diff --git a/rhodecode/controllers/api/api.py b/rhodecode/controllers/api/api.py --- a/rhodecode/controllers/api/api.py +++ b/rhodecode/controllers/api/api.py @@ -31,18 +31,99 @@ import logging from rhodecode.controllers.api import JSONRPCController, JSONRPCError from rhodecode.lib.auth import HasPermissionAllDecorator, \ HasPermissionAnyDecorator, PasswordGenerator, AuthUser - +from rhodecode.lib.utils import map_groups from rhodecode.model.meta import Session from rhodecode.model.scm import ScmModel -from rhodecode.model.db import User, UsersGroup, Repository from rhodecode.model.repo import RepoModel from rhodecode.model.user import UserModel from rhodecode.model.users_group import UsersGroupModel -from rhodecode.lib.utils import map_groups +from rhodecode.model.permission import PermissionModel log = logging.getLogger(__name__) +class Optional(object): + """ + Defines an optional parameter:: + + param = param.getval() if isinstance(param, Optional) else param + param = param() if isinstance(param, Optional) else param + + is equivalent of:: + + param = Optional.extract(param) + + """ + def __init__(self, type_): + self.type_ = type_ + + def __repr__(self): + return '' % self.type_.__repr__() + + def __call__(self): + return self.getval() + + def getval(self): + """ + returns value from this Optional instance + """ + return self.type_ + + @classmethod + def extract(cls, val): + if isinstance(val, cls): + return val.getval() + return val + + +def get_user_or_error(userid): + """ + Get user by id or name or return JsonRPCError if not found + + :param userid: + """ + user = UserModel().get_user(userid) + if user is None: + raise JSONRPCError("user `%s` does not exist" % userid) + return user + + +def get_repo_or_error(repoid): + """ + Get repo by id or name or return JsonRPCError if not found + + :param userid: + """ + repo = RepoModel().get_repo(repoid) + if repo is None: + raise JSONRPCError('repository `%s` does not exist' % (repoid)) + return repo + + +def get_users_group_or_error(usersgroupid): + """ + Get users group by id or name or return JsonRPCError if not found + + :param userid: + """ + users_group = UsersGroupModel().get_group(usersgroupid) + if users_group is None: + raise JSONRPCError('users group `%s` does not exist' % usersgroupid) + return users_group + + +def get_perm_or_error(permid): + """ + Get permission by id or name or return JsonRPCError if not found + + :param userid: + """ + perm = PermissionModel().get_permission_by_name(permid) + if perm is None: + raise JSONRPCError('permission `%s` does not exist' % (permid)) + return perm + + class ApiController(JSONRPCController): """ API Controller @@ -60,23 +141,25 @@ class ApiController(JSONRPCController): """ @HasPermissionAllDecorator('hg.admin') - def pull(self, apiuser, repo_name): + def pull(self, apiuser, repoid): """ Dispatch pull action on given repo - - :param user: - :param repo_name: + :param apiuser: + :param repoid: """ - if Repository.is_valid(repo_name) is False: - raise JSONRPCError('Unknown repo "%s"' % repo_name) + repo = get_repo_or_error(repoid) try: - ScmModel().pull_changes(repo_name, self.rhodecode_user.username) - return 'Pulled from %s' % repo_name + ScmModel().pull_changes(repo.repo_name, + self.rhodecode_user.username) + return 'Pulled from `%s`' % repo.repo_name except Exception: - raise JSONRPCError('Unable to pull changes from "%s"' % repo_name) + log.error(traceback.format_exc()) + raise JSONRPCError( + 'Unable to pull changes from `%s`' % repo.repo_name + ) @HasPermissionAllDecorator('hg.admin') def get_user(self, apiuser, userid): @@ -84,26 +167,13 @@ class ApiController(JSONRPCController): Get a user by username :param apiuser: - :param username: + :param userid: """ - user = UserModel().get_user(userid) - if user is None: - return user - - return dict( - id=user.user_id, - username=user.username, - firstname=user.name, - lastname=user.lastname, - email=user.email, - emails=user.emails, - active=user.active, - admin=user.admin, - ldap_dn=user.ldap_dn, - last_login=user.last_login, - permissions=AuthUser(user_id=user.user_id).permissions - ) + user = get_user_or_error(userid) + data = user.get_api_data() + data['permissions'] = AuthUser(user_id=user.user_id).permissions + return data @HasPermissionAllDecorator('hg.admin') def get_users(self, apiuser): @@ -114,43 +184,34 @@ class ApiController(JSONRPCController): """ result = [] - for user in User.getAll(): - result.append( - dict( - id=user.user_id, - username=user.username, - firstname=user.name, - lastname=user.lastname, - email=user.email, - active=user.active, - admin=user.admin, - ldap_dn=user.ldap_dn, - last_login=user.last_login, - ) - ) + for user in UserModel().get_all(): + result.append(user.get_api_data()) return result @HasPermissionAllDecorator('hg.admin') - def create_user(self, apiuser, username, email, password, firstname=None, - lastname=None, active=True, admin=False, ldap_dn=None): + def create_user(self, apiuser, username, email, password, + firstname=Optional(None), lastname=Optional(None), + active=Optional(True), admin=Optional(False), + ldap_dn=Optional(None)): """ Create new user :param apiuser: :param username: + :param email: :param password: - :param email: - :param name: + :param firstname: :param lastname: :param active: :param admin: :param ldap_dn: """ - if User.get_by_username(username): - raise JSONRPCError("user %s already exist" % username) - if User.get_by_email(email, case_insensitive=True): - raise JSONRPCError("email %s already exist" % email) + if UserModel().get_by_username(username): + raise JSONRPCError("user `%s` already exist" % username) + + if UserModel().get_by_email(email, case_insensitive=True): + raise JSONRPCError("email `%s` already exist" % email) if ldap_dn: # generate temporary password if ldap_dn @@ -158,73 +219,73 @@ class ApiController(JSONRPCController): try: user = UserModel().create_or_update( - username, password, email, firstname, - lastname, active, admin, ldap_dn + username=Optional.extract(username), + password=Optional.extract(password), + email=Optional.extract(email), + firstname=Optional.extract(firstname), + lastname=Optional.extract(lastname), + active=Optional.extract(active), + admin=Optional.extract(admin), + ldap_dn=Optional.extract(ldap_dn) ) - Session.commit() + Session().commit() return dict( - id=user.user_id, - msg='created new user %s' % username, - user=dict( - id=user.user_id, - username=user.username, - firstname=user.name, - lastname=user.lastname, - email=user.email, - active=user.active, - admin=user.admin, - ldap_dn=user.ldap_dn, - last_login=user.last_login, - ) + msg='created new user `%s`' % username, + user=user.get_api_data() ) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to create user %s' % username) + raise JSONRPCError('failed to create user `%s`' % username) @HasPermissionAllDecorator('hg.admin') - def update_user(self, apiuser, userid, username, password, email, - firstname, lastname, active, admin, ldap_dn): + def update_user(self, apiuser, userid, username=Optional(None), + email=Optional(None), firstname=Optional(None), + lastname=Optional(None), active=Optional(None), + admin=Optional(None), ldap_dn=Optional(None), + password=Optional(None)): """ Updates given user :param apiuser: + :param userid: :param username: - :param password: :param email: - :param name: + :param firstname: :param lastname: :param active: :param admin: :param ldap_dn: + :param password: """ - usr = UserModel().get_user(userid) - if not usr: - raise JSONRPCError("user ID:%s does not exist" % userid) + + user = get_user_or_error(userid) + + #return old attribute if Optional is passed. We don't change parameter + # so user doesn't get updated parameters + get = lambda attr, name: ( + getattr(user, name) if isinstance(attr, Optional) else attr + ) try: + user = UserModel().create_or_update( - username, password, email, firstname, - lastname, active, admin, ldap_dn + username=get(username, 'username'), + password=get(password, 'password'), + email=get(email, 'email'), + firstname=get(firstname, 'name'), + lastname=get(lastname, 'lastname'), + active=get(active, 'active'), + admin=get(admin, 'admin'), + ldap_dn=get(ldap_dn, 'ldap_dn') ) - Session.commit() + Session().commit() return dict( - id=usr.user_id, msg='updated user ID:%s %s' % (user.user_id, user.username), - user=dict( - id=user.user_id, - username=user.username, - firstname=user.name, - lastname=user.lastname, - email=user.email, - active=user.active, - admin=user.admin, - ldap_dn=user.ldap_dn, - last_login=user.last_login, - ) + user=user.get_api_data() ) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to update user %s' % userid) + raise JSONRPCError('failed to update user `%s`' % userid) @HasPermissionAllDecorator('hg.admin') def delete_user(self, apiuser, userid): @@ -232,52 +293,40 @@ class ApiController(JSONRPCController): Deletes an user :param apiuser: + :param userid: """ - usr = UserModel().get_user(userid) - if not usr: - raise JSONRPCError("user ID:%s does not exist" % userid) + user = get_user_or_error(userid) try: UserModel().delete(userid) - Session.commit() + Session().commit() return dict( - id=usr.user_id, - msg='deleted user ID:%s %s' % (usr.user_id, usr.username) + msg='deleted user ID:%s %s' % (user.user_id, user.username), + user=None ) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to delete ID:%s %s' % (usr.user_id, - usr.username)) + raise JSONRPCError('failed to delete ID:%s %s' % (user.user_id, + user.username)) @HasPermissionAllDecorator('hg.admin') - def get_users_group(self, apiuser, group_name): + def get_users_group(self, apiuser, usersgroupid): """" - Get users group by name + Get users group by name or id :param apiuser: - :param group_name: + :param usersgroupid: """ + users_group = get_users_group_or_error(usersgroupid) - users_group = UsersGroup.get_by_group_name(group_name) - if not users_group: - return None + data = users_group.get_api_data() members = [] for user in users_group.members: user = user.user - members.append(dict(id=user.user_id, - username=user.username, - firstname=user.name, - lastname=user.lastname, - email=user.email, - active=user.active, - admin=user.admin, - ldap=user.ldap_dn)) - - return dict(id=users_group.users_group_id, - group_name=users_group.users_group_name, - active=users_group.users_group_active, - members=members) + members.append(user.get_api_data()) + data['members'] = members + return data @HasPermissionAllDecorator('hg.admin') def get_users_groups(self, apiuser): @@ -288,107 +337,96 @@ class ApiController(JSONRPCController): """ result = [] - for users_group in UsersGroup.getAll(): - members = [] - for user in users_group.members: - user = user.user - members.append(dict(id=user.user_id, - username=user.username, - firstname=user.name, - lastname=user.lastname, - email=user.email, - active=user.active, - admin=user.admin, - ldap=user.ldap_dn)) - - result.append(dict(id=users_group.users_group_id, - group_name=users_group.users_group_name, - active=users_group.users_group_active, - members=members)) + for users_group in UsersGroupModel().get_all(): + result.append(users_group.get_api_data()) return result @HasPermissionAllDecorator('hg.admin') - def create_users_group(self, apiuser, group_name, active=True): + def create_users_group(self, apiuser, group_name, active=Optional(True)): """ Creates an new usergroup + :param apiuser: :param group_name: :param active: """ - if self.get_users_group(apiuser, group_name): - raise JSONRPCError("users group %s already exist" % group_name) + if UsersGroupModel().get_by_name(group_name): + raise JSONRPCError("users group `%s` already exist" % group_name) try: + active = Optional.extract(active) ug = UsersGroupModel().create(name=group_name, active=active) - Session.commit() - return dict(id=ug.users_group_id, - msg='created new users group %s' % group_name) + Session().commit() + return dict( + msg='created new users group `%s`' % group_name, + users_group=ug.get_api_data() + ) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to create group %s' % group_name) + raise JSONRPCError('failed to create group `%s`' % group_name) @HasPermissionAllDecorator('hg.admin') - def add_user_to_users_group(self, apiuser, group_name, username): + def add_user_to_users_group(self, apiuser, usersgroupid, userid): """" Add a user to a users group :param apiuser: - :param group_name: - :param username: + :param usersgroupid: + :param userid: """ + user = get_user_or_error(userid) + users_group = get_users_group_or_error(usersgroupid) try: - users_group = UsersGroup.get_by_group_name(group_name) - if not users_group: - raise JSONRPCError('unknown users group %s' % group_name) - - user = User.get_by_username(username) - if user is None: - raise JSONRPCError('unknown user %s' % username) - ugm = UsersGroupModel().add_user_to_group(users_group, user) success = True if ugm != True else False - msg = 'added member %s to users group %s' % (username, group_name) + msg = 'added member `%s` to users group `%s`' % ( + user.username, users_group.users_group_name + ) msg = msg if success else 'User is already in that group' - Session.commit() + Session().commit() return dict( - id=ugm.users_group_member_id if ugm != True else None, success=success, msg=msg ) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to add users group member') + raise JSONRPCError( + 'failed to add member to users group `%s`' % ( + users_group.users_group_name + ) + ) @HasPermissionAllDecorator('hg.admin') - def remove_user_from_users_group(self, apiuser, group_name, username): + def remove_user_from_users_group(self, apiuser, usersgroupid, userid): """ Remove user from a group - :param apiuser - :param group_name - :param username + :param apiuser: + :param usersgroupid: + :param userid: """ + user = get_user_or_error(userid) + users_group = get_users_group_or_error(usersgroupid) try: - users_group = UsersGroup.get_by_group_name(group_name) - if not users_group: - raise JSONRPCError('unknown users group %s' % group_name) - - user = User.get_by_username(username) - if user is None: - raise JSONRPCError('unknown user %s' % username) - - success = UsersGroupModel().remove_user_from_group(users_group, user) - msg = 'removed member %s from users group %s' % (username, group_name) + success = UsersGroupModel().remove_user_from_group(users_group, + user) + msg = 'removed member `%s` from users group `%s`' % ( + user.username, users_group.users_group_name + ) msg = msg if success else "User wasn't in group" - Session.commit() + Session().commit() return dict(success=success, msg=msg) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to remove user from group') + raise JSONRPCError( + 'failed to remove member from users group `%s`' % ( + users_group.users_group_name + ) + ) @HasPermissionAnyDecorator('hg.admin') def get_repo(self, apiuser, repoid): @@ -396,54 +434,30 @@ class ApiController(JSONRPCController): Get repository by name :param apiuser: - :param repo_name: + :param repoid: """ - - repo = RepoModel().get_repo(repoid) - if repo is None: - raise JSONRPCError('unknown repository "%s"' % (repo or repoid)) + repo = get_repo_or_error(repoid) members = [] for user in repo.repo_to_perm: perm = user.permission.permission_name user = user.user - members.append( - dict( - type="user", - id=user.user_id, - username=user.username, - firstname=user.name, - lastname=user.lastname, - email=user.email, - active=user.active, - admin=user.admin, - ldap=user.ldap_dn, - permission=perm - ) - ) + user_data = user.get_api_data() + user_data['type'] = "user" + user_data['permission'] = perm + members.append(user_data) + for users_group in repo.users_group_to_perm: perm = users_group.permission.permission_name users_group = users_group.users_group - members.append( - dict( - type="users_group", - id=users_group.users_group_id, - name=users_group.users_group_name, - active=users_group.users_group_active, - permission=perm - ) - ) + users_group_data = users_group.get_api_data() + users_group_data['type'] = "users_group" + users_group_data['permission'] = perm + members.append(users_group_data) - return dict( - id=repo.repo_id, - repo_name=repo.repo_name, - type=repo.repo_type, - clone_uri=repo.clone_uri, - private=repo.private, - created_on=repo.created_on, - description=repo.description, - members=members - ) + data = repo.get_api_data() + data['members'] = members + return data @HasPermissionAnyDecorator('hg.admin') def get_repos(self, apiuser): @@ -454,22 +468,12 @@ class ApiController(JSONRPCController): """ result = [] - for repo in Repository.getAll(): - result.append( - dict( - id=repo.repo_id, - repo_name=repo.repo_name, - type=repo.repo_type, - clone_uri=repo.clone_uri, - private=repo.private, - created_on=repo.created_on, - description=repo.description, - ) - ) + for repo in RepoModel().get_all(): + result.append(repo.get_api_data()) return result @HasPermissionAnyDecorator('hg.admin') - def get_repo_nodes(self, apiuser, repo_name, revision, root_path, + def get_repo_nodes(self, apiuser, repoid, revision, root_path, ret_type='all'): """ returns a list of nodes and it's children @@ -477,13 +481,14 @@ class ApiController(JSONRPCController): to show only files or dirs :param apiuser: - :param repo_name: name of repository + :param repoid: name or id of repository :param revision: revision for which listing should be done :param root_path: path from which start displaying :param ret_type: return type 'all|files|dirs' nodes """ + repo = get_repo_or_error(repoid) try: - _d, _f = ScmModel().get_nodes(repo_name, revision, root_path, + _d, _f = ScmModel().get_nodes(repo, revision, root_path, flat=False) _map = { 'all': _d + _f, @@ -493,235 +498,242 @@ class ApiController(JSONRPCController): return _map[ret_type] except KeyError: raise JSONRPCError('ret_type must be one of %s' % _map.keys()) - except Exception, e: - raise JSONRPCError(e) + except Exception: + log.error(traceback.format_exc()) + raise JSONRPCError( + 'failed to get repo: `%s` nodes' % repo.repo_name + ) @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository') - def create_repo(self, apiuser, repo_name, owner_name, description='', - repo_type='hg', private=False, clone_uri=None): + def create_repo(self, apiuser, repo_name, owner, repo_type, + description=Optional(''), private=Optional(False), + clone_uri=Optional(None), landing_rev=Optional('tip')): """ Create repository, if clone_url is given it makes a remote clone + if repo_name is withina group name the groups will be created + automatically if they aren't present :param apiuser: :param repo_name: - :param owner_name: + :param onwer: + :param repo_type: :param description: - :param repo_type: :param private: :param clone_uri: + :param landing_rev: """ + owner = get_user_or_error(owner) + + if RepoModel().get_by_repo_name(repo_name): + raise JSONRPCError("repo `%s` already exist" % repo_name) + + private = Optional.extract(private) + clone_uri = Optional.extract(clone_uri) + description = Optional.extract(description) + landing_rev = Optional.extract(landing_rev) try: - owner = User.get_by_username(owner_name) - if owner is None: - raise JSONRPCError('unknown user %s' % owner_name) - - if Repository.get_by_repo_name(repo_name): - raise JSONRPCError("repo %s already exist" % repo_name) - - groups = repo_name.split(Repository.url_sep()) - real_name = groups[-1] - # create structure of groups + # create structure of groups and return the last group group = map_groups(repo_name) - repo = RepoModel().create( - dict( - repo_name=real_name, - repo_name_full=repo_name, - description=description, - private=private, - repo_type=repo_type, - repo_group=group.group_id if group else None, - clone_uri=clone_uri - ) + repo = RepoModel().create_repo( + repo_name=repo_name, + repo_type=repo_type, + description=description, + owner=owner, + private=private, + clone_uri=clone_uri, + repos_group=group, + landing_rev=landing_rev, ) - Session.commit() + + Session().commit() return dict( - id=repo.repo_id, - msg="Created new repository %s" % (repo.repo_name), - repo=dict( - id=repo.repo_id, - repo_name=repo.repo_name, - type=repo.repo_type, - clone_uri=repo.clone_uri, - private=repo.private, - created_on=repo.created_on, - description=repo.description, - ) + msg="Created new repository `%s`" % (repo.repo_name), + repo=repo.get_api_data() ) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to create repository %s' % repo_name) + raise JSONRPCError('failed to create repository `%s`' % repo_name) + +# @HasPermissionAnyDecorator('hg.admin') +# def fork_repo(self, apiuser, repoid, fork_name): +# repo = get_repo_or_error(repoid) +# +# try: +# form_data = dict( +# +# ) +# RepoModel().create_fork(form_data, cur_user=apiuser) +# return dict( +# msg='Created fork of `%s` as `%s`' % (repo.repo_name, +# fork_name), +# success=True +# ) +# except Exception: +# log.error(traceback.format_exc()) +# raise JSONRPCError( +# 'failed to fork repository `%s` as `%s`' % (repo.repo_name, +# fork_name) +# ) @HasPermissionAnyDecorator('hg.admin') - def fork_repo(self, apiuser, repoid): - repo = RepoModel().get_repo(repoid) - if repo is None: - raise JSONRPCError('unknown repository "%s"' % (repo or repoid)) - - RepoModel().create_fork(form_data, cur_user) - - - @HasPermissionAnyDecorator('hg.admin') - def delete_repo(self, apiuser, repo_name): + def delete_repo(self, apiuser, repoid): """ Deletes a given repository - :param repo_name: + :param apiuser: + :param repoid: """ - if not Repository.get_by_repo_name(repo_name): - raise JSONRPCError("repo %s does not exist" % repo_name) + repo = get_repo_or_error(repoid) + try: - RepoModel().delete(repo_name) - Session.commit() + RepoModel().delete(repo) + Session().commit() return dict( - msg='Deleted repository %s' % repo_name + msg='Deleted repository `%s`' % repo.repo_name, + success=True ) except Exception: log.error(traceback.format_exc()) - raise JSONRPCError('failed to delete repository %s' % repo_name) + raise JSONRPCError( + 'failed to delete repository `%s`' % repo.repo_name + ) @HasPermissionAnyDecorator('hg.admin') - def grant_user_permission(self, apiuser, repo_name, username, perm): + def grant_user_permission(self, apiuser, repoid, userid, perm): """ Grant permission for user on given repository, or update existing one if found - :param repo_name: - :param username: + :param repoid: + :param userid: :param perm: """ + repo = get_repo_or_error(repoid) + user = get_user_or_error(userid) + perm = get_perm_or_error(perm) try: - repo = Repository.get_by_repo_name(repo_name) - if repo is None: - raise JSONRPCError('unknown repository %s' % repo) - - user = User.get_by_username(username) - if user is None: - raise JSONRPCError('unknown user %s' % username) RepoModel().grant_user_permission(repo=repo, user=user, perm=perm) - Session.commit() + Session().commit() return dict( - msg='Granted perm: %s for user: %s in repo: %s' % ( - perm, username, repo_name - ) + msg='Granted perm: `%s` for user: `%s` in repo: `%s`' % ( + perm.permission_name, user.username, repo.repo_name + ), + success=True ) except Exception: log.error(traceback.format_exc()) raise JSONRPCError( - 'failed to edit permission %(repo)s for %(user)s' % dict( - user=username, repo=repo_name + 'failed to edit permission for user: `%s` in repo: `%s`' % ( + userid, repoid ) ) @HasPermissionAnyDecorator('hg.admin') - def revoke_user_permission(self, apiuser, repo_name, username): + def revoke_user_permission(self, apiuser, repoid, userid): """ Revoke permission for user on given repository - :param repo_name: - :param username: + :param apiuser: + :param repoid: + :param userid: """ + repo = get_repo_or_error(repoid) + user = get_user_or_error(userid) try: - repo = Repository.get_by_repo_name(repo_name) - if repo is None: - raise JSONRPCError('unknown repository %s' % repo) + + RepoModel().revoke_user_permission(repo=repo, user=user) - user = User.get_by_username(username) - if user is None: - raise JSONRPCError('unknown user %s' % username) - - RepoModel().revoke_user_permission(repo=repo_name, user=username) - - Session.commit() + Session().commit() return dict( - msg='Revoked perm for user: %s in repo: %s' % ( - username, repo_name - ) + msg='Revoked perm for user: `%s` in repo: `%s`' % ( + user.username, repo.repo_name + ), + success=True ) except Exception: log.error(traceback.format_exc()) raise JSONRPCError( - 'failed to edit permission %(repo)s for %(user)s' % dict( - user=username, repo=repo_name + 'failed to edit permission for user: `%s` in repo: `%s`' % ( + userid, repoid ) ) @HasPermissionAnyDecorator('hg.admin') - def grant_users_group_permission(self, apiuser, repo_name, group_name, perm): + def grant_users_group_permission(self, apiuser, repoid, usersgroupid, + perm): """ Grant permission for users group on given repository, or update existing one if found - :param repo_name: - :param group_name: + :param apiuser: + :param repoid: + :param usersgroupid: :param perm: """ + repo = get_repo_or_error(repoid) + perm = get_perm_or_error(perm) + users_group = get_users_group_or_error(usersgroupid) try: - repo = Repository.get_by_repo_name(repo_name) - if repo is None: - raise JSONRPCError('unknown repository %s' % repo) - - user_group = UsersGroup.get_by_group_name(group_name) - if user_group is None: - raise JSONRPCError('unknown users group %s' % user_group) - - RepoModel().grant_users_group_permission(repo=repo_name, - group_name=group_name, + RepoModel().grant_users_group_permission(repo=repo, + group_name=users_group, perm=perm) - Session.commit() + Session().commit() return dict( - msg='Granted perm: %s for group: %s in repo: %s' % ( - perm, group_name, repo_name + msg='Granted perm: `%s` for users group: `%s` in ' + 'repo: `%s`' % ( + perm.permission_name, users_group.users_group_name, + repo.repo_name + ), + success=True + ) + except Exception: + print traceback.format_exc() + log.error(traceback.format_exc()) + raise JSONRPCError( + 'failed to edit permission for users group: `%s` in ' + 'repo: `%s`' % ( + usersgroupid, repo.repo_name ) ) + + @HasPermissionAnyDecorator('hg.admin') + def revoke_users_group_permission(self, apiuser, repoid, usersgroupid): + """ + Revoke permission for users group on given repository + + :param apiuser: + :param repoid: + :param usersgroupid: + """ + repo = get_repo_or_error(repoid) + users_group = get_users_group_or_error(usersgroupid) + + try: + RepoModel().revoke_users_group_permission(repo=repo, + group_name=users_group) + + Session().commit() + return dict( + msg='Revoked perm for users group: `%s` in repo: `%s`' % ( + users_group.users_group_name, repo.repo_name + ), + success=True + ) except Exception: log.error(traceback.format_exc()) raise JSONRPCError( - 'failed to edit permission %(repo)s for %(usersgr)s' % dict( - usersgr=group_name, repo=repo_name + 'failed to edit permission for users group: `%s` in ' + 'repo: `%s`' % ( + users_group.users_group_name, repo.repo_name ) ) - - @HasPermissionAnyDecorator('hg.admin') - def revoke_users_group_permission(self, apiuser, repo_name, group_name): - """ - Revoke permission for users group on given repository - - :param repo_name: - :param group_name: - """ - - try: - repo = Repository.get_by_repo_name(repo_name) - if repo is None: - raise JSONRPCError('unknown repository %s' % repo) - - user_group = UsersGroup.get_by_group_name(group_name) - if user_group is None: - raise JSONRPCError('unknown users group %s' % user_group) - - RepoModel().revoke_users_group_permission(repo=repo_name, - group_name=group_name) - - Session.commit() - return dict( - msg='Revoked perm for group: %s in repo: %s' % ( - group_name, repo_name - ) - ) - except Exception: - log.error(traceback.format_exc()) - raise JSONRPCError( - 'failed to edit permission %(repo)s for %(usersgr)s' % dict( - usersgr=group_name, repo=repo_name - ) - )