Changeset - 8eed16b2a99b
[Not reviewed]
default
0 2 0
Mads Kiilerich - 7 years ago 2018-12-29 18:55:01
mads@kiilerich.com
auth: minor refactoring of computation of admin access for repo owners

Make the flow slightly simpler ... and now when permissions are merged, we only
have to set repo owner access once.

BUT: because multiple_counter, we actually don't merge permissions in all
cases. This will thus introduce a regression that will be fixed in next
changeset.
2 files changed with 4 insertions and 16 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth.py
Show inline comments
 
@@ -106,305 +106,293 @@ def get_crypt_password(password):
 
    else:
 
        raise Exception('Unknown or unsupported platform %s'
 
                        % __platform__)
 

	
 

	
 
def check_password(password, hashed):
 
    """
 
    Checks matching password with it's hashed value, runs different
 
    implementation based on platform it runs on
 

	
 
    :param password: password
 
    :param hashed: password in hashed form
 
    """
 

	
 
    if is_windows:
 
        return hashlib.sha256(password).hexdigest() == hashed
 
    elif is_unix:
 
        import bcrypt
 
        try:
 
            return bcrypt.checkpw(safe_str(password), safe_str(hashed))
 
        except ValueError as e:
 
            # bcrypt will throw ValueError 'Invalid hashed_password salt' on all password errors
 
            log.error('error from bcrypt checking password: %s', e)
 
            return False
 
    else:
 
        raise Exception('Unknown or unsupported platform %s'
 
                        % __platform__)
 

	
 

	
 
