Changeset - b580691553f5
[Not reviewed]
default
0 1 0
Søren Løvborg - 10 years ago 2015-07-26 14:10:44
kwi@kwi.dk
auth: turn dead AuthUser code into assertion

The result of db.User.get_dict never contains the keys 'api_keys' or
'permissions'. The keys returned by get_dict are 1) all the User table
columns, 2) the keys explicitly defined in User.__json__, and 3) the
keys defined in User.get_api_data, none of which include the two
blacklisted keys.

'api_keys' would be returned if __json__ called get_api_data with
argument details=True; but currently that is not the case.

In case there's a reason why these two keys must never appear in an
AuthUser object, the check has not been removed entirely; instead, it's
been turned into an assertion. This way, it will be noticed if __json__
is later modified to request detailed API data, for instance.
1 file changed with 2 insertions and 2 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth.py
Show inline comments
 
@@ -436,194 +436,194 @@ def allowed_api_access(controller_name, 
 
    Check if given controller_name is in whitelist API access
 
    """
 
    if not whitelist:
 
        from kallithea import CONFIG
 
        whitelist = aslist(CONFIG.get('api_access_controllers_whitelist'),
 
                           sep=',')
 
        log.debug('whitelist of API access is: %s' % (whitelist))
 
    api_access_valid = controller_name in whitelist
 
    if api_access_valid:
 
        log.debug('controller:%s is in API whitelist' % (controller_name))
 
    else:
 
        msg = 'controller: %s is *NOT* in API whitelist' % (controller_name)
 
        if api_key:
 
            #if we use API key and don't have access it's a warning
 
            log.warning(msg)
 
        else:
 
            log.debug(msg)
 
    return api_access_valid
 

	
 

	
 
class AuthUser(object):
 
    """
 
    Represents a Kallithea user, including various authentication and
 
    authorization information. Typically used to store the current user,
 
    but is also used as a generic user information data structure in
 
    parts of the code, e.g. user management.
 

	
 
    Constructed from a database `User` object, a user ID or cookie dict,
 
    it looks up the user (if needed) and copies all attributes to itself,
 
    adding various non-persistent data. If lookup fails but anonymous
 
    access to Kallithea is enabled, the default user is loaded instead.
 

	
 
    `AuthUser` does not by itself authenticate users and the constructor
 
    sets the `is_authenticated` field to False, except when falling back
 
    to the default anonymous user (if enabled). It's up to other parts
 
    of the code to check e.g. if a supplied password is correct, and if
 
    so, set `is_authenticated` to True.
 

	
 
    However, `AuthUser` does refuse to load a user that is not `active`.
 
    """
 

	
 
    def __init__(self, user_id=None, dbuser=None,
 
            is_external_auth=False):
 

	
 
        self.is_authenticated = False
 
        self.is_external_auth = is_external_auth
 

	
 
        user_model = UserModel()
 
        self.anonymous_user = User.get_default_user(cache=True)
 

	
 
        # These attributes will be overriden by fill_data, below, unless the
 
        # requested user cannot be found and the default anonymous user is
 
        # not enabled.
 
        self.user_id = None
 
        self.username = None
 
        self.api_key = None
 
        self.name = ''
 
        self.lastname = ''
 
        self.email = ''
 
        self.admin = False
 
        self.inherit_default_permissions = False
 

	
 
        # Look up database user, if necessary.
 
        if user_id is not None:
 
            log.debug('Auth User lookup by USER ID %s' % user_id)
 
            dbuser = user_model.get(user_id)
 
        else:
 
            # Note: dbuser is allowed to be None.
 
            log.debug('Auth User lookup by database user %s', dbuser)
 

	
 
        is_user_loaded = self._fill_data(dbuser)
 

	
 
        # If user cannot be found, try falling back to anonymous.
 
        if not is_user_loaded:
 
            is_user_loaded =  self._fill_data(self.anonymous_user)
 

	
 
        # The anonymous user is always "logged in".
 
        if self.user_id == self.anonymous_user.user_id:
 
            self.is_authenticated = True
 

	
 
        if not self.username:
 
            self.username = 'None'
 

	
 
        log.debug('Auth User is now %s' % self)
 

	
 
    def _fill_data(self, dbuser):
 
        """
 
        Copies database fields from a `db.User` to this `AuthUser`. Does
 
        not copy `api_keys` and `permissions` attributes.
 

	
 
        Checks that `dbuser` is `active` (and not None) before copying;
 
        returns True on success.
 
        """
 
        if dbuser is not None and dbuser.active:
 
            log.debug('filling %s data', dbuser)
 
            for k, v in dbuser.get_dict().iteritems():
 
                if k not in ['api_keys', 'permissions']:
 
                    setattr(self, k, v)
 
                assert k not in ['api_keys', 'permissions']
 
                setattr(self, k, v)
 
            return True
 
        return False
 

	
 
    @LazyProperty
 
    def permissions(self):
 
        return self.__get_perms(user=self, cache=False)
 

	
 
    @property
 
    def api_keys(self):
 
        return self._get_api_keys()
 

	
 
    def __get_perms(self, user, explicit=True, algo='higherwin', cache=False):
 
        """
 
        Fills user permission attribute with permissions taken from database
 
        works for permissions given for repositories, and for permissions that
 
        are granted to groups
 

	
 
        :param user: `AuthUser` instance
 
        :param explicit: In case there are permissions both for user and a group
 
            that user is part of, explicit flag will define if user will
 
            explicitly override permissions from group, if it's False it will
 
            make decision based on the algo
 
        :param algo: algorithm to decide what permission should be choose if
 
            it's multiple defined, eg user in two different groups. It also
 
            decides if explicit flag is turned off how to specify the permission
 
            for case when user is in a group + have defined separate permission
 
        """
 
        user_id = user.user_id
 
        user_is_admin = user.is_admin
 
        user_inherit_default_permissions = user.inherit_default_permissions
 

	
 
        log.debug('Getting PERMISSION tree')
 
        compute = conditional_cache('short_term', 'cache_desc',
 
                                    condition=cache, func=_cached_perms_data)
 
        return compute(user_id, user_is_admin,
 
                       user_inherit_default_permissions, explicit, algo)
 

	
 
    def _get_api_keys(self):
 
        api_keys = [self.api_key]
 
        for api_key in UserApiKeys.query()\
 
                .filter(UserApiKeys.user_id == self.user_id)\
 
                .filter(or_(UserApiKeys.expires == -1,
 
                            UserApiKeys.expires >= time.time())).all():
 
            api_keys.append(api_key.api_key)
 

	
 
        return api_keys
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @property
 
    def repositories_admin(self):
 
        """
 
        Returns list of repositories you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories'].iteritems()
 
                if x[1] == 'repository.admin']
 

	
 
    @property
 
    def repository_groups_admin(self):
 
        """
 
        Returns list of repository groups you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories_groups'].iteritems()
 
                if x[1] == 'group.admin']
 

	
 
    @property
 
    def user_groups_admin(self):
 
        """
 
        Returns list of user groups you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['user_groups'].iteritems()
 
                if x[1] == 'usergroup.admin']
 

	
 
    @staticmethod
 
    def check_ip_allowed(user, ip_addr):
 
        """
 
        Check if the given IP address (a `str`) is allowed for the given
 
        user (an `AuthUser` or `db.User`).
 
        """
 
        allowed_ips = AuthUser.get_allowed_ips(user.user_id, cache=True,
 
            inherit_from_default=user.inherit_default_permissions)
 
        if check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips):
 
            log.debug('IP:%s is in range of %s' % (ip_addr, allowed_ips))
 
            return True
 
        else:
 
            log.info('Access for IP:%s forbidden, '
 
                     'not in %s' % (ip_addr, allowed_ips))
 
            return False
 

	
 
    def __repr__(self):
 
        return "<AuthUser('id:%s[%s] auth:%s')>"\
 
            % (self.user_id, self.username, self.is_authenticated)
 

	
 
    def set_authenticated(self, authenticated=True):
0 comments (0 inline, 0 general)