Changeset - 6d0573ba0721
[Not reviewed]
default
0 3 0
Mads Kiilerich - 7 years ago 2018-12-29 19:16:56
mads@kiilerich.com
auth: drop "multiple_counter" from computing permissions

This seems to have been something about having some permissions override
existing permissions. It is not clear to me why anybody should want that.

test_user_group_permissions_on_repo_groups.py seems to have been testing for
something we don't want. The new behaviour seems more reasonable. The test user
is inhering access from the default user, and thus in this case getting read
access (except when private).
3 files changed with 10 insertions and 19 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth.py
Show inline comments
 
@@ -96,469 +96,460 @@ def get_crypt_password(password):
 
    Cryptographic function used for password hashing based on pybcrypt
 
    or Python's own OpenSSL wrapper on windows
 

	
 
    :param password: password to hash
 
    """
 
    if is_windows:
 
        return hashlib.sha256(password).hexdigest()
 
    elif is_unix:
 
        import bcrypt
 
        return bcrypt.hashpw(safe_str(password), bcrypt.gensalt(10))
 
    else:
 
        raise Exception('Unknown or unsupported platform %s'
 
                        % __platform__)
 

	
 

	
 
def check_password(password, hashed):
 
    """
 
    Checks matching password with it's hashed value, runs different
 
    implementation based on platform it runs on
 

	
 
    :param password: password
 
    :param hashed: password in hashed form
 
    """
 

	
 
    if is_windows:
 
        return hashlib.sha256(password).hexdigest() == hashed
 
    elif is_unix:
 
        import bcrypt
 
        try:
 
            return bcrypt.checkpw(safe_str(password), safe_str(hashed))
 
        except ValueError as e:
 
            # bcrypt will throw ValueError 'Invalid hashed_password salt' on all password errors
 
            log.error('error from bcrypt checking password: %s', e)
 
            return False
 
    else:
 
        raise Exception('Unknown or unsupported platform %s'
 
                        % __platform__)
 

	
 

	
 
def _cached_perms_data(user_id, user_is_admin):
 
    RK = 'repositories'
 
    GK = 'repositories_groups'
 
    UK = 'user_groups'
 
    GLOBAL = 'global'
 
    PERM_WEIGHTS = Permission.PERM_WEIGHTS
 
    permissions = {RK: {}, GK: {}, UK: {}, GLOBAL: set()}
 

	
 
    def _choose_perm(new_perm, cur_perm):
 
        new_perm_val = PERM_WEIGHTS[new_perm]
 
        cur_perm_val = PERM_WEIGHTS[cur_perm]
 
        if new_perm_val > cur_perm_val:
 
            return new_perm
 
        return cur_perm
 

	
 
    #======================================================================
 
    # fetch default permissions
 
    #======================================================================
 
    default_user = User.get_by_username('default', cache=True)
 
    default_user_id = default_user.user_id
 

	
 
    default_repo_perms = Permission.get_default_perms(default_user_id)
 
    default_repo_groups_perms = Permission.get_default_group_perms(default_user_id)
 
    default_user_group_perms = Permission.get_default_user_group_perms(default_user_id)
 

	
 
    if user_is_admin:
 
        #==================================================================
 
        # admin users have all rights;
 
        # based on default permissions, just set everything to admin
 
        #==================================================================
 
        permissions[GLOBAL].add('hg.admin')
 
        permissions[GLOBAL].add('hg.create.write_on_repogroup.true')
 

	
 
        # repositories
 
        for perm in default_repo_perms:
 
            r_k = perm.UserRepoToPerm.repository.repo_name
 
            p = 'repository.admin'
 
            permissions[RK][r_k] = p
 

	
 
        # repository groups
 
        for perm in default_repo_groups_perms:
 
            rg_k = perm.UserRepoGroupToPerm.group.group_name
 
            p = 'group.admin'
 
            permissions[GK][rg_k] = p
 

	
 
        # user groups
 
        for perm in default_user_group_perms:
 
            u_k = perm.UserUserGroupToPerm.user_group.users_group_name
 
            p = 'usergroup.admin'
 
            permissions[UK][u_k] = p
 
        return permissions
 

	
 
    #==================================================================
 
    # SET DEFAULTS GLOBAL, REPOS, REPOSITORY GROUPS
 
    #==================================================================
 

	
 
    # default global permissions taken from the default user
 
    default_global_perms = UserToPerm.query() \
 
        .filter(UserToPerm.user_id == default_user_id) \
 
        .options(joinedload(UserToPerm.permission))
 

	
 
    for perm in default_global_perms:
 
        permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # defaults for repositories, taken from default user
 
    for perm in default_repo_perms:
 
        r_k = perm.UserRepoToPerm.repository.repo_name
 
        if perm.Repository.owner_id == user_id:
 
            p = 'repository.admin'
 
        elif perm.Repository.private:
 
            p = 'repository.none'
 
        else:
 
            p = perm.Permission.permission_name
 
        permissions[RK][r_k] = p
 

	
 
    # defaults for repository groups taken from default user permission
 
    # on given group
 
    for perm in default_repo_groups_perms:
 
        rg_k = perm.UserRepoGroupToPerm.group.group_name
 
        p = perm.Permission.permission_name
 
        permissions[GK][rg_k] = p
 

	
 
    # defaults for user groups taken from default user permission
 
    # on given user group
 
    for perm in default_user_group_perms:
 
        u_k = perm.UserUserGroupToPerm.user_group.users_group_name
 
        p = perm.Permission.permission_name
 
        permissions[UK][u_k] = p
 

	
 
    #======================================================================
 
    # !! Augment GLOBALS with user permissions if any found !!
 
    #======================================================================
 

	
 
    # USER GROUPS comes first
 
    # user group global permissions
 
    user_perms_from_users_groups = Session().query(UserGroupToPerm) \
 
        .options(joinedload(UserGroupToPerm.permission)) \
 
        .join((UserGroupMember, UserGroupToPerm.users_group_id ==
 
               UserGroupMember.users_group_id)) \
 
        .filter(UserGroupMember.user_id == user_id) \
 
        .join((UserGroup, UserGroupMember.users_group_id ==
 
               UserGroup.users_group_id)) \
 
        .filter(UserGroup.users_group_active == True) \
 
        .order_by(UserGroupToPerm.users_group_id) \
 
        .all()
 
    # need to group here by groups since user can be in more than
 
    # one group
 
    _grouped = [[x, list(y)] for x, y in
 
                itertools.groupby(user_perms_from_users_groups,
 
                                  lambda x:x.users_group)]
 
    for gr, perms in _grouped:
 
        for perm in perms:
 
            permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # user specific global permissions
 
    user_perms = Session().query(UserToPerm) \
 
            .options(joinedload(UserToPerm.permission)) \
 
            .filter(UserToPerm.user_id == user_id).all()
 

	
 
    for perm in user_perms:
 
        permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # for each kind of global permissions, only keep the one with heighest weight
 
    kind_max_perm = {}
 
    for perm in sorted(permissions[GLOBAL], key=lambda n: PERM_WEIGHTS[n]):
 
        kind = perm.rsplit('.', 1)[0]
 
        kind_max_perm[kind] = perm
 
    permissions[GLOBAL] = set(kind_max_perm.values())
 
    ## END GLOBAL PERMISSIONS
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR REPOSITORIES !!
 
    #======================================================================
 
    #======================================================================
 
    # check if user is part of user groups for this repository and
 
    # fill in his permission from it.
 
    #======================================================================
 

	
 
    # user group for repositories permissions
 
    user_repo_perms_from_users_groups = \
 
     Session().query(UserGroupRepoToPerm, Permission, Repository,) \
 
        .join((Repository, UserGroupRepoToPerm.repository_id ==
 
               Repository.repo_id)) \
 
        .join((Permission, UserGroupRepoToPerm.permission_id ==
 
               Permission.permission_id)) \
 
        .join((UserGroup, UserGroupRepoToPerm.users_group_id ==
 
               UserGroup.users_group_id)) \
 
        .filter(UserGroup.users_group_active == True) \
 
        .join((UserGroupMember, UserGroupRepoToPerm.users_group_id ==
 
               UserGroupMember.users_group_id)) \
 
        .filter(UserGroupMember.user_id == user_id) \
 
        .all()
 

	
 
    multiple_counter = collections.defaultdict(int)
 
    for perm in user_repo_perms_from_users_groups:
 
        r_k = perm.UserGroupRepoToPerm.repository.repo_name
 
        multiple_counter[r_k] += 1
 
        cur_perm = permissions[RK][r_k]
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[RK][r_k]
 
        if multiple_counter[r_k] > 1:
 
            p = _choose_perm(p, cur_perm)
 
        p = _choose_perm(p, cur_perm)
 
        permissions[RK][r_k] = p
 

	
 
    # user permissions for repositories
 
    user_repo_perms = Permission.get_default_perms(user_id)
 
    for perm in user_repo_perms:
 
        r_k = perm.UserRepoToPerm.repository.repo_name
 
        cur_perm = permissions[RK][r_k]
 
        p = perm.Permission.permission_name
 
        p = _choose_perm(p, cur_perm)
 
        permissions[RK][r_k] = p
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR REPOSITORY GROUPS !!
 
    #======================================================================
 
    #======================================================================
 
    # check if user is part of user groups for this repository groups and
 
    # fill in his permission from it.
 
    #======================================================================
 
    # user group for repo groups permissions
 
    user_repo_group_perms_from_users_groups = \
 
     Session().query(UserGroupRepoGroupToPerm, Permission, RepoGroup) \
 
     .join((RepoGroup, UserGroupRepoGroupToPerm.group_id == RepoGroup.group_id)) \
 
     .join((Permission, UserGroupRepoGroupToPerm.permission_id
 
            == Permission.permission_id)) \
 
     .join((UserGroup, UserGroupRepoGroupToPerm.users_group_id ==
 
            UserGroup.users_group_id)) \
 
     .filter(UserGroup.users_group_active == True) \
 
     .join((UserGroupMember, UserGroupRepoGroupToPerm.users_group_id
 
            == UserGroupMember.users_group_id)) \
 
     .filter(UserGroupMember.user_id == user_id) \
 
     .all()
 

	
 
    multiple_counter = collections.defaultdict(int)
 
    for perm in user_repo_group_perms_from_users_groups:
 
        g_k = perm.UserGroupRepoGroupToPerm.group.group_name
 
        multiple_counter[g_k] += 1
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[GK][g_k]
 
        if multiple_counter[g_k] > 1:
 
            p = _choose_perm(p, cur_perm)
 
        p = _choose_perm(p, cur_perm)
 
        permissions[GK][g_k] = p
 

	
 
    # user explicit permissions for repository groups
 
    user_repo_groups_perms = Permission.get_default_group_perms(user_id)
 
    for perm in user_repo_groups_perms:
 
        rg_k = perm.UserRepoGroupToPerm.group.group_name
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[GK][rg_k]
 
        p = _choose_perm(p, cur_perm)
 
        permissions[GK][rg_k] = p
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR USER GROUPS !!
 
    #======================================================================
 
    # user group for user group permissions
 
    user_group_user_groups_perms = \
 
     Session().query(UserGroupUserGroupToPerm, Permission, UserGroup) \
 
     .join((UserGroup, UserGroupUserGroupToPerm.target_user_group_id
 
            == UserGroup.users_group_id)) \
 
     .join((Permission, UserGroupUserGroupToPerm.permission_id
 
            == Permission.permission_id)) \
 
     .join((UserGroupMember, UserGroupUserGroupToPerm.user_group_id
 
            == UserGroupMember.users_group_id)) \
 
     .filter(UserGroupMember.user_id == user_id) \
 
     .join((UserGroup, UserGroupMember.users_group_id ==
 
            UserGroup.users_group_id), aliased=True, from_joinpoint=True) \
 
     .filter(UserGroup.users_group_active == True) \
 
     .all()
 

	
 
    multiple_counter = collections.defaultdict(int)
 
    for perm in user_group_user_groups_perms:
 
        g_k = perm.UserGroupUserGroupToPerm.target_user_group.users_group_name
 
        multiple_counter[g_k] += 1
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[UK][g_k]
 
        if multiple_counter[g_k] > 1:
 
            p = _choose_perm(p, cur_perm)
 
        p = _choose_perm(p, cur_perm)
 
        permissions[UK][g_k] = p
 

	
 
    # user explicit permission for user groups
 
    user_user_groups_perms = Permission.get_default_user_group_perms(user_id)
 
    for perm in user_user_groups_perms:
 
        u_k = perm.UserUserGroupToPerm.user_group.users_group_name
 
        p = perm.Permission.permission_name
 
        cur_perm = permissions[UK][u_k]
 
        p = _choose_perm(p, cur_perm)
 
        permissions[UK][u_k] = p
 

	
 
    return permissions
 

	
 

	
 
def allowed_api_access(controller_name, whitelist=None, api_key=None):
 
    """
 
    Check if given controller_name is in whitelist API access
 
    """
 
    if not whitelist:
 
        from kallithea import CONFIG
 
        whitelist = aslist(CONFIG.get('api_access_controllers_whitelist'),
 
                           sep=',')
 
        log.debug('whitelist of API access is: %s', whitelist)
 
    api_access_valid = controller_name in whitelist
 
    if api_access_valid:
 
        log.debug('controller:%s is in API whitelist', controller_name)
 
    else:
 
        msg = 'controller: %s is *NOT* in API whitelist' % (controller_name)
 
        if api_key:
 
            # if we use API key and don't have access it's a warning
 
            log.warning(msg)
 
        else:
 
            log.debug(msg)
 
    return api_access_valid
 

	
 

	
 
class AuthUser(object):
 
    """
 
    Represents a Kallithea user, including various authentication and
 
    authorization information. Typically used to store the current user,
 
    but is also used as a generic user information data structure in
 
    parts of the code, e.g. user management.
 

	
 
    Constructed from a database `User` object, a user ID or cookie dict,
 
    it looks up the user (if needed) and copies all attributes to itself,
 
    adding various non-persistent data. If lookup fails but anonymous
 
    access to Kallithea is enabled, the default user is loaded instead.
 

	
 
    `AuthUser` does not by itself authenticate users and the constructor
 
    sets the `is_authenticated` field to False. It's up to other parts
 
    of the code to check e.g. if a supplied password is correct, and if
 
    so, set `is_authenticated` to True.
 

	
 
    However, `AuthUser` does refuse to load a user that is not `active`.
 

	
 
    Note that Kallithea distinguishes between the default user (an actual
 
    user in the database with username "default") and "no user" (no actual
 
    User object, AuthUser filled with blank values and username "None").
 

	
 
    If the default user is active, that will always be used instead of
 
    "no user". On the other hand, if the default user is disabled (and
 
    there is no login information), we instead get "no user"; this should
 
    only happen on the login page (as all other requests are redirected).
 

	
 
    `is_default_user` specifically checks if the AuthUser is the user named
 
    "default". Use `is_anonymous` to check for both "default" and "no user".
 
    """
 

	
 
    def __init__(self, user_id=None, dbuser=None, authenticating_api_key=None,
 
            is_external_auth=False):
 

	
 
        self.is_authenticated = False
 
        self.is_external_auth = is_external_auth
 
        self.authenticating_api_key = authenticating_api_key
 

	
 
        # These attributes will be overridden by fill_data, below, unless the
 
        # requested user cannot be found and the default anonymous user is
 
        # not enabled.
 
        self.user_id = None
 
        self.username = None
 
        self.api_key = None
 
        self.name = ''
 
        self.lastname = ''
 
        self.email = ''
 
        self.admin = False
 

	
 
        # Look up database user, if necessary.
 
        if user_id is not None:
 
            log.debug('Auth User lookup by USER ID %s', user_id)
 
            dbuser = UserModel().get(user_id)
 
        else:
 
            # Note: dbuser is allowed to be None.
 
            log.debug('Auth User lookup by database user %s', dbuser)
 

	
 
        is_user_loaded = self._fill_data(dbuser)
 

	
 
        # If user cannot be found, try falling back to anonymous.
 
        if is_user_loaded:
 
            assert dbuser is not None
 
            self.is_default_user = dbuser.is_default_user
 
        else:
 
            default_user = User.get_default_user(cache=True)
 
            is_user_loaded = self._fill_data(default_user)
 
            self.is_default_user = is_user_loaded
 

	
 
        self.is_anonymous = not is_user_loaded or self.is_default_user
 

	
 
        if not self.username:
 
            self.username = 'None'
 

	
 
        log.debug('Auth User is now %s', self)
 

	
 
    def _fill_data(self, dbuser):
 
        """
 
        Copies database fields from a `db.User` to this `AuthUser`. Does
 
        not copy `api_keys` and `permissions` attributes.
 

	
 
        Checks that `dbuser` is `active` (and not None) before copying;
 
        returns True on success.
 
        """
 
        if dbuser is not None and dbuser.active:
 
            log.debug('filling %s data', dbuser)
 
            for k, v in dbuser.get_dict().iteritems():
 
                assert k not in ['api_keys', 'permissions']
 
                setattr(self, k, v)
 
            return True
 
        return False
 

	
 
    @LazyProperty
 
    def permissions(self):
 
        return self.__get_perms(user=self, cache=False)
 

	
 
    def has_repository_permission_level(self, repo_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['repository.read', 'repository.write', 'repository.admin'],
 
            'write': ['repository.write', 'repository.admin'],
 
            'admin': ['repository.admin'],
 
        }[level]
 
        actual_perm = self.permissions['repositories'].get(repo_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo %r (%s): %s (has %r)',
 
            self.username, level, repo_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    def has_repository_group_permission_level(self, repo_group_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['group.read', 'group.write', 'group.admin'],
 
            'write': ['group.write', 'group.admin'],
 
            'admin': ['group.admin'],
 
        }[level]
 
        actual_perm = self.permissions['repositories_groups'].get(repo_group_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo group %r (%s): %s (has %r)',
 
            self.username, level, repo_group_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    def has_user_group_permission_level(self, user_group_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['usergroup.read', 'usergroup.write', 'usergroup.admin'],
 
            'write': ['usergroup.write', 'usergroup.admin'],
 
            'admin': ['usergroup.admin'],
 
        }[level]
 
        actual_perm = self.permissions['user_groups'].get(user_group_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r user group %r (%s): %s (has %r)',
 
            self.username, level, user_group_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    @property
 
    def api_keys(self):
 
        return self._get_api_keys()
 

	
 
    def __get_perms(self, user, cache=False):
 
        """
 
        Fills user permission attribute with permissions taken from database
 
        works for permissions given for repositories, and for permissions that
 
        are granted to groups
 

	
 
        :param user: `AuthUser` instance
 
        """
 
        user_id = user.user_id
 
        user_is_admin = user.is_admin
 

	
 
        log.debug('Getting PERMISSION tree')
 
        compute = conditional_cache('short_term', 'cache_desc',
 
                                    condition=cache, func=_cached_perms_data)
 
        return compute(user_id, user_is_admin)
 

	
 
    def _get_api_keys(self):
 
        api_keys = [self.api_key]
 
        for api_key in UserApiKeys.query() \
 
                .filter_by(user_id=self.user_id, is_expired=False):
kallithea/tests/models/test_permissions.py
Show inline comments
 
@@ -410,263 +410,263 @@ class TestPermissions(TestController):
 
        user_model.grant_perm(usr, 'hg.fork.none')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 

	
 
        assert u1_auth.permissions['global'] == set(['hg.create.none', 'hg.fork.none',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read',
 
                              'hg.create.write_on_repogroup.true'])
 

	
 
    def test_inactive_user_group_does_not_affect_global_permissions_inverse(self):
 
        # Add user to inactive user group, set specific permissions on user
 
        # group and and verify it really is inactive.
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # disable fork and create on user group
 
        user_group_model.revoke_perm(self.ug1, perm='hg.create.repository')
 
        user_group_model.grant_perm(self.ug1, perm='hg.create.none')
 
        user_group_model.revoke_perm(self.ug1, perm='hg.fork.repository')
 
        user_group_model.grant_perm(self.ug1, perm='hg.fork.none')
 

	
 
        user_model = UserModel()
 
        # enable fork and create on default user
 
        usr = 'default'
 
        user_model.revoke_perm(usr, 'hg.create.none')
 
        user_model.grant_perm(usr, 'hg.create.repository')
 
        user_model.revoke_perm(usr, 'hg.fork.none')
 
        user_model.grant_perm(usr, 'hg.fork.repository')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 

	
 
        assert u1_auth.permissions['global'] == set(['hg.create.repository', 'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'hg.extern_activate.auto',
 
                              'repository.read', 'group.read',
 
                              'usergroup.read',
 
                              'hg.create.write_on_repogroup.true'])
 

	
 
    def test_inactive_user_group_does_not_affect_repo_permissions(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # note: make u2 repo owner rather than u1, because the owner always has
 
        # admin permissions
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u2)
 

	
 
        # enable admin access for user group on repo
 
        RepoModel().grant_user_group_permission(self.test_repo,
 
                                                group_name=self.ug1,
 
                                                perm='repository.admin')
 
        # enable only write access for default user on repo
 
        RepoModel().grant_user_permission(self.test_repo,
 
                                          user='default',
 
                                          perm='repository.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.write'
 

	
 
    def test_inactive_user_group_does_not_affect_repo_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        # note: make u2 repo owner rather than u1, because the owner always has
 
        # admin permissions
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u2)
 

	
 
        # enable only write access for user group on repo
 
        RepoModel().grant_user_group_permission(self.test_repo,
 
                                                group_name=self.ug1,
 
                                                perm='repository.write')
 
        # enable admin access for default user on repo
 
        RepoModel().grant_user_permission(self.test_repo,
 
                                          user='default',
 
                                          perm='repository.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 

	
 
    def test_inactive_user_group_does_not_affect_repo_group_permissions(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.g1 = fixture.create_repo_group(u'group1', skip_if_exists=True)
 

	
 
        # enable admin access for user group on repo group
 
        RepoGroupModel().grant_user_group_permission(self.g1,
 
                                                     group_name=self.ug1,
 
                                                     perm='group.admin')
 
        # enable only write access for default user on repo group
 
        RepoGroupModel().grant_user_permission(self.g1,
 
                                               user='default',
 
                                               perm='group.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'] == {u'group1': u'group.write'}
 

	
 
    def test_inactive_user_group_does_not_affect_repo_group_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.g1 = fixture.create_repo_group(u'group1', skip_if_exists=True)
 

	
 
        # enable only write access for user group on repo group
 
        RepoGroupModel().grant_user_group_permission(self.g1,
 
                                                     group_name=self.ug1,
 
                                                     perm='group.write')
 
        # enable admin access for default user on repo group
 
        RepoGroupModel().grant_user_permission(self.g1,
 
                                               user='default',
 
                                               perm='group.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories_groups'] == {u'group1': u'group.admin'}
 

	
 
    def test_inactive_user_group_does_not_affect_user_group_permissions(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.ug2 = fixture.create_user_group(u'G2')
 

	
 
        # enable admin access for user group on user group
 
        UserGroupModel().grant_user_group_permission(self.ug2,
 
                                                     user_group=self.ug1,
 
                                                     perm='usergroup.admin')
 
        # enable only write access for default user on user group
 
        UserGroupModel().grant_user_permission(self.ug2,
 
                                               user='default',
 
                                               perm='usergroup.write')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['user_groups'][u'G1'] == u'usergroup.read'
 
        assert u1_auth.permissions['user_groups'][u'G2'] == u'usergroup.write'
 

	
 
    def test_inactive_user_group_does_not_affect_user_group_permissions_inverse(self):
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        user_group_model = UserGroupModel()
 
        user_group_model.add_user_to_group(self.ug1, self.u1)
 
        user_group_model.update(self.ug1, {'users_group_active': False})
 

	
 
        self.ug2 = fixture.create_user_group(u'G2')
 

	
 
        # enable only write access for user group on user group
 
        UserGroupModel().grant_user_group_permission(self.ug2,
 
                                                     user_group=self.ug1,
 
                                                     perm='usergroup.write')
 
        # enable admin access for default user on user group
 
        UserGroupModel().grant_user_permission(self.ug2,
 
                                               user='default',
 
                                               perm='usergroup.admin')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['user_groups'][u'G1'] == u'usergroup.read'
 
        assert u1_auth.permissions['user_groups'][u'G2'] == u'usergroup.admin'
 

	
 
    def test_owner_permissions_doesnot_get_overwritten_by_group(self):
 
        # create repo as USER,
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u1)
 

	
 
        # he has permissions of admin as owner
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 
        # set his permission as user group, he should still be admin
 
        self.ug1 = fixture.create_user_group(u'G1')
 
        UserGroupModel().add_user_to_group(self.ug1, self.u1)
 
        RepoModel().grant_user_group_permission(self.test_repo,
 
                                                 group_name=self.ug1,
 
                                                 perm='repository.none')
 

	
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.none' # temporarily, because multiple_counter
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 

	
 
    def test_owner_permissions_doesnot_get_overwritten_by_others(self):
 
        # create repo as USER,
 
        self.test_repo = fixture.create_repo(name=u'myownrepo',
 
                                             repo_type='hg',
 
                                             cur_user=self.u1)
 

	
 
        # he has permissions of admin as owner
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 
        # set his permission as user, he should still be admin
 
        RepoModel().grant_user_permission(self.test_repo, user=self.u1,
 
                                          perm='repository.none')
 
        Session().commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        assert u1_auth.permissions['repositories']['myownrepo'] == 'repository.admin'
 

	
 
    def _test_def_perm_equal(self, user, change_factor=0):
 
        perms = UserToPerm.query() \
 
                .filter(UserToPerm.user == user) \
 
                .all()
 
        assert len(perms) == len(Permission.DEFAULT_USER_PERMISSIONS,)+change_factor, perms
 

	
 
    def test_set_default_permissions(self):
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 

	
 
    def test_set_default_permissions_after_one_is_missing(self):
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 
        # now we delete one, it should be re-created after another call
 
        perms = UserToPerm.query() \
 
                .filter(UserToPerm.user == self.u1) \
 
                .all()
 
        Session().delete(perms[0])
 
        Session().commit()
 

	
 
        self._test_def_perm_equal(user=self.u1, change_factor=-1)
 

	
 
        # create missing one !
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 

	
 
    @parametrize('perm,modify_to', [
 
        ('repository.read', 'repository.none'),
 
        ('group.read', 'group.none'),
 
        ('usergroup.read', 'usergroup.none'),
 
        ('hg.create.repository', 'hg.create.none'),
 
        ('hg.fork.repository', 'hg.fork.none'),
 
        ('hg.register.manual_activate', 'hg.register.auto_activate',)
 
    ])
 
    def test_set_default_permissions_after_modification(self, perm, modify_to):
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 

	
 
        old = Permission.get_by_key(perm)
 
        new = Permission.get_by_key(modify_to)
 
        assert old is not None
 
        assert new is not None
 

	
 
        # now modify permissions
 
        p = UserToPerm.query() \
 
                .filter(UserToPerm.user == self.u1) \
 
                .filter(UserToPerm.permission == old) \
 
                .one()
 
        p.permission = new
 
        Session().commit()
 

	
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
kallithea/tests/models/test_user_group_permissions_on_repo_groups.py
Show inline comments
 
import functools
 

	
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.db import RepoGroup
 

	
 
from kallithea.model.meta import Session
 
from kallithea.tests.models.common import _create_project_tree, check_tree_perms, \
 
    _get_perms, _check_expected_count, expected_count, _destroy_project_tree
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.tests.fixture import Fixture
 

	
 
fixture = Fixture()
 

	
 
test_u2_id = None
 
test_u2_gr_id = None
 
_get_repo_perms = None
 
_get_group_perms = None
 

	
 

	
 
def permissions_setup_func(group_name=u'g0', perm='group.read', recursive='all'):
 
    """
 
    Resets all permissions to perm attribute
 
    """
 
    repo_group = RepoGroup.get_by_group_name(group_name=group_name)
 
    if not repo_group:
 
        raise Exception('Cannot get group %s' % group_name)
 

	
 
    # Start with a baseline that current group can read recursive
 
    perms_updates = [[test_u2_gr_id, 'group.read', 'users_group']]
 
    RepoGroupModel()._update_permissions(repo_group,
 
                                         perms_updates=perms_updates,
 
                                         recursive='all', check_perms=False)
 

	
 
    perms_updates = [[test_u2_gr_id, perm, 'users_group']]
 
    RepoGroupModel()._update_permissions(repo_group,
 
                                         perms_updates=perms_updates,
 
                                         recursive=recursive, check_perms=False)
 
    Session().commit()
 

	
 

	
 
def setup_module():
 
    global test_u2_id, test_u2_gr_id, _get_repo_perms, _get_group_perms
 
    test_u2 = _create_project_tree()
 
    Session().commit()
 
    test_u2_id = test_u2.user_id
 

	
 
    gr1 = fixture.create_user_group(u'perms_group_1')
 
    Session().commit()
 
    test_u2_gr_id = gr1.users_group_id
 
    UserGroupModel().add_user_to_group(gr1, user=test_u2_id)
 
    Session().commit()
 

	
 
    _get_repo_perms = functools.partial(_get_perms, key='repositories',
 
                                        test_u1_id=test_u2_id)
 
    _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
 
                                         test_u1_id=test_u2_id)
 

	
 

	
 
def teardown_module():
 
    _destroy_project_tree(test_u2_id)
 
    fixture.destroy_user_group(u'perms_group_1')
 

	
 

	
 
def test_user_permissions_on_group_without_recursive_mode():
 
    # set permission to g0 non-recursive mode
 
    recursive = 'none'
 
    group = u'g0'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    items = [x for x in _get_repo_perms(group, recursive)]
 
    expected = 0
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'repository.read')
 

	
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    expected = 1
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.write')
 

	
 

	
 
def test_user_permissions_on_group_without_recursive_mode_subgroup():
 
    # set permission to g0 non-recursive mode
 
    recursive = 'none'
 
    group = u'g0/g0_1'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    items = [x for x in _get_repo_perms(group, recursive)]
 
    expected = 0
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'repository.read')
 

	
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    expected = 1
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.write')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_mode():
 

	
 
    # set permission to g0 recursive mode, all children including
 
    # other repos and groups should have this permission now set !
 
    recursive = 'all'
 
    group = u'g0'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.write')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.write')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_mode_inner_group():
 
    ## set permission to g0_3 group to none
 
    recursive = 'all'
 
    group = u'g0/g0_3'
 
    permissions_setup_func(group, 'group.none', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.none')
 
        check_tree_perms(name, perm, group, 'repository.none' if name.endswith('_private') else 'repository.read')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.none')
 
        check_tree_perms(name, perm, group, 'group.read')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_mode_deepest():
 
    ## set permission to g0_3 group to none
 
    ## set permission to g0/g0_1/g0_1_1 group to write
 
    recursive = 'all'
 
    group = u'g0/g0_1/g0_1_1'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.write')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.write')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_mode_only_with_repos():
 
    ## set permission to g0_3 group to none
 
    ## set permission to g0/g0_2 group to admin
 
    recursive = 'all'
 
    group = u'g0/g0_2'
 
    permissions_setup_func(group, 'group.admin', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.admin')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.admin')
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_mode_on_repos():
 
    # set permission to g0/g0_1 with recursive mode on just repositories
 
    recursive = 'repos'
 
    group = u'g0/g0_1'
 
    perm = 'group.write'
 
    permissions_setup_func(group, perm, recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.write')
 

	
 
    for name, perm in items:
 
        # permission is set with repos only mode, but we also change the permission
 
        # on the group we trigger the apply to children from, thus we need
 
        # to change its permission check
 
        old_perm = 'group.read'
 
        if name == group:
 
            old_perm = perm
 
        check_tree_perms(name, perm, group, old_perm)
 

	
 

	
 
def test_user_permissions_on_group_with_recursive_mode_on_repo_groups():
 
    # set permission to g0/g0_1 with recursive mode on just repository groups
 
    recursive = 'groups'
 
    group = u'g0/g0_1'
 
    perm = 'group.none'
 
    permissions_setup_func(group, perm, recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        check_tree_perms(name, perm, group, 'repository.read')
 

	
 
    for name, perm in items:
 
        check_tree_perms(name, perm, group, 'group.none')
 
        check_tree_perms(name, perm, group, 'group.read')
0 comments (0 inline, 0 general)