Changeset - 3505a6be2988
[Not reviewed]
default
0 2 0
Søren Løvborg - 9 years ago 2017-02-23 20:26:27
sorenl@unity3d.com
repogroups: fix private repo recursion check

The purpose of this check is to ensure that we don't recursively assign
"default" user perms for a repo with the "private" flag set (because in
that case, the "default" user perms should always be "no access").

(The check, and this fix, is of course only applicable to Kallithea
instances that have anonymous access enabled to begin with.)

However, the check was only functional if the user was specified as a
username. This is apparently always the case when Kallithea is running,
but was not e.g. the case in the test suite, which consistently passed
a user ID instead of a username.

This commit ensures that the user is always resolved before the check is
made. There's no significant overhead to this, as the code immediately
calls RepoModel().grant_user_permission, which resolved the user anyway.
This change just moves the database lookup a bit earlier.

Fixing this revealed the matching test case to be broken, so it has been
fixed as well.

Down the road, we should eliminate Kallithea's bizarre practice of
passing around usernames and user IDs, in favor of passing actual User
objects. That'll get rid of mistakes like these, as well as repeated
needless database lookups.
2 files changed with 7 insertions and 3 deletions:
0 comments (0 inline, 0 general)
kallithea/model/repo_group.py
Show inline comments
 
@@ -9,387 +9,389 @@
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.repo_group
 
~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
repo group model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jan 25, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import os
 
import logging
 
import traceback
 
import shutil
 
import datetime
 

	
 
from kallithea.lib.utils2 import LazyProperty
 

	
 
from kallithea.model.base import BaseModel
 
