Changeset - b4d1e85265c1
[Not reviewed]
default
0 9 0
Søren Løvborg - 9 years ago 2017-02-20 19:31:48
sorenl@unity3d.com
auth: simplify repository group permission checks

In practice, Kallithea has the 'group.admin' permission imply the
'group.write' permission, which again implies 'group.read'.

This codifies this practice by replacing HasRepoGroupPermissionAny
"perm function" with the new HasRepoGroupLevel function, reducing the
risk of errors and saving quite a lot of typing.
9 files changed with 59 insertions and 66 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/repo_groups.py
Show inline comments
 
@@ -41,7 +41,7 @@ from kallithea.config.routing import url
 
from kallithea.lib import helpers as h
 
from kallithea.lib.compat import json
 
from kallithea.lib.auth import LoginRequired, \
 
    HasRepoGroupPermissionAnyDecorator, HasRepoGroupPermissionAny, \
 
    HasRepoGroupPermissionLevelDecorator, HasRepoGroupPermissionLevel, \
 
    HasPermissionAny
 
from kallithea.lib.base import BaseController, render
 
from kallithea.model.db import RepoGroup, Repository
 
@@ -68,7 +68,7 @@ class RepoGroupsController(BaseControlle
 
        exclude is used for not moving group to itself TODO: also exclude descendants
 
        Note: only admin can create top level groups
 
        """
 
        repo_groups = AvailableRepoGroupChoices([], ['group.admin'], extras)
 
        repo_groups = AvailableRepoGroupChoices([], 'admin', extras)
 
        exclude_group_ids = set(rg.group_id for rg in exclude)
 
        c.repo_groups = [rg for rg in repo_groups
 
                         if rg[0] not in exclude_group_ids]
 
@@ -110,7 +110,7 @@ class RepoGroupsController(BaseControlle
 

	
 
    def index(self, format='html'):
 
        _list = RepoGroup.query(sorted=True).all()
 
        group_iter = RepoGroupList(_list, perm_set=['group.admin'])
 
        group_iter = RepoGroupList(_list, perm_level='admin')
 
        repo_groups_data = []
 
        total_records = len(group_iter)
 
        _tmpl_lookup = kallithea.CONFIG['pylons.app_globals'].mako_lookup
 
@@ -197,7 +197,7 @@ class RepoGroupsController(BaseControlle
 
            group_id = safe_int(request.GET.get('parent_group'))
 
            group = RepoGroup.get(group_id) if group_id else None
 
            group_name = group.group_name if group else None
 
            if HasRepoGroupPermissionAny('group.admin')(group_name, 'group create'):
 
            if HasRepoGroupPermissionLevel('admin')(group_name, 'group create'):
 
                pass
 
            else:
 
                raise HTTPForbidden()
 
@@ -205,7 +205,7 @@ class RepoGroupsController(BaseControlle
 
        self.__load_defaults()
 
        return render('admin/repo_groups/repo_group_add.html')
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def update(self, group_name):
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 
        self.__load_defaults(extras=[c.repo_group.parent_group],
 
@@ -251,7 +251,7 @@ class RepoGroupsController(BaseControlle
 

	
 
        raise HTTPFound(location=url('edit_repo_group', group_name=group_name))
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def delete(self, group_name):
 
        gr = c.repo_group = RepoGroup.guess_instance(group_name)
 
        repos = gr.repositories.all()
 
@@ -292,8 +292,7 @@ class RepoGroupsController(BaseControlle
 
            return self.show(group_name)
 
        raise HTTPNotFound
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.read', 'group.write',
 
                                         'group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('read')
 
    def show(self, group_name):
 
        c.active = 'settings'
 

	
 
@@ -310,7 +309,7 @@ class RepoGroupsController(BaseControlle
 

	
 
        return render('admin/repo_groups/repo_group_show.html')
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def edit(self, group_name):
 
        c.active = 'settings'
 

	
 
@@ -326,14 +325,14 @@ class RepoGroupsController(BaseControlle
 
            force_defaults=False
 
        )
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def edit_repo_group_advanced(self, group_name):
 
        c.active = 'advanced'
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 

	
 
        return render('admin/repo_groups/repo_group_edit.html')
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def edit_repo_group_perms(self, group_name):
 
        c.active = 'perms'
 
        c.repo_group = RepoGroup.guess_instance(group_name)
 
@@ -347,7 +346,7 @@ class RepoGroupsController(BaseControlle
 
            force_defaults=False
 
        )
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def update_perms(self, group_name):
 
        """
 
        Update permissions for given repository group
 
@@ -378,7 +377,7 @@ class RepoGroupsController(BaseControlle
 
        h.flash(_('Repository group permissions updated'), category='success')
 
        raise HTTPFound(location=url('edit_repo_group_perms', group_name=group_name))
 

	
 
    @HasRepoGroupPermissionAnyDecorator('group.admin')
 
    @HasRepoGroupPermissionLevelDecorator('admin')
 
    def delete_perms(self, group_name):
 
        try:
 
            obj_type = request.POST.get('obj_type')
kallithea/controllers/admin/repos.py
Show inline comments
 
@@ -76,12 +76,13 @@ class ReposController(BaseRepoController
 

	
 
    def __load_defaults(self, repo=None):
 
        top_perms = ['hg.create.repository']
 
        repo_group_perms = ['group.admin']
 
        if HasPermissionAny('hg.create.write_on_repogroup.true')():
 
            repo_group_perms.append('group.write')
 
            repo_group_perm_level = 'write'
 
        else:
 
            repo_group_perm_level = 'admin'
 
        extras = [] if repo is None else [repo.group]
 

	
 
        c.repo_groups = AvailableRepoGroupChoices(top_perms, repo_group_perms, extras)
 
        c.repo_groups = AvailableRepoGroupChoices(top_perms, repo_group_perm_level, extras)
 

	
 
        c.landing_revs_choices, c.landing_revs = ScmModel().get_repo_landing_revs(repo)
 

	
kallithea/controllers/api/api.py
Show inline comments
 
@@ -36,7 +36,7 @@ from kallithea.controllers.api import JS
 
from kallithea.lib.auth import (
 
    PasswordGenerator, AuthUser, HasPermissionAnyDecorator,
 
    HasPermissionAnyDecorator, HasPermissionAny, HasRepoPermissionLevel,
 
    HasRepoGroupPermissionAny, HasUserGroupPermissionAny)
 
    HasRepoGroupPermissionLevel, HasUserGroupPermissionAny)
 
from kallithea.lib.utils import map_groups, repo2db_mapper
 
from kallithea.lib.utils2 import (
 
    str2bool, time_to_datetime, safe_int, Optional, OAttr)
 
@@ -2111,8 +2111,7 @@ class ApiController(JSONRPCController):
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo group !
 
            if not HasRepoGroupPermissionAny('group.admin')(group_name=repo_group.group_name):
 
            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
 
                raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
 

	
 
        user = get_user_or_error(userid)
 
@@ -2175,8 +2174,7 @@ class ApiController(JSONRPCController):
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo group !
 
            if not HasRepoGroupPermissionAny('group.admin')(group_name=repo_group.group_name):
 
            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
 
                raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
 

	
 
        user = get_user_or_error(userid)
 
@@ -2243,10 +2241,7 @@ class ApiController(JSONRPCController):
 
        perm = get_perm_or_error(perm, prefix='group.')
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo group !
 
            _perms = ('group.admin',)
 
            if not HasRepoGroupPermissionAny(*_perms)(
 
                    group_name=repo_group.group_name):
 
            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
 
                raise JSONRPCError(
 
                    'repository group `%s` does not exist' % (repogroupid,))
 

	
 
@@ -2319,10 +2314,7 @@ class ApiController(JSONRPCController):
 
        repo_group = get_repo_group_or_error(repogroupid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            # check if we have admin permission for this repo group !
 
            _perms = ('group.admin',)
 
            if not HasRepoGroupPermissionAny(*_perms)(
 
                    group_name=repo_group.group_name):
 
            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
 
                raise JSONRPCError(
 
                    'repository group `%s` does not exist' % (repogroupid,))
 

	
kallithea/controllers/forks.py
Show inline comments
 
@@ -56,10 +56,11 @@ class ForksController(BaseRepoController
 
        super(ForksController, self).__before__()
 

	
 
    def __load_defaults(self):
 
        repo_group_perms = ['group.admin']
 
        if HasPermissionAny('hg.create.write_on_repogroup.true')():
 
            repo_group_perms.append('group.write')
 
        c.repo_groups = AvailableRepoGroupChoices(['hg.create.repository'], repo_group_perms)
 
            repo_group_perm_level = 'write'
 
        else:
 
            repo_group_perm_level = 'admin'
 
        c.repo_groups = AvailableRepoGroupChoices(['hg.create.repository'], repo_group_perm_level)
 

	
 
        c.landing_revs_choices, c.landing_revs = ScmModel().get_repo_landing_revs()
 

	
kallithea/lib/auth.py
Show inline comments
 
@@ -549,6 +549,18 @@ class AuthUser(object):
 
            self.username, level, repo_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    def has_repository_group_permission_level(self, repo_group_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['group.read', 'group.write', 'group.admin'],
 
            'write': ['group.write', 'group.admin'],
 
            'admin': ['group.admin'],
 
        }[level]
 
        actual_perm = self.permissions['repositories_groups'].get(repo_group_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo group %r (%s): %s (has %r)',
 
            self.username, level, repo_group_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    @property
 
    def api_keys(self):
 
        return self._get_api_keys()
 
@@ -859,17 +871,15 @@ class HasRepoPermissionLevelDecorator(_P
 
        return user.has_repository_permission_level(repo_name, level)
 

	
 

	
 
class HasRepoGroupPermissionAnyDecorator(_PermsDecorator):
 
class HasRepoGroupPermissionLevelDecorator(_PermsDecorator):
 
    """
 
    Checks the user has any of given permissions for the requested repository group.
 
    """
 

	
 
    def check_permissions(self, user):
 
        repo_group_name = get_repo_group_slug(request)
 
        try:
 
            return user.permissions['repositories_groups'][repo_group_name] in self.required_perms
 
        except KeyError:
 
            return False
 
        (level,) = self.required_perms
 
        return user.has_repository_group_permission_level(repo_group_name, level)
 

	
 

	
 
class HasUserGroupPermissionAnyDecorator(_PermsDecorator):
 
@@ -925,17 +935,11 @@ class HasRepoPermissionLevel(_PermsFunct
 
        return request.user.has_repository_permission_level(repo_name, level, purpose)
 

	
 

	
 
class HasRepoGroupPermissionAny(_PermsFunction):
 
class HasRepoGroupPermissionLevel(_PermsFunction):
 

	
 
    def __call__(self, group_name, purpose=None):
 
        try:
 
            ok = request.user.permissions['repositories_groups'][group_name] in self.required_perms
 
        except KeyError:
 
            ok = False
 

	
 
        log.debug('Check %s for %s for repo group %s (%s): %s' %
 
            (request.user.username, self.required_perms, group_name, purpose, ok))
 
        return ok
 
        (level,) = self.required_perms
 
        return request.user.has_repository_group_permission_level(group_name, level, purpose)
 

	
 

	
 
class HasUserGroupPermissionAny(_PermsFunction):
kallithea/lib/helpers.py
Show inline comments
 
@@ -778,7 +778,7 @@ def action_parser(user_log, feed=False, 
 
# PERMS
 
#==============================================================================
 
from kallithea.lib.auth import HasPermissionAny, \
 
    HasRepoPermissionLevel, HasRepoGroupPermissionAny
 
    HasRepoPermissionLevel, HasRepoGroupPermissionLevel
 

	
 

	
 
#==============================================================================
kallithea/model/scm.py
Show inline comments
 
@@ -49,7 +49,7 @@ from kallithea import BACKENDS
 
from kallithea.lib import helpers as h
 
from kallithea.lib.utils2 import safe_str, safe_unicode, get_server_url, \
 
    _set_extras
 
from kallithea.lib.auth import HasRepoPermissionLevel, HasRepoGroupPermissionAny, \
 
from kallithea.lib.auth import HasRepoPermissionLevel, HasRepoGroupPermissionLevel, \
 
    HasUserGroupPermissionAny, HasPermissionAny, HasPermissionAny
 
from kallithea.lib.utils import get_filesystem_repos, make_ui, \
 
    action_logger
 
@@ -123,13 +123,10 @@ class RepoList(_PermCheckIterator):
 

	
 
class RepoGroupList(_PermCheckIterator):
 

	
 
    def __init__(self, db_repo_group_list, perm_set=None, extra_kwargs=None):
 
        if not perm_set:
 
            perm_set = ['group.read', 'group.write', 'group.admin']
 

	
 
    def __init__(self, db_repo_group_list, perm_level, extra_kwargs=None):
 
        super(RepoGroupList, self).__init__(obj_list=db_repo_group_list,
 
                    obj_attr='group_name', perm_set=perm_set,
 
                    perm_checker=HasRepoGroupPermissionAny,
 
                    obj_attr='group_name', perm_set=[perm_level],
 
                    perm_checker=HasRepoGroupPermissionLevel,
 
                    extra_kwargs=extra_kwargs)
 

	
 

	
 
@@ -222,7 +219,7 @@ class ScmModel(BaseModel):
 
        if groups is None:
 
            groups = RepoGroup.query() \
 
                .filter(RepoGroup.parent_group_id == None).all()
 
        return RepoGroupList(groups)
 
        return RepoGroupList(groups, perm_level='read')
 

	
 
    def mark_for_invalidation(self, repo_name):
 
        """
 
@@ -784,7 +781,7 @@ class ScmModel(BaseModel):
 
            else:
 
                log.debug('skipping writing hook file')
 

	
 
def AvailableRepoGroupChoices(top_perms, repo_group_perms, extras=()):
 
def AvailableRepoGroupChoices(top_perms, repo_group_perm_level, extras=()):
 
    """Return group_id,string tuples with choices for all the repo groups where
 
    the user has the necessary permissions.
 

	
 
@@ -794,7 +791,7 @@ def AvailableRepoGroupChoices(top_perms,
 
    if HasPermissionAny('hg.admin')('available repo groups'):
 
        groups.append(None)
 
    else:
 
        groups = list(RepoGroupList(groups, perm_set=repo_group_perms))
 
        groups = list(RepoGroupList(groups, perm_level=repo_group_perm_level))
 
        if top_perms and HasPermissionAny(*top_perms)('available repo groups'):
 
            groups.append(None)
 
        for extra in extras:
kallithea/model/validators.py
Show inline comments
 
@@ -35,7 +35,7 @@ from kallithea.lib.utils2 import str2boo
 
from kallithea.model.db import RepoGroup, Repository, UserGroup, User
 
from kallithea.lib.exceptions import LdapImportError
 
from kallithea.config.routing import ADMIN_PREFIX
 
from kallithea.lib.auth import HasRepoGroupPermissionAny, HasPermissionAny
 
from kallithea.lib.auth import HasRepoGroupPermissionLevel, HasPermissionAny
 

	
 
# silence warnings and pylint
 
UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
 
@@ -502,9 +502,9 @@ def CanWriteGroup(old_data=None):
 

	
 
            # create repositories with write permission on group is set to true
 
            create_on_write = HasPermissionAny('hg.create.write_on_repogroup.true')()
 
            group_admin = HasRepoGroupPermissionAny('group.admin')(gr_name,
 
            group_admin = HasRepoGroupPermissionLevel('admin')(gr_name,
 
                                            'can write into group validator')
 
            group_write = HasRepoGroupPermissionAny('group.write')(gr_name,
 
            group_write = HasRepoGroupPermissionLevel('write')(gr_name,
 
                                            'can write into group validator')
 
            forbidden = not (group_admin or (group_write and create_on_write))
 
            can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository')
 
@@ -555,8 +555,7 @@ def CanCreateGroup(can_create_in_root=Fa
 
                return
 

	
 
            forbidden_in_root = gr is None and not can_create_in_root
 
            val = HasRepoGroupPermissionAny('group.admin')
 
            forbidden = not val(gr_name, 'can create group validator')
 
            forbidden = not HasRepoGroupPermissionLevel('admin')(gr_name, 'can create group validator')
 
            if forbidden_in_root or forbidden:
 
                msg = self.message('permission_denied', state)
 
                raise formencode.Invalid(msg, value, state,
kallithea/templates/index_base.html
Show inline comments
 
@@ -18,13 +18,13 @@
 
                    gr_name = c.group.group_name if c.group else None
 
                    # create repositories with write permission on group is set to true
 
                    create_on_write = h.HasPermissionAny('hg.create.write_on_repogroup.true')()
 
                    group_admin = h.HasRepoGroupPermissionAny('group.admin')(gr_name, 'can write into group index page')
 
                    group_write = h.HasRepoGroupPermissionAny('group.write')(gr_name, 'can write into group index page')
 
                    group_admin = h.HasRepoGroupPermissionLevel('admin')(gr_name, 'can write into group index page')
 
                    group_write = h.HasRepoGroupPermissionLevel('write')(gr_name, 'can write into group index page')
 
                %>
 
                %if h.HasPermissionAny('hg.admin','hg.create.repository')() or (group_admin or (group_write and create_on_write)):
 
                  %if c.group:
 
                        <a href="${h.url('new_repo',parent_group=c.group.group_id)}" class="btn btn-default btn-xs"><i class="icon-plus"></i> ${_('Add Repository')}</a>
 
                        %if h.HasPermissionAny('hg.admin')() or h.HasRepoGroupPermissionAny('group.admin')(c.group.group_name):
 
                        %if h.HasPermissionAny('hg.admin')() or h.HasRepoGroupPermissionLevel('admin')(c.group.group_name):
 
                            <a href="${h.url('new_repos_group', parent_group=c.group.group_id)}" class="btn btn-default btn-xs"><i class="icon-plus"></i> ${_('Add Repository Group')}</a>
 
                        %endif
 
                  %else:
 
@@ -34,7 +34,7 @@
 
                    %endif
 
                  %endif
 
                %endif
 
                %if c.group and h.HasRepoGroupPermissionAny('group.admin')(c.group.group_name):
 
                %if c.group and h.HasRepoGroupPermissionLevel('admin')(c.group.group_name):
 
                    <a href="${h.url('edit_repo_group',group_name=c.group.group_name)}" title="${_('You have admin right to this group, and can edit it')}" class="btn btn-default btn-xs"><i class="icon-pencil"></i> ${_('Edit Repository Group')}</a>
 
                %endif
 
                </li>
0 comments (0 inline, 0 general)