Changeset - 2864cff1f12a
[Not reviewed]
default
0 1 0
Mads Kiilerich - 5 years ago 2020-08-23 14:50:56
mads@kiilerich.com
auth: compute AuthUser.user_group_permissions lazily
1 file changed with 47 insertions and 68 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth.py
Show inline comments
 
@@ -109,112 +109,48 @@ def check_password(password, hashed):
 
        return hashlib.sha256(password).hexdigest() == hashed
 
    try:
 
        return bcrypt.checkpw(safe_bytes(password), ascii_bytes(hashed))
 
    except ValueError as e:
 
        # bcrypt will throw ValueError 'Invalid hashed_password salt' on all password errors
 
        log.error('error from bcrypt checking password: %s', e)
 
        return False
 
    log.error('check_password failed - no method found for hash length %s', len(hashed))
 
    return False
 

	
 

	
 
PERM_WEIGHTS = Permission.PERM_WEIGHTS
 

	
 
def bump_permission(permissions, key, new_perm):
 
    """Add a new permission for key to permissions.
 
    Assuming the permissions are comparable, set the new permission if it
 
    has higher weight, else drop it and keep the old permission.
 
    """
 
    cur_perm = permissions[key]
 
    new_perm_val = PERM_WEIGHTS[new_perm]
 
    cur_perm_val = PERM_WEIGHTS[cur_perm]
 
    if new_perm_val > cur_perm_val:
 
        permissions[key] = new_perm
 

	
 
def get_user_permissions(user_id, user_is_admin):
 
    user_group_permissions = {}
 

	
 
    #======================================================================
 
    # fetch default permissions
 
    #======================================================================
 
    default_user_group_perms = Permission.get_default_user_group_perms(kallithea.DEFAULT_USER_ID)
 

	
 
    if user_is_admin:
 
        #==================================================================
 
        # admin users have all rights;
 
        # based on default permissions, just set everything to admin
 
        #==================================================================
 

	
 
        # user groups
 
        for perm in default_user_group_perms:
 
            u_k = perm.user_group.users_group_name
 
            p = 'usergroup.admin'
 
            user_group_permissions[u_k] = p
 
        return (user_group_permissions)
 

	
 
    #==================================================================
 
    # SET DEFAULTS GLOBAL, REPOS, REPOSITORY GROUPS
 
    #==================================================================
 

	
 
    # defaults for user groups taken from default user permission
 
    # on given user group
 
    for perm in default_user_group_perms:
 
        u_k = perm.user_group.users_group_name
 
        p = perm.permission.permission_name
 
        user_group_permissions[u_k] = p
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR USER GROUPS !!
 
    #======================================================================
 
    # user group for user group permissions
 
    user_group_user_groups_perms = \
 
     Session().query(UserGroupUserGroupToPerm) \
 
     .join((UserGroup, UserGroupUserGroupToPerm.target_user_group_id
 
            == UserGroup.users_group_id)) \
 
     .join((UserGroupMember, UserGroupUserGroupToPerm.user_group_id
 
            == UserGroupMember.users_group_id)) \
 
     .filter(UserGroupMember.user_id == user_id) \
 
     .join((UserGroup, UserGroupMember.users_group_id ==
 
            UserGroup.users_group_id), aliased=True, from_joinpoint=True) \
 
     .filter(UserGroup.users_group_active == True) \
 
     .options(joinedload(UserGroupUserGroupToPerm.permission)) \
 
     .all()
 

	
 
    for perm in user_group_user_groups_perms:
 
        bump_permission(user_group_permissions,
 
            perm.target_user_group.users_group_name,
 
            perm.permission.permission_name)
 

	
 
    # user explicit permission for user groups
 
    user_user_groups_perms = Permission.get_default_user_group_perms(user_id)
 
    for perm in user_user_groups_perms:
 
        bump_permission(user_group_permissions,
 
            perm.user_group.users_group_name,
 
            perm.permission.permission_name)
 

	
 
    return (user_group_permissions)
 

	
 

	
 