def _cached_perms_data(user_id, user_is_admin):
 
    RK = 'repositories'
 
    GK = 'repositories_groups'
 
    UK = 'user_groups'
 
    GLOBAL = 'global'
 
    PERM_WEIGHTS = Permission.PERM_WEIGHTS
 
    permissions = {RK: {}, GK: {}, UK: {}, GLOBAL: set()}
 

	
 
    def _choose_perm(new_perm, cur_perm):
 
        new_perm_val = PERM_WEIGHTS[new_perm]
 
        cur_perm_val = PERM_WEIGHTS[cur_perm]
 
        if new_perm_val > cur_perm_val:
 
            return new_perm
 
        return cur_perm
 

	
 
    #======================================================================
 
    # fetch default permissions
 
    #======================================================================
 
    default_user = User.get_by_username('default', cache=True)
 
    default_user_id = default_user.user_id
 

	
 
    default_repo_perms = Permission.get_default_perms(default_user_id)
 
    default_repo_groups_perms = Permission.get_default_group_perms(default_user_id)
 
    default_user_group_perms = Permission.get_default_user_group_perms(default_user_id)
 

	
 
    if user_is_admin:
 
        #==================================================================
 
        # admin users have all rights;
 
        # based on default permissions, just set everything to admin
 
        #==================================================================
 
        permissions[GLOBAL].add('hg.admin')
 
        permissions[GLOBAL].add('hg.create.write_on_repogroup.true')
 

	
 
        # repositories
 
        for perm in default_repo_perms:
 
            r_k = perm.UserRepoToPerm.repository.repo_name
 
            p = 'repository.admin'
 
            permissions[RK][r_k] = p
 

	
 
        # repository groups
 
        for perm in default_repo_groups_perms:
 
            rg_k = perm.UserRepoGroupToPerm.group.group_name
 
            p = 'group.admin'
 
            permissions[GK][rg_k] = p
 

	
 
        # user groups
 
        for perm in default_user_group_perms:
 
            u_k = perm.UserUserGroupToPerm.user_group.users_group_name
 
            p = 'usergroup.admin'
 
            permissions[UK][u_k] = p
 
        return permissions
 

	
 
    #==================================================================
 
    # SET DEFAULTS GLOBAL, REPOS, REPOSITORY GROUPS
 
    #==================================================================
 

	
 
    # default global permissions taken from the default user
 
    default_global_perms = UserToPerm.query() \
 
        .filter(UserToPerm.user_id == default_user_id) \
 
        .options(joinedload(UserToPerm.permission))
 

	
 
    for perm in default_global_perms:
 
        permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # defaults for repositories, taken from default user
 
    for perm in default_repo_perms:
 
        r_k = perm.UserRepoToPerm.repository.repo_name
 
        if perm.Repository.private and not (perm.Repository.owner_id == user_id):
 
            # disable defaults for private repos,
 
        if perm.Repository.owner_id == user_id:
 
            p = 'repository.admin'
 
        elif perm.Repository.private:
 
            p = 'repository.none'
 
        elif perm.Repository.owner_id == user_id:
 
            # set admin if owner
 
            p = 'repository.admin'
 
        else:
 
            p = perm.Permission.permission_name
 

	
 
        permissions[RK][r_k] = p
 

	
 
    # defaults for repository groups taken from default user permission
 
    # on given group
 
    for perm in default_repo_groups_perms:
 
        rg_k = perm.UserRepoGroupToPerm.group.group_name
 
        p = perm.Permission.permission_name
 
        permissions[GK][rg_k] = p
 

	
 
    # defaults for user groups taken from default user permission
 
    # on given user group
 
    for perm in default_user_group_perms:
 
        u_k = perm.UserUserGroupToPerm.user_group.users_group_name
 
        p = perm.Permission.permission_name
 
        permissions[UK][u_k] = p
 

	
 
    #======================================================================
 
    # !! Augment GLOBALS with user permissions if any found !!
 
    #======================================================================
 

	
 
    # USER GROUPS comes first
 
    # user group global permissions
 
    user_perms_from_users_groups = Session().query(UserGroupToPerm) \
 
        .options(joinedload(UserGroupToPerm.permission)) \
 
        .join((UserGroupMember, UserGroupToPerm.users_group_id ==
 
               UserGroupMember.users_group_id)) \
 
        .filter(UserGroupMember.user_id == user_id) \
 
        .join((UserGroup, UserGroupMember.users_group_id ==
 
               UserGroup.users_group_id)) \
 
        .filter(UserGroup.users_group_active == True) \
 
        .order_by(UserGroupToPerm.users_group_id) \
 
        .all()
 
    # need to group here by groups since user can be in more than
 
    # one group
 
    _grouped = [[x, list(y)] for x, y in
 
                itertools.groupby(user_perms_from_users_groups,
 
                                  lambda x:x.users_group)]
 
    for gr, perms in _grouped:
 
        for perm in perms:
 
            permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # user specific global permissions
 
    user_perms = Session().query(UserToPerm) \
 
            .options(joinedload(UserToPerm.permission)) \
 
            .filter(UserToPerm.user_id == user_id).all()
 

	
 
    for perm in user_perms:
 
        permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # for each kind of global permissions, only keep the one with heighest weight
 
    kind_max_perm = {}
 
    for perm in sorted(permissions[GLOBAL], key=lambda n: PERM_WEIGHTS[n]):
 
        kind = perm.rsplit('.', 1)[0]
 
        kind_max_perm[kind] = perm
 
    permissions[GLOBAL] = set(kind_max_perm.values())
 
    ## END GLOBAL PERMISSIONS
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR REPOSITORIES !!
 
    #======================================================================
 
    #======================================================================
 
    # check if user is part of user groups for this repository and
 
    # fill in his permission from it.
 
    #======================================================================
 

	
 
    # user group for repositories permissions
 
    user_repo_perms_from_users_groups = \
 
     Session().query(UserGroupRepoToPerm, Permission, Repository,) \
 
        .join((Repository, UserGroupRepoToPerm.repository_id ==
 
               Repository.repo_id)) \
 
        .join((Permission, UserGroupRepoToPerm.permission_id ==
 
               Permission.permission_id)) \
 
        .join((UserGroup, UserGroupRepoToPerm.users_group_id ==
 
               UserGroup.users_group_id)) \
 
        .filter(UserGroup.users_group_active == True) \
 
        .join((UserGroupMember, UserGroupRepoToPerm.users_group_id ==
 
               UserGroupMember.users_group_id)) \
 
        .filter(UserGroupMember.user_id == user_id) \
 
        .all()
 

	
 
    multiple_counter = collections.defaultdict(int)
 
    for perm in user_repo_perms_from_users_groups:
 
        r_k = perm.UserGroupRepoToPerm.repository.repo_name
 
        multiple_counter[r_k] += 1
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[RK][r_k]
 

	
 
        if perm.Repository.owner_id == user_id:
 
            # set admin if owner
 
            p = 'repository.admin'
 
        else:
 
            if multiple_counter[r_k] > 1:
 
                p = _choose_perm(p, cur_perm)
 
        permissions[RK][r_k] = p
 

	
 
    # user permissions for repositories
 
    user_repo_perms = Permission.get_default_perms(user_id)
 
    for perm in user_repo_perms:
 
        r_k = perm.UserRepoToPerm.repository.repo_name
 
        cur_perm = permissions[RK][r_k]
 
        # set admin if owner
 
        if perm.Repository.owner_id == user_id:
 
            p = 'repository.admin'
 
        else:
 
            p = perm.Permission.permission_name
 
            p = _choose_perm(p, cur_perm)
 
        permissions[RK][r_k] = p
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR REPOSITORY GROUPS !!
 
    #======================================================================
 
    #======================================================================
 
    # check if user is part of user groups for this repository groups and
 
    # fill in his permission from it.
 
    #======================================================================
 
    # user group for repo groups permissions
 
    user_repo_group_perms_from_users_groups = \
 
     Session().query(UserGroupRepoGroupToPerm, Permission, RepoGroup) \
 
     .join((RepoGroup, UserGroupRepoGroupToPerm.group_id == RepoGroup.group_id)) \
 
     .join((Permission, UserGroupRepoGroupToPerm.permission_id
 
            == Permission.permission_id)) \
 
     .join((UserGroup, UserGroupRepoGroupToPerm.users_group_id ==
 
            UserGroup.users_group_id)) \
 
     .filter(UserGroup.users_group_active == True) \
 
     .join((UserGroupMember, UserGroupRepoGroupToPerm.users_group_id
 
            == UserGroupMember.users_group_id)) \
 
     .filter(UserGroupMember.user_id == user_id) \
 
     .all()
 

	
 
    multiple_counter = collections.defaultdict(int)
 
    for perm in user_repo_group_perms_from_users_groups:
 
        g_k = perm.UserGroupRepoGroupToPerm.group.group_name
 
        multiple_counter[g_k] += 1
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[GK][g_k]
 
        if multiple_counter[g_k] > 1:
 
            p = _choose_perm(p, cur_perm)
 
        permissions[GK][g_k] = p
 

	
 
    # user explicit permissions for repository groups
 
    user_repo_groups_perms = Permission.get_default_group_perms(user_id)
 
    for perm in user_repo_groups_perms:
 
        rg_k = perm.UserRepoGroupToPerm.group.group_name
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[GK][rg_k]
 
        p = _choose_perm(p, cur_perm)
 
        permissions[GK][rg_k] = p
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR USER GROUPS !!
 
    #======================================================================
 
    # user group for user group permissions
 
    user_group_user_groups_perms = \
 
     Session().query(UserGroupUserGroupToPerm, Permission, UserGroup) \
 
     .join((UserGroup, UserGroupUserGroupToPerm.target_user_group_id
 
            == UserGroup.users_group_id)) \
 
     .join((Permission, UserGroupUserGroupToPerm.permission_id
 
            == Permission.permission_id)) \
 
     .join((UserGroupMember, UserGroupUserGroupToPerm.user_group_id
 
            == UserGroupMember.users_group_id)) \
 
     .filter(UserGroupMember.user_id == user_id) \
 
     .join((UserGroup, UserGroupMember.users_group_id ==
 
            UserGroup.users_group_id), aliased=True, from_joinpoint=True) \
 
     .filter(UserGroup.users_group_active == True) \
 
     .all()
 

	
 
    multiple_counter = collections.defaultdict(int)
 
    for perm in user_group_user_groups_perms:
 
        g_k = perm.UserGroupUserGroupToPerm.target_user_group.users_group_name
 
        multiple_counter[g_k] += 1
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[UK][g_k]
 
        if multiple_counter[g_k] > 1:
 
            p = _choose_perm(p, cur_perm)
 
        permissions[UK][g_k] = p
 

	
 
    # user explicit permission for user groups
 
    user_user_groups_perms = Permission.get_default_user_group_perms(user_id)
 
    for perm in user_user_groups_perms:
 
        u_k = perm.UserUserGroupToPerm.user_group.users_group_name
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[UK][u_k]
 
        p = _choose_perm(p, cur_perm)
 
        permissions[UK][u_k] = p
 

	
 
    return permissions
 

	
 

	
 
