# HG changeset patch # User Søren Løvborg # Date 2017-02-20 19:31:48 # Node ID b4d1e85265c1c6c760c65c1a6e9d25516444d2b0 # Parent a17c8e5f6712eac28ce9db16d34c9cb41dda56d6 auth: simplify repository group permission checks In practice, Kallithea has the 'group.admin' permission imply the 'group.write' permission, which again implies 'group.read'. This codifies this practice by replacing HasRepoGroupPermissionAny "perm function" with the new HasRepoGroupLevel function, reducing the risk of errors and saving quite a lot of typing. diff --git a/kallithea/controllers/admin/repo_groups.py b/kallithea/controllers/admin/repo_groups.py --- a/kallithea/controllers/admin/repo_groups.py +++ b/kallithea/controllers/admin/repo_groups.py @@ -41,7 +41,7 @@ from kallithea.config.routing import url from kallithea.lib import helpers as h from kallithea.lib.compat import json from kallithea.lib.auth import LoginRequired, \ - HasRepoGroupPermissionAnyDecorator, HasRepoGroupPermissionAny, \ + HasRepoGroupPermissionLevelDecorator, HasRepoGroupPermissionLevel, \ HasPermissionAny from kallithea.lib.base import BaseController, render from kallithea.model.db import RepoGroup, Repository @@ -68,7 +68,7 @@ class RepoGroupsController(BaseControlle exclude is used for not moving group to itself TODO: also exclude descendants Note: only admin can create top level groups """ - repo_groups = AvailableRepoGroupChoices([], ['group.admin'], extras) + repo_groups = AvailableRepoGroupChoices([], 'admin', extras) exclude_group_ids = set(rg.group_id for rg in exclude) c.repo_groups = [rg for rg in repo_groups if rg[0] not in exclude_group_ids] @@ -110,7 +110,7 @@ class RepoGroupsController(BaseControlle def index(self, format='html'): _list = RepoGroup.query(sorted=True).all() - group_iter = RepoGroupList(_list, perm_set=['group.admin']) + group_iter = RepoGroupList(_list, perm_level='admin') repo_groups_data = [] total_records = len(group_iter) _tmpl_lookup = kallithea.CONFIG['pylons.app_globals'].mako_lookup @@ -197,7 +197,7 @@ class RepoGroupsController(BaseControlle group_id = safe_int(request.GET.get('parent_group')) group = RepoGroup.get(group_id) if group_id else None group_name = group.group_name if group else None - if HasRepoGroupPermissionAny('group.admin')(group_name, 'group create'): + if HasRepoGroupPermissionLevel('admin')(group_name, 'group create'): pass else: raise HTTPForbidden() @@ -205,7 +205,7 @@ class RepoGroupsController(BaseControlle self.__load_defaults() return render('admin/repo_groups/repo_group_add.html') - @HasRepoGroupPermissionAnyDecorator('group.admin') + @HasRepoGroupPermissionLevelDecorator('admin') def update(self, group_name): c.repo_group = RepoGroup.guess_instance(group_name) self.__load_defaults(extras=[c.repo_group.parent_group], @@ -251,7 +251,7 @@ class RepoGroupsController(BaseControlle raise HTTPFound(location=url('edit_repo_group', group_name=group_name)) - @HasRepoGroupPermissionAnyDecorator('group.admin') + @HasRepoGroupPermissionLevelDecorator('admin') def delete(self, group_name): gr = c.repo_group = RepoGroup.guess_instance(group_name) repos = gr.repositories.all() @@ -292,8 +292,7 @@ class RepoGroupsController(BaseControlle return self.show(group_name) raise HTTPNotFound - @HasRepoGroupPermissionAnyDecorator('group.read', 'group.write', - 'group.admin') + @HasRepoGroupPermissionLevelDecorator('read') def show(self, group_name): c.active = 'settings' @@ -310,7 +309,7 @@ class RepoGroupsController(BaseControlle return render('admin/repo_groups/repo_group_show.html') - @HasRepoGroupPermissionAnyDecorator('group.admin') + @HasRepoGroupPermissionLevelDecorator('admin') def edit(self, group_name): c.active = 'settings' @@ -326,14 +325,14 @@ class RepoGroupsController(BaseControlle force_defaults=False ) - @HasRepoGroupPermissionAnyDecorator('group.admin') + @HasRepoGroupPermissionLevelDecorator('admin') def edit_repo_group_advanced(self, group_name): c.active = 'advanced' c.repo_group = RepoGroup.guess_instance(group_name) return render('admin/repo_groups/repo_group_edit.html') - @HasRepoGroupPermissionAnyDecorator('group.admin') + @HasRepoGroupPermissionLevelDecorator('admin') def edit_repo_group_perms(self, group_name): c.active = 'perms' c.repo_group = RepoGroup.guess_instance(group_name) @@ -347,7 +346,7 @@ class RepoGroupsController(BaseControlle force_defaults=False ) - @HasRepoGroupPermissionAnyDecorator('group.admin') + @HasRepoGroupPermissionLevelDecorator('admin') def update_perms(self, group_name): """ Update permissions for given repository group @@ -378,7 +377,7 @@ class RepoGroupsController(BaseControlle h.flash(_('Repository group permissions updated'), category='success') raise HTTPFound(location=url('edit_repo_group_perms', group_name=group_name)) - @HasRepoGroupPermissionAnyDecorator('group.admin') + @HasRepoGroupPermissionLevelDecorator('admin') def delete_perms(self, group_name): try: obj_type = request.POST.get('obj_type') diff --git a/kallithea/controllers/admin/repos.py b/kallithea/controllers/admin/repos.py --- a/kallithea/controllers/admin/repos.py +++ b/kallithea/controllers/admin/repos.py @@ -76,12 +76,13 @@ class ReposController(BaseRepoController def __load_defaults(self, repo=None): top_perms = ['hg.create.repository'] - repo_group_perms = ['group.admin'] if HasPermissionAny('hg.create.write_on_repogroup.true')(): - repo_group_perms.append('group.write') + repo_group_perm_level = 'write' + else: + repo_group_perm_level = 'admin' extras = [] if repo is None else [repo.group] - c.repo_groups = AvailableRepoGroupChoices(top_perms, repo_group_perms, extras) + c.repo_groups = AvailableRepoGroupChoices(top_perms, repo_group_perm_level, extras) c.landing_revs_choices, c.landing_revs = ScmModel().get_repo_landing_revs(repo) diff --git a/kallithea/controllers/api/api.py b/kallithea/controllers/api/api.py --- a/kallithea/controllers/api/api.py +++ b/kallithea/controllers/api/api.py @@ -36,7 +36,7 @@ from kallithea.controllers.api import JS from kallithea.lib.auth import ( PasswordGenerator, AuthUser, HasPermissionAnyDecorator, HasPermissionAnyDecorator, HasPermissionAny, HasRepoPermissionLevel, - HasRepoGroupPermissionAny, HasUserGroupPermissionAny) + HasRepoGroupPermissionLevel, HasUserGroupPermissionAny) from kallithea.lib.utils import map_groups, repo2db_mapper from kallithea.lib.utils2 import ( str2bool, time_to_datetime, safe_int, Optional, OAttr) @@ -2111,8 +2111,7 @@ class ApiController(JSONRPCController): repo_group = get_repo_group_or_error(repogroupid) if not HasPermissionAny('hg.admin')(): - # check if we have admin permission for this repo group ! - if not HasRepoGroupPermissionAny('group.admin')(group_name=repo_group.group_name): + if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name): raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,)) user = get_user_or_error(userid) @@ -2175,8 +2174,7 @@ class ApiController(JSONRPCController): repo_group = get_repo_group_or_error(repogroupid) if not HasPermissionAny('hg.admin')(): - # check if we have admin permission for this repo group ! - if not HasRepoGroupPermissionAny('group.admin')(group_name=repo_group.group_name): + if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name): raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,)) user = get_user_or_error(userid) @@ -2243,10 +2241,7 @@ class ApiController(JSONRPCController): perm = get_perm_or_error(perm, prefix='group.') user_group = get_user_group_or_error(usergroupid) if not HasPermissionAny('hg.admin')(): - # check if we have admin permission for this repo group ! - _perms = ('group.admin',) - if not HasRepoGroupPermissionAny(*_perms)( - group_name=repo_group.group_name): + if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name): raise JSONRPCError( 'repository group `%s` does not exist' % (repogroupid,)) @@ -2319,10 +2314,7 @@ class ApiController(JSONRPCController): repo_group = get_repo_group_or_error(repogroupid) user_group = get_user_group_or_error(usergroupid) if not HasPermissionAny('hg.admin')(): - # check if we have admin permission for this repo group ! - _perms = ('group.admin',) - if not HasRepoGroupPermissionAny(*_perms)( - group_name=repo_group.group_name): + if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name): raise JSONRPCError( 'repository group `%s` does not exist' % (repogroupid,)) diff --git a/kallithea/controllers/forks.py b/kallithea/controllers/forks.py --- a/kallithea/controllers/forks.py +++ b/kallithea/controllers/forks.py @@ -56,10 +56,11 @@ class ForksController(BaseRepoController super(ForksController, self).__before__() def __load_defaults(self): - repo_group_perms = ['group.admin'] if HasPermissionAny('hg.create.write_on_repogroup.true')(): - repo_group_perms.append('group.write') - c.repo_groups = AvailableRepoGroupChoices(['hg.create.repository'], repo_group_perms) + repo_group_perm_level = 'write' + else: + repo_group_perm_level = 'admin' + c.repo_groups = AvailableRepoGroupChoices(['hg.create.repository'], repo_group_perm_level) c.landing_revs_choices, c.landing_revs = ScmModel().get_repo_landing_revs() diff --git a/kallithea/lib/auth.py b/kallithea/lib/auth.py --- a/kallithea/lib/auth.py +++ b/kallithea/lib/auth.py @@ -549,6 +549,18 @@ class AuthUser(object): self.username, level, repo_name, purpose, ok, actual_perm) return ok + def has_repository_group_permission_level(self, repo_group_name, level, purpose=None): + required_perms = { + 'read': ['group.read', 'group.write', 'group.admin'], + 'write': ['group.write', 'group.admin'], + 'admin': ['group.admin'], + }[level] + actual_perm = self.permissions['repositories_groups'].get(repo_group_name) + ok = actual_perm in required_perms + log.debug('Checking if user %r can %r repo group %r (%s): %s (has %r)', + self.username, level, repo_group_name, purpose, ok, actual_perm) + return ok + @property def api_keys(self): return self._get_api_keys() @@ -859,17 +871,15 @@ class HasRepoPermissionLevelDecorator(_P return user.has_repository_permission_level(repo_name, level) -class HasRepoGroupPermissionAnyDecorator(_PermsDecorator): +class HasRepoGroupPermissionLevelDecorator(_PermsDecorator): """ Checks the user has any of given permissions for the requested repository group. """ def check_permissions(self, user): repo_group_name = get_repo_group_slug(request) - try: - return user.permissions['repositories_groups'][repo_group_name] in self.required_perms - except KeyError: - return False + (level,) = self.required_perms + return user.has_repository_group_permission_level(repo_group_name, level) class HasUserGroupPermissionAnyDecorator(_PermsDecorator): @@ -925,17 +935,11 @@ class HasRepoPermissionLevel(_PermsFunct return request.user.has_repository_permission_level(repo_name, level, purpose) -class HasRepoGroupPermissionAny(_PermsFunction): +class HasRepoGroupPermissionLevel(_PermsFunction): def __call__(self, group_name, purpose=None): - try: - ok = request.user.permissions['repositories_groups'][group_name] in self.required_perms - except KeyError: - ok = False - - log.debug('Check %s for %s for repo group %s (%s): %s' % - (request.user.username, self.required_perms, group_name, purpose, ok)) - return ok + (level,) = self.required_perms + return request.user.has_repository_group_permission_level(group_name, level, purpose) class HasUserGroupPermissionAny(_PermsFunction): diff --git a/kallithea/lib/helpers.py b/kallithea/lib/helpers.py --- a/kallithea/lib/helpers.py +++ b/kallithea/lib/helpers.py @@ -778,7 +778,7 @@ def action_parser(user_log, feed=False, # PERMS #============================================================================== from kallithea.lib.auth import HasPermissionAny, \ - HasRepoPermissionLevel, HasRepoGroupPermissionAny + HasRepoPermissionLevel, HasRepoGroupPermissionLevel #============================================================================== diff --git a/kallithea/model/scm.py b/kallithea/model/scm.py --- a/kallithea/model/scm.py +++ b/kallithea/model/scm.py @@ -49,7 +49,7 @@ from kallithea import BACKENDS from kallithea.lib import helpers as h from kallithea.lib.utils2 import safe_str, safe_unicode, get_server_url, \ _set_extras -from kallithea.lib.auth import HasRepoPermissionLevel, HasRepoGroupPermissionAny, \ +from kallithea.lib.auth import HasRepoPermissionLevel, HasRepoGroupPermissionLevel, \ HasUserGroupPermissionAny, HasPermissionAny, HasPermissionAny from kallithea.lib.utils import get_filesystem_repos, make_ui, \ action_logger @@ -123,13 +123,10 @@ class RepoList(_PermCheckIterator): class RepoGroupList(_PermCheckIterator): - def __init__(self, db_repo_group_list, perm_set=None, extra_kwargs=None): - if not perm_set: - perm_set = ['group.read', 'group.write', 'group.admin'] - + def __init__(self, db_repo_group_list, perm_level, extra_kwargs=None): super(RepoGroupList, self).__init__(obj_list=db_repo_group_list, - obj_attr='group_name', perm_set=perm_set, - perm_checker=HasRepoGroupPermissionAny, + obj_attr='group_name', perm_set=[perm_level], + perm_checker=HasRepoGroupPermissionLevel, extra_kwargs=extra_kwargs) @@ -222,7 +219,7 @@ class ScmModel(BaseModel): if groups is None: groups = RepoGroup.query() \ .filter(RepoGroup.parent_group_id == None).all() - return RepoGroupList(groups) + return RepoGroupList(groups, perm_level='read') def mark_for_invalidation(self, repo_name): """ @@ -784,7 +781,7 @@ class ScmModel(BaseModel): else: log.debug('skipping writing hook file') -def AvailableRepoGroupChoices(top_perms, repo_group_perms, extras=()): +def AvailableRepoGroupChoices(top_perms, repo_group_perm_level, extras=()): """Return group_id,string tuples with choices for all the repo groups where the user has the necessary permissions. @@ -794,7 +791,7 @@ def AvailableRepoGroupChoices(top_perms, if HasPermissionAny('hg.admin')('available repo groups'): groups.append(None) else: - groups = list(RepoGroupList(groups, perm_set=repo_group_perms)) + groups = list(RepoGroupList(groups, perm_level=repo_group_perm_level)) if top_perms and HasPermissionAny(*top_perms)('available repo groups'): groups.append(None) for extra in extras: diff --git a/kallithea/model/validators.py b/kallithea/model/validators.py --- a/kallithea/model/validators.py +++ b/kallithea/model/validators.py @@ -35,7 +35,7 @@ from kallithea.lib.utils2 import str2boo from kallithea.model.db import RepoGroup, Repository, UserGroup, User from kallithea.lib.exceptions import LdapImportError from kallithea.config.routing import ADMIN_PREFIX -from kallithea.lib.auth import HasRepoGroupPermissionAny, HasPermissionAny +from kallithea.lib.auth import HasRepoGroupPermissionLevel, HasPermissionAny # silence warnings and pylint UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \ @@ -502,9 +502,9 @@ def CanWriteGroup(old_data=None): # create repositories with write permission on group is set to true create_on_write = HasPermissionAny('hg.create.write_on_repogroup.true')() - group_admin = HasRepoGroupPermissionAny('group.admin')(gr_name, + group_admin = HasRepoGroupPermissionLevel('admin')(gr_name, 'can write into group validator') - group_write = HasRepoGroupPermissionAny('group.write')(gr_name, + group_write = HasRepoGroupPermissionLevel('write')(gr_name, 'can write into group validator') forbidden = not (group_admin or (group_write and create_on_write)) can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository') @@ -555,8 +555,7 @@ def CanCreateGroup(can_create_in_root=Fa return forbidden_in_root = gr is None and not can_create_in_root - val = HasRepoGroupPermissionAny('group.admin') - forbidden = not val(gr_name, 'can create group validator') + forbidden = not HasRepoGroupPermissionLevel('admin')(gr_name, 'can create group validator') if forbidden_in_root or forbidden: msg = self.message('permission_denied', state) raise formencode.Invalid(msg, value, state, diff --git a/kallithea/templates/index_base.html b/kallithea/templates/index_base.html --- a/kallithea/templates/index_base.html +++ b/kallithea/templates/index_base.html @@ -18,13 +18,13 @@ gr_name = c.group.group_name if c.group else None # create repositories with write permission on group is set to true create_on_write = h.HasPermissionAny('hg.create.write_on_repogroup.true')() - group_admin = h.HasRepoGroupPermissionAny('group.admin')(gr_name, 'can write into group index page') - group_write = h.HasRepoGroupPermissionAny('group.write')(gr_name, 'can write into group index page') + group_admin = h.HasRepoGroupPermissionLevel('admin')(gr_name, 'can write into group index page') + group_write = h.HasRepoGroupPermissionLevel('write')(gr_name, 'can write into group index page') %> %if h.HasPermissionAny('hg.admin','hg.create.repository')() or (group_admin or (group_write and create_on_write)): %if c.group: ${_('Add Repository')} - %if h.HasPermissionAny('hg.admin')() or h.HasRepoGroupPermissionAny('group.admin')(c.group.group_name): + %if h.HasPermissionAny('hg.admin')() or h.HasRepoGroupPermissionLevel('admin')(c.group.group_name): ${_('Add Repository Group')} %endif %else: @@ -34,7 +34,7 @@ %endif %endif %endif - %if c.group and h.HasRepoGroupPermissionAny('group.admin')(c.group.group_name): + %if c.group and h.HasRepoGroupPermissionLevel('admin')(c.group.group_name): ${_('Edit Repository Group')} %endif