Changeset - a2eaa0054430
[Not reviewed]
beta
0 2 0
Marcin Kuzminski - 13 years ago 2012-08-14 01:02:53
marcin@python-works.com
fixed error when disabled anonymous access lead to error on server
2 files changed with 38 insertions and 0 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/auth.py
Show inline comments
 
@@ -231,192 +231,193 @@ def authenticate(username, password):
 
                _password = PasswordGenerator().gen_password(length=8)
 
                # create this user on the fly if it doesn't exist in rhodecode
 
                # database
 
                if user_model.create_ldap(username, _password, user_dn,
 
                                          user_attrs):
 
                    log.info('created new ldap user %s' % username)
 

	
 
                Session().commit()
 
                return True
 
            except (LdapUsernameError, LdapPasswordError,):
 
                pass
 
            except (Exception,):
 
                log.error(traceback.format_exc())
 
                pass
 
    return False
 

	
 

	
 
def login_container_auth(username):
 
    user = User.get_by_username(username)
 
    if user is None:
 
        user_attrs = {
 
            'name': username,
 
            'lastname': None,
 
            'email': None,
 
        }
 
        user = UserModel().create_for_container_auth(username, user_attrs)
 
        if not user:
 
            return None
 
        log.info('User %s was created by container authentication' % username)
 

	
 
    if not user.active:
 
        return None
 

	
 
    user.update_lastlogin()
 
    Session().commit()
 

	
 
    log.debug('User %s is now logged in by container authentication',
 
              user.username)
 
    return user
 

	
 

	
 
def get_container_username(environ, config):
 
    username = None
 

	
 
    if str2bool(config.get('container_auth_enabled', False)):
 
        from paste.httpheaders import REMOTE_USER
 
        username = REMOTE_USER(environ)
 

	
 
    if not username and str2bool(config.get('proxypass_auth_enabled', False)):
 
        username = environ.get('HTTP_X_FORWARDED_USER')
 

	
 
    if username:
 
        # Removing realm and domain from username
 
        username = username.partition('@')[0]
 
        username = username.rpartition('\\')[2]
 
        log.debug('Received username %s from container' % username)
 

	
 
    return username
 

	
 

	
 
class CookieStoreWrapper(object):
 

	
 
    def __init__(self, cookie_store):
 
        self.cookie_store = cookie_store
 

	
 
    def __repr__(self):
 
        return 'CookieStore<%s>' % (self.cookie_store)
 

	
 
    def get(self, key, other=None):
 
        if isinstance(self.cookie_store, dict):
 
            return self.cookie_store.get(key, other)
 
        elif isinstance(self.cookie_store, AuthUser):
 
            return self.cookie_store.__dict__.get(key, other)
 

	
 

	
 
class  AuthUser(object):
 
    """
 
    A simple object that handles all attributes of user in RhodeCode
 

	
 
    It does lookup based on API key,given user, or user present in session
 
    Then it fills all required information for such user. It also checks if
 
    anonymous access is enabled and if so, it returns default user as logged
 
    in
 
    """
 

	
 
    def __init__(self, user_id=None, api_key=None, username=None):
 

	
 
        self.user_id = user_id
 
        self.api_key = None
 
        self.username = username
 

	
 
        self.name = ''
 
        self.lastname = ''
 
        self.email = ''
 
        self.is_authenticated = False
 
        self.admin = False
 
        self.inherit_default_permissions = False
 
        self.permissions = {}
 
        self._api_key = api_key
 
        self.propagate_data()
 
        self._instance = None
 

	
 
    def propagate_data(self):
 
        user_model = UserModel()
 
        self.anonymous_user = User.get_by_username('default', cache=True)
 
        is_user_loaded = False
 

	
 
        # try go get user by api key
 
        if self._api_key and self._api_key != self.anonymous_user.api_key:
 
            log.debug('Auth User lookup by API KEY %s' % self._api_key)
 
            is_user_loaded = user_model.fill_data(self, api_key=self._api_key)
 
        # lookup by userid
 
        elif (self.user_id is not None and
 
              self.user_id != self.anonymous_user.user_id):
 
            log.debug('Auth User lookup by USER ID %s' % self.user_id)
 
            is_user_loaded = user_model.fill_data(self, user_id=self.user_id)
 
        # lookup by username
 
        elif self.username and \
 
            str2bool(config.get('container_auth_enabled', False)):
 

	
 
            log.debug('Auth User lookup by USER NAME %s' % self.username)
 
            dbuser = login_container_auth(self.username)
 
            if dbuser is not None:
 
                log.debug('filling all attributes to object')
 
                for k, v in dbuser.get_dict().items():
 
                    setattr(self, k, v)
 
                self.set_authenticated()
 
                is_user_loaded = True
 
        else:
 
            log.debug('No data in %s that could been used to log in' % self)
 

	
 
        if not is_user_loaded:
 
            # if we cannot authenticate user try anonymous
 
            if self.anonymous_user.active is True:
 
                user_model.fill_data(self, user_id=self.anonymous_user.user_id)
 
                # then we set this user is logged in
 
                self.is_authenticated = True
 
            else:
 
                self.user_id = None
 
                self.username = None
 
                self.is_authenticated = False
 

	
 
        if not self.username:
 
            self.username = 'None'
 

	
 
        log.debug('Auth User is now %s' % self)
 
        user_model.fill_perms(self)
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    def __repr__(self):
 
        return "<AuthUser('id:%s:%s|%s')>" % (self.user_id, self.username,
 
                                              self.is_authenticated)
 

	
 
    def set_authenticated(self, authenticated=True):
 
        if self.user_id != self.anonymous_user.user_id:
 
            self.is_authenticated = authenticated
 

	
 
    def get_cookie_store(self):
 
        return {'username': self.username,
 
                'user_id': self.user_id,
 
                'is_authenticated': self.is_authenticated}
 

	
 
    @classmethod
 
    def from_cookie_store(cls, cookie_store):
 
        """
 
        Creates AuthUser from a cookie store
 

	
 
        :param cls:
 
        :param cookie_store:
 
        """
 
        user_id = cookie_store.get('user_id')
 
        username = cookie_store.get('username')
 
        api_key = cookie_store.get('api_key')
 
        return AuthUser(user_id, api_key, username)
 

	
 

	
 