class AuthUser(object):
 
    """
 
    Represents a Kallithea user, including various authentication and
 
    authorization information. Typically used to store the current user,
 
    but is also used as a generic user information data structure in
 
    parts of the code, e.g. user management.
 

	
 
    Constructed from a database `User` object, a user ID or cookie dict,
 
    it looks up the user (if needed) and copies all attributes to itself,
 
    adding various non-persistent data. If lookup fails but anonymous
 
    access to Kallithea is enabled, the default user is loaded instead.
 

	
 
    `AuthUser` does not by itself authenticate users. It's up to other parts of
 
    the code to check e.g. if a supplied password is correct, and if so, trust
 
    the AuthUser object as an authenticated user.
 

	
 
    However, `AuthUser` does refuse to load a user that is not `active`.
 

	
 
    Note that Kallithea distinguishes between the default user (an actual
 
    user in the database with username "default") and "no user" (no actual
 
    User object, AuthUser filled with blank values and username "None").
 

	
 
    If the default user is active, that will always be used instead of
 
    "no user". On the other hand, if the default user is disabled (and
 
@@ -258,52 +194,48 @@ class AuthUser(object):
 

	
 
        # Look up database user, if necessary.
 
        if user_id is not None:
 
            assert dbuser is None
 
            log.debug('Auth User lookup by USER ID %s', user_id)
 
            dbuser = UserModel().get(user_id)
 
            assert dbuser is not None
 
        else:
 
            assert dbuser is not None
 
            log.debug('Auth User lookup by database user %s', dbuser)
 

	
 
        log.debug('filling %s data', dbuser)
 
        self.is_anonymous = dbuser.is_default_user
 
        if dbuser.is_default_user and not dbuser.active:
 
            self.username = 'None'
 
            self.is_default_user = False
 
        else:
 
            # copy non-confidential database fields from a `db.User` to this `AuthUser`.
 
            for k, v in dbuser.get_dict().items():
 
                assert k not in ['api_keys', 'permissions']
 
                setattr(self, k, v)
 
            self.is_default_user = dbuser.is_default_user
 
        log.debug('Auth User is now %s', self)
 

	
 
        log.debug('Getting PERMISSION tree for %s', self)
 
        (self.user_group_permissions,
 
        )= get_user_permissions(self.user_id, self.is_admin)
 

	
 
    @LazyProperty
 
    def global_permissions(self):
 
        log.debug('Getting global permissions for %s', self)
 

	
 
        if self.is_admin:
 
            return set(['hg.admin'])
 

	
 
        global_permissions = set()
 

	
 
        # default global permissions from the default user
 
        default_global_perms = UserToPerm.query() \
 
            .filter(UserToPerm.user_id == kallithea.DEFAULT_USER_ID) \
 
            .options(joinedload(UserToPerm.permission))
 
        for perm in default_global_perms:
 
            global_permissions.add(perm.permission.permission_name)
 

	
 
        # user group global permissions
 
        user_perms_from_users_groups = Session().query(UserGroupToPerm) \
 
            .options(joinedload(UserGroupToPerm.permission)) \
 
            .join((UserGroupMember, UserGroupToPerm.users_group_id ==
 
                   UserGroupMember.users_group_id)) \
 
            .filter(UserGroupMember.user_id == self.user_id) \
 
            .join((UserGroup, UserGroupMember.users_group_id ==
 
                   UserGroup.users_group_id)) \
 
@@ -408,48 +340,95 @@ class AuthUser(object):
 
                Session().query(UserGroupRepoGroupToPerm) \
 
                .join((UserGroup, UserGroupRepoGroupToPerm.users_group_id ==
 
                       UserGroup.users_group_id)) \
 
                .filter(UserGroup.users_group_active == True) \
 
                .join((UserGroupMember, UserGroupRepoGroupToPerm.users_group_id
 
                       == UserGroupMember.users_group_id)) \
 
                .filter(UserGroupMember.user_id == self.user_id) \
 
                .options(joinedload(UserGroupRepoGroupToPerm.permission)) \
 
                .all()
 
            for perm in user_repo_group_perms_from_users_groups:
 
                bump_permission(repository_group_permissions,
 
                    perm.group.group_name,
 
                    perm.permission.permission_name)
 

	
 
            # user explicit permissions for repository groups
 
            user_repo_groups_perms = Permission.get_default_group_perms(self.user_id)
 
            for perm in user_repo_groups_perms:
 
                bump_permission(repository_group_permissions,
 
                    perm.group.group_name,
 
                    perm.permission.permission_name)
 

	
 
        return repository_group_permissions
 

	
 
    @LazyProperty
 
    def user_group_permissions(self):
 
        log.debug('Getting user group permissions for %s', self)
 
        user_group_permissions = {}
 
        default_user_group_perms = Permission.get_default_user_group_perms(kallithea.DEFAULT_USER_ID)
 

	
 
        if self.is_admin:
 
            for perm in default_user_group_perms:
 
                u_k = perm.user_group.users_group_name
 
                p = 'usergroup.admin'
 
                user_group_permissions[u_k] = p
 

	
 
        else:
 
            # defaults for user groups taken from default user permission
 
            # on given user group
 
            for perm in default_user_group_perms:
 
                u_k = perm.user_group.users_group_name
 
                p = perm.permission.permission_name
 
                user_group_permissions[u_k] = p
 

	
 
            # user group for user group permissions
 
            user_group_user_groups_perms = \
 
                Session().query(UserGroupUserGroupToPerm) \
 
                .join((UserGroup, UserGroupUserGroupToPerm.target_user_group_id
 
                       == UserGroup.users_group_id)) \
 
                .join((UserGroupMember, UserGroupUserGroupToPerm.user_group_id
 
                       == UserGroupMember.users_group_id)) \
 
                .filter(UserGroupMember.user_id == self.user_id) \
 
                .join((UserGroup, UserGroupMember.users_group_id ==
 
                       UserGroup.users_group_id), aliased=True, from_joinpoint=True) \
 
                .filter(UserGroup.users_group_active == True) \
 
                .options(joinedload(UserGroupUserGroupToPerm.permission)) \
 
                .all()
 
            for perm in user_group_user_groups_perms:
 
                bump_permission(user_group_permissions,
 
                    perm.target_user_group.users_group_name,
 
                    perm.permission.permission_name)
 

	
 
            # user explicit permission for user groups
 
            user_user_groups_perms = Permission.get_default_user_group_perms(self.user_id)
 
            for perm in user_user_groups_perms:
 
                bump_permission(user_group_permissions,
 
                    perm.user_group.users_group_name,
 
                    perm.permission.permission_name)
 

	
 
        return user_group_permissions
 

	
 
    @LazyProperty
 
    def permissions(self):
 
        """dict with all 4 kind of permissions - mainly for backwards compatibility"""
 
        return {
 
            'global': self.global_permissions,
 
            'repositories': self.repository_permissions,
 
            'repositories_groups': self.repository_group_permissions,
 
            'user_groups': self.user_group_permissions,
 
        }
 

	
 
    def has_repository_permission_level(self, repo_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['repository.read', 'repository.write', 'repository.admin'],
 
            'write': ['repository.write', 'repository.admin'],
 
            'admin': ['repository.admin'],
 
        }[level]
 
        actual_perm = self.repository_permissions.get(repo_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo %r (%s): %s (has %r)',
 
            self.username, level, repo_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    def has_repository_group_permission_level(self, repo_group_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['group.read', 'group.write', 'group.admin'],
0 comments (0 inline, 0 general)