Changeset - ffe4d5060d91
[Not reviewed]
stable
0 3 0
Mads Kiilerich - 9 years ago 2016-05-30 15:32:22
madski@unity3d.com
api: avoid duplicating group name when updating repo (Issue #37)

The api incorrectly passed repo.repo_name as repo_name, and the model update
function incorrectly always required repo_name.
3 files changed with 48 insertions and 9 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/api/api.py
Show inline comments
 
@@ -802,1540 +802,1537 @@ class ApiController(JSONRPCController):
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "deleted user ID:<userid> <username>",
 
                      "user": null
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to delete user ID:<userid> <username>"
 
          }
 

	
 
        """
 
        user = get_user_or_error(userid)
 

	
 
        try:
 
            UserModel().delete(userid)
 
            Session().commit()
 
            return dict(
 
                msg='deleted user ID:%s %s' % (user.user_id, user.username),
 
                user=None
 
            )
 
        except Exception:
 

	
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to delete user ID:%s %s'
 
                               % (user.user_id, user.username))
 

	
 
    # permission check inside
 
    def get_user_group(self, apiuser, usergroupid):
 
        """
 
        Gets an existing user group. This command can be executed only using api_key
 
        belonging to user with admin rights or user who has at least
 
        read access to user group.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param usergroupid: id of user_group to edit
 
        :type usergroupid: str or int
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result : None if group not exist
 
                     {
 
                       "users_group_id" : "<id>",
 
                       "group_name" :     "<groupname>",
 
                       "active":          "<bool>",
 
                       "members" :  [<user_obj>,...]
 
                     }
 
            error : null
 

	
 
        """
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have at least read permission for this user group !
 
            _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
                    user=apiuser, user_group_name=user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        data = user_group.get_api_data()
 
        return data
 

	
 
    # permission check inside
 
    def get_user_groups(self, apiuser):
 
        """
 
        Lists all existing user groups. This command can be executed only using
 
        api_key belonging to user with admin rights or user who has at least
 
        read access to user group.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result : [<user_group_obj>,...]
 
            error : null
 
        """
 

	
 
        result = []
 
        _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
 
        extras = {'user': apiuser}
 
        for user_group in UserGroupList(UserGroupModel().get_all(),
 
                                        perm_set=_perms, extra_kwargs=extras):
 
            result.append(user_group.get_api_data())
 
        return result
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.usergroup.create.true')
 
    def create_user_group(self, apiuser, group_name, description=Optional(''),
 
                          owner=Optional(OAttr('apiuser')), active=Optional(True)):
 
        """
 
        Creates new user group. This command can be executed only using api_key
 
        belonging to user with admin rights or an user who has create user group
 
        permission
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param group_name: name of new user group
 
        :type group_name: str
 
        :param description: group description
 
        :type description: str
 
        :param owner: owner of group. If not passed apiuser is the owner
 
        :type owner: Optional(str or int)
 
        :param active: group is active
 
        :type active: Optional(bool)
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg": "created new user group `<groupname>`",
 
                      "user_group": <user_group_object>
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "user group `<group name>` already exist"
 
            or
 
            "failed to create group `<group name>`"
 
          }
 

	
 
        """
 

	
 
        if UserGroupModel().get_by_name(group_name):
 
            raise JSONRPCError("user group `%s` already exist" % (group_name,))
 

	
 
        try:
 
            if isinstance(owner, Optional):
 
                owner = apiuser.user_id
 

	
 
            owner = get_user_or_error(owner)
 
            active = Optional.extract(active)
 
            description = Optional.extract(description)
 
            ug = UserGroupModel().create(name=group_name, description=description,
 
                                         owner=owner, active=active)
 
            Session().commit()
 
            return dict(
 
                msg='created new user group `%s`' % group_name,
 
                user_group=ug.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to create group `%s`' % (group_name,))
 

	
 
    # permission check inside
 
    def update_user_group(self, apiuser, usergroupid, group_name=Optional(''),
 
                          description=Optional(''), owner=Optional(None),
 
                          active=Optional(True)):
 
        """
 
        Updates given usergroup.  This command can be executed only using api_key
 
        belonging to user with admin rights or an admin of given user group
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param usergroupid: id of user group to update
 
        :type usergroupid: str or int
 
        :param group_name: name of new user group
 
        :type group_name: str
 
        :param description: group description
 
        :type description: str
 
        :param owner: owner of group.
 
        :type owner: Optional(str or int)
 
        :param active: group is active
 
        :type active: Optional(bool)
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "msg": 'updated user group ID:<user group id> <user group name>',
 
            "user_group": <user_group_object>
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to update user group `<user group name>`"
 
          }
 

	
 
        """
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this user group !
 
            _perms = ('usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
                    user=apiuser, user_group_name=user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        if not isinstance(owner, Optional):
 
            owner = get_user_or_error(owner)
 

	
 
        updates = {}
 
        store_update(updates, group_name, 'users_group_name')
 
        store_update(updates, description, 'user_group_description')
 
        store_update(updates, owner, 'user')
 
        store_update(updates, active, 'users_group_active')
 
        try:
 
            UserGroupModel().update(user_group, updates)
 
            Session().commit()
 
            return dict(
 
                msg='updated user group ID:%s %s' % (user_group.users_group_id,
 
                                                     user_group.users_group_name),
 
                user_group=user_group.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to update user group `%s`' % (usergroupid,))
 

	
 
    # permission check inside
 
    def delete_user_group(self, apiuser, usergroupid):
 
        """
 
        Delete given user group by user group id or name.
 
        This command can be executed only using api_key
 
        belonging to user with admin rights or an admin of given user group
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param usergroupid:
 
        :type usergroupid: int
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "msg": "deleted user group ID:<user_group_id> <user_group_name>"
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to delete user group ID:<user_group_id> <user_group_name>"
 
            or
 
            "RepoGroup assigned to <repo_groups_list>"
 
          }
 

	
 
        """
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this user group !
 
            _perms = ('usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
                    user=apiuser, user_group_name=user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        try:
 
            UserGroupModel().delete(user_group)
 
            Session().commit()
 
            return dict(
 
                msg='deleted user group ID:%s %s' %
 
                    (user_group.users_group_id, user_group.users_group_name),
 
                user_group=None
 
            )
 
        except UserGroupsAssignedException as e:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(str(e))
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to delete user group ID:%s %s' %
 
                               (user_group.users_group_id,
 
                                user_group.users_group_name)
 
            )
 

	
 
    # permission check inside
 
    def add_user_to_user_group(self, apiuser, usergroupid, userid):
 
        """
 
        Adds a user to a user group. If user exists in that group success will be
 
        `false`. This command can be executed only using api_key
 
        belonging to user with admin rights  or an admin of given user group
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param usergroupid:
 
        :type usergroupid: int
 
        :param userid:
 
        :type userid: int
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
              "success": True|False # depends on if member is in group
 
              "msg": "added member `<username>` to user group `<groupname>` |
 
                      User is already in that group"
 

	
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to add member to user group `<user_group_name>`"
 
          }
 

	
 
        """
 
        user = get_user_or_error(userid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this user group !
 
            _perms = ('usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
                    user=apiuser, user_group_name=user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        try:
 
            ugm = UserGroupModel().add_user_to_group(user_group, user)
 
            success = True if ugm != True else False
 
            msg = 'added member `%s` to user group `%s`' % (
 
                user.username, user_group.users_group_name
 
            )
 
            msg = msg if success else 'User is already in that group'
 
            Session().commit()
 

	
 
            return dict(
 
                success=success,
 
                msg=msg
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to add member to user group `%s`' % (
 
                    user_group.users_group_name,
 
                )
 
            )
 

	
 
    # permission check inside
 
    def remove_user_from_user_group(self, apiuser, usergroupid, userid):
 
        """
 
        Removes a user from a user group. If user is not in given group success will
 
        be `false`. This command can be executed only
 
        using api_key belonging to user with admin rights or an admin of given user group
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param usergroupid:
 
        :param userid:
 

	
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "success":  True|False,  # depends on if member is in group
 
                      "msg": "removed member <username> from user group <groupname> |
 
                              User wasn't in group"
 
                    }
 
            error:  null
 

	
 
        """
 
        user = get_user_or_error(userid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this user group !
 
            _perms = ('usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
                    user=apiuser, user_group_name=user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        try:
 
            success = UserGroupModel().remove_user_from_group(user_group, user)
 
            msg = 'removed member `%s` from user group `%s`' % (
 
                user.username, user_group.users_group_name
 
            )
 
            msg = msg if success else "User wasn't in group"
 
            Session().commit()
 
            return dict(success=success, msg=msg)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to remove member from user group `%s`' % (
 
                    user_group.users_group_name,
 
                )
 
            )
 

	
 
    # permission check inside
 
    def get_repo(self, apiuser, repoid):
 
        """
 
        Gets an existing repository by it's name or repository_id. Members will return
 
        either users_group or user associated to that repository. This command can be
 
        executed only using api_key belonging to user with admin
 
        rights or regular user that have at least read access to repository.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            {
 
                "repo_id" :          "<repo_id>",
 
                "repo_name" :        "<reponame>"
 
                "repo_type" :        "<repo_type>",
 
                "clone_uri" :        "<clone_uri>",
 
                "enable_downloads":  "<bool>",
 
                "enable_locking":    "<bool>",
 
                "enable_statistics": "<bool>",
 
                "private":           "<bool>",
 
                "created_on" :       "<date_time_created>",
 
                "description" :      "<description>",
 
                "landing_rev":       "<landing_rev>",
 
                "last_changeset":    {
 
                                       "author":   "<full_author>",
 
                                       "date":     "<date_time_of_commit>",
 
                                       "message":  "<commit_message>",
 
                                       "raw_id":   "<raw_id>",
 
                                       "revision": "<numeric_revision>",
 
                                       "short_id": "<short_id>"
 
                                     }
 
                "owner":             "<repo_owner>",
 
                "fork_of":           "<name_of_fork_parent>",
 
                "members" :     [
 
                                  {
 
                                    "name":     "<username>",
 
                                    "type" :    "user",
 
                                    "permission" : "repository.(read|write|admin)"
 
                                  },
 
                                  …
 
                                  {
 
                                    "name":     "<usergroup name>",
 
                                    "type" :    "user_group",
 
                                    "permission" : "usergroup.(read|write|admin)"
 
                                  },
 
                                  …
 
                                ]
 
                 "followers":   [<user_obj>, ...]
 
                 ]
 
            }
 
          }
 
          error :  null
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 

	
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this repo !
 
            perms = ('repository.admin', 'repository.write', 'repository.read')
 
            if not HasRepoPermissionAnyApi(*perms)(user=apiuser, repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        members = []
 
        followers = []
 
        for user in repo.repo_to_perm:
 
            perm = user.permission.permission_name
 
            user = user.user
 
            user_data = {
 
                'name': user.username,
 
                'type': "user",
 
                'permission': perm
 
            }
 
            members.append(user_data)
 

	
 
        for user_group in repo.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group = user_group.users_group
 
            user_group_data = {
 
                'name': user_group.users_group_name,
 
                'type': "user_group",
 
                'permission': perm
 
            }
 
            members.append(user_group_data)
 

	
 
        for user in repo.followers:
 
            followers.append(user.user.get_api_data())
 

	
 
        data = repo.get_api_data()
 
        data['members'] = members
 
        data['followers'] = followers
 
        return data
 

	
 
    # permission check inside
 
    def get_repos(self, apiuser):
 
        """
 
        Lists all existing repositories. This command can be executed only using
 
        api_key belonging to user with admin rights or regular user that have
 
        admin, write or read access to repository.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: [
 
                      {
 
                        "repo_id" :          "<repo_id>",
 
                        "repo_name" :        "<reponame>"
 
                        "repo_type" :        "<repo_type>",
 
                        "clone_uri" :        "<clone_uri>",
 
                        "private": :         "<bool>",
 
                        "created_on" :       "<datetimecreated>",
 
                        "description" :      "<description>",
 
                        "landing_rev":       "<landing_rev>",
 
                        "owner":             "<repo_owner>",
 
                        "fork_of":           "<name_of_fork_parent>",
 
                        "enable_downloads":  "<bool>",
 
                        "enable_locking":    "<bool>",
 
                        "enable_statistics": "<bool>",
 
                      },
 
                      …
 
                    ]
 
            error:  null
 
        """
 
        result = []
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            repos = RepoModel().get_all_user_repos(user=apiuser)
 
        else:
 
            repos = RepoModel().get_all()
 

	
 
        for repo in repos:
 
            result.append(repo.get_api_data())
 
        return result
 

	
 
    # permission check inside
 
    def get_repo_nodes(self, apiuser, repoid, revision, root_path,
 
                       ret_type=Optional('all')):
 
        """
 
        returns a list of nodes and it's children in a flat list for a given path
 
        at given revision. It's possible to specify ret_type to show only `files` or
 
        `dirs`.  This command can be executed only using api_key belonging to
 
        user with admin rights or regular user that have at least read access to repository.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param revision: revision for which listing should be done
 
        :type revision: str
 
        :param root_path: path from which start displaying
 
        :type root_path: str
 
        :param ret_type: return type 'all|files|dirs' nodes
 
        :type ret_type: Optional(str)
 

	
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: [
 
                      {
 
                        "name" :        "<name>"
 
                        "type" :        "<type>",
 
                      },
 
                      …
 
                    ]
 
            error:  null
 
        """
 
        repo = get_repo_or_error(repoid)
 

	
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this repo !
 
            perms = ('repository.admin', 'repository.write', 'repository.read')
 
            if not HasRepoPermissionAnyApi(*perms)(user=apiuser, repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        ret_type = Optional.extract(ret_type)
 
        _map = {}
 
        try:
 
            _d, _f = ScmModel().get_nodes(repo, revision, root_path,
 
                                          flat=False)
 
            _map = {
 
                'all': _d + _f,
 
                'files': _f,
 
                'dirs': _d,
 
            }
 
            return _map[ret_type]
 
        except KeyError:
 
            raise JSONRPCError('ret_type must be one of %s'
 
                               % (','.join(_map.keys())))
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to get repo: `%s` nodes' % repo.repo_name
 
            )
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
 
    def create_repo(self, apiuser, repo_name, owner=Optional(OAttr('apiuser')),
 
                    repo_type=Optional('hg'), description=Optional(''),
 
                    private=Optional(False), clone_uri=Optional(None),
 
                    landing_rev=Optional('rev:tip'),
 
                    enable_statistics=Optional(False),
 
                    enable_locking=Optional(False),
 
                    enable_downloads=Optional(False),
 
                    copy_permissions=Optional(False)):
 
        """
 
        Creates a repository. If repository name contains "/", all needed repository
 
        groups will be created. For example "foo/bar/baz" will create groups
 
        "foo", "bar" (with "foo" as parent), and create "baz" repository with
 
        "bar" as group. This command can be executed only using api_key
 
        belonging to user with admin rights or regular user that have create
 
        repository permission. Regular users cannot specify owner parameter
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repo_name: repository name
 
        :type repo_name: str
 
        :param owner: user_id or username
 
        :type owner: Optional(str)
 
        :param repo_type: 'hg' or 'git'
 
        :type repo_type: Optional(str)
 
        :param description: repository description
 
        :type description: Optional(str)
 
        :param private:
 
        :type private: bool
 
        :param clone_uri:
 
        :type clone_uri: str
 
        :param landing_rev: <rev_type>:<rev>
 
        :type landing_rev: str
 
        :param enable_locking:
 
        :type enable_locking: bool
 
        :param enable_downloads:
 
        :type enable_downloads: bool
 
        :param enable_statistics:
 
        :type enable_statistics: bool
 
        :param copy_permissions: Copy permission from group that repository is
 
            being created.
 
        :type copy_permissions: bool
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg": "Created new repository `<reponame>`",
 
                      "success": true,
 
                      "task": "<celery task id or None if done sync>"
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
             'failed to create repository `<repo_name>`
 
          }
 

	
 
        """
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            if not isinstance(owner, Optional):
 
                #forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
 
        if isinstance(owner, Optional):
 
            owner = apiuser.user_id
 

	
 
        owner = get_user_or_error(owner)
 

	
 
        if RepoModel().get_by_repo_name(repo_name):
 
            raise JSONRPCError("repo `%s` already exist" % repo_name)
 

	
 
        defs = Setting.get_default_repo_settings(strip_prefix=True)
 
        if isinstance(private, Optional):
 
            private = defs.get('repo_private') or Optional.extract(private)
 
        if isinstance(repo_type, Optional):
 
            repo_type = defs.get('repo_type')
 
        if isinstance(enable_statistics, Optional):
 
            enable_statistics = defs.get('repo_enable_statistics')
 
        if isinstance(enable_locking, Optional):
 
            enable_locking = defs.get('repo_enable_locking')
 
        if isinstance(enable_downloads, Optional):
 
            enable_downloads = defs.get('repo_enable_downloads')
 

	
 
        clone_uri = Optional.extract(clone_uri)
 
        description = Optional.extract(description)
 
        landing_rev = Optional.extract(landing_rev)
 
        copy_permissions = Optional.extract(copy_permissions)
 

	
 
        try:
 
            repo_name_cleaned = repo_name.split('/')[-1]
 
            # create structure of groups and return the last group
 
            repo_group = map_groups(repo_name)
 
            data = dict(
 
                repo_name=repo_name_cleaned,
 
                repo_name_full=repo_name,
 
                repo_type=repo_type,
 
                repo_description=description,
 
                owner=owner,
 
                repo_private=private,
 
                clone_uri=clone_uri,
 
                repo_group=repo_group,
 
                repo_landing_rev=landing_rev,
 
                enable_statistics=enable_statistics,
 
                enable_locking=enable_locking,
 
                enable_downloads=enable_downloads,
 
                repo_copy_permissions=copy_permissions,
 
            )
 

	
 
            task = RepoModel().create(form_data=data, cur_user=owner)
 
            from celery.result import BaseAsyncResult
 
            task_id = None
 
            if isinstance(task, BaseAsyncResult):
 
                task_id = task.task_id
 
            # no commit, it's done in RepoModel, or async via celery
 
            return dict(
 
                msg="Created new repository `%s`" % (repo_name,),
 
                success=True,  # cannot return the repo data here since fork
 
                               # can be done async
 
                task=task_id
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to create repository `%s`' % (repo_name,))
 

	
 
    # permission check inside
 
    def update_repo(self, apiuser, repoid, name=Optional(None),
 
                    owner=Optional(OAttr('apiuser')),
 
                    group=Optional(None),
 
                    description=Optional(''), private=Optional(False),
 
                    clone_uri=Optional(None), landing_rev=Optional('rev:tip'),
 
                    enable_statistics=Optional(False),
 
                    enable_locking=Optional(False),
 
                    enable_downloads=Optional(False)):
 

	
 
        """
 
        Updates repo
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param name:
 
        :param owner:
 
        :param group:
 
        :param description:
 
        :param private:
 
        :param clone_uri:
 
        :param landing_rev:
 
        :param enable_statistics:
 
        :param enable_locking:
 
        :param enable_downloads:
 
        """
 
        repo = get_repo_or_error(repoid)
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this repo !
 
            if not HasRepoPermissionAnyApi('repository.admin')(user=apiuser,
 
                                                               repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
            if (name != repo.repo_name and
 
                not HasPermissionAnyApi('hg.create.repository')(user=apiuser)
 
                ):
 
                raise JSONRPCError('no permission to create (or move) repositories')
 

	
 
            if not isinstance(owner, Optional):
 
                #forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
 

	
 
        updates = {
 
            # update function requires this.
 
            'repo_name': repo.repo_name
 
        }
 
        updates = {}
 
        repo_group = group
 
        if not isinstance(repo_group, Optional):
 
            repo_group = get_repo_group_or_error(repo_group)
 
            repo_group = repo_group.group_id
 
        try:
 
            store_update(updates, name, 'repo_name')
 
            store_update(updates, repo_group, 'repo_group')
 
            store_update(updates, owner, 'user')
 
            store_update(updates, description, 'repo_description')
 
            store_update(updates, private, 'repo_private')
 
            store_update(updates, clone_uri, 'clone_uri')
 
            store_update(updates, landing_rev, 'repo_landing_rev')
 
            store_update(updates, enable_statistics, 'repo_enable_statistics')
 
            store_update(updates, enable_locking, 'repo_enable_locking')
 
            store_update(updates, enable_downloads, 'repo_enable_downloads')
 

	
 
            RepoModel().update(repo, **updates)
 
            Session().commit()
 
            return dict(
 
                msg='updated repo ID:%s %s' % (repo.repo_id, repo.repo_name),
 
                repository=repo.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to update repo `%s`' % repoid)
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
 
    def fork_repo(self, apiuser, repoid, fork_name,
 
                  owner=Optional(OAttr('apiuser')),
 
                  description=Optional(''), copy_permissions=Optional(False),
 
                  private=Optional(False), landing_rev=Optional('rev:tip')):
 
        """
 
        Creates a fork of given repo. In case of using celery this will
 
        immediately return success message, while fork is going to be created
 
        asynchronous. This command can be executed only using api_key belonging to
 
        user with admin rights or regular user that have fork permission, and at least
 
        read access to forking repository. Regular users cannot specify owner parameter.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param fork_name:
 
        :param owner:
 
        :param description:
 
        :param copy_permissions:
 
        :param private:
 
        :param landing_rev:
 

	
 
        INPUT::
 

	
 
            id : <id_for_response>
 
            api_key : "<api_key>"
 
            args:     {
 
                        "repoid" :          "<reponame or repo_id>",
 
                        "fork_name":        "<forkname>",
 
                        "owner":            "<username or user_id = Optional(=apiuser)>",
 
                        "description":      "<description>",
 
                        "copy_permissions": "<bool>",
 
                        "private":          "<bool>",
 
                        "landing_rev":      "<landing_rev>"
 
                      }
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg": "Created fork of `<reponame>` as `<forkname>`",
 
                      "success": true,
 
                      "task": "<celery task id or None if done sync>"
 
                    }
 
            error:  null
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 
        repo_name = repo.repo_name
 

	
 
        _repo = RepoModel().get_by_repo_name(fork_name)
 
        if _repo:
 
            type_ = 'fork' if _repo.fork else 'repo'
 
            raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
 

	
 
        if HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            pass
 
        elif HasRepoPermissionAnyApi('repository.admin',
 
                                     'repository.write',
 
                                     'repository.read')(user=apiuser,
 
                                                        repo_name=repo.repo_name):
 
            if not isinstance(owner, Optional):
 
                #forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
 

	
 
            if not HasPermissionAnyApi('hg.create.repository')(user=apiuser):
 
                raise JSONRPCError('no permission to create repositories')
 
        else:
 
            raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        if isinstance(owner, Optional):
 
            owner = apiuser.user_id
 

	
 
        owner = get_user_or_error(owner)
 

	
 
        try:
 
            # create structure of groups and return the last group
 
            group = map_groups(fork_name)
 

	
 
            form_data = dict(
 
                repo_name=fork_name,
 
                repo_name_full=fork_name,
 
                repo_group=group,
 
                repo_type=repo.repo_type,
 
                description=Optional.extract(description),
 
                private=Optional.extract(private),
 
                copy_permissions=Optional.extract(copy_permissions),
 
                landing_rev=Optional.extract(landing_rev),
 
                update_after_clone=False,
 
                fork_parent_id=repo.repo_id,
 
            )
 
            task = RepoModel().create_fork(form_data, cur_user=owner)
 
            # no commit, it's done in RepoModel, or async via celery
 
            from celery.result import BaseAsyncResult
 
            task_id = None
 
            if isinstance(task, BaseAsyncResult):
 
                task_id = task.task_id
 
            return dict(
 
                msg='Created fork of `%s` as `%s`' % (repo.repo_name,
 
                                                      fork_name),
 
                success=True,  # cannot return the repo data here since fork
 
                               # can be done async
 
                task=task_id
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to fork repository `%s` as `%s`' % (repo_name,
 
                                                            fork_name)
 
            )
 

	
 
    # permission check inside
 
    def delete_repo(self, apiuser, repoid, forks=Optional('')):
 
        """
 
        Deletes a repository. This command can be executed only using api_key belonging
 
        to user with admin rights or regular user that have admin access to repository.
 
        When `forks` param is set it's possible to detach or delete forks of deleting
 
        repository
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param forks: `detach` or `delete`, what do do with attached forks for repo
 
        :type forks: Optional(str)
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg": "Deleted repository `<reponame>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 

	
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this repo !
 
            if not HasRepoPermissionAnyApi('repository.admin')(user=apiuser,
 
                                                           repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        try:
 
            handle_forks = Optional.extract(forks)
 
            _forks_msg = ''
 
            _forks = [f for f in repo.forks]
 
            if handle_forks == 'detach':
 
                _forks_msg = ' ' + 'Detached %s forks' % len(_forks)
 
            elif handle_forks == 'delete':
 
                _forks_msg = ' ' + 'Deleted %s forks' % len(_forks)
 
            elif _forks:
 
                raise JSONRPCError(
 
                    'Cannot delete `%s` it still contains attached forks' %
 
                    (repo.repo_name,)
 
                )
 

	
 
            RepoModel().delete(repo, forks=forks)
 
            Session().commit()
 
            return dict(
 
                msg='Deleted repository `%s`%s' % (repo.repo_name, _forks_msg),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to delete repository `%s`' % (repo.repo_name,)
 
            )
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def grant_user_permission(self, apiuser, repoid, userid, perm):
 
        """
 
        Grant permission for user on given repository, or update existing one
 
        if found. This command can be executed only using api_key belonging to user
 
        with admin rights.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param userid:
 
        :param perm: (repository.(none|read|write|admin))
 
        :type perm: str
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Granted perm: `<perm>` for user: `<username>` in repo: `<reponame>`",
 
                      "success": true
 
                    }
 
            error:  null
 
        """
 
        repo = get_repo_or_error(repoid)
 
        user = get_user_or_error(userid)
 
        perm = get_perm_or_error(perm)
 

	
 
        try:
 

	
 
            RepoModel().grant_user_permission(repo=repo, user=user, perm=perm)
 

	
 
            Session().commit()
 
            return dict(
 
                msg='Granted perm: `%s` for user: `%s` in repo: `%s`' % (
 
                    perm.permission_name, user.username, repo.repo_name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user: `%s` in repo: `%s`' % (
 
                    userid, repoid
 
                )
 
            )
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def revoke_user_permission(self, apiuser, repoid, userid):
 
        """
 
        Revoke permission for user on given repository. This command can be executed
 
        only using api_key belonging to user with admin rights.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param userid:
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Revoked perm for user: `<username>` in repo: `<reponame>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        """
 

	
 
        repo = get_repo_or_error(repoid)
 
        user = get_user_or_error(userid)
 
        try:
 
            RepoModel().revoke_user_permission(repo=repo, user=user)
 
            Session().commit()
 
            return dict(
 
                msg='Revoked perm for user: `%s` in repo: `%s`' % (
 
                    user.username, repo.repo_name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user: `%s` in repo: `%s`' % (
 
                    userid, repoid
 
                )
 
            )
 

	
 
    # permission check inside
 
    def grant_user_group_permission(self, apiuser, repoid, usergroupid, perm):
 
        """
 
        Grant permission for user group on given repository, or update
 
        existing one if found. This command can be executed only using
 
        api_key belonging to user with admin rights.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param usergroupid: id of usergroup
 
        :type usergroupid: str or int
 
        :param perm: (repository.(none|read|write|admin))
 
        :type perm: str
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "msg" : "Granted perm: `<perm>` for group: `<usersgroupname>` in repo: `<reponame>`",
 
            "success": true
 

	
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to edit permission for user group: `<usergroup>` in repo `<repo>`'
 
          }
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 
        perm = get_perm_or_error(perm)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this repo !
 
            _perms = ('repository.admin',)
 
            if not HasRepoPermissionAnyApi(*_perms)(
 
                    user=apiuser, repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
            # check if we have at least read permission for this user group !
 
            _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
                    user=apiuser, user_group_name=user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        try:
 
            RepoModel().grant_user_group_permission(
 
                repo=repo, group_name=user_group, perm=perm)
 

	
 
            Session().commit()
 
            return dict(
 
                msg='Granted perm: `%s` for user group: `%s` in '
 
                    'repo: `%s`' % (
 
                        perm.permission_name, user_group.users_group_name,
 
                        repo.repo_name
 
                    ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user group: `%s` in '
 
                'repo: `%s`' % (
 
                    usergroupid, repo.repo_name
 
                )
 
            )
 

	
 
    # permission check inside
 
    def revoke_user_group_permission(self, apiuser, repoid, usergroupid):
 
        """
 
        Revoke permission for user group on given repository. This command can be
 
        executed only using api_key belonging to user with admin rights.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param usergroupid:
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Revoked perm for group: `<usersgroupname>` in repo: `<reponame>`",
 
                      "success": true
 
                    }
 
            error:  null
 
        """
 
        repo = get_repo_or_error(repoid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this repo !
 
            _perms = ('repository.admin',)
 
            if not HasRepoPermissionAnyApi(*_perms)(
 
                    user=apiuser, repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
            # check if we have at least read permission for this user group !
 
            _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
                    user=apiuser, user_group_name=user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        try:
 
            RepoModel().revoke_user_group_permission(
 
                repo=repo, group_name=user_group)
 

	
 
            Session().commit()
 
            return dict(
 
                msg='Revoked perm for user group: `%s` in repo: `%s`' % (
 
                    user_group.users_group_name, repo.repo_name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user group: `%s` in '
 
                'repo: `%s`' % (
 
                    user_group.users_group_name, repo.repo_name
 
                )
 
            )
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def get_repo_group(self, apiuser, repogroupid):
 
        """
 
        Returns given repo group together with permissions, and repositories
 
        inside the group
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repogroupid: id/name of repository group
 
        :type repogroupid: str or int
 
        """
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        members = []
 
        for user in repo_group.repo_group_to_perm:
 
            perm = user.permission.permission_name
 
            user = user.user
 
            user_data = {
 
                'name': user.username,
 
                'type': "user",
 
                'permission': perm
 
            }
 
            members.append(user_data)
 

	
 
        for user_group in repo_group.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group = user_group.users_group
 
            user_group_data = {
 
                'name': user_group.users_group_name,
 
                'type': "user_group",
 
                'permission': perm
 
            }
 
            members.append(user_group_data)
 

	
 
        data = repo_group.get_api_data()
 
        data["members"] = members
 
        return data
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def get_repo_groups(self, apiuser):
 
        """
 
        Returns all repository groups
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        """
 
        result = []
 
        for repo_group in RepoGroupModel().get_all():
 
            result.append(repo_group.get_api_data())
 
        return result
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def create_repo_group(self, apiuser, group_name, description=Optional(''),
 
                          owner=Optional(OAttr('apiuser')),
 
                          parent=Optional(None),
 
                          copy_permissions=Optional(False)):
 
        """
 
        Creates a repository group. This command can be executed only using
 
        api_key belonging to user with admin rights.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param group_name:
 
        :type group_name:
 
        :param description:
 
        :type description:
 
        :param owner:
 
        :type owner:
 
        :param parent:
 
        :type parent:
 
        :param copy_permissions:
 
        :type copy_permissions:
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
              "msg": "created new repo group `<repo_group_name>`"
 
              "repo_group": <repogroup_object>
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            failed to create repo group `<repogroupid>`
 
          }
 

	
 
        """
 
        if RepoGroup.get_by_group_name(group_name):
 
            raise JSONRPCError("repo group `%s` already exist" % (group_name,))
 

	
 
        if isinstance(owner, Optional):
 
            owner = apiuser.user_id
 
        group_description = Optional.extract(description)
 
        parent_group = Optional.extract(parent)
 
        if not isinstance(parent, Optional):
 
            parent_group = get_repo_group_or_error(parent_group)
 

	
 
        copy_permissions = Optional.extract(copy_permissions)
 
        try:
 
            repo_group = RepoGroupModel().create(
 
                group_name=group_name,
 
                group_description=group_description,
 
                owner=owner,
 
                parent=parent_group,
 
                copy_permissions=copy_permissions
 
            )
 
            Session().commit()
 
            return dict(
 
                msg='created new repo group `%s`' % group_name,
 
                repo_group=repo_group.get_api_data()
 
            )
 
        except Exception:
 

	
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to create repo group `%s`' % (group_name,))
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def update_repo_group(self, apiuser, repogroupid, group_name=Optional(''),
 
                          description=Optional(''),
 
                          owner=Optional(OAttr('apiuser')),
 
                          parent=Optional(None), enable_locking=Optional(False)):
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        updates = {}
 
        try:
 
            store_update(updates, group_name, 'group_name')
 
            store_update(updates, description, 'group_description')
 
            store_update(updates, owner, 'owner')
 
            store_update(updates, parent, 'parent_group')
 
            store_update(updates, enable_locking, 'enable_locking')
 
            repo_group = RepoGroupModel().update(repo_group, updates)
 
            Session().commit()
 
            return dict(
 
                msg='updated repository group ID:%s %s' % (repo_group.group_id,
 
                                                           repo_group.group_name),
 
                repo_group=repo_group.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to update repository group `%s`'
 
                               % (repogroupid,))
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def delete_repo_group(self, apiuser, repogroupid):
 
        """
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            'msg': 'deleted repo group ID:<repogroupid> <repogroupname>
 
            'repo_group': null
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to delete repo group ID:<repogroupid> <repogroupname>"
 
          }
 

	
 
        """
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        try:
 
            RepoGroupModel().delete(repo_group)
 
            Session().commit()
 
            return dict(
 
                msg='deleted repo group ID:%s %s' %
 
                    (repo_group.group_id, repo_group.group_name),
 
                repo_group=None
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to delete repo group ID:%s %s' %
 
                               (repo_group.group_id, repo_group.group_name)
 
            )
 

	
 
    # permission check inside
 
    def grant_user_permission_to_repo_group(self, apiuser, repogroupid, userid,
 
                                            perm, apply_to_children=Optional('none')):
 
        """
 
        Grant permission for user on given repository group, or update existing
 
        one if found. This command can be executed only using api_key belonging
 
        to user with admin rights, or user who has admin right to given repository
 
        group.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 
        :param userid:
 
        :param perm: (group.(none|read|write|admin))
 
        :type perm: str
 
        :param apply_to_children: 'none', 'repos', 'groups', 'all'
 
        :type apply_to_children: str
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Granted perm: `<perm>` (recursive:<apply_to_children>) for user: `<username>` in repo group: `<repo_group_name>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to edit permission for user: `<userid>` in repo group: `<repo_group_name>`"
 
          }
 

	
 
        """
 

	
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this repo group !
 
            if not HasRepoGroupPermissionAnyApi('group.admin')(user=apiuser,
 
                                                               group_name=repo_group.group_name):
 
                raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
 

	
 
        user = get_user_or_error(userid)
 
        perm = get_perm_or_error(perm, prefix='group.')
 
        apply_to_children = Optional.extract(apply_to_children)
 

	
 
        try:
 
            RepoGroupModel().add_permission(repo_group=repo_group,
 
                                            obj=user,
 
                                            obj_type="user",
 
                                            perm=perm,
 
                                            recursive=apply_to_children)
 
            Session().commit()
 
            return dict(
 
                msg='Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
 
                    perm.permission_name, apply_to_children, user.username, repo_group.name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user: `%s` in repo group: `%s`' % (
 
                    userid, repo_group.name))
 

	
 
    # permission check inside
 
    def revoke_user_permission_from_repo_group(self, apiuser, repogroupid, userid,
 
                                               apply_to_children=Optional('none')):
 
        """
 
        Revoke permission for user on given repository group. This command can
 
        be executed only using api_key belonging to user with admin rights, or
 
        user who has admin right to given repository group.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 
        :param userid:
 
        :type userid:
 
        :param apply_to_children: 'none', 'repos', 'groups', 'all'
 
        :type apply_to_children: str
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Revoked perm (recursive:<apply_to_children>) for user: `<username>` in repo group: `<repo_group_name>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to edit permission for user: `<userid>` in repo group: `<repo_group_name>`"
 
          }
 

	
 
        """
 

	
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this repo group !
 
            if not HasRepoGroupPermissionAnyApi('group.admin')(user=apiuser,
 
                                                               group_name=repo_group.group_name):
 
                raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
 

	
 
        user = get_user_or_error(userid)
 
        apply_to_children = Optional.extract(apply_to_children)
 

	
 
        try:
 
            RepoGroupModel().delete_permission(repo_group=repo_group,
 
                                               obj=user,
 
                                               obj_type="user",
 
                                               recursive=apply_to_children)
 

	
 
            Session().commit()
 
            return dict(
 
                msg='Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % (
 
                    apply_to_children, user.username, repo_group.name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user: `%s` in repo group: `%s`' % (
 
                    userid, repo_group.name))
 

	
 
    # permission check inside
 
    def grant_user_group_permission_to_repo_group(
 
            self, apiuser, repogroupid, usergroupid, perm,
 
            apply_to_children=Optional('none'),):
 
        """
 
        Grant permission for user group on given repository group, or update
 
        existing one if found. This command can be executed only using
 
        api_key belonging to user with admin rights, or user who has admin
 
        right to given repository group.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 
        :param usergroupid: id of usergroup
 
        :type usergroupid: str or int
 
        :param perm: (group.(none|read|write|admin))
 
        :type perm: str
 
        :param apply_to_children: 'none', 'repos', 'groups', 'all'
 
        :type apply_to_children: str
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "msg" : "Granted perm: `<perm>` (recursive:<apply_to_children>) for user group: `<usersgroupname>` in repo group: `<repo_group_name>`",
 
            "success": true
kallithea/model/repo.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.repo
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
Repository model for kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 5, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
import os
 
import shutil
 
import logging
 
import traceback
 
from datetime import datetime
 
from sqlalchemy.orm import subqueryload
 

	
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.vcs.backends import get_backend
 
from kallithea.lib.compat import json
 
from kallithea.lib.utils2 import LazyProperty, safe_str, safe_unicode, \
 
    remove_prefix, obfuscate_url_pw, get_current_authuser
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.lib.hooks import log_delete_repository
 

	
 
from kallithea.model import BaseModel
 
from kallithea.model.db import Repository, UserRepoToPerm, UserGroupRepoToPerm, \
 
    UserRepoGroupToPerm, UserGroupRepoGroupToPerm, User, Permission, \
 
    Statistics, UserGroup, Ui, RepoGroup, RepositoryField
 

	
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionAny, HasUserGroupPermissionAny
 
from kallithea.lib.exceptions import AttachedForksError
 
from kallithea.model.scm import UserGroupList
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class RepoModel(BaseModel):
 

	
 
    cls = Repository
 
    URL_SEPARATOR = Repository.url_sep()
 

	
 
    def _get_user_group(self, users_group):
 
        return self._get_instance(UserGroup, users_group,
 
                                  callback=UserGroup.get_by_group_name)
 

	
 
    def _get_repo_group(self, repo_group):
 
        return self._get_instance(RepoGroup, repo_group,
 
                                  callback=RepoGroup.get_by_group_name)
 

	
 
    def _create_default_perms(self, repository, private):
 
        # create default permission
 
        default = 'repository.read'
 
        def_user = User.get_default_user()
 
        for p in def_user.user_perms:
 
            if p.permission.permission_name.startswith('repository.'):
 
                default = p.permission.permission_name
 
                break
 

	
 
        default_perm = 'repository.none' if private else default
 

	
 
        repo_to_perm = UserRepoToPerm()
 
        repo_to_perm.permission = Permission.get_by_key(default_perm)
 

	
 
        repo_to_perm.repository = repository
 
        repo_to_perm.user_id = def_user.user_id
 

	
 
        return repo_to_perm
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Gets the repositories root path from database
 
        """
 

	
 
        q = self.sa.query(Ui).filter(Ui.ui_key == '/').one()
 
        return q.ui_value
 

	
 
    def get(self, repo_id, cache=False):
 
        repo = self.sa.query(Repository) \
 
            .filter(Repository.repo_id == repo_id)
 

	
 
        if cache:
 
            repo = repo.options(FromCache("sql_cache_short",
 
                                          "get_repo_%s" % repo_id))
 
        return repo.scalar()
 

	
 
    def get_repo(self, repository):
 
        return self._get_repo(repository)
 

	
 
    def get_by_repo_name(self, repo_name, cache=False):
 
        repo = self.sa.query(Repository) \
 
            .filter(Repository.repo_name == repo_name)
 

	
 
        if cache:
 
            repo = repo.options(FromCache("sql_cache_short",
 
                                          "get_repo_%s" % repo_name))
 
        return repo.scalar()
 

	
 
    def get_all_user_repos(self, user):
 
        """
 
        Gets all repositories that user have at least read access
 

	
 
        :param user:
 
        """
 
        from kallithea.lib.auth import AuthUser
 
        user = self._get_user(user)
 
        repos = AuthUser(user_id=user.user_id).permissions['repositories']
 
        access_check = lambda r: r[1] in ['repository.read',
 
                                          'repository.write',
 
                                          'repository.admin']
 
        repos = [x[0] for x in filter(access_check, repos.items())]
 
        return Repository.query().filter(Repository.repo_name.in_(repos))
 

	
 
    def get_users_js(self):
 
        users = self.sa.query(User).filter(User.active == True).all()
 
        return json.dumps([
 
            {
 
                'id': u.user_id,
 
                'fname': h.escape(u.name),
 
                'lname': h.escape(u.lastname),
 
                'nname': u.username,
 
                'gravatar_lnk': h.gravatar_url(u.email, size=28),
 
                'gravatar_size': 14,
 
            } for u in users]
 
        )
 

	
 
    def get_user_groups_js(self):
 
        user_groups = self.sa.query(UserGroup) \
 
            .filter(UserGroup.users_group_active == True) \
 
            .options(subqueryload(UserGroup.members)) \
 
            .all()
 
        user_groups = UserGroupList(user_groups, perm_set=['usergroup.read',
 
                                                           'usergroup.write',
 
                                                           'usergroup.admin'])
 
        return json.dumps([
 
            {
 
                'id': gr.users_group_id,
 
                'grname': gr.users_group_name,
 
                'grmembers': len(gr.members),
 
            } for gr in user_groups]
 
        )
 

	
 
    @classmethod
 
    def _render_datatable(cls, tmpl, *args, **kwargs):
 
        import kallithea
 
        from pylons import tmpl_context as c
 
        from pylons.i18n.translation import _
 

	
 
        _tmpl_lookup = kallithea.CONFIG['pylons.app_globals'].mako_lookup
 
        template = _tmpl_lookup.get_template('data_table/_dt_elements.html')
 

	
 
        tmpl = template.get_def(tmpl)
 
        kwargs.update(dict(_=_, h=h, c=c))
 
        return tmpl.render(*args, **kwargs)
 

	
 
    @classmethod
 
    def update_repoinfo(cls, repositories=None):
 
        if repositories is None:
 
            repositories = Repository.getAll()
 
        for repo in repositories:
 
            repo.update_changeset_cache()
 

	
 
    def get_repos_as_dict(self, repos_list=None, admin=False, perm_check=True,
 
                          super_user_actions=False):
 
        _render = self._render_datatable
 
        from pylons import tmpl_context as c
 

	
 
        def quick_menu(repo_name):
 
            return _render('quick_menu', repo_name)
 

	
 
        def repo_lnk(name, rtype, rstate, private, fork_of):
 
            return _render('repo_name', name, rtype, rstate, private, fork_of,
 
                           short_name=not admin, admin=False)
 

	
 
        def last_change(last_change):
 
            return _render("last_change", last_change)
 

	
 
        def rss_lnk(repo_name):
 
            return _render("rss", repo_name)
 

	
 
        def atom_lnk(repo_name):
 
            return _render("atom", repo_name)
 

	
 
        def last_rev(repo_name, cs_cache):
 
            return _render('revision', repo_name, cs_cache.get('revision'),
 
                           cs_cache.get('raw_id'), cs_cache.get('author'),
 
                           cs_cache.get('message'))
 

	
 
        def desc(desc):
 
            return h.urlify_text(desc, truncate=60, stylize=c.visual.stylify_metatags)
 

	
 
        def state(repo_state):
 
            return _render("repo_state", repo_state)
 

	
 
        def repo_actions(repo_name):
 
            return _render('repo_actions', repo_name, super_user_actions)
 

	
 
        def owner_actions(user_id, username):
 
            return _render('user_name', user_id, username)
 

	
 
        repos_data = []
 
        for repo in repos_list:
 
            if perm_check:
 
                # check permission at this level
 
                if not HasRepoPermissionAny(
 
                        'repository.read', 'repository.write',
 
                        'repository.admin'
 
                )(repo.repo_name, 'get_repos_as_dict check'):
 
                    continue
 
            cs_cache = repo.changeset_cache
 
            row = {
 
                "menu": quick_menu(repo.repo_name),
 
                "raw_name": repo.repo_name.lower(),
 
                "name": repo_lnk(repo.repo_name, repo.repo_type,
 
                                 repo.repo_state, repo.private, repo.fork),
 
                "last_change": last_change(repo.last_db_change),
 
                "last_changeset": last_rev(repo.repo_name, cs_cache),
 
                "last_rev_raw": cs_cache.get('revision'),
 
                "desc": desc(repo.description),
 
                "owner": h.person(repo.user),
 
                "state": state(repo.repo_state),
 
                "rss": rss_lnk(repo.repo_name),
 
                "atom": atom_lnk(repo.repo_name),
 

	
 
            }
 
            if admin:
 
                row.update({
 
                    "action": repo_actions(repo.repo_name),
 
                    "owner": owner_actions(repo.user.user_id,
 
                                           h.person(repo.user))
 
                })
 
            repos_data.append(row)
 

	
 
        return {
 
            "totalRecords": len(repos_list),
 
            "startIndex": 0,
 
            "sort": "name",
 
            "dir": "asc",
 
            "records": repos_data
 
        }
 

	
 
    def _get_defaults(self, repo_name):
 
        """
 
        Gets information about repository, and returns a dict for
 
        usage in forms
 

	
 
        :param repo_name:
 
        """
 

	
 
        repo_info = Repository.get_by_repo_name(repo_name)
 

	
 
        if repo_info is None:
 
            return None
 

	
 
        defaults = repo_info.get_dict()
 
        group, repo_name, repo_name_full = repo_info.groups_and_repo
 
        defaults['repo_name'] = repo_name
 
        defaults['repo_group'] = getattr(group[-1] if group else None,
 
                                         'group_id', None)
 

	
 
        for strip, k in [(0, 'repo_type'), (1, 'repo_enable_downloads'),
 
                         (1, 'repo_description'), (1, 'repo_enable_locking'),
 
                         (1, 'repo_landing_rev'), (0, 'clone_uri'),
 
                         (1, 'repo_private'), (1, 'repo_enable_statistics')]:
 
            attr = k
 
            if strip:
 
                attr = remove_prefix(k, 'repo_')
 

	
 
            val = defaults[attr]
 
            if k == 'repo_landing_rev':
 
                val = ':'.join(defaults[attr])
 
            defaults[k] = val
 
            if k == 'clone_uri':
 
                defaults['clone_uri_hidden'] = repo_info.clone_uri_hidden
 

	
 
        # fill owner
 
        if repo_info.user:
 
            defaults.update({'user': repo_info.user.username})
 
        else:
 
            replacement_user = User.query().filter(User.admin ==
 
                                                   True).first().username
 
            defaults.update({'user': replacement_user})
 

	
 
        # fill repository users
 
        for p in repo_info.repo_to_perm:
 
            defaults.update({'u_perm_%s' % p.user.username:
 
                                 p.permission.permission_name})
 

	
 
        # fill repository groups
 
        for p in repo_info.users_group_to_perm:
 
            defaults.update({'g_perm_%s' % p.users_group.users_group_name:
 
                                 p.permission.permission_name})
 

	
 
        return defaults
 

	
 
    def update(self, repo, **kwargs):
 
        try:
 
            cur_repo = self._get_repo(repo)
 
            org_repo_name = cur_repo.repo_name
 
            if 'user' in kwargs:
 
                cur_repo.user = User.get_by_username(kwargs['user'])
 

	
 
            if 'repo_group' in kwargs:
 
                cur_repo.group = RepoGroup.get(kwargs['repo_group'])
 
                cur_repo.repo_name = cur_repo.get_new_name(cur_repo.just_name)
 
            log.debug('Updating repo %s with params:%s', cur_repo, kwargs)
 
            for k in ['repo_enable_downloads',
 
                      'repo_description',
 
                      'repo_enable_locking',
 
                      'repo_landing_rev',
 
                      'repo_private',
 
                      'repo_enable_statistics',
 
                      ]:
 
                if k in kwargs:
 
                    setattr(cur_repo, remove_prefix(k, 'repo_'), kwargs[k])
 
            clone_uri = kwargs.get('clone_uri')
 
            if clone_uri is not None and clone_uri != cur_repo.clone_uri_hidden:
 
                cur_repo.clone_uri = clone_uri
 

	
 
            new_name = cur_repo.get_new_name(kwargs['repo_name'])
 
            cur_repo.repo_name = new_name
 
            if 'repo_name' in kwargs:
 
                cur_repo.repo_name = cur_repo.get_new_name(kwargs['repo_name'])
 

	
 
            #if private flag is set, reset default permission to NONE
 

	
 
            if kwargs.get('repo_private'):
 
                EMPTY_PERM = 'repository.none'
 
                RepoModel().grant_user_permission(
 
                    repo=cur_repo, user='default', perm=EMPTY_PERM
 
                )
 
                #handle extra fields
 
            for field in filter(lambda k: k.startswith(RepositoryField.PREFIX),
 
                                kwargs):
 
                k = RepositoryField.un_prefix_key(field)
 
                ex_field = RepositoryField.get_by_key_name(key=k, repo=cur_repo)
 
                if ex_field:
 
                    ex_field.field_value = kwargs[field]
 
                    self.sa.add(ex_field)
 
            self.sa.add(cur_repo)
 

	
 
            if org_repo_name != new_name:
 
            if org_repo_name != cur_repo.repo_name:
 
                # rename repository
 
                self._rename_filesystem_repo(old=org_repo_name, new=new_name)
 
                self._rename_filesystem_repo(old=org_repo_name, new=cur_repo.repo_name)
 

	
 
            return cur_repo
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def _create_repo(self, repo_name, repo_type, description, owner,
 
                     private=False, clone_uri=None, repo_group=None,
 
                     landing_rev='rev:tip', fork_of=None,
 
                     copy_fork_permissions=False, enable_statistics=False,
 
                     enable_locking=False, enable_downloads=False,
 
                     copy_group_permissions=False, state=Repository.STATE_PENDING):
 
        """
 
        Create repository inside database with PENDING state. This should only be
 
        executed by create() repo, with exception of importing existing repos.
 

	
 
        """
 
        from kallithea.model.scm import ScmModel
 

	
 
        owner = self._get_user(owner)
 
        fork_of = self._get_repo(fork_of)
 
        repo_group = self._get_repo_group(repo_group)
 
        try:
 
            repo_name = safe_unicode(repo_name)
 
            description = safe_unicode(description)
 
            # repo name is just a name of repository
 
            # while repo_name_full is a full qualified name that is combined
 
            # with name and path of group
 
            repo_name_full = repo_name
 
            repo_name = repo_name.split(self.URL_SEPARATOR)[-1]
 

	
 
            new_repo = Repository()
 
            new_repo.repo_state = state
 
            new_repo.enable_statistics = False
 
            new_repo.repo_name = repo_name_full
 
            new_repo.repo_type = repo_type
 
            new_repo.user = owner
 
            new_repo.group = repo_group
 
            new_repo.description = description or repo_name
 
            new_repo.private = private
 
            new_repo.clone_uri = clone_uri
 
            new_repo.landing_rev = landing_rev
 

	
 
            new_repo.enable_statistics = enable_statistics
 
            new_repo.enable_locking = enable_locking
 
            new_repo.enable_downloads = enable_downloads
 

	
 
            if repo_group:
 
                new_repo.enable_locking = repo_group.enable_locking
 

	
 
            if fork_of:
 
                parent_repo = fork_of
 
                new_repo.fork = parent_repo
 

	
 
            self.sa.add(new_repo)
 

	
 
            if fork_of and copy_fork_permissions:
 
                repo = fork_of
 
                user_perms = UserRepoToPerm.query() \
 
                    .filter(UserRepoToPerm.repository == repo).all()
 
                group_perms = UserGroupRepoToPerm.query() \
 
                    .filter(UserGroupRepoToPerm.repository == repo).all()
 

	
 
                for perm in user_perms:
 
                    UserRepoToPerm.create(perm.user, new_repo, perm.permission)
 

	
 
                for perm in group_perms:
 
                    UserGroupRepoToPerm.create(perm.users_group, new_repo,
 
                                               perm.permission)
 

	
 
            elif repo_group and copy_group_permissions:
 

	
 
                user_perms = UserRepoGroupToPerm.query() \
 
                    .filter(UserRepoGroupToPerm.group == repo_group).all()
 

	
 
                group_perms = UserGroupRepoGroupToPerm.query() \
 
                    .filter(UserGroupRepoGroupToPerm.group == repo_group).all()
 

	
 
                for perm in user_perms:
 
                    perm_name = perm.permission.permission_name.replace('group.', 'repository.')
 
                    perm_obj = Permission.get_by_key(perm_name)
 
                    UserRepoToPerm.create(perm.user, new_repo, perm_obj)
 

	
 
                for perm in group_perms:
 
                    perm_name = perm.permission.permission_name.replace('group.', 'repository.')
 
                    perm_obj = Permission.get_by_key(perm_name)
 
                    UserGroupRepoToPerm.create(perm.users_group, new_repo, perm_obj)
 

	
 
            else:
 
                perm_obj = self._create_default_perms(new_repo, private)
 
                self.sa.add(perm_obj)
 

	
 
            # now automatically start following this repository as owner
 
            ScmModel(self.sa).toggle_following_repo(new_repo.repo_id,
 
                                                    owner.user_id)
 
            # we need to flush here, in order to check if database won't
 
            # throw any exceptions, create filesystem dirs at the very end
 
            self.sa.flush()
 
            return new_repo
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def create(self, form_data, cur_user):
 
        """
 
        Create repository using celery tasks
 

	
 
        :param form_data:
 
        :param cur_user:
 
        """
 
        from kallithea.lib.celerylib import tasks, run_task
 
        return run_task(tasks.create_repo, form_data, cur_user)
 

	
 
    def _update_permissions(self, repo, perms_new=None, perms_updates=None,
 
                            check_perms=True):
 
        if not perms_new:
 
            perms_new = []
 
        if not perms_updates:
 
            perms_updates = []
 

	
 
        # update permissions
 
        for member, perm, member_type in perms_updates:
 
            if member_type == 'user':
 
                # this updates existing one
 
                self.grant_user_permission(
 
                    repo=repo, user=member, perm=perm
 
                )
 
            else:
 
                #check if we have permissions to alter this usergroup
 
                req_perms = (
 
                    'usergroup.read', 'usergroup.write', 'usergroup.admin')
 
                if not check_perms or HasUserGroupPermissionAny(*req_perms)(
 
                        member):
 
                    self.grant_user_group_permission(
 
                        repo=repo, group_name=member, perm=perm
 
                    )
 
            # set new permissions
 
        for member, perm, member_type in perms_new:
 
            if member_type == 'user':
 
                self.grant_user_permission(
 
                    repo=repo, user=member, perm=perm
 
                )
 
            else:
 
                #check if we have permissions to alter this usergroup
 
                req_perms = (
 
                    'usergroup.read', 'usergroup.write', 'usergroup.admin')
 
                if not check_perms or HasUserGroupPermissionAny(*req_perms)(
 
                        member):
 
                    self.grant_user_group_permission(
 
                        repo=repo, group_name=member, perm=perm
 
                    )
 

	
 
    def create_fork(self, form_data, cur_user):
 
        """
 
        Simple wrapper into executing celery task for fork creation
 

	
 
        :param form_data:
 
        :param cur_user:
 
        """
 
        from kallithea.lib.celerylib import tasks, run_task
 
        return run_task(tasks.create_repo_fork, form_data, cur_user)
 

	
 
    def delete(self, repo, forks=None, fs_remove=True, cur_user=None):
 
        """
 
        Delete given repository, forks parameter defines what do do with
 
        attached forks. Throws AttachedForksError if deleted repo has attached
 
        forks
 

	
 
        :param repo:
 
        :param forks: str 'delete' or 'detach'
 
        :param fs_remove: remove(archive) repo from filesystem
 
        """
 
        if not cur_user:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 
        repo = self._get_repo(repo)
 
        if repo is not None:
 
            if forks == 'detach':
 
                for r in repo.forks:
 
                    r.fork = None
 
                    self.sa.add(r)
 
            elif forks == 'delete':
 
                for r in repo.forks:
 
                    self.delete(r, forks='delete')
 
            elif [f for f in repo.forks]:
 
                raise AttachedForksError()
 

	
 
            old_repo_dict = repo.get_dict()
 
            try:
 
                self.sa.delete(repo)
 
                if fs_remove:
 
                    self._delete_filesystem_repo(repo)
 
                else:
 
                    log.debug('skipping removal from filesystem')
 
                log_delete_repository(old_repo_dict,
 
                                      deleted_by=cur_user)
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                raise
 

	
 
    def grant_user_permission(self, repo, user, perm):
 
        """
 
        Grant permission for user on given repository, or update existing one
 
        if found
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param user: Instance of User, user_id or username
 
        :param perm: Instance of Permission, or permission_name
 
        """
 
        user = self._get_user(user)
 
        repo = self._get_repo(repo)
 
        permission = self._get_perm(perm)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UserRepoToPerm) \
 
            .filter(UserRepoToPerm.user == user) \
 
            .filter(UserRepoToPerm.repository == repo) \
 
            .scalar()
 
        if obj is None:
 
            # create new !
 
            obj = UserRepoToPerm()
 
        obj.repository = repo
 
        obj.user = user
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s', perm, user, repo)
 
        return obj
 

	
 
    def revoke_user_permission(self, repo, user):
 
        """
 
        Revoke permission for user on given repository
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param user: Instance of User, user_id or username
 
        """
 

	
 
        user = self._get_user(user)
 
        repo = self._get_repo(repo)
 

	
 
        obj = self.sa.query(UserRepoToPerm) \
 
            .filter(UserRepoToPerm.repository == repo) \
 
            .filter(UserRepoToPerm.user == user) \
 
            .scalar()
 
        if obj is not None:
 
            self.sa.delete(obj)
 
            log.debug('Revoked perm on %s on %s', repo, user)
 

	
 
    def grant_user_group_permission(self, repo, group_name, perm):
 
        """
 
        Grant permission for user group on given repository, or update
 
        existing one if found
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or user group name
 
        :param perm: Instance of Permission, or permission_name
 
        """
 
        repo = self._get_repo(repo)
 
        group_name = self._get_user_group(group_name)
 
        permission = self._get_perm(perm)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UserGroupRepoToPerm) \
 
            .filter(UserGroupRepoToPerm.users_group == group_name) \
 
            .filter(UserGroupRepoToPerm.repository == repo) \
 
            .scalar()
 

	
 
        if obj is None:
 
            # create new
 
            obj = UserGroupRepoToPerm()
 

	
 
        obj.repository = repo
 
        obj.users_group = group_name
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s', perm, group_name, repo)
 
        return obj
 

	
 
    def revoke_user_group_permission(self, repo, group_name):
 
        """
 
        Revoke permission for user group on given repository
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or user group name
 
        """
 
        repo = self._get_repo(repo)
 
        group_name = self._get_user_group(group_name)
 

	
 
        obj = self.sa.query(UserGroupRepoToPerm) \
 
            .filter(UserGroupRepoToPerm.repository == repo) \
 
            .filter(UserGroupRepoToPerm.users_group == group_name) \
 
            .scalar()
 
        if obj is not None:
 
            self.sa.delete(obj)
 
            log.debug('Revoked perm to %s on %s', repo, group_name)
 

	
 
    def delete_stats(self, repo_name):
 
        """
 
        removes stats for given repo
 

	
 
        :param repo_name:
 
        """
 
        repo = self._get_repo(repo_name)
 
        try:
 
            obj = self.sa.query(Statistics) \
 
                .filter(Statistics.repository == repo).scalar()
 
            if obj is not None:
 
                self.sa.delete(obj)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def _create_filesystem_repo(self, repo_name, repo_type, repo_group,
 
                                clone_uri=None, repo_store_location=None):
 
        """
 
        Makes repository on filesystem. Operation is group aware, meaning that it will create
 
        a repository within a group, and alter the paths accordingly to the group location.
 

	
 
        :param repo_name:
 
        :param alias:
 
        :param parent:
 
        :param clone_uri:
 
        :param repo_store_location:
 
        """
 
        from kallithea.lib.utils import is_valid_repo, is_valid_repo_group
 
        from kallithea.model.scm import ScmModel
 

	
 
        if '/' in repo_name:
 
            raise ValueError('repo_name must not contain groups got `%s`' % repo_name)
 

	
 
        if isinstance(repo_group, RepoGroup):
 
            new_parent_path = os.sep.join(repo_group.full_path_splitted)
 
        else:
 
            new_parent_path = repo_group or ''
 

	
 
        if repo_store_location:
 
            _paths = [repo_store_location]
 
        else:
 
            _paths = [self.repos_path, new_parent_path, repo_name]
 
            # we need to make it str for mercurial
 
        repo_path = os.path.join(*map(lambda x: safe_str(x), _paths))
 

	
 
        # check if this path is not a repository
 
        if is_valid_repo(repo_path, self.repos_path):
 
            raise Exception('This path %s is a valid repository' % repo_path)
 

	
 
        # check if this path is a group
 
        if is_valid_repo_group(repo_path, self.repos_path):
 
            raise Exception('This path %s is a valid group' % repo_path)
 

	
 
        log.info('creating repo %s in %s from url: `%s`',
 
            repo_name, safe_unicode(repo_path),
 
            obfuscate_url_pw(clone_uri))
 

	
 
        backend = get_backend(repo_type)
 

	
 
        if repo_type == 'hg':
 
            baseui = make_ui('db', clear_session=False)
 
            # patch and reset hooks section of UI config to not run any
 
            # hooks on creating remote repo
 
            for k, v in baseui.configitems('hooks'):
 
                baseui.setconfig('hooks', k, None)
 

	
 
            repo = backend(repo_path, create=True, src_url=clone_uri, baseui=baseui)
 
        elif repo_type == 'git':
 
            repo = backend(repo_path, create=True, src_url=clone_uri, bare=True)
 
            # add kallithea hook into this repo
 
            ScmModel().install_git_hooks(repo=repo)
 
        else:
 
            raise Exception('Not supported repo_type %s expected hg/git' % repo_type)
 

	
 
        log.debug('Created repo %s with %s backend',
 
                  safe_unicode(repo_name), safe_unicode(repo_type))
 
        return repo
 

	
 
    def _rename_filesystem_repo(self, old, new):
 
        """
 
        renames repository on filesystem
 

	
 
        :param old: old name
 
        :param new: new name
 
        """
 
        log.info('renaming repo from %s to %s', old, new)
 

	
 
        old_path = safe_str(os.path.join(self.repos_path, old))
 
        new_path = safe_str(os.path.join(self.repos_path, new))
 
        if os.path.isdir(new_path):
 
            raise Exception(
 
                'Was trying to rename to already existing dir %s' % new_path
 
            )
 
        shutil.move(old_path, new_path)
 

	
 
    def _delete_filesystem_repo(self, repo):
 
        """
 
        removes repo from filesystem, the removal is actually done by
 
        renaming dir to a 'rm__*' prefix which Kallithea will skip.
 
        It can be undeleted later by reverting the rename.
 

	
 
        :param repo: repo object
 
        """
 
        rm_path = safe_str(os.path.join(self.repos_path, repo.repo_name))
 
        log.info("Removing %s", rm_path)
 

	
 
        _now = datetime.now()
 
        _ms = str(_now.microsecond).rjust(6, '0')
 
        _d = 'rm__%s__%s' % (_now.strftime('%Y%m%d_%H%M%S_' + _ms),
 
                             repo.just_name)
 
        if repo.group:
 
            args = repo.group.full_path_splitted + [_d]
 
            _d = os.path.join(*args)
 
        shutil.move(rm_path, safe_str(os.path.join(self.repos_path, _d)))
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -406,1536 +406,1577 @@ class _BaseTestApi(object):
 

	
 
    def test_api_lock_repo_lock_aquire_non_admin_not_his_repo(self):
 
        id_, params = _build_data(self.apikey_regular, 'lock',
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_release(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  repoid=self.REPO,
 
                                  locked=False)
 
        response = api_call(self, params)
 
        expected = {
 
            'repo': self.REPO,
 
            'locked': False,
 
            'locked_since': None,
 
            'locked_by': TEST_USER_ADMIN_LOGIN,
 
            'lock_state_changed': True,
 
            'msg': ('User `%s` set lock state for repo `%s` to `%s`'
 
                    % (TEST_USER_ADMIN_LOGIN, self.REPO, False))
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_aquire_optional_userid(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 
        time_ = response.json['result']['locked_since']
 
        expected = {
 
            'repo': self.REPO,
 
            'locked': True,
 
            'locked_since': time_,
 
            'locked_by': TEST_USER_ADMIN_LOGIN,
 
            'lock_state_changed': True,
 
            'msg': ('User `%s` set lock state for repo `%s` to `%s`'
 
                    % (TEST_USER_ADMIN_LOGIN, self.REPO, True))
 
        }
 

	
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_optional_locked(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 
        time_ = response.json['result']['locked_since']
 
        expected = {
 
            'repo': self.REPO,
 
            'locked': True,
 
            'locked_since': time_,
 
            'locked_by': TEST_USER_ADMIN_LOGIN,
 
            'lock_state_changed': False,
 
            'msg': ('Repo `%s` locked by `%s` on `%s`.'
 
                    % (self.REPO, TEST_USER_ADMIN_LOGIN,
 
                       json.dumps(time_to_datetime(time_))))
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_lock_repo_lock_optional_not_locked(self):
 
        repo_name = 'api_not_locked'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                            cur_user=self.TEST_USER_LOGIN)
 
        self.assertEqual(repo.locked, [None, None])
 
        try:
 
            id_, params = _build_data(self.apikey, 'lock',
 
                                      repoid=repo.repo_id)
 
            response = api_call(self, params)
 
            expected = {
 
                'repo': repo_name,
 
                'locked': False,
 
                'locked_since': None,
 
                'locked_by': None,
 
                'lock_state_changed': False,
 
                'msg': ('Repo `%s` not locked.' % (repo_name,))
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    @mock.patch.object(Repository, 'lock', crash)
 
    def test_api_lock_error(self):
 
        id_, params = _build_data(self.apikey, 'lock',
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  repoid=self.REPO,
 
                                  locked=True)
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred locking repository `%s`' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_locks_regular_user(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_locks')
 
        response = api_call(self, params)
 
        expected = []
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_locks_with_userid_regular_user(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_locks',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 
        expected = 'userid is not the same as your user'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_locks(self):
 
        id_, params = _build_data(self.apikey, 'get_locks')
 
        response = api_call(self, params)
 
        expected = []
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_locks_with_one_locked_repo(self):
 
        repo_name = 'api_delete_me'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                                   cur_user=self.TEST_USER_LOGIN)
 
        Repository.lock(repo, User.get_by_username(self.TEST_USER_LOGIN).user_id)
 
        try:
 
            id_, params = _build_data(self.apikey, 'get_locks')
 
            response = api_call(self, params)
 
            expected = [repo.get_api_data()]
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_get_locks_with_one_locked_repo_for_specific_user(self):
 
        repo_name = 'api_delete_me'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                                   cur_user=self.TEST_USER_LOGIN)
 
        Repository.lock(repo, User.get_by_username(self.TEST_USER_LOGIN).user_id)
 
        try:
 
            id_, params = _build_data(self.apikey, 'get_locks',
 
                                      userid=self.TEST_USER_LOGIN)
 
            response = api_call(self, params)
 
            expected = [repo.get_api_data()]
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_get_locks_with_userid(self):
 
        id_, params = _build_data(self.apikey, 'get_locks',
 
                                  userid=TEST_USER_REGULAR_LOGIN)
 
        response = api_call(self, params)
 
        expected = []
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_create_existing_user(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=TEST_USER_ADMIN_LOGIN,
 
                                  email='test@example.com',
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` already exist" % TEST_USER_ADMIN_LOGIN
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user_with_existing_email(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=TEST_USER_ADMIN_LOGIN + 'new',
 
                                  email=TEST_USER_REGULAR_EMAIL,
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "email `%s` already exist" % TEST_USER_REGULAR_EMAIL
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user(self):
 
        username = 'test_new_api_user'
 
        email = username + "@example.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email,
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(username)
 
        ret = dict(
 
            msg='created new user `%s`' % username,
 
            user=jsonify(usr.get_api_data())
 
        )
 

	
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user(usr.user_id)
 

	
 
    def test_api_create_user_without_password(self):
 
        username = 'test_new_api_user_passwordless'
 
        email = username + "@example.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email)
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(username)
 
        ret = dict(
 
            msg='created new user `%s`' % username,
 
            user=jsonify(usr.get_api_data())
 
        )
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user(usr.user_id)
 

	
 
    def test_api_create_user_with_extern_name(self):
 
        username = 'test_new_api_user_passwordless'
 
        email = username + "@example.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email, extern_name='internal')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(username)
 
        ret = dict(
 
            msg='created new user `%s`' % username,
 
            user=jsonify(usr.get_api_data())
 
        )
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user(usr.user_id)
 

	
 
    @mock.patch.object(UserModel, 'create_or_update', crash)
 
    def test_api_create_user_when_exception_happened(self):
 

	
 
        username = 'test_new_api_user'
 
        email = username + "@example.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email,
 
                                  password='trololo')
 
        response = api_call(self, params)
 
        expected = 'failed to create user `%s`' % username
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_delete_user(self):
 
        usr = UserModel().create_or_update(username=u'test_user',
 
                                           password=u'qweqwe',
 
                                           email=u'u232@example.com',
 
                                           firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        username = usr.username
 
        email = usr.email
 
        usr_id = usr.user_id
 
        ## DELETE THIS USER NOW
 

	
 
        id_, params = _build_data(self.apikey, 'delete_user',
 
                                  userid=username, )
 
        response = api_call(self, params)
 

	
 
        ret = {'msg': 'deleted user ID:%s %s' % (usr_id, username),
 
               'user': None}
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserModel, 'delete', crash)
 
    def test_api_delete_user_when_exception_happened(self):
 
        usr = UserModel().create_or_update(username=u'test_user',
 
                                           password=u'qweqwe',
 
                                           email=u'u232@example.com',
 
                                           firstname=u'u1', lastname=u'u1')
 
        Session().commit()
 
        username = usr.username
 

	
 
        id_, params = _build_data(self.apikey, 'delete_user',
 
                                  userid=username, )
 
        response = api_call(self, params)
 
        ret = 'failed to delete user ID:%s %s' % (usr.user_id,
 
                                                  usr.username)
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('firstname', 'new_username'),
 
                           ('lastname', 'new_username'),
 
                           ('email', 'new_username'),
 
                           ('admin', True),
 
                           ('admin', False),
 
                           ('extern_type', 'ldap'),
 
                           ('extern_type', None),
 
                           ('extern_name', 'test'),
 
                           ('extern_name', None),
 
                           ('active', False),
 
                           ('active', True),
 
                           ('password', 'newpass')
 
    ])
 
    def test_api_update_user(self, name, expected):
 
        usr = User.get_by_username(self.TEST_USER_LOGIN)
 
        kw = {name: expected,
 
              'userid': usr.user_id}
 
        id_, params = _build_data(self.apikey, 'update_user', **kw)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'updated user ID:%s %s' % (
 
                usr.user_id, self.TEST_USER_LOGIN),
 
            'user': jsonify(User \
 
                .get_by_username(self.TEST_USER_LOGIN) \
 
                .get_api_data())
 
        }
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_no_changed_params(self):
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 

	
 
        response = api_call(self, params)
 
        ret = {
 
            'msg': 'updated user ID:%s %s' % (
 
                usr.user_id, TEST_USER_ADMIN_LOGIN),
 
            'user': ret
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_by_user_id(self):
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        ret = {
 
            'msg': 'updated user ID:%s %s' % (
 
                usr.user_id, TEST_USER_ADMIN_LOGIN),
 
            'user': ret
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_update_user_default_user(self):
 
        usr = User.get_default_user()
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        expected = 'editing default user is forbidden'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserModel, 'update_user', crash)
 
    def test_api_update_user_when_exception_happens(self):
 
        usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        ret = jsonify(usr.get_api_data())
 
        id_, params = _build_data(self.apikey, 'update_user',
 
                                  userid=usr.user_id)
 

	
 
        response = api_call(self, params)
 
        ret = 'failed to update user `%s`' % usr.user_id
 

	
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo(self):
 
        new_group = 'some_new_group'
 
        make_user_group(new_group)
 
        RepoModel().grant_user_group_permission(repo=self.REPO,
 
                                                group_name=new_group,
 
                                                perm='repository.read')
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        ret = repo.get_api_data()
 

	
 
        members = []
 
        followers = []
 
        for user in repo.repo_to_perm:
 
            perm = user.permission.permission_name
 
            user = user.user
 
            user_data = {'name': user.username, 'type': "user",
 
                         'permission': perm}
 
            members.append(user_data)
 

	
 
        for user_group in repo.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group = user_group.users_group
 
            user_group_data = {'name': user_group.users_group_name,
 
                               'type': "user_group", 'permission': perm}
 
            members.append(user_group_data)
 

	
 
        for user in repo.followers:
 
            followers.append(user.user.get_api_data())
 

	
 
        ret['members'] = members
 
        ret['followers'] = followers
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_user_group(new_group)
 

	
 
    @parameterized.expand([
 
        ('repository.admin',),
 
        ('repository.write',),
 
        ('repository.read',),
 
    ])
 
    def test_api_get_repo_by_non_admin(self, grant_perm):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=grant_perm)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey_regular, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        ret = repo.get_api_data()
 

	
 
        members = []
 
        followers = []
 
        self.assertEqual(2, len(repo.repo_to_perm))
 
        for user in repo.repo_to_perm:
 
            perm = user.permission.permission_name
 
            user_obj = user.user
 
            user_data = {'name': user_obj.username, 'type': "user",
 
                         'permission': perm}
 
            members.append(user_data)
 

	
 
        for user_group in repo.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group_obj = user_group.users_group
 
            user_group_data = {'name': user_group_obj.users_group_name,
 
                               'type': "user_group", 'permission': perm}
 
            members.append(user_group_data)
 

	
 
        for user in repo.followers:
 
            followers.append(user.user.get_api_data())
 

	
 
        ret['members'] = members
 
        ret['followers'] = followers
 

	
 
        expected = ret
 
        try:
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
 

	
 
    def test_api_get_repo_by_non_admin_no_permission_to_repo(self):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.none')
 

	
 
        id_, params = _build_data(self.apikey_regular, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_that_doesn_not_exist(self):
 
        id_, params = _build_data(self.apikey, 'get_repo',
 
                                  repoid='no-such-repo')
 
        response = api_call(self, params)
 

	
 
        ret = 'repository `%s` does not exist' % 'no-such-repo'
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repos(self):
 
        id_, params = _build_data(self.apikey, 'get_repos')
 
        response = api_call(self, params)
 

	
 
        result = []
 
        for repo in RepoModel().get_all():
 
            result.append(repo.get_api_data())
 
        ret = jsonify(result)
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_repos_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_repos')
 
        response = api_call(self, params)
 

	
 
        result = []
 
        for repo in RepoModel().get_all_user_repos(self.TEST_USER_LOGIN):
 
            result.append(repo.get_api_data())
 
        ret = jsonify(result)
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('all', 'all'),
 
                           ('dirs', 'dirs'),
 
                           ('files', 'files'), ])
 
    def test_api_get_repo_nodes(self, name, ret_type):
 
        rev = 'tip'
 
        path = '/'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        # we don't the actual return types here since it's tested somewhere
 
        # else
 
        expected = response.json['result']
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_revisions(self):
 
        rev = 'i-dont-exist'
 
        path = '/'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to get repo: `%s` nodes' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_path(self):
 
        rev = 'tip'
 
        path = '/idontexits'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to get repo: `%s` nodes' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_ret_type(self):
 
        rev = 'tip'
 
        path = '/'
 
        ret_type = 'error'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        expected = ('ret_type must be one of %s'
 
                    % (','.join(['files', 'dirs', 'all'])))
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('all', 'all', 'repository.write'),
 
                           ('dirs', 'dirs', 'repository.admin'),
 
                           ('files', 'files', 'repository.read'), ])
 
    def test_api_get_repo_nodes_by_regular_user(self, name, ret_type, grant_perm):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=grant_perm)
 
        Session().commit()
 

	
 
        rev = 'tip'
 
        path = '/'
 
        id_, params = _build_data(self.apikey_regular, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        # we don't the actual return types here since it's tested somewhere
 
        # else
 
        expected = response.json['result']
 
        try:
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
 

	
 
    def test_api_create_repo(self):
 
        repo_name = 'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_and_repo_group(self):
 
        repo_name = 'my_gr/api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        print params
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 
        fixture.destroy_repo_group('my_gr')
 

	
 
    def test_api_create_repo_in_repo_group_without_permission(self):
 
        repo_group_name = '%s/api-repo-repo' % TEST_REPO_GROUP
 
        repo_name = '%s/api-repo' % repo_group_name
 

	
 
        rg = fixture.create_repo_group(repo_group_name)
 
        Session().commit()
 
        RepoGroupModel().grant_user_permission(repo_group_name,
 
                                               self.TEST_USER_LOGIN,
 
                                               'group.none')
 
        Session().commit()
 

	
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        # Current result when API access control is different from Web:
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
        # Expected and arguably more correct result:
 
        #expected = 'failed to create repository `%s`' % repo_name
 
        #self._compare_error(id_, expected, given=response.body)
 

	
 
        fixture.destroy_repo_group(repo_group_name)
 

	
 
    def test_api_create_repo_unknown_owner(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=owner,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 
        expected = 'user `%s` does not exist' % owner
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_repo_dont_specify_owner(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_by_non_admin(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_by_non_admin_specify_owner(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
                                  owner=owner)
 
        response = api_call(self, params)
 

	
 
        expected = 'Only Kallithea admin can specify `owner` param'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_exists(self):
 
        repo_name = self.REPO
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = "repo `%s` already exist" % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'create', crash)
 
    def test_api_create_repo_exception_occurred(self):
 
        repo_name = 'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = 'failed to create repository `%s`' % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([
 
        ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
        ('description', {'description': 'new description'}),
 
        ('active', {'active': True}),
 
        ('active', {'active': False}),
 
        ('clone_uri', {'clone_uri': 'http://example.com/repo'}),
 
        ('clone_uri', {'clone_uri': None}),
 
        ('landing_rev', {'landing_rev': 'branch:master'}),
 
        ('enable_statistics', {'enable_statistics': True}),
 
        ('enable_locking', {'enable_locking': True}),
 
        ('enable_downloads', {'enable_downloads': True}),
 
        ('name', {'name': 'new_repo_name'}),
 
        ('repo_group', {'group': 'test_group_for_update'}),
 
    ])
 
    def test_api_update_repo(self, changing_attr, updates):
 
        repo_name = 'api_update_me'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        if changing_attr == 'repo_group':
 
            fixture.create_repo_group(updates['group'])
 

	
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        if changing_attr == 'name':
 
            repo_name = updates['name']
 
        if changing_attr == 'repo_group':
 
            repo_name = '/'.join([updates['group'], repo_name])
 
        try:
 
            expected = {
 
                'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
 
                'repository': repo.get_api_data()
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            if changing_attr == 'repo_group':
 
                fixture.destroy_repo_group(updates['group'])
 

	
 
    @parameterized.expand([
 
        ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
        ('description', {'description': u'new description'}),
 
        ('active', {'active': True}),
 
        ('active', {'active': False}),
 
        ('clone_uri', {'clone_uri': 'http://example.com/repo'}),
 
        ('clone_uri', {'clone_uri': None}),
 
        ('landing_rev', {'landing_rev': 'branch:master'}),
 
        ('enable_statistics', {'enable_statistics': True}),
 
        ('enable_locking', {'enable_locking': True}),
 
        ('enable_downloads', {'enable_downloads': True}),
 
        ('name', {'name': u'new_repo_name'}),
 
        ('repo_group', {'group': u'test_group_for_update'}),
 
    ])
 
    def test_api_update_group_repo(self, changing_attr, updates):
 
        group_name = u'lololo'
 
        fixture.create_repo_group(group_name)
 
        repo_name = u'%s/api_update_me' % group_name
 
        repo = fixture.create_repo(repo_name, repo_group=group_name, repo_type=self.REPO_TYPE)
 
        if changing_attr == 'repo_group':
 
            fixture.create_repo_group(updates['group'])
 

	
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        if changing_attr == 'name':
 
            repo_name = u'%s/%s' % (group_name, updates['name'])
 
        if changing_attr == 'repo_group':
 
            repo_name = u'/'.join([updates['group'], repo_name.rsplit('/', 1)[-1]])
 
        try:
 
            expected = {
 
                'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
 
                'repository': repo.get_api_data()
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            if changing_attr == 'repo_group':
 
                fixture.destroy_repo_group(updates['group'])
 
        fixture.destroy_repo_group(group_name)
 

	
 
    def test_api_update_repo_repo_group_does_not_exist(self):
 
        repo_name = 'admin_owned'
 
        fixture.create_repo(repo_name)
 
        updates = {'group': 'test_group_for_update'}
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'repository group `%s` does not exist' % updates['group']
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_update_repo_regular_user_not_allowed(self):
 
        repo_name = 'admin_owned'
 
        fixture.create_repo(repo_name)
 
        updates = {'active': False}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'repository `%s` does not exist' % repo_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    @mock.patch.object(RepoModel, 'update', crash)
 
    def test_api_update_repo_exception_occurred(self):
 
        repo_name = 'api_update_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, owner=TEST_USER_ADMIN_LOGIN,)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'failed to update repo `%s`' % repo_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_update_repo_regular_user_change_repo_name(self):
 
        repo_name = 'admin_owned'
 
        new_repo_name = 'new_repo_name'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        RepoModel().grant_user_permission(repo=repo_name,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.admin')
 
        UserModel().revoke_perm('default', 'hg.create.repository')
 
        UserModel().grant_perm('default', 'hg.create.none')
 
        updates = {'name': new_repo_name}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'no permission to create (or move) repositories'
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            fixture.destroy_repo(new_repo_name)
 

	
 
    def test_api_update_repo_regular_user_change_repo_name_allowed(self):
 
        repo_name = 'admin_owned'
 
        new_repo_name = 'new_repo_name'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        RepoModel().grant_user_permission(repo=repo_name,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.admin')
 
        UserModel().revoke_perm('default', 'hg.create.none')
 
        UserModel().grant_perm('default', 'hg.create.repository')
 
        updates = {'name': new_repo_name}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = {
 
                'msg': 'updated repo ID:%s %s' % (repo.repo_id, new_repo_name),
 
                'repository': repo.get_api_data()
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            fixture.destroy_repo(new_repo_name)
 

	
 
    def test_api_update_repo_regular_user_change_owner(self):
 
        repo_name = 'admin_owned'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        RepoModel().grant_user_permission(repo=repo_name,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.admin')
 
        updates = {'owner': TEST_USER_ADMIN_LOGIN}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'Only Kallithea admin can specify `owner` param'
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 

	
 
        id_, params = _build_data(self.apikey, 'delete_repo',
 
                                  repoid=repo_name, )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Deleted repository `%s`' % repo_name,
 
            'success': True
 
        }
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_by_non_admin(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                            cur_user=self.TEST_USER_LOGIN)
 
        id_, params = _build_data(self.apikey_regular, 'delete_repo',
 
                                  repoid=repo_name, )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Deleted repository `%s`' % repo_name,
 
            'success': True
 
        }
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_by_non_admin_no_permission(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'delete_repo',
 
                                      repoid=repo_name, )
 
            response = api_call(self, params)
 
            expected = 'repository `%s` does not exist' % (repo_name)
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_exception_occurred(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        try:
 
            with mock.patch.object(RepoModel, 'delete', crash):
 
                id_, params = _build_data(self.apikey, 'delete_repo',
 
                                          repoid=repo_name, )
 
                response = api_call(self, params)
 

	
 
                expected = 'failed to delete repository `%s`' % repo_name
 
                self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_fork_repo(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
 
                                                     fork_name),
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
 
                                                     fork_name),
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin_specify_owner(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 
        expected = 'Only Kallithea admin can specify `owner` param'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin_no_permission_to_fork(self):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.none')
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    @parameterized.expand([('read', 'repository.read'),
 
                           ('write', 'repository.write'),
 
                           ('admin', 'repository.admin')])
 
    def test_api_fork_repo_non_admin_no_create_repo_permission(self, name, perm):
 
        fork_name = 'api-repo-fork'
 
        # regardless of base repository permission, forking is disallowed
 
        # when repository creation is disabled
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=perm)
 
        UserModel().revoke_perm('default', 'hg.create.repository')
 
        UserModel().grant_perm('default', 'hg.create.none')
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 
        expected = 'no permission to create repositories'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_unknown_owner(self):
 
        fork_name = 'api-repo-fork'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=owner,
 
        )
 
        response = api_call(self, params)
 
        expected = 'user `%s` does not exist' % owner
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_fork_repo_fork_exists(self):
 
        fork_name = 'api-repo-fork'
 
        fixture.create_fork(self.REPO, fork_name)
 

	
 
        try:
 
            fork_name = 'api-repo-fork'
 

	
 
            id_, params = _build_data(self.apikey, 'fork_repo',
 
                                      repoid=self.REPO,
 
                                      fork_name=fork_name,
 
                                      owner=TEST_USER_ADMIN_LOGIN,
 
            )
 
            response = api_call(self, params)
 

	
 
            expected = "fork `%s` already exist" % fork_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_repo_exists(self):
 
        fork_name = self.REPO
 

	
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        expected = "repo `%s` already exist" % fork_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'create_fork', crash)
 
    def test_api_fork_repo_exception_occurred(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
 
                                                               fork_name)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_group(self):
 
        id_, params = _build_data(self.apikey, 'get_user_group',
 
                                  usergroupid=TEST_USER_GROUP)
 
        response = api_call(self, params)
 

	
 
        user_group = UserGroupModel().get_group(TEST_USER_GROUP)
 
        members = []
 
        for user in user_group.members:
 
            user = user.user
 
            members.append(user.get_api_data())
 

	
 
        ret = user_group.get_api_data()
 
        ret['members'] = members
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_groups(self):
 
        gr_name = 'test_user_group2'
 
        make_user_group(gr_name)
 

	
 
        id_, params = _build_data(self.apikey, 'get_user_groups', )
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = []
 
            for gr_name in [TEST_USER_GROUP, 'test_user_group2']:
 
                user_group = UserGroupModel().get_group(gr_name)
 
                ret = user_group.get_api_data()
 
                expected.append(ret)
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_create_user_group(self):
 
        group_name = 'some_new_group'
 
        id_, params = _build_data(self.apikey, 'create_user_group',
 
                                  group_name=group_name)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'created new user group `%s`' % group_name,
 
            'user_group': jsonify(UserGroupModel() \
 
                .get_by_name(group_name) \
 
                .get_api_data())
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        fixture.destroy_user_group(group_name)
 

	
 
    def test_api_get_user_group_that_exist(self):
 
        id_, params = _build_data(self.apikey, 'create_user_group',
 
                                  group_name=TEST_USER_GROUP)
 
        response = api_call(self, params)
 

	
 
        expected = "user group `%s` already exist" % TEST_USER_GROUP
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserGroupModel, 'create', crash)
 
    def test_api_get_user_group_exception_occurred(self):
 
        group_name = 'exception_happens'
 
        id_, params = _build_data(self.apikey, 'create_user_group',
 
                                  group_name=group_name)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to create group `%s`' % group_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('group_name', {'group_name': 'new_group_name'}),
 
                           ('group_name', {'group_name': 'test_group_for_update'}),
 
                           ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
                           ('active', {'active': False}),
 
                           ('active', {'active': True})])
 
    def test_api_update_user_group(self, changing_attr, updates):
 
        gr_name = 'test_group_for_update'
 
        user_group = fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'update_user_group',
 
                                  usergroupid=gr_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = {
 
               'msg': 'updated user group ID:%s %s' % (user_group.users_group_id,
 
                                                     user_group.users_group_name),
 
               'user_group': user_group.get_api_data()
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            if changing_attr == 'group_name':
 
                # switch to updated name for proper cleanup
 
                gr_name = updates['group_name']
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @mock.patch.object(UserGroupModel, 'update', crash)
 
    def test_api_update_user_group_exception_occurred(self):
 
        gr_name = 'test_group'
 
        fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'update_user_group',
 
                                  usergroupid=gr_name)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'failed to update user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_add_user_to_user_group(self):
 
        gr_name = 'test_group'
 
        fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 
        try:
 
            expected = {
 
            'msg': 'added member `%s` to user group `%s`' % (
 
                    TEST_USER_ADMIN_LOGIN, gr_name),
 
            'success': True
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_add_user_to_user_group_that_doesnt_exist(self):
 
        id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                  usergroupid='false-group',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'user group `%s` does not exist' % 'false-group'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserGroupModel, 'add_user_to_group', crash)
 
    def test_api_add_user_to_user_group_exception_occurred(self):
 
        gr_name = 'test_group'
 
        fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = 'failed to add member to user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_remove_user_from_user_group(self):
 
        gr_name = 'test_group_3'
 
        gr = fixture.create_user_group(gr_name)
 
        UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = {
 
                'msg': 'removed member `%s` from user group `%s`' % (
 
                    TEST_USER_ADMIN_LOGIN, gr_name
 
                ),
 
                'success': True}
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @mock.patch.object(UserGroupModel, 'remove_user_from_group', crash)
 
    def test_api_remove_user_from_user_group_exception_occurred(self):
 
        gr_name = 'test_group_3'
 
        gr = fixture.create_user_group(gr_name)
 
        UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'failed to remove member from user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_delete_user_group(self):
 
        gr_name = 'test_group'
 
        ugroup = fixture.create_user_group(gr_name)
 
        gr_id = ugroup.users_group_id
 
        id_, params = _build_data(self.apikey, 'delete_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = {
 
                'user_group': None,
 
                'msg': 'deleted user group ID:%s %s' % (gr_id, gr_name)
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            if UserGroupModel().get_by_name(gr_name):
 
                fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_delete_user_group_that_is_assigned(self):
 
        gr_name = 'test_group'
 
        ugroup = fixture.create_user_group(gr_name)
 
        gr_id = ugroup.users_group_id
 

	
 
        ugr_to_perm = RepoModel().grant_user_group_permission(self.REPO, gr_name, 'repository.write')
 
        msg = 'User Group assigned to %s' % ugr_to_perm.repository.repo_name
 

	
 
        id_, params = _build_data(self.apikey, 'delete_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = msg
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            if UserGroupModel().get_by_name(gr_name):
 
                fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_delete_user_group_exception_occurred(self):
 
        gr_name = 'test_group'
 
        ugroup = fixture.create_user_group(gr_name)
 
        gr_id = ugroup.users_group_id
 
        id_, params = _build_data(self.apikey, 'delete_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 

	
 
        try:
 
            with mock.patch.object(UserGroupModel, 'delete', crash):
 
                response = api_call(self, params)
 
                expected = 'failed to delete user group ID:%s %s' % (gr_id, gr_name)
 
                self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @parameterized.expand([('none', 'repository.none'),
 
                           ('read', 'repository.read'),
 
                           ('write', 'repository.write'),
 
                           ('admin', 'repository.admin')])
 
    def test_api_grant_user_permission(self, name, perm):
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Granted perm: `%s` for user: `%s` in repo: `%s`' % (
 
                perm, TEST_USER_ADMIN_LOGIN, self.REPO
 
            ),
 
            'success': True
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_grant_user_permission_wrong_permission(self):
 
        perm = 'haha.no.permission'
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'permission `%s` does not exist' % perm
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'grant_user_permission', crash)
 
    def test_api_grant_user_permission_exception_when_adding(self):
 
        perm = 'repository.read'
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
 
            TEST_USER_ADMIN_LOGIN, self.REPO
 
        )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_revoke_user_permission(self):
 
        id_, params = _build_data(self.apikey,
 
                                  'revoke_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN, )
 
        response = api_call(self, params)
 

	
 
        expected = {
 
            'msg': 'Revoked perm for user: `%s` in repo: `%s`' % (
 
                TEST_USER_ADMIN_LOGIN, self.REPO
 
            ),
 
            'success': True
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'revoke_user_permission', crash)
 
    def test_api_revoke_user_permission_exception_when_adding(self):
 
        id_, params = _build_data(self.apikey,
 
                                  'revoke_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
 
            TEST_USER_ADMIN_LOGIN, self.REPO
 
        )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('none', 'repository.none'),
 
                           ('read', 'repository.read'),
 
                           ('write', 'repository.write'),
 
                           ('admin', 'repository.admin')])
 
    def test_api_grant_user_group_permission(self, name, perm):
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_group_permission',
 
                                  repoid=self.REPO,
 
                                  usergroupid=TEST_USER_GROUP,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Granted perm: `%s` for user group: `%s` in repo: `%s`' % (
 
                perm, TEST_USER_GROUP, self.REPO
 
            ),
 
            'success': True
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_grant_user_group_permission_wrong_permission(self):
 
        perm = 'haha.no.permission'
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_group_permission',
 
                                  repoid=self.REPO,
 
                                  usergroupid=TEST_USER_GROUP,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'permission `%s` does not exist' % perm
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'grant_user_group_permission', crash)
 
    def test_api_grant_user_group_permission_exception_when_adding(self):
 
        perm = 'repository.read'
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_group_permission',
 
                                  repoid=self.REPO,
 
                                  usergroupid=TEST_USER_GROUP,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % (
 
            TEST_USER_GROUP, self.REPO
 
        )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_revoke_user_group_permission(self):
 
        RepoModel().grant_user_group_permission(repo=self.REPO,
 
                                                group_name=TEST_USER_GROUP,
 
                                                perm='repository.read')
 
        Session().commit()
 
        id_, params = _build_data(self.apikey,
 
                                  'revoke_user_group_permission',
 
                                  repoid=self.REPO,
 
                                  usergroupid=TEST_USER_GROUP, )
 
        response = api_call(self, params)
 

	
 
        expected = {
 
            'msg': 'Revoked perm for user group: `%s` in repo: `%s`' % (
 
                TEST_USER_GROUP, self.REPO
 
            ),
 
            'success': True
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'revoke_user_group_permission', crash)
 
    def test_api_revoke_user_group_permission_exception_when_adding(self):
 
        id_, params = _build_data(self.apikey,
 
                                  'revoke_user_group_permission',
 
                                  repoid=self.REPO,
 
                                  usergroupid=TEST_USER_GROUP, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for user group: `%s` in repo: `%s`' % (
 
            TEST_USER_GROUP, self.REPO
 
        )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([
 
        ('none', 'group.none', 'none'),
 
        ('read', 'group.read', 'none'),
 
        ('write', 'group.write', 'none'),
 
        ('admin', 'group.admin', 'none'),
 

	
 
        ('none', 'group.none', 'all'),
 
        ('read', 'group.read', 'all'),
 
        ('write', 'group.write', 'all'),
 
        ('admin', 'group.admin', 'all'),
 

	
 
        ('none', 'group.none', 'repos'),
 
        ('read', 'group.read', 'repos'),
 
        ('write', 'group.write', 'repos'),
 
        ('admin', 'group.admin', 'repos'),
 

	
 
        ('none', 'group.none', 'groups'),
 
        ('read', 'group.read', 'groups'),
 
        ('write', 'group.write', 'groups'),
 
        ('admin', 'group.admin', 'groups'),
 
    ])
 
    def test_api_grant_user_permission_to_repo_group(self, name, perm, apply_to_children):
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission_to_repo_group',
 
                                  repogroupid=TEST_REPO_GROUP,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm, apply_to_children=apply_to_children)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
 
                perm, apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
            ),
 
            'success': True
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([
 
        ('none_fails', 'group.none', 'none', False, False),
 
        ('read_fails', 'group.read', 'none', False, False),
 
        ('write_fails', 'group.write', 'none', False, False),
 
        ('admin_fails', 'group.admin', 'none', False, False),
 

	
 
        # with granted perms
 
        ('none_ok', 'group.none', 'none', True, True),
 
        ('read_ok', 'group.read', 'none', True, True),
 
        ('write_ok', 'group.write', 'none', True, True),
 
        ('admin_ok', 'group.admin', 'none', True, True),
 
    ])
 
    def test_api_grant_user_permission_to_repo_group_by_regular_user(
 
            self, name, perm, apply_to_children, grant_admin, access_ok):
 
        if grant_admin:
 
            RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
 
                                                   self.TEST_USER_LOGIN,
 
                                                   'group.admin')
 
            Session().commit()
 

	
 
        id_, params = _build_data(self.apikey_regular,
 
                                  'grant_user_permission_to_repo_group',
 
                                  repogroupid=TEST_REPO_GROUP,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm, apply_to_children=apply_to_children)
 
        response = api_call(self, params)
 
        if access_ok:
 
            ret = {
 
                'msg': 'Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
 
                    perm, apply_to_children, TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
                ),
 
                'success': True
 
            }
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
0 comments (0 inline, 0 general)