Changeset - 9d87b8d5ba00
[Not reviewed]
stable
0 3 0
Mads Kiilerich - 10 years ago 2015-07-07 02:19:55
madski@unity3d.com
auth: ignore permissions from in-active user groups (Issue #138)

Tests by Thomas De Schampheleire.

Additionally, rename the unused and seemingly search-replace-massacred
function revokehas_permrevoke_permgrant_perm_perm into revoke_perm.
3 files changed with 227 insertions and 1 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth.py
Show inline comments
 
@@ -267,24 +267,27 @@ def _cached_perms_data(user_id, user_is_
 
        'hg.fork.none', 'hg.fork.repository',
 
        'hg.create.none', 'hg.create.repository',
 
        'hg.usergroup.create.false', 'hg.usergroup.create.true'
 
    ])
 

	
 
    # USER GROUPS comes first
 
    # user group global permissions
 
    user_perms_from_users_groups = Session().query(UserGroupToPerm)\
 
        .options(joinedload(UserGroupToPerm.permission))\
 
        .join((UserGroupMember, UserGroupToPerm.users_group_id ==
 
               UserGroupMember.users_group_id))\
 
        .filter(UserGroupMember.user_id == uid)\
 
        .join((UserGroup, UserGroupMember.users_group_id ==
 
               UserGroup.users_group_id))\
 
        .filter(UserGroup.users_group_active == True)\
 
        .order_by(UserGroupToPerm.users_group_id)\
 
        .all()
 
    # need to group here by groups since user can be in more than
 
    # one group
 
    _grouped = [[x, list(y)] for x, y in
 
                itertools.groupby(user_perms_from_users_groups,
 
                                  lambda x:x.users_group)]
 
    for gr, perms in _grouped:
 
        # since user can be in multiple groups iterate over them and
 
        # select the lowest permissions first (more explicit)
 
        ##TODO: do this^^
 
        if not gr.inherit_default_permissions:
 
@@ -317,24 +320,27 @@ def _cached_perms_data(user_id, user_is_
 
    # check if user is part of user groups for this repository and
 
    # fill in his permission from it. _choose_perm decides of which
 
    # permission should be selected based on selected method
 
    #======================================================================
 

	
 
    # user group for repositories permissions
 
    user_repo_perms_from_users_groups = \
 
     Session().query(UserGroupRepoToPerm, Permission, Repository,)\
 
        .join((Repository, UserGroupRepoToPerm.repository_id ==
 
               Repository.repo_id))\
 
        .join((Permission, UserGroupRepoToPerm.permission_id ==
 
               Permission.permission_id))\
 
        .join((UserGroup, UserGroupRepoToPerm.users_group_id ==
 
               UserGroup.users_group_id))\
 
        .filter(UserGroup.users_group_active == True)\
 
        .join((UserGroupMember, UserGroupRepoToPerm.users_group_id ==
 
               UserGroupMember.users_group_id))\
 
        .filter(UserGroupMember.user_id == uid)\
 
        .all()
 

	
 
    multiple_counter = collections.defaultdict(int)
 
    for perm in user_repo_perms_from_users_groups:
 
        r_k = perm.UserGroupRepoToPerm.repository.repo_name
 
        multiple_counter[r_k] += 1
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[RK][r_k]
 

	
 
@@ -366,24 +372,27 @@ def _cached_perms_data(user_id, user_is_
 
    #======================================================================
 
    #======================================================================
 
    # check if user is part of user groups for this repository groups and
 
    # fill in his permission from it. _choose_perm decides of which
 
    # permission should be selected based on selected method
 
    #======================================================================
 
    # user group for repo groups permissions
 
    user_repo_group_perms_from_users_groups = \
 
     Session().query(UserGroupRepoGroupToPerm, Permission, RepoGroup)\
 
     .join((RepoGroup, UserGroupRepoGroupToPerm.group_id == RepoGroup.group_id))\
 
     .join((Permission, UserGroupRepoGroupToPerm.permission_id
 
            == Permission.permission_id))\
 
     .join((UserGroup, UserGroupRepoGroupToPerm.users_group_id ==
 
            UserGroup.users_group_id))\
 
     .filter(UserGroup.users_group_active == True)\
 
     .join((UserGroupMember, UserGroupRepoGroupToPerm.users_group_id
 
            == UserGroupMember.users_group_id))\
 
     .filter(UserGroupMember.user_id == uid)\
 
     .all()
 

	
 
    multiple_counter = collections.defaultdict(int)
 
    for perm in user_repo_group_perms_from_users_groups:
 
        g_k = perm.UserGroupRepoGroupToPerm.group.group_name
 
        multiple_counter[g_k] += 1
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[GK][g_k]
 
        if multiple_counter[g_k] > 1:
 
@@ -404,24 +413,27 @@ def _cached_perms_data(user_id, user_is_
 
    # !! PERMISSIONS FOR USER GROUPS !!
 
    #======================================================================
 
    # user group for user group permissions
 
    user_group_user_groups_perms = \
 
     Session().query(UserGroupUserGroupToPerm, Permission, UserGroup)\
 
     .join((UserGroup, UserGroupUserGroupToPerm.target_user_group_id
 
            == UserGroup.users_group_id))\
 
     .join((Permission, UserGroupUserGroupToPerm.permission_id
 
            == Permission.permission_id))\
 
     .join((UserGroupMember, UserGroupUserGroupToPerm.user_group_id
 
            == UserGroupMember.users_group_id))\
 
     .filter(UserGroupMember.user_id == uid)\
 
     .join((UserGroup, UserGroupMember.users_group_id ==
 
            UserGroup.users_group_id), aliased=True, from_joinpoint=True)\
 
     .filter(UserGroup.users_group_active == True)\
 
     .all()
 

	
 
    multiple_counter = collections.defaultdict(int)
 
    for perm in user_group_user_groups_perms:
 
        g_k = perm.UserGroupUserGroupToPerm.target_user_group.users_group_name
 
        multiple_counter[g_k] += 1
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[UK][g_k]
 
        if multiple_counter[g_k] > 1:
 
            p = _choose_perm(p, cur_perm)
 
        permissions[UK][g_k] = p
 

	
kallithea/model/user_group.py
Show inline comments
 
@@ -237,25 +237,25 @@ class UserGroupModel(BaseModel):
 
            .filter(UserGroupToPerm.users_group == user_group)\
 
            .filter(UserGroupToPerm.permission == perm)\
 
            .scalar()
 
        if _perm:
 
            return
 

	
 
        new = UserGroupToPerm()
 
        new.users_group = user_group
 
        new.permission = perm
 
        self.sa.add(new)
 
        return new
 

	
 
    def revokehas_permrevoke_permgrant_perm_perm(self, user_group, perm):
 
    def revoke_perm(self, user_group, perm):
 
        user_group = self._get_user_group(user_group)
 
        perm = self._get_perm(perm)
 

	
 
        obj = UserGroupToPerm.query()\
 
            .filter(UserGroupToPerm.users_group == user_group)\
 
            .filter(UserGroupToPerm.permission == perm).scalar()
 
        if obj:
 
            self.sa.delete(obj)
 

	
 
    def grant_user_permission(self, user_group, user, perm):
 
        """
 
        Grant permission for user on given user group, or update
kallithea/tests/models/test_permissions.py
Show inline comments
 
@@ -52,24 +52,26 @@ class TestPermissions(BaseTestCase):
 

	
 
        UserModel().delete(self.u1)
 
        UserModel().delete(self.u2)
 
        UserModel().delete(self.u3)
 
        UserModel().delete(self.a1)
 
        if hasattr(self, 'g1'):
 
            RepoGroupModel().delete(self.g1.group_id)
 
        if hasattr(self, 'g2'):
 
            RepoGroupModel().delete(self.g2.group_id)
 

	
 
        if hasattr(self, 'ug1'):
 
            UserGroupModel().delete(self.ug1, force=True)
 
        if hasattr(self, 'ug2'):
 
            UserGroupModel().delete(self.ug2, force=True)
 

	
 
        Session().commit()
 

	
 
    def test_default_perms_set(self):
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set([u'hg.create.repository', u'repository.read',
 
                           u'hg.register.manual_activate']),
 
            'repositories': {u'vcs_test_hg': u'repository.read'}
 
        }
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
@@ -425,24 +427,236 @@ class TestPermissions(BaseTestCase):
 
        self.u1.inherit_default_permissions = False
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        # this user will have non inherited permissions from he's
 
        # explicitly set permissions
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         set(['hg.create.repository', 'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read', 'hg.create.write_on_repogroup.true']))
 

	
 
    def test_inactive_user_group_does_not_affect_global_permissions(self):
 
        # Issue #138: Inactive User Groups affecting permissions
 
        # Add user to inactive user group, set specific permissions on user
 
        # group and disable inherit-from-default. User permissions should still
 
        # inherit from default.
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # enable fork and create on user group
 
        user_group_model.revoke_perm(self.ug1, perm='hg.create.none')
 
        user_group_model.grant_perm(self.ug1, perm='hg.create.repository')
 
        user_group_model.revoke_perm(self.ug1, perm='hg.fork.none')
 
        user_group_model.grant_perm(self.ug1, perm='hg.fork.repository')
 

	
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.repository')
 
        user_model.grant_perm(usr, 'hg.create.none')
 
        user_model.revoke_perm(usr, 'hg.fork.repository')
 
        user_model.grant_perm(usr, 'hg.fork.none')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 

	
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         set(['hg.create.none', 'hg.fork.none',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read',
 
                              'hg.create.write_on_repogroup.true']))
 

	
 
    def test_inactive_user_group_does_not_affect_global_permissions_inverse(self):
 
        # Issue #138: Inactive User Groups affecting permissions
 
        # Add user to inactive user group, set specific permissions on user
 
        # group and disable inherit-from-default. User permissions should still
 
        # inherit from default.
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # disable fork and create on user group
 
        user_group_model.revoke_perm(self.ug1, perm='hg.create.repository')
 
        user_group_model.grant_perm(self.ug1, perm='hg.create.none')
 
        user_group_model.revoke_perm(self.ug1, perm='hg.fork.repository')
 
        user_group_model.grant_perm(self.ug1, perm='hg.fork.none')
 

	
 
        user_model = UserModel()
 
        # enable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.none')
 
        user_model.grant_perm(usr, 'hg.create.repository')
 
        user_model.revoke_perm(usr, 'hg.fork.none')
 
        user_model.grant_perm(usr, 'hg.fork.repository')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 

	
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         set(['hg.create.repository', 'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read',
 
                              'hg.create.write_on_repogroup.true']))
 

	
 
    def test_inactive_user_group_does_not_affect_repo_permissions(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # note: make u2 repo owner rather than u1, because the owner always has
 
        # admin permissions
 
        self.test_repo = fixture.create_repo(name='myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u2)
 

	
 
        # enable admin access for user group on repo
 
        RepoModel().grant_user_group_permission(self.test_repo,
 
                                                group_name=self.ug1,
 
                                                perm='repository.admin')
 
        # enable only write access for default user on repo
 
        RepoModel().grant_user_permission(self.test_repo,
 
                                          user='default',
 
                                          perm='repository.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
 
                         'repository.write')
 

	
 
    def test_inactive_user_group_does_not_affect_repo_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # note: make u2 repo owner rather than u1, because the owner always has
 
        # admin permissions
 
        self.test_repo = fixture.create_repo(name='myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u2)
 

	
 
        # enable only write access for user group on repo
 
        RepoModel().grant_user_group_permission(self.test_repo,
 
                                                group_name=self.ug1,
 
                                                perm='repository.write')
 
        # enable admin access for default user on repo
 
        RepoModel().grant_user_permission(self.test_repo,
 
                                          user='default',
 
                                          perm='repository.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
 
                         'repository.admin')
 

	
 
    def test_inactive_user_group_does_not_affect_repo_group_permissions(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.g1 = fixture.create_repo_group('group1', skip_if_exists=True)
 

	
 
        # enable admin access for user group on repo group
 
        RepoGroupModel().grant_user_group_permission(self.g1,
 
                                                     group_name=self.ug1,
 
                                                     perm='group.admin')
 
        # enable only write access for default user on repo group
 
        RepoGroupModel().grant_user_permission(self.g1,
 
                                               user='default',
 
                                               perm='group.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.write'})
 

	
 
    def test_inactive_user_group_does_not_affect_repo_group_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.g1 = fixture.create_repo_group('group1', skip_if_exists=True)
 

	
 
        # enable only write access for user group on repo group
 
        RepoGroupModel().grant_user_group_permission(self.g1,
 
                                                     group_name=self.ug1,
 
                                                     perm='group.write')
 
        # enable admin access for default user on repo group
 
        RepoGroupModel().grant_user_permission(self.g1,
 
                                               user='default',
 
                                               perm='group.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.admin'})
 

	
 
    def test_inactive_user_group_does_not_affect_user_group_permissions(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.ug2 = fixture.create_user_group('G2')
 

	
 
        # enable admin access for user group on user group
 
        UserGroupModel().grant_user_group_permission(self.ug2,
 
                                                     user_group=self.ug1,
 
                                                     perm='usergroup.admin')
 
        # enable only write access for default user on user group
 
        UserGroupModel().grant_user_permission(self.ug2,
 
                                               user='default',
 
                                               perm='usergroup.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['user_groups'][u'G1'], u'usergroup.read')
 
        self.assertEqual(u1_auth.permissions['user_groups'][u'G2'], u'usergroup.write')
 

	
 
    def test_inactive_user_group_does_not_affect_user_group_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group('G1')
 
        self.ug1.inherit_default_permissions = False
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.ug2 = fixture.create_user_group('G2')
 

	
 
        # enable only write access for user group on user group
 
        UserGroupModel().grant_user_group_permission(self.ug2,
 
                                                     user_group=self.ug1,
 
                                                     perm='usergroup.write')
 
        # enable admin access for default user on user group
 
        UserGroupModel().grant_user_permission(self.ug2,
 
                                               user='default',
 
                                               perm='usergroup.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['user_groups'][u'G1'], u'usergroup.read')
 
        self.assertEqual(u1_auth.permissions['user_groups'][u'G2'], u'usergroup.admin')
 

	
 
    def test_owner_permissions_doesnot_get_overwritten_by_group(self):
 
        #create repo as USER,
 
        self.test_repo = fixture.create_repo(name='myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u1)
 

	
 
        #he has permissions of admin as owner
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
 
                         'repository.admin')
 
        #set his permission as user group, he should still be admin
 
        self.ug1 = fixture.create_user_group('G1')
0 comments (0 inline, 0 general)