def allowed_api_access(controller_name, whitelist=None, api_key=None):
 
    """
 
    Check if given controller_name is in whitelist API access
 
    """
 
    if not whitelist:
 
        from kallithea import CONFIG
 
        whitelist = aslist(CONFIG.get('api_access_controllers_whitelist'),
 
                           sep=',')
 
        log.debug('whitelist of API access is: %s', whitelist)
 
    api_access_valid = controller_name in whitelist
 
    if api_access_valid:
 
        log.debug('controller:%s is in API whitelist', controller_name)
kallithea/tests/models/test_permissions.py
Show inline comments
 
@@ -506,167 +506,167 @@ class TestPermissions(TestController):
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.g1 = fixture.create_repo_group(u'group1', skip_if_exists=True)
 

	
 
        # enable admin access for user group on repo group
 
        RepoGroupModel().grant_user_group_permission(self.g1,
 
                                                     group_name=self.ug1,
 
                                                     perm='group.admin')
 
        # enable only write access for default user on repo group
 
        RepoGroupModel().grant_user_permission(self.g1,
 
                                               user='default',
 
                                               perm='group.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'] == {u'group1': u'group.write'}
 

	
 
    def test_inactive_user_group_does_not_affect_repo_group_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.g1 = fixture.create_repo_group(u'group1', skip_if_exists=True)
 

	
 
        # enable only write access for user group on repo group
 
        RepoGroupModel().grant_user_group_permission(self.g1,
 
                                                     group_name=self.ug1,
 
                                                     perm='group.write')
 
        # enable admin access for default user on repo group
 
        RepoGroupModel().grant_user_permission(self.g1,
 
                                               user='default',
 
                                               perm='group.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'] == {u'group1': u'group.admin'}
 

	
 
    def test_inactive_user_group_does_not_affect_user_group_permissions(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.ug2 = fixture.create_user_group(u'G2')
 

	
 
        # enable admin access for user group on user group
 
        UserGroupModel().grant_user_group_permission(self.ug2,
 
                                                     user_group=self.ug1,
 
                                                     perm='usergroup.admin')
 
        # enable only write access for default user on user group
 
        UserGroupModel().grant_user_permission(self.ug2,
 
                                               user='default',
 
                                               perm='usergroup.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['user_groups'][u'G1'] == u'usergroup.read'
 
        assert u1_auth.permissions['user_groups'][u'G2'] == u'usergroup.write'
 

	
 
    def test_inactive_user_group_does_not_affect_user_group_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.ug2 = fixture.create_user_group(u'G2')
 

	
 
        # enable only write access for user group on user group
 
        UserGroupModel().grant_user_group_permission(self.ug2,
 
                                                     user_group=self.ug1,
 
                                                     perm='usergroup.write')
 
        # enable admin access for default user on user group
 
        UserGroupModel().grant_user_permission(self.ug2,
 
                                               user='default',
 
                                               perm='usergroup.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['user_groups'][u'G1'] == u'usergroup.read'
 
        assert u1_auth.permissions['user_groups'][u'G2'] == u'usergroup.admin'
 

	
 
    def test_owner_permissions_doesnot_get_overwritten_by_group(self):
 
        # create repo as USER,
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u1)
 

	
 
        # he has permissions of admin as owner
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 
        # set his permission as user group, he should still be admin
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
 
        RepoModel().grant_user_group_permission(self.test_repo,
 
                                                 group_name=self.ug1,
 
                                                 perm='repository.none')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.none' # temporarily, because multiple_counter
 

	
 
    def test_owner_permissions_doesnot_get_overwritten_by_others(self):
 
        # create repo as USER,
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u1)
 

	
 
        # he has permissions of admin as owner
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 
        # set his permission as user, he should still be admin
 
        RepoModel().grant_user_permission(self.test_repo, user=self.u1,
 
                                          perm='repository.none')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 

	
 
    def _test_def_perm_equal(self, user, change_factor=0):
 
        perms = UserToPerm.query() \
 
                .filter(UserToPerm.user == user) \
 
                .all()
 
        assert len(perms) == len(Permission.DEFAULT_USER_PERMISSIONS,)+change_factor, perms
 

	
 
    def test_set_default_permissions(self):
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 

	
 
    def test_set_default_permissions_after_one_is_missing(self):
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 
        # now we delete one, it should be re-created after another call
 
        perms = UserToPerm.query() \
 
                .filter(UserToPerm.user == self.u1) \
 
                .all()
 
        Session().delete(perms[0])
 
        Session().commit()
 

	
 
        self._test_def_perm_equal(user=self.u1, change_factor=-1)
 

	
 
        # create missing one !
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 

	
 
    @parametrize('perm,modify_to', [
 
        ('repository.read', 'repository.none'),
 
        ('group.read', 'group.none'),
 
        ('usergroup.read', 'usergroup.none'),
 
        ('hg.create.repository', 'hg.create.none'),
 
        ('hg.fork.repository', 'hg.fork.none'),
 
        ('hg.register.manual_activate', 'hg.register.auto_activate',)
 
    ])
 
    def test_set_default_permissions_after_modification(self, perm, modify_to):
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 

	
 
        old = Permission.get_by_key(perm)
 
        new = Permission.get_by_key(modify_to)
 
        assert old is not None
 
        assert new is not None
 

	
 
        # now modify permissions
 
        p = UserToPerm.query() \
 
                .filter(UserToPerm.user == self.u1) \
 
                .filter(UserToPerm.permission == old) \
 
                .one()
 
        p.permission = new
 
        Session().commit()
 

	
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
0 comments (0 inline, 0 general)