def set_available_permissions(config):
 
    """
 
    This function will propagate pylons globals with all available defined
 
    permission given in db. We don't want to check each time from db for new
 
    permissions since adding a new permission also requires application restart
 
    ie. to decorate new views with the newly created permission
 

	
 
    :param config: current pylons config instance
 

	
 
    """
 
    log.info('getting information about all available permissions')
 
    try:
 
        sa = meta.Session
 
        all_perms = sa.query(Permission).all()
rhodecode/tests/functional/test_home.py
Show inline comments
 
import time
 
from rhodecode.tests import *
 
from rhodecode.model.meta import Session
 
from rhodecode.model.db import User
 

	
 

	
 
class TestHomeController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='home', action='index'))
 
        #if global permission is set
 
        response.mustcontain('ADD REPOSITORY')
 
        response.mustcontain('href="/%s/summary"' % HG_REPO)
 

	
 
        response.mustcontain("""<img class="icon" title="Mercurial repository" """
 
                        """alt="Mercurial repository" src="/images/icons/hg"""
 
                        """icon.png"/>""")
 
        response.mustcontain("""<img class="icon" title="public repository" """
 
                        """alt="public repository" src="/images/icons/lock_"""
 
                        """open.png"/>""")
 

	
 
        response.mustcontain(
 
"""<a title="Marcin Kuzminski &amp;lt;marcin@python-works.com&amp;gt;:\n
 
merge" class="tooltip" href="/vcs_test_hg/changeset/27cd5cce30c96924232"""
 
"""dffcd24178a07ffeb5dfc">r173:27cd5cce30c9</a>"""
 
)
 

	
 
    def test_repo_summary_with_anonymous_access_disabled(self):
 
        anon = User.get_by_username('default')
 
        anon.active = False
 
        Session().add(anon)
 
        Session().commit()
 
        time.sleep(1.5)  # must sleep for cache (1s to expire)
 
        try:
 
            response = self.app.get(url(controller='summary',
 
                                        action='index', repo_name=HG_REPO),
 
                                        status=302)
 
            assert 'login' in response.location
 

	
 
        finally:
 
            anon = User.get_by_username('default')
 
            anon.active = True
 
            Session().add(anon)
 
            Session().commit()
 

	
 
    def test_index_with_anonymous_access_disabled(self):
 
        anon = User.get_by_username('default')
 
        anon.active = False
 
        Session().add(anon)
 
        Session().commit()
 
        time.sleep(1.5)  # must sleep for cache (1s to expire)
 
        try:
 
            response = self.app.get(url(controller='home', action='index'),
 
                                    status=302)
 
            assert 'login' in response.location
 
        finally:
 
            anon = User.get_by_username('default')
 
            anon.active = True
 
            Session().add(anon)
 
            Session().commit()
0 comments (0 inline, 0 general)