from kallithea.model.db import RepoGroup, Ui, UserRepoGroupToPerm, \
 
    User, Permission, UserGroupRepoGroupToPerm, UserGroup, Repository
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoGroupModel(BaseModel):
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Gets the repositories root path from database
 
        """
 

	
 
        q = Ui.get_by_key('paths', '/')
 
        return q.ui_value
 

	
 
    def _create_default_perms(self, new_group):
 
        # create default permission
 
        default_perm = 'group.read'
 
        def_user = User.get_default_user()
 
        for p in def_user.user_perms:
 
            if p.permission.permission_name.startswith('group.'):
 
                default_perm = p.permission.permission_name
 
                break
 

	
 
        repo_group_to_perm = UserRepoGroupToPerm()
 
        repo_group_to_perm.permission = Permission.get_by_key(default_perm)
 

	
 
        repo_group_to_perm.group = new_group
 
        repo_group_to_perm.user_id = def_user.user_id
 
        return repo_group_to_perm
 

	
 
    def _create_group(self, group_name):
 
        """
 
        makes repository group on filesystem
 

	
 
        :param repo_name:
 
        :param parent_id:
 
        """
 

	
 
        create_path = os.path.join(self.repos_path, group_name)
 
        log.debug('creating new group in %s', create_path)
 

	
 
        if os.path.isdir(create_path):
 
            raise Exception('That directory already exists !')
 

	
 
        os.makedirs(create_path)
 
        log.debug('Created group in %s', create_path)
 

	
 
    def _rename_group(self, old, new):
 
        """
 
        Renames a group on filesystem
 

	
 
        :param group_name:
 
        """
 

	
 
        if old == new:
 
            log.debug('skipping group rename')
 
            return
 

	
 
        log.debug('renaming repository group from %s to %s', old, new)
 

	
 
        old_path = os.path.join(self.repos_path, old)
 
        new_path = os.path.join(self.repos_path, new)
 

	
 
        log.debug('renaming repos paths from %s to %s', old_path, new_path)
 

	
 
        if os.path.isdir(new_path):
 
            raise Exception('Was trying to rename to already '
 
                            'existing dir %s' % new_path)
 
        shutil.move(old_path, new_path)
 

	
 
    def _delete_group(self, group, force_delete=False):
 
        """
 
        Deletes a group from a filesystem
 

	
 
        :param group: instance of group from database
 
        :param force_delete: use shutil rmtree to remove all objects
 
        """
 
        paths = group.full_path.split(RepoGroup.url_sep())
 
        paths = os.sep.join(paths)
 

	
 
        rm_path = os.path.join(self.repos_path, paths)
 
        log.info("Removing group %s", rm_path)
 
        # delete only if that path really exists
 
        if os.path.isdir(rm_path):
 
            if force_delete:
 
                shutil.rmtree(rm_path)
 
            else:
 
                #archive that group`
 
                _now = datetime.datetime.now()
 
                _ms = str(_now.microsecond).rjust(6, '0')
 
                _d = 'rm__%s_GROUP_%s' % (_now.strftime('%Y%m%d_%H%M%S_' + _ms),
 
                                          group.name)
 
                shutil.move(rm_path, os.path.join(self.repos_path, _d))
 

	
 
    def create(self, group_name, group_description, owner, parent=None,
 
               just_db=False, copy_permissions=False):
 
        try:
 
            owner = User.guess_instance(owner)
 
            parent_group = RepoGroup.guess_instance(parent)
 
            new_repo_group = RepoGroup()
 
            new_repo_group.owner = owner
 
            new_repo_group.group_description = group_description or group_name
 
            new_repo_group.parent_group = parent_group
 
            new_repo_group.group_name = new_repo_group.get_new_name(group_name)
 

	
 
            self.sa.add(new_repo_group)
 

	
 
            # create an ADMIN permission for owner except if we're super admin,
 
            # later owner should go into the owner field of groups
 
            if not owner.is_admin:
 
                self.grant_user_permission(repo_group=new_repo_group,
 
                                           user=owner, perm='group.admin')
 

	
 
            if parent_group and copy_permissions:
 
                # copy permissions from parent
 
                user_perms = UserRepoGroupToPerm.query() \
 
                    .filter(UserRepoGroupToPerm.group == parent_group).all()
 

	
 
                group_perms = UserGroupRepoGroupToPerm.query() \
 
                    .filter(UserGroupRepoGroupToPerm.group == parent_group).all()
 

	
 
                for perm in user_perms:
 
                    # don't copy over the permission for user who is creating
 
                    # this group, if he is not super admin he get's admin
 
                    # permission set above
 
                    if perm.user != owner or owner.is_admin:
 
                        UserRepoGroupToPerm.create(perm.user, new_repo_group, perm.permission)
 

	
 
                for perm in group_perms:
 
                    UserGroupRepoGroupToPerm.create(perm.users_group, new_repo_group, perm.permission)
 
            else:
 
                perm_obj = self._create_default_perms(new_repo_group)
 
                self.sa.add(perm_obj)
 

	
 
            if not just_db:
 
                # we need to flush here, in order to check if database won't
 
                # throw any exceptions, create filesystem dirs at the very end
 
                self.sa.flush()
 
                self._create_group(new_repo_group.group_name)
 

	
 
            return new_repo_group
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def _update_permissions(self, repo_group, perms_new=None,
 
                            perms_updates=None, recursive=None,
 
                            check_perms=True):
 
        from kallithea.model.repo import RepoModel
 
        from kallithea.lib.auth import HasUserGroupPermissionAny
 

	
 
        if not perms_new:
 
            perms_new = []
 
        if not perms_updates:
 
            perms_updates = []
 

	
 
        def _set_perm_user(obj, user, perm):
 
            if isinstance(obj, RepoGroup):
 
                self.grant_user_permission(repo_group=obj, user=user, perm=perm)
 
            elif isinstance(obj, Repository):
 
                user = User.guess_instance(user)
 

	
 
                # private repos will not allow to change the default permissions
 
                # using recursive mode
 
                if obj.private and user == User.DEFAULT_USER:
 
                if obj.private and user.username == User.DEFAULT_USER:
 
                    return
 

	
 
                # we set group permission but we have to switch to repo
 
                # permission
 
                perm = perm.replace('group.', 'repository.')
 
                RepoModel().grant_user_permission(
 
                    repo=obj, user=user, perm=perm
 
                )
 

	
 
        def _set_perm_group(obj, users_group, perm):
 
            if isinstance(obj, RepoGroup):
 
                self.grant_user_group_permission(repo_group=obj,
 
                                                  group_name=users_group,
 
                                                  perm=perm)
 
            elif isinstance(obj, Repository):
 
                # we set group permission but we have to switch to repo
 
                # permission
 
                perm = perm.replace('group.', 'repository.')
 
                RepoModel().grant_user_group_permission(
 
                    repo=obj, group_name=users_group, perm=perm
 
                )
 

	
 
        # start updates
 
        updates = []
 
        log.debug('Now updating permissions for %s in recursive mode:%s',
 
                  repo_group, recursive)
 

	
 
        for obj in repo_group.recursive_groups_and_repos():
 
            # iterated obj is an instance of a repos group or repository in
 
            # that group, recursive option can be: none, repos, groups, all
 
            if recursive == 'all':
 
                pass
 
            elif recursive == 'repos':
 
                # skip groups, other than this one
 
                if isinstance(obj, RepoGroup) and not obj == repo_group:
 
                    continue
 
            elif recursive == 'groups':
 
                # skip repos
 
                if isinstance(obj, Repository):
 
                    continue
 
            else:  # recursive == 'none': # DEFAULT don't apply to iterated objects
 
                obj = repo_group
 
                # also we do a break at the end of this loop.
 

	
 
            # update permissions
 
            for member, perm, member_type in perms_updates:
 
                ## set for user
 
                if member_type == 'user':
 
                    # this updates also current one if found
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                ## set for user group
 
                else:
 
                    #check if we have permissions to alter this usergroup
 
                    req_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin')
 
                    if not check_perms or HasUserGroupPermissionAny(*req_perms)(member):
 
                        _set_perm_group(obj, users_group=member, perm=perm)
 
            # set new permissions
 
            for member, perm, member_type in perms_new:
 
                if member_type == 'user':
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                else:
 
                    #check if we have permissions to alter this usergroup
 
                    req_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin')
 
                    if not check_perms or HasUserGroupPermissionAny(*req_perms)(member):
 
                        _set_perm_group(obj, users_group=member, perm=perm)
 
            updates.append(obj)
 
            # if it's not recursive call for all,repos,groups
 
            # break the loop and don't proceed with other changes
 
            if recursive not in ['all', 'repos', 'groups']:
 
                break
 

	
 
        return updates
 

	
 
    def update(self, repo_group, form_data):
 

	
 
        try:
 
            repo_group = RepoGroup.guess_instance(repo_group)
 
            old_path = repo_group.full_path
 

	
 
            # change properties
 
            repo_group.group_description = form_data['group_description']
 
            repo_group.parent_group_id = form_data['parent_group_id']
 
            repo_group.enable_locking = form_data['enable_locking']
 

	
 
            repo_group.parent_group = RepoGroup.get(form_data['parent_group_id'])
 
            repo_group.group_name = repo_group.get_new_name(form_data['group_name'])
 
            new_path = repo_group.full_path
 
            self.sa.add(repo_group)
 

	
 
            # iterate over all members of this groups and do fixes
 
            # set locking if given
 
            # if obj is a repoGroup also fix the name of the group according
 
            # to the parent
 
            # if obj is a Repo fix it's name
 
            # this can be potentially heavy operation
 
            for obj in repo_group.recursive_groups_and_repos():
 
                #set the value from it's parent
 
                obj.enable_locking = repo_group.enable_locking
 
                if isinstance(obj, RepoGroup):
 
                    new_name = obj.get_new_name(obj.name)
 
                    log.debug('Fixing group %s to new name %s' \
 
                                % (obj.group_name, new_name))
 
                    obj.group_name = new_name
 
                elif isinstance(obj, Repository):
 
                    # we need to get all repositories from this new group and
 
                    # rename them accordingly to new group path
 
                    new_name = obj.get_new_name(obj.just_name)
 
                    log.debug('Fixing repo %s to new name %s' \
 
                                % (obj.repo_name, new_name))
 
                    obj.repo_name = new_name
 
                self.sa.add(obj)
 

	
 
            self._rename_group(old_path, new_path)
 

	
 
            return repo_group
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def delete(self, repo_group, force_delete=False):
 
        repo_group = RepoGroup.guess_instance(repo_group)
 
        try:
 
            self.sa.delete(repo_group)
 
            self._delete_group(repo_group, force_delete)
 
        except Exception:
 
            log.error('Error removing repo_group %s', repo_group)
 
            raise
 

	
 
    def add_permission(self, repo_group, obj, obj_type, perm, recursive):
 
        from kallithea.model.repo import RepoModel
 
        repo_group = RepoGroup.guess_instance(repo_group)
 
        perm = Permission.guess_instance(perm)
 

	
 
        for el in repo_group.recursive_groups_and_repos():
 
            # iterated obj is an instance of a repos group or repository in
 
            # that group, recursive option can be: none, repos, groups, all
 
            if recursive == 'all':
 
                pass
 
            elif recursive == 'repos':
 
                # skip groups, other than this one
 
                if isinstance(el, RepoGroup) and not el == repo_group:
 
                    continue
 
            elif recursive == 'groups':
 
                # skip repos
 
                if isinstance(el, Repository):
 
                    continue
 
            else:  # recursive == 'none': # DEFAULT don't apply to iterated objects
 
                el = repo_group
 
                # also we do a break at the end of this loop.
 

	
 
            if isinstance(el, RepoGroup):
 
                if obj_type == 'user':
 
                    RepoGroupModel().grant_user_permission(el, user=obj, perm=perm)
 
                elif obj_type == 'user_group':
 
                    RepoGroupModel().grant_user_group_permission(el, group_name=obj, perm=perm)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 
            elif isinstance(el, Repository):
 
                # for repos we need to hotfix the name of permission
 
                _perm = perm.permission_name.replace('group.', 'repository.')
 
                if obj_type == 'user':
 
                    RepoModel().grant_user_permission(el, user=obj, perm=_perm)
 
                elif obj_type == 'user_group':
 
                    RepoModel().grant_user_group_permission(el, group_name=obj, perm=_perm)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 
            else:
 
                raise Exception('el should be instance of Repository or '
 
                                'RepositoryGroup got %s instead' % type(el))
 

	
 
            # if it's not recursive call for all,repos,groups
 
            # break the loop and don't proceed with other changes
 
            if recursive not in ['all', 'repos', 'groups']:
 
                break
 

	
 
    def delete_permission(self, repo_group, obj, obj_type, recursive):
 
        """
 
        Revokes permission for repo_group for given obj(user or users_group),
 
        obj_type can be user or user group
 

	
 
        :param repo_group:
 
        :param obj: user or user group id
 
        :param obj_type: user or user group type
 
        :param recursive: recurse to all children of group
 
        """
 
        from kallithea.model.repo import RepoModel
 
        repo_group = RepoGroup.guess_instance(repo_group)
 

	
 
        for el in repo_group.recursive_groups_and_repos():
 
            # iterated obj is an instance of a repos group or repository in
 
            # that group, recursive option can be: none, repos, groups, all
 
            if recursive == 'all':
kallithea/tests/models/test_user_permissions_on_repo_groups.py
Show inline comments
 
import functools
 

	
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.db import RepoGroup, User
 
from kallithea.model.db import RepoGroup, Repository, User
 

	
 
from kallithea.model.meta import Session
 
from kallithea.tests.models.common import _create_project_tree, check_tree_perms, \
 
    _get_perms, _check_expected_count, expected_count, _destroy_project_tree
 

	
 

	
 
test_u1_id = None
 
_get_repo_perms = None
 
_get_group_perms = None
 

	
 

	
 
def permissions_setup_func(group_name=u'g0', perm='group.read', recursive='all',
 
                           user_id=None):
 
    """
 
    Resets all permissions to perm attribute
 
    """
 
    if not user_id:
 
        user_id = test_u1_id
 
        permissions_setup_func(group_name, perm, recursive,
 
                               user_id=User.get_default_user().user_id)
 

	
 
    repo_group = RepoGroup.get_by_group_name(group_name=group_name)
 
    if not repo_group:
 
        raise Exception('Cannot get group %s' % group_name)
 

	
 
    # Start with a baseline that current group can read recursive
 
    perms_updates = [[user_id, 'group.read', 'user']]
 
    RepoGroupModel()._update_permissions(repo_group,
 
                                         perms_updates=perms_updates,
 
                                         recursive='all', check_perms=False)
 

	
 
    perms_updates = [[user_id, perm, 'user']]
 
    RepoGroupModel()._update_permissions(repo_group,
 
                                         perms_updates=perms_updates,
 
                                         recursive=recursive, check_perms=False)
 
    Session().commit()
 

	
 

	
 
def setup_module():
 
    global test_u1_id, _get_repo_perms, _get_group_perms
 
    test_u1 = _create_project_tree()
 
    Session().commit()
 
    test_u1_id = test_u1.user_id
 
    _get_repo_perms = functools.partial(_get_perms, key='repositories',
 
                                        test_u1_id=test_u1_id)
 
    _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
 
                                         test_u1_id=test_u1_id)
 

	
 

	
 
def teardown_module():
 
    _destroy_project_tree(test_u1_id)
 

	
 

	
 
def test_user_permissions_on_group_without_recursive_mode():
 
    # set permission to g0 non-recursive mode
 
    recursive = 'none'
 
    group = u'g0'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    items = [x for x in _get_repo_perms(group, recursive)]
 
    expected = 0
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'repository.read')
 

	
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    expected = 1
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.write')
 

	
 

	
 
def test_user_permissions_on_group_without_recursive_mode_subgroup():
 
    # set permission to g0 non-recursive mode
 
    recursive = 'none'
 
    group = u'g0/g0_1'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    items = [x for x in _get_repo_perms(group, recursive)]
 
    expected = 0
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'repository.read')
 

	
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    expected = 1
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.write')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_mode():
 

	
 
    # set permission to g0 recursive mode, all children including
 
    # other repos and groups should have this permission now set !
 
    recursive = 'all'
 
    group = u'g0'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.write')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.write')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_mode_for_default_user():
 

	
 
    # set permission to g0 recursive mode, all children including
 
    # other repos and groups should have this permission now set !
 
    recursive = 'all'
 
    group = u'g0'
 
    default_user_id = User.get_default_user().user_id
 
    permissions_setup_func(group, 'group.write', recursive=recursive,
 
                           user_id=default_user_id)
 

	
 
    # change default to get perms for default user
 
    _get_repo_perms = functools.partial(_get_perms, key='repositories',
 
                                        test_u1_id=default_user_id)
 
    _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
 
                                         test_u1_id=default_user_id)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.write')
 
        # default user permissions do not "recurse into" private repos
 
        is_private = Repository.get_by_repo_name(name).private
 
        check_tree_perms(name, perm, group, 'repository.none' if is_private else 'repository.write')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.write')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_mode_inner_group():
 
    ## set permission to g0_3 group to none
 
    recursive = 'all'
 
    group = u'g0/g0_3'
 
    permissions_setup_func(group, 'group.none', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.none')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.none')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_mode_deepest():
 
    ## set permission to g0_3 group to none
 
    recursive = 'all'
 
    group = u'g0/g0_1/g0_1_1'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.write')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.write')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_mode_only_with_repos():
 
    ## set permission to g0_3 group to none
 
    recursive = 'all'
 
    group = u'g0/g0_2'
 
    permissions_setup_func(group, 'group.admin', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.admin')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.admin')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_repo_mode_for_default_user():
 
    # set permission to g0/g0_1 recursive repos only mode, all children including
 
    # other repos should have this permission now set, inner groups are excluded!
 
    recursive = 'repos'
 
    group = u'g0/g0_1'
 
    perm = 'group.none'
 
    default_user_id = User.get_default_user().user_id
 

	
 
    permissions_setup_func(group, perm, recursive=recursive,
 
                           user_id=default_user_id)
 

	
 
    # change default to get perms for default user
 
    _get_repo_perms = functools.partial(_get_perms, key='repositories',
 
                                        test_u1_id=default_user_id)
 
    _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
 
                                         test_u1_id=default_user_id)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.none')
 

	
 
    for name, perm in items:
 
        # permission is set with repos only mode, but we also change the permission
 
        # on the group we trigger the apply to children from, thus we need
 
        # to change its permission check
 
        old_perm = 'group.read'
 
        if name == group:
 
            old_perm = perm
 
        check_tree_perms(name, perm, group, old_perm)
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_repo_mode_inner_group():
 
    ## set permission to g0_3 group to none, with recursive repos only
 
    recursive = 'repos'
 
    group = u'g0/g0_3'
 
    perm = 'group.none'
 
    permissions_setup_func(group, perm, recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.none')
 

	
 
    for name, perm in items:
 
        # permission is set with repos only mode, but we also change the permission
 
        # on the group we trigger the apply to children from, thus we need
 
        # to change its permission check
 
        old_perm = 'group.read'
 
        if name == group:
 
            old_perm = perm
 
        check_tree_perms(name, perm, group, old_perm)
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_group_mode_for_default_user():
 
    # set permission to g0/g0_1 with recursive groups only mode, all children including
 
    # other groups should have this permission now set. repositories should
 
    # remain intact as we use groups only mode !
 
    recursive = 'groups'
 
    group = u'g0/g0_1'
 
    default_user_id = User.get_default_user().user_id
 
    permissions_setup_func(group, 'group.write', recursive=recursive,
 
                           user_id=default_user_id)
 

	
 
    # change default to get perms for default user
 
    _get_repo_perms = functools.partial(_get_perms, key='repositories',
 
                                        test_u1_id=default_user_id)
 
    _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
 
                                         test_u1_id=default_user_id)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.read')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.write')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_group_mode_inner_group():
 
    ## set permission to g0_3 group to none, with recursive mode for groups only
 
    recursive = 'groups'
 
    group = u'g0/g0_3'
 
    permissions_setup_func(group, 'group.none', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.read')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.none')
0 comments (0 inline, 0 general)