# HG changeset patch # User Søren Løvborg # Date 2017-02-20 18:40:32 # Node ID ca77c6da2d34aaa4ccc34a6929b2452c153d661a # Parent b4d1e85265c1c6c760c65c1a6e9d25516444d2b0 auth: simplify user group permission checks In practice, Kallithea has the 'usergroup.admin' permission imply the 'usergroup.write' permission, which again implies 'usergroup.read'. This codifies this practice by replacing the HasUserGroupPermissionAny "perm function" with the new HasUserGroupLevel function, reducing the risk of errors and saving quite a lot of typing. diff --git a/kallithea/controllers/admin/user_groups.py b/kallithea/controllers/admin/user_groups.py --- a/kallithea/controllers/admin/user_groups.py +++ b/kallithea/controllers/admin/user_groups.py @@ -45,7 +45,7 @@ from kallithea.lib.exceptions import Use RepoGroupAssignmentError from kallithea.lib.utils2 import safe_unicode, safe_int from kallithea.lib.auth import LoginRequired, \ - HasUserGroupPermissionAnyDecorator, HasPermissionAnyDecorator + HasUserGroupPermissionLevelDecorator, HasPermissionAnyDecorator from kallithea.lib.base import BaseController, render from kallithea.model.scm import UserGroupList from kallithea.model.user_group import UserGroupModel @@ -92,7 +92,7 @@ class UserGroupsController(BaseControlle _list = UserGroup.query() \ .order_by(func.lower(UserGroup.users_group_name)) \ .all() - group_iter = UserGroupList(_list, perm_set=['usergroup.admin']) + group_iter = UserGroupList(_list, perm_level='admin') user_groups_data = [] total_records = len(group_iter) _tmpl_lookup = kallithea.CONFIG['pylons.app_globals'].mako_lookup @@ -165,7 +165,7 @@ class UserGroupsController(BaseControlle def new(self, format='html'): return render('admin/user_groups/user_group_add.html') - @HasUserGroupPermissionAnyDecorator('usergroup.admin') + @HasUserGroupPermissionLevelDecorator('admin') def update(self, id): c.user_group = UserGroup.get_or_404(id) c.active = 'settings' @@ -211,7 +211,7 @@ class UserGroupsController(BaseControlle raise HTTPFound(location=url('edit_users_group', id=id)) - @HasUserGroupPermissionAnyDecorator('usergroup.admin') + @HasUserGroupPermissionLevelDecorator('admin') def delete(self, id): usr_gr = UserGroup.get_or_404(id) try: @@ -226,7 +226,7 @@ class UserGroupsController(BaseControlle category='error') raise HTTPFound(location=url('users_groups')) - @HasUserGroupPermissionAnyDecorator('usergroup.admin') + @HasUserGroupPermissionLevelDecorator('admin') def edit(self, id, format='html'): c.user_group = UserGroup.get_or_404(id) c.active = 'settings' @@ -241,7 +241,7 @@ class UserGroupsController(BaseControlle force_defaults=False ) - @HasUserGroupPermissionAnyDecorator('usergroup.admin') + @HasUserGroupPermissionLevelDecorator('admin') def edit_perms(self, id): c.user_group = UserGroup.get_or_404(id) c.active = 'perms' @@ -267,7 +267,7 @@ class UserGroupsController(BaseControlle force_defaults=False ) - @HasUserGroupPermissionAnyDecorator('usergroup.admin') + @HasUserGroupPermissionLevelDecorator('admin') def update_perms(self, id): """ grant permission for given usergroup @@ -291,7 +291,7 @@ class UserGroupsController(BaseControlle h.flash(_('User group permissions updated'), category='success') raise HTTPFound(location=url('edit_user_group_perms', id=id)) - @HasUserGroupPermissionAnyDecorator('usergroup.admin') + @HasUserGroupPermissionLevelDecorator('admin') def delete_perms(self, id): try: obj_type = request.POST.get('obj_type') @@ -319,7 +319,7 @@ class UserGroupsController(BaseControlle category='error') raise HTTPInternalServerError() - @HasUserGroupPermissionAnyDecorator('usergroup.admin') + @HasUserGroupPermissionLevelDecorator('admin') def edit_default_perms(self, id): c.user_group = UserGroup.get_or_404(id) c.active = 'default_perms' @@ -368,7 +368,7 @@ class UserGroupsController(BaseControlle force_defaults=False ) - @HasUserGroupPermissionAnyDecorator('usergroup.admin') + @HasUserGroupPermissionLevelDecorator('admin') def update_default_perms(self, id): user_group = UserGroup.get_or_404(id) @@ -408,7 +408,7 @@ class UserGroupsController(BaseControlle raise HTTPFound(location=url('edit_user_group_default_perms', id=id)) - @HasUserGroupPermissionAnyDecorator('usergroup.admin') + @HasUserGroupPermissionLevelDecorator('admin') def edit_advanced(self, id): c.user_group = UserGroup.get_or_404(id) c.active = 'advanced' @@ -417,7 +417,7 @@ class UserGroupsController(BaseControlle return render('admin/user_groups/user_group_edit.html') - @HasUserGroupPermissionAnyDecorator('usergroup.admin') + @HasUserGroupPermissionLevelDecorator('admin') def edit_members(self, id): c.user_group = UserGroup.get_or_404(id) c.active = 'members' diff --git a/kallithea/controllers/api/api.py b/kallithea/controllers/api/api.py --- a/kallithea/controllers/api/api.py +++ b/kallithea/controllers/api/api.py @@ -36,7 +36,7 @@ from kallithea.controllers.api import JS from kallithea.lib.auth import ( PasswordGenerator, AuthUser, HasPermissionAnyDecorator, HasPermissionAnyDecorator, HasPermissionAny, HasRepoPermissionLevel, - HasRepoGroupPermissionLevel, HasUserGroupPermissionAny) + HasRepoGroupPermissionLevel, HasUserGroupPermissionLevel) from kallithea.lib.utils import map_groups, repo2db_mapper from kallithea.lib.utils2 import ( str2bool, time_to_datetime, safe_int, Optional, OAttr) @@ -820,10 +820,7 @@ class ApiController(JSONRPCController): """ user_group = get_user_group_or_error(usergroupid) if not HasPermissionAny('hg.admin')(): - # check if we have at least read permission for this user group ! - _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',) - if not HasUserGroupPermissionAny(*_perms)( - user_group_name=user_group.users_group_name): + if not HasUserGroupPermissionLevel('read')(user_group.users_group_name): raise JSONRPCError('user group `%s` does not exist' % (usergroupid,)) data = user_group.get_api_data() @@ -845,9 +842,7 @@ class ApiController(JSONRPCController): """ result = [] - _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',) - for user_group in UserGroupList(UserGroup.query().all(), - perm_set=_perms): + for user_group in UserGroupList(UserGroup.query().all(), perm_level='read'): result.append(user_group.get_api_data()) return result @@ -949,10 +944,7 @@ class ApiController(JSONRPCController): """ user_group = get_user_group_or_error(usergroupid) if not HasPermissionAny('hg.admin')(): - # check if we have admin permission for this user group ! - _perms = ('usergroup.admin',) - if not HasUserGroupPermissionAny(*_perms)( - user_group_name=user_group.users_group_name): + if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name): raise JSONRPCError('user group `%s` does not exist' % (usergroupid,)) if not isinstance(owner, Optional): @@ -1006,10 +998,7 @@ class ApiController(JSONRPCController): """ user_group = get_user_group_or_error(usergroupid) if not HasPermissionAny('hg.admin')(): - # check if we have admin permission for this user group ! - _perms = ('usergroup.admin',) - if not HasUserGroupPermissionAny(*_perms)( - user_group_name=user_group.users_group_name): + if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name): raise JSONRPCError('user group `%s` does not exist' % (usergroupid,)) try: @@ -1065,10 +1054,7 @@ class ApiController(JSONRPCController): user = get_user_or_error(userid) user_group = get_user_group_or_error(usergroupid) if not HasPermissionAny('hg.admin')(): - # check if we have admin permission for this user group ! - _perms = ('usergroup.admin',) - if not HasUserGroupPermissionAny(*_perms)( - user_group_name=user_group.users_group_name): + if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name): raise JSONRPCError('user group `%s` does not exist' % (usergroupid,)) try: @@ -1117,10 +1103,7 @@ class ApiController(JSONRPCController): user = get_user_or_error(userid) user_group = get_user_group_or_error(usergroupid) if not HasPermissionAny('hg.admin')(): - # check if we have admin permission for this user group ! - _perms = ('usergroup.admin',) - if not HasUserGroupPermissionAny(*_perms)( - user_group_name=user_group.users_group_name): + if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name): raise JSONRPCError('user group `%s` does not exist' % (usergroupid,)) try: @@ -1812,10 +1795,7 @@ class ApiController(JSONRPCController): if not HasRepoPermissionLevel('admin')(repo.repo_name): raise JSONRPCError('repository `%s` does not exist' % (repoid,)) - # check if we have at least read permission for this user group ! - _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',) - if not HasUserGroupPermissionAny(*_perms)( - user_group_name=user_group.users_group_name): + if not HasUserGroupPermissionLevel('read')(user_group.users_group_name): raise JSONRPCError('user group `%s` does not exist' % (usergroupid,)) try: @@ -1865,10 +1845,7 @@ class ApiController(JSONRPCController): if not HasRepoPermissionLevel('admin')(repo.repo_name): raise JSONRPCError('repository `%s` does not exist' % (repoid,)) - # check if we have at least read permission for this user group ! - _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',) - if not HasUserGroupPermissionAny(*_perms)( - user_group_name=user_group.users_group_name): + if not HasUserGroupPermissionLevel('read')(user_group.users_group_name): raise JSONRPCError('user group `%s` does not exist' % (usergroupid,)) try: @@ -2245,10 +2222,7 @@ class ApiController(JSONRPCController): raise JSONRPCError( 'repository group `%s` does not exist' % (repogroupid,)) - # check if we have at least read permission for this user group ! - _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',) - if not HasUserGroupPermissionAny(*_perms)( - user_group_name=user_group.users_group_name): + if not HasUserGroupPermissionLevel('read')(user_group.users_group_name): raise JSONRPCError( 'user group `%s` does not exist' % (usergroupid,)) @@ -2318,10 +2292,7 @@ class ApiController(JSONRPCController): raise JSONRPCError( 'repository group `%s` does not exist' % (repogroupid,)) - # check if we have at least read permission for this user group ! - _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',) - if not HasUserGroupPermissionAny(*_perms)( - user_group_name=user_group.users_group_name): + if not HasUserGroupPermissionLevel('read')(user_group.users_group_name): raise JSONRPCError( 'user group `%s` does not exist' % (usergroupid,)) diff --git a/kallithea/lib/auth.py b/kallithea/lib/auth.py --- a/kallithea/lib/auth.py +++ b/kallithea/lib/auth.py @@ -561,6 +561,18 @@ class AuthUser(object): self.username, level, repo_group_name, purpose, ok, actual_perm) return ok + def has_user_group_permission_level(self, user_group_name, level, purpose=None): + required_perms = { + 'read': ['usergroup.read', 'usergroup.write', 'usergroup.admin'], + 'write': ['usergroup.write', 'usergroup.admin'], + 'admin': ['usergroup.admin'], + }[level] + actual_perm = self.permissions['user_groups'].get(user_group_name) + ok = actual_perm in required_perms + log.debug('Checking if user %r can %r user group %r (%s): %s (has %r)', + self.username, level, user_group_name, purpose, ok, actual_perm) + return ok + @property def api_keys(self): return self._get_api_keys() @@ -882,7 +894,7 @@ class HasRepoGroupPermissionLevelDecorat return user.has_repository_group_permission_level(repo_group_name, level) -class HasUserGroupPermissionAnyDecorator(_PermsDecorator): +class HasUserGroupPermissionLevelDecorator(_PermsDecorator): """ Checks for access permission for any of given predicates for specific user group. In order to fulfill the request any of predicates must be meet @@ -890,10 +902,8 @@ class HasUserGroupPermissionAnyDecorator def check_permissions(self, user): user_group_name = get_user_group_slug(request) - try: - return user.permissions['user_groups'][user_group_name] in self.required_perms - except KeyError: - return False + (level,) = self.required_perms + return user.has_user_group_permission_level(user_group_name, level) #============================================================================== @@ -942,17 +952,11 @@ class HasRepoGroupPermissionLevel(_Perms return request.user.has_repository_group_permission_level(group_name, level, purpose) -class HasUserGroupPermissionAny(_PermsFunction): +class HasUserGroupPermissionLevel(_PermsFunction): def __call__(self, user_group_name, purpose=None): - try: - ok = request.user.permissions['user_groups'][user_group_name] in self.required_perms - except KeyError: - ok = False - - log.debug('Check %s %s for user group %s (%s): %s' % - (request.user.username, self.required_perms, user_group_name, purpose, ok)) - return ok + (level,) = self.required_perms + return request.user.has_user_group_permission_level(user_group_name, level, purpose) #============================================================================== diff --git a/kallithea/model/repo.py b/kallithea/model/repo.py --- a/kallithea/model/repo.py +++ b/kallithea/model/repo.py @@ -47,7 +47,7 @@ from kallithea.model.db import Repositor Statistics, UserGroup, Ui, RepoGroup, RepositoryField from kallithea.lib import helpers as h -from kallithea.lib.auth import HasRepoPermissionLevel, HasUserGroupPermissionAny +from kallithea.lib.auth import HasRepoPermissionLevel, HasUserGroupPermissionLevel from kallithea.lib.exceptions import AttachedForksError from kallithea.model.scm import UserGroupList @@ -144,9 +144,7 @@ class RepoModel(BaseModel): .order_by(UserGroup.users_group_name) \ .options(subqueryload(UserGroup.members)) \ .all() - user_groups = UserGroupList(user_groups, perm_set=['usergroup.read', - 'usergroup.write', - 'usergroup.admin']) + user_groups = UserGroupList(user_groups, perm_level='read') return json.dumps([ { 'id': gr.users_group_id, @@ -468,11 +466,8 @@ class RepoModel(BaseModel): repo=repo, user=member, perm=perm ) else: - #check if we have permissions to alter this usergroup - req_perms = ( - 'usergroup.read', 'usergroup.write', 'usergroup.admin') - if not check_perms or HasUserGroupPermissionAny(*req_perms)( - member): + #check if we have permissions to alter this usergroup's access + if not check_perms or HasUserGroupPermissionLevel('read')(member): self.grant_user_group_permission( repo=repo, group_name=member, perm=perm ) @@ -483,11 +478,8 @@ class RepoModel(BaseModel): repo=repo, user=member, perm=perm ) else: - #check if we have permissions to alter this usergroup - req_perms = ( - 'usergroup.read', 'usergroup.write', 'usergroup.admin') - if not check_perms or HasUserGroupPermissionAny(*req_perms)( - member): + #check if we have permissions to alter this usergroup's access + if not check_perms or HasUserGroupPermissionLevel('read')(member): self.grant_user_group_permission( repo=repo, group_name=member, perm=perm ) diff --git a/kallithea/model/repo_group.py b/kallithea/model/repo_group.py --- a/kallithea/model/repo_group.py +++ b/kallithea/model/repo_group.py @@ -187,7 +187,7 @@ class RepoGroupModel(BaseModel): perms_updates=None, recursive=None, check_perms=True): from kallithea.model.repo import RepoModel - from kallithea.lib.auth import HasUserGroupPermissionAny + from kallithea.lib.auth import HasUserGroupPermissionLevel if not perms_new: perms_new = [] @@ -255,18 +255,16 @@ class RepoGroupModel(BaseModel): _set_perm_user(obj, user=member, perm=perm) ## set for user group else: - #check if we have permissions to alter this usergroup - req_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin') - if not check_perms or HasUserGroupPermissionAny(*req_perms)(member): + #check if we have permissions to alter this usergroup's access + if not check_perms or HasUserGroupPermissionLevel('read')(member): _set_perm_group(obj, users_group=member, perm=perm) # set new permissions for member, perm, member_type in perms_new: if member_type == 'user': _set_perm_user(obj, user=member, perm=perm) else: - #check if we have permissions to alter this usergroup - req_perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin') - if not check_perms or HasUserGroupPermissionAny(*req_perms)(member): + #check if we have permissions to alter this usergroup's access + if not check_perms or HasUserGroupPermissionLevel('read')(member): _set_perm_group(obj, users_group=member, perm=perm) updates.append(obj) # if it's not recursive call for all,repos,groups diff --git a/kallithea/model/scm.py b/kallithea/model/scm.py --- a/kallithea/model/scm.py +++ b/kallithea/model/scm.py @@ -50,7 +50,7 @@ from kallithea.lib import helpers as h from kallithea.lib.utils2 import safe_str, safe_unicode, get_server_url, \ _set_extras from kallithea.lib.auth import HasRepoPermissionLevel, HasRepoGroupPermissionLevel, \ - HasUserGroupPermissionAny, HasPermissionAny, HasPermissionAny + HasUserGroupPermissionLevel, HasPermissionAny, HasPermissionAny from kallithea.lib.utils import get_filesystem_repos, make_ui, \ action_logger from kallithea.model.base import BaseModel @@ -132,13 +132,10 @@ class RepoGroupList(_PermCheckIterator): class UserGroupList(_PermCheckIterator): - def __init__(self, db_user_group_list, perm_set=None, extra_kwargs=None): - if not perm_set: - perm_set = ['usergroup.read', 'usergroup.write', 'usergroup.admin'] - + def __init__(self, db_user_group_list, perm_level, extra_kwargs=None): super(UserGroupList, self).__init__(obj_list=db_user_group_list, - obj_attr='users_group_name', perm_set=perm_set, - perm_checker=HasUserGroupPermissionAny, + obj_attr='users_group_name', perm_set=[perm_level], + perm_checker=HasUserGroupPermissionLevel, extra_kwargs=extra_kwargs) diff --git a/kallithea/model/user_group.py b/kallithea/model/user_group.py --- a/kallithea/model/user_group.py +++ b/kallithea/model/user_group.py @@ -57,7 +57,7 @@ class UserGroupModel(BaseModel): def _update_permissions(self, user_group, perms_new=None, perms_updates=None): - from kallithea.lib.auth import HasUserGroupPermissionAny + from kallithea.lib.auth import HasUserGroupPermissionLevel if not perms_new: perms_new = [] if not perms_updates: @@ -71,9 +71,8 @@ class UserGroupModel(BaseModel): user_group=user_group, user=member, perm=perm ) else: - #check if we have permissions to alter this usergroup - if HasUserGroupPermissionAny('usergroup.read', 'usergroup.write', - 'usergroup.admin')(member): + #check if we have permissions to alter this usergroup's access + if HasUserGroupPermissionLevel('read')(member): self.grant_user_group_permission( target_user_group=user_group, user_group=member, perm=perm ) @@ -84,9 +83,8 @@ class UserGroupModel(BaseModel): user_group=user_group, user=member, perm=perm ) else: - #check if we have permissions to alter this usergroup - if HasUserGroupPermissionAny('usergroup.read', 'usergroup.write', - 'usergroup.admin')(member): + #check if we have permissions to alter this usergroup's access + if HasUserGroupPermissionLevel('read')(member): self.grant_user_group_permission( target_user_group=user_group, user_group=member, perm=perm )