Changeset - 6620542597d3
[Not reviewed]
stable
0 3 0
Mads Kiilerich - 10 years ago 2015-07-07 02:25:59
madski@unity3d.com
api: check repo create permissions for update_repo and fork_repo as for create-repo

Close loophole for creating repos everywhere.

Tests by Thomas De Schampheleire.
3 files changed with 75 insertions and 2 deletions:
0 comments (0 inline, 0 general)
docs/api/api.rst
Show inline comments
 
@@ -452,552 +452,553 @@ OUTPUT::
 
get_user_group
 
--------------
 

	
 
Get an existing user group.
 
This command can only be executed using the api_key of a user with admin rights.
 

	
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "get_user_group"
 
    args :    {
 
                "usergroupid" : "<user group id or name>"
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result : None if group not exist
 
             {
 
               "users_group_id" : "<id>",
 
               "group_name" :     "<groupname>",
 
               "active":          "<bool>",
 
               "members" :  [
 
                              {
 
                                "user_id" :  "<user_id>",
 
                                "api_key" :  "<api_key>",
 
                                "username" : "<username>",
 
                                "firstname": "<firstname>",
 
                                "lastname" : "<lastname>",
 
                                "email" :    "<email>",
 
                                "emails":    "<list_of_all_additional_emails>",
 
                                "active" :   "<bool>",
 
                                "admin" :    "<bool>",
 
                                "ldap_dn" :  "<ldap_dn>",
 
                                "last_login": "<last_login>",
 
                              },
 
 
                            ]
 
             }
 
    error : null
 

	
 

	
 
get_user_groups
 
---------------
 

	
 
List all existing user groups.
 
This command can only be executed using the api_key of a user with admin rights.
 

	
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "get_user_groups"
 
    args :    { }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result : [
 
               {
 
               "users_group_id" : "<id>",
 
               "group_name" :     "<groupname>",
 
               "active":          "<bool>",
 
               },
 
 
              ]
 
    error : null
 

	
 

	
 
create_user_group
 
-----------------
 

	
 
Create a new user group.
 
This command can only be executed using the api_key of a user with admin rights.
 

	
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "create_user_group"
 
    args:     {
 
                "group_name": "<groupname>",
 
                "owner" :     "<onwer_name_or_id = Optional(=apiuser)>",
 
                "active":     "<bool> = Optional(True)"
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: {
 
              "msg": "created new user group `<groupname>`",
 
              "users_group": {
 
                     "users_group_id" : "<id>",
 
                     "group_name" :     "<groupname>",
 
                     "active":          "<bool>",
 
               },
 
            }
 
    error:  null
 

	
 

	
 
add_user_to_user_group
 
----------------------
 

	
 
Adds a user to a user group. If the user already is in that group, success will be
 
``false``.
 
This command can only be executed using the api_key of a user with admin rights.
 

	
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "add_user_user_group"
 
    args:     {
 
                "usersgroupid" : "<user group id or name>",
 
                "userid" : "<user_id or username>",
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: {
 
              "success": True|False # depends on if member is in group
 
              "msg": "added member `<username>` to a user group `<groupname>` |
 
                      User is already in that group"
 
            }
 
    error:  null
 

	
 

	
 
remove_user_from_user_group
 
---------------------------
 

	
 
Remove a user from a user group. If the user isn't in the given group, success will
 
be ``false``.
 
This command can only be executed using the api_key of a user with admin rights.
 

	
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "remove_user_from_user_group"
 
    args:     {
 
                "usersgroupid" : "<user group id or name>",
 
                "userid" : "<user_id or username>",
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: {
 
              "success":  True|False,  # depends on if member is in group
 
              "msg": "removed member <username> from user group <groupname> |
 
                      User wasn't in group"
 
            }
 
    error:  null
 

	
 

	
 
get_repo
 
--------
 

	
 
Get an existing repository by its name or repository_id. Members will contain
 
either users_group or users associated to that repository.
 
This command can only be executed using the api_key of a user with admin rights,
 
or that of a regular user with at least read access to the repository.
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "get_repo"
 
    args:     {
 
                "repoid" : "<reponame or repo_id>"
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: None if repository does not exist or
 
            {
 
                "repo_id" :          "<repo_id>",
 
                "repo_name" :        "<reponame>"
 
                "repo_type" :        "<repo_type>",
 
                "clone_uri" :        "<clone_uri>",
 
                "enable_downloads":  "<bool>",
 
                "enable_locking":    "<bool>",
 
                "enable_statistics": "<bool>",
 
                "private":           "<bool>",
 
                "created_on" :       "<date_time_created>",
 
                "description" :      "<description>",
 
                "landing_rev":       "<landing_rev>",
 
                "last_changeset":    {
 
                                       "author":   "<full_author>",
 
                                       "date":     "<date_time_of_commit>",
 
                                       "message":  "<commit_message>",
 
                                       "raw_id":   "<raw_id>",
 
                                       "revision": "<numeric_revision>",
 
                                       "short_id": "<short_id>"
 
                                     }
 
                "owner":             "<repo_owner>",
 
                "fork_of":           "<name_of_fork_parent>",
 
                "members" :     [
 
                                  {
 
                                    "type":        "user",
 
                                    "user_id" :    "<user_id>",
 
                                    "api_key" :    "<api_key>",
 
                                    "username" :   "<username>",
 
                                    "firstname":   "<firstname>",
 
                                    "lastname" :   "<lastname>",
 
                                    "email" :      "<email>",
 
                                    "emails":      "<list_of_all_additional_emails>",
 
                                    "active" :     "<bool>",
 
                                    "admin" :      "<bool>",
 
                                    "ldap_dn" :    "<ldap_dn>",
 
                                    "last_login":  "<last_login>",
 
                                    "permission" : "repository.(read|write|admin)"
 
                                  },
 
 
                                  {
 
                                    "type":      "users_group",
 
                                    "id" :       "<usersgroupid>",
 
                                    "name" :     "<usersgroupname>",
 
                                    "active":    "<bool>",
 
                                    "permission" : "repository.(read|write|admin)"
 
                                  },
 
 
                                ]
 
                 "followers":   [
 
                                  {
 
                                    "user_id" :     "<user_id>",
 
                                    "username" :    "<username>",
 
                                    "api_key" :     "<api_key>",
 
                                    "firstname":    "<firstname>",
 
                                    "lastname" :    "<lastname>",
 
                                    "email" :       "<email>",
 
                                    "emails":       "<list_of_all_additional_emails>",
 
                                    "ip_addresses": "<list_of_ip_addresses_for_user>",
 
                                    "active" :      "<bool>",
 
                                    "admin" :       "<bool>",
 
                                    "ldap_dn" :     "<ldap_dn>",
 
                                    "last_login":   "<last_login>",
 
                                  },
 
 
                 ]
 
            }
 
    error:  null
 

	
 

	
 
get_repos
 
---------
 

	
 
List all existing repositories.
 
This command can only be executed using the api_key of a user with admin rights,
 
or that of a regular user with at least read access to the repository.
 

	
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "get_repos"
 
    args:     { }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: [
 
              {
 
                "repo_id" :          "<repo_id>",
 
                "repo_name" :        "<reponame>"
 
                "repo_type" :        "<repo_type>",
 
                "clone_uri" :        "<clone_uri>",
 
                "private": :         "<bool>",
 
                "created_on" :       "<datetimecreated>",
 
                "description" :      "<description>",
 
                "landing_rev":       "<landing_rev>",
 
                "owner":             "<repo_owner>",
 
                "fork_of":           "<name_of_fork_parent>",
 
                "enable_downloads":  "<bool>",
 
                "enable_locking":    "<bool>",
 
                "enable_statistics": "<bool>",
 
              },
 
 
            ]
 
    error:  null
 

	
 

	
 
get_repo_nodes
 
--------------
 

	
 
Return a list of files and directories for a given path at the given revision.
 
It is possible to specify ret_type to show only ``files`` or ``dirs``.
 
This command can only be executed using the api_key of a user with admin rights.
 

	
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "get_repo_nodes"
 
    args:     {
 
                "repoid" : "<reponame or repo_id>"
 
                "revision"  : "<revision>",
 
                "root_path" : "<root_path>",
 
                "ret_type"  : "<ret_type> = Optional('all')"
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: [
 
              {
 
                "name" :        "<name>"
 
                "type" :        "<type>",
 
              },
 
 
            ]
 
    error:  null
 

	
 

	
 
create_repo
 
-----------
 

	
 
Create a repository. If the repository name contains "/", all needed repository
 
groups will be created. For example "foo/bar/baz" will create repository groups
 
"foo", "bar" (with "foo" as parent), and create "baz" repository with
 
"bar" as group.
 
This command can only be executed using the api_key of a user with admin rights,
 
or that of a regular user with create repository permission.
 
Regular users cannot specify owner parameter.
 

	
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "create_repo"
 
    args:     {
 
                "repo_name" :        "<reponame>",
 
                "owner" :            "<onwer_name_or_id = Optional(=apiuser)>",
 
                "repo_type" :        "<repo_type> = Optional('hg')",
 
                "description" :      "<description> = Optional('')",
 
                "private" :          "<bool> = Optional(False)",
 
                "clone_uri" :        "<clone_uri> = Optional(None)",
 
                "landing_rev" :      "<landing_rev> = Optional('tip')",
 
                "enable_downloads":  "<bool> = Optional(False)",
 
                "enable_locking":    "<bool> = Optional(False)",
 
                "enable_statistics": "<bool> = Optional(False)",
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: {
 
              "msg": "Created new repository `<reponame>`",
 
              "repo": {
 
                "repo_id" :          "<repo_id>",
 
                "repo_name" :        "<reponame>"
 
                "repo_type" :        "<repo_type>",
 
                "clone_uri" :        "<clone_uri>",
 
                "private": :         "<bool>",
 
                "created_on" :       "<datetimecreated>",
 
                "description" :      "<description>",
 
                "landing_rev":       "<landing_rev>",
 
                "owner":             "<username or user_id>",
 
                "fork_of":           "<name_of_fork_parent>",
 
                "enable_downloads":  "<bool>",
 
                "enable_locking":    "<bool>",
 
                "enable_statistics": "<bool>",
 
              },
 
            }
 
    error:  null
 

	
 

	
 
fork_repo
 
---------
 

	
 
Create a fork of the given repo. If using Celery, this will
 
return success message immediately and a fork will be created
 
asynchronously.
 
This command can only be executed using the api_key of a user with admin rights,
 
or that of a regular user with fork permission and at least read access to the repository.
 
This command can only be executed using the api_key of a user with admin
 
rights, or with the global fork permission, by a regular user with create
 
repository permission and at least read access to the repository.
 
Regular users cannot specify owner parameter.
 

	
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "fork_repo"
 
    args:     {
 
                "repoid" :          "<reponame or repo_id>",
 
                "fork_name":        "<forkname>",
 
                "owner":            "<username or user_id = Optional(=apiuser)>",
 
                "description":      "<description>",
 
                "copy_permissions": "<bool>",
 
                "private":          "<bool>",
 
                "landing_rev":      "<landing_rev>"
 

	
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: {
 
              "msg": "Created fork of `<reponame>` as `<forkname>`",
 
              "success": true
 
            }
 
    error:  null
 

	
 

	
 
delete_repo
 
-----------
 

	
 
Delete a repository.
 
This command can only be executed using the api_key of a user with admin rights,
 
or that of a regular user with admin access to the repository.
 
When ``forks`` param is set it is possible to detach or delete forks of the deleted repository.
 

	
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "delete_repo"
 
    args:     {
 
                "repoid" : "<reponame or repo_id>",
 
                "forks"  : "`delete` or `detach` = Optional(None)"
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: {
 
              "msg": "Deleted repository `<reponame>`",
 
              "success": true
 
            }
 
    error:  null
 

	
 

	
 
grant_user_permission
 
---------------------
 

	
 
Grant permission for a user on the given repository, or update the existing one if found.
 
This command can only be executed using the api_key of a user with admin rights.
 

	
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "grant_user_permission"
 
    args:     {
 
                "repoid" : "<reponame or repo_id>"
 
                "userid" : "<username or user_id>"
 
                "perm" :       "(repository.(none|read|write|admin))",
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: {
 
              "msg" : "Granted perm: `<perm>` for user: `<username>` in repo: `<reponame>`",
 
              "success": true
 
            }
 
    error:  null
 

	
 

	
 
revoke_user_permission
 
----------------------
 

	
 
Revoke permission for a user on the given repository.
 
This command can only be executed using the api_key of a user with admin rights.
 

	
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method  : "revoke_user_permission"
 
    args:     {
 
                "repoid" : "<reponame or repo_id>"
 
                "userid" : "<username or user_id>"
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: {
 
              "msg" : "Revoked perm for user: `<username>` in repo: `<reponame>`",
 
              "success": true
 
            }
 
    error:  null
 

	
 

	
 
grant_user_group_permission
 
---------------------------
 

	
 
Grant permission for a user group on the given repository, or update the
 
existing one if found.
 
This command can only be executed using the api_key of a user with admin rights.
 

	
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "grant_user_group_permission"
 
    args:     {
 
                "repoid" : "<reponame or repo_id>"
 
                "usersgroupid" : "<user group id or name>"
 
                "perm" : "(repository.(none|read|write|admin))",
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: {
 
              "msg" : "Granted perm: `<perm>` for group: `<usersgroupname>` in repo: `<reponame>`",
 
              "success": true
 
            }
 
    error:  null
 

	
 

	
 
revoke_user_group_permission
 
----------------------------
 

	
 
Revoke permission for a user group on the given repository.
 
This command can only be executed using the api_key of a user with admin rights.
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method  : "revoke_user_group_permission"
 
    args:     {
 
                "repoid" : "<reponame or repo_id>"
 
                "usersgroupid" : "<user group id or name>"
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: {
 
              "msg" : "Revoked perm for group: `<usersgroupname>` in repo: `<reponame>`",
 
              "success": true
 
            }
 
    error:  null
kallithea/controllers/api/api.py
Show inline comments
 
@@ -1175,865 +1175,873 @@ class ApiController(JSONRPCController):
 

	
 
        try:
 
            success = UserGroupModel().remove_user_from_group(user_group, user)
 
            msg = 'removed member `%s` from user group `%s`' % (
 
                user.username, user_group.users_group_name
 
            )
 
            msg = msg if success else "User wasn't in group"
 
            Session().commit()
 
            return dict(success=success, msg=msg)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to remove member from user group `%s`' % (
 
                    user_group.users_group_name,
 
                )
 
            )
 

	
 
    # permission check inside
 
    def get_repo(self, apiuser, repoid):
 
        """
 
        Gets an existing repository by it's name or repository_id. Members will return
 
        either users_group or user associated to that repository. This command can be
 
        executed only using api_key belonging to user with admin
 
        rights or regular user that have at least read access to repository.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            {
 
                "repo_id" :          "<repo_id>",
 
                "repo_name" :        "<reponame>"
 
                "repo_type" :        "<repo_type>",
 
                "clone_uri" :        "<clone_uri>",
 
                "enable_downloads":  "<bool>",
 
                "enable_locking":    "<bool>",
 
                "enable_statistics": "<bool>",
 
                "private":           "<bool>",
 
                "created_on" :       "<date_time_created>",
 
                "description" :      "<description>",
 
                "landing_rev":       "<landing_rev>",
 
                "last_changeset":    {
 
                                       "author":   "<full_author>",
 
                                       "date":     "<date_time_of_commit>",
 
                                       "message":  "<commit_message>",
 
                                       "raw_id":   "<raw_id>",
 
                                       "revision": "<numeric_revision>",
 
                                       "short_id": "<short_id>"
 
                                     }
 
                "owner":             "<repo_owner>",
 
                "fork_of":           "<name_of_fork_parent>",
 
                "members" :     [
 
                                  {
 
                                    "name":     "<username>",
 
                                    "type" :    "user",
 
                                    "permission" : "repository.(read|write|admin)"
 
                                  },
 
 
                                  {
 
                                    "name":     "<usergroup name>",
 
                                    "type" :    "user_group",
 
                                    "permission" : "usergroup.(read|write|admin)"
 
                                  },
 
 
                                ]
 
                 "followers":   [<user_obj>, ...]
 
                 ]
 
            }
 
          }
 
          error :  null
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 

	
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this repo !
 
            perms = ('repository.admin', 'repository.write', 'repository.read')
 
            if not HasRepoPermissionAnyApi(*perms)(user=apiuser, repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        members = []
 
        followers = []
 
        for user in repo.repo_to_perm:
 
            perm = user.permission.permission_name
 
            user = user.user
 
            user_data = {
 
                'name': user.username,
 
                'type': "user",
 
                'permission': perm
 
            }
 
            members.append(user_data)
 

	
 
        for user_group in repo.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group = user_group.users_group
 
            user_group_data = {
 
                'name': user_group.users_group_name,
 
                'type': "user_group",
 
                'permission': perm
 
            }
 
            members.append(user_group_data)
 

	
 
        for user in repo.followers:
 
            followers.append(user.user.get_api_data())
 

	
 
        data = repo.get_api_data()
 
        data['members'] = members
 
        data['followers'] = followers
 
        return data
 

	
 
    # permission check inside
 
    def get_repos(self, apiuser):
 
        """
 
        Lists all existing repositories. This command can be executed only using
 
        api_key belonging to user with admin rights or regular user that have
 
        admin, write or read access to repository.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: [
 
                      {
 
                        "repo_id" :          "<repo_id>",
 
                        "repo_name" :        "<reponame>"
 
                        "repo_type" :        "<repo_type>",
 
                        "clone_uri" :        "<clone_uri>",
 
                        "private": :         "<bool>",
 
                        "created_on" :       "<datetimecreated>",
 
                        "description" :      "<description>",
 
                        "landing_rev":       "<landing_rev>",
 
                        "owner":             "<repo_owner>",
 
                        "fork_of":           "<name_of_fork_parent>",
 
                        "enable_downloads":  "<bool>",
 
                        "enable_locking":    "<bool>",
 
                        "enable_statistics": "<bool>",
 
                      },
 
 
                    ]
 
            error:  null
 
        """
 
        result = []
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            repos = RepoModel().get_all_user_repos(user=apiuser)
 
        else:
 
            repos = RepoModel().get_all()
 

	
 
        for repo in repos:
 
            result.append(repo.get_api_data())
 
        return result
 

	
 
    # permission check inside
 
    def get_repo_nodes(self, apiuser, repoid, revision, root_path,
 
                       ret_type=Optional('all')):
 
        """
 
        returns a list of nodes and it's children in a flat list for a given path
 
        at given revision. It's possible to specify ret_type to show only `files` or
 
        `dirs`.  This command can be executed only using api_key belonging to
 
        user with admin rights or regular user that have at least read access to repository.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param revision: revision for which listing should be done
 
        :type revision: str
 
        :param root_path: path from which start displaying
 
        :type root_path: str
 
        :param ret_type: return type 'all|files|dirs' nodes
 
        :type ret_type: Optional(str)
 

	
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: [
 
                      {
 
                        "name" :        "<name>"
 
                        "type" :        "<type>",
 
                      },
 
 
                    ]
 
            error:  null
 
        """
 
        repo = get_repo_or_error(repoid)
 

	
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this repo !
 
            perms = ('repository.admin', 'repository.write', 'repository.read')
 
            if not HasRepoPermissionAnyApi(*perms)(user=apiuser, repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        ret_type = Optional.extract(ret_type)
 
        _map = {}
 
        try:
 
            _d, _f = ScmModel().get_nodes(repo, revision, root_path,
 
                                          flat=False)
 
            _map = {
 
                'all': _d + _f,
 
                'files': _f,
 
                'dirs': _d,
 
            }
 
            return _map[ret_type]
 
        except KeyError:
 
            raise JSONRPCError('ret_type must be one of %s'
 
                               % (','.join(_map.keys())))
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to get repo: `%s` nodes' % repo.repo_name
 
            )
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
 
    def create_repo(self, apiuser, repo_name, owner=Optional(OAttr('apiuser')),
 
                    repo_type=Optional('hg'), description=Optional(''),
 
                    private=Optional(False), clone_uri=Optional(None),
 
                    landing_rev=Optional('rev:tip'),
 
                    enable_statistics=Optional(False),
 
                    enable_locking=Optional(False),
 
                    enable_downloads=Optional(False),
 
                    copy_permissions=Optional(False)):
 
        """
 
        Creates a repository. If repository name contains "/", all needed repository
 
        groups will be created. For example "foo/bar/baz" will create groups
 
        "foo", "bar" (with "foo" as parent), and create "baz" repository with
 
        "bar" as group. This command can be executed only using api_key
 
        belonging to user with admin rights or regular user that have create
 
        repository permission. Regular users cannot specify owner parameter
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repo_name: repository name
 
        :type repo_name: str
 
        :param owner: user_id or username
 
        :type owner: Optional(str)
 
        :param repo_type: 'hg' or 'git'
 
        :type repo_type: Optional(str)
 
        :param description: repository description
 
        :type description: Optional(str)
 
        :param private:
 
        :type private: bool
 
        :param clone_uri:
 
        :type clone_uri: str
 
        :param landing_rev: <rev_type>:<rev>
 
        :type landing_rev: str
 
        :param enable_locking:
 
        :type enable_locking: bool
 
        :param enable_downloads:
 
        :type enable_downloads: bool
 
        :param enable_statistics:
 
        :type enable_statistics: bool
 
        :param copy_permissions: Copy permission from group that repository is
 
            being created.
 
        :type copy_permissions: bool
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg": "Created new repository `<reponame>`",
 
                      "success": true,
 
                      "task": "<celery task id or None if done sync>"
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
             'failed to create repository `<repo_name>`
 
          }
 

	
 
        """
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            if not isinstance(owner, Optional):
 
                #forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
 
        if isinstance(owner, Optional):
 
            owner = apiuser.user_id
 

	
 
        owner = get_user_or_error(owner)
 

	
 
        if RepoModel().get_by_repo_name(repo_name):
 
            raise JSONRPCError("repo `%s` already exist" % repo_name)
 

	
 
        defs = Setting.get_default_repo_settings(strip_prefix=True)
 
        if isinstance(private, Optional):
 
            private = defs.get('repo_private') or Optional.extract(private)
 
        if isinstance(repo_type, Optional):
 
            repo_type = defs.get('repo_type')
 
        if isinstance(enable_statistics, Optional):
 
            enable_statistics = defs.get('repo_enable_statistics')
 
        if isinstance(enable_locking, Optional):
 
            enable_locking = defs.get('repo_enable_locking')
 
        if isinstance(enable_downloads, Optional):
 
            enable_downloads = defs.get('repo_enable_downloads')
 

	
 
        clone_uri = Optional.extract(clone_uri)
 
        description = Optional.extract(description)
 
        landing_rev = Optional.extract(landing_rev)
 
        copy_permissions = Optional.extract(copy_permissions)
 

	
 
        try:
 
            repo_name_cleaned = repo_name.split('/')[-1]
 
            # create structure of groups and return the last group
 
            repo_group = map_groups(repo_name)
 
            data = dict(
 
                repo_name=repo_name_cleaned,
 
                repo_name_full=repo_name,
 
                repo_type=repo_type,
 
                repo_description=description,
 
                owner=owner,
 
                repo_private=private,
 
                clone_uri=clone_uri,
 
                repo_group=repo_group,
 
                repo_landing_rev=landing_rev,
 
                enable_statistics=enable_statistics,
 
                enable_locking=enable_locking,
 
                enable_downloads=enable_downloads,
 
                repo_copy_permissions=copy_permissions,
 
            )
 

	
 
            task = RepoModel().create(form_data=data, cur_user=owner)
 
            from celery.result import BaseAsyncResult
 
            task_id = None
 
            if isinstance(task, BaseAsyncResult):
 
                task_id = task.task_id
 
            # no commit, it's done in RepoModel, or async via celery
 
            return dict(
 
                msg="Created new repository `%s`" % (repo_name,),
 
                success=True,  # cannot return the repo data here since fork
 
                               # can be done async
 
                task=task_id
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to create repository `%s`' % (repo_name,))
 

	
 
    # permission check inside
 
    def update_repo(self, apiuser, repoid, name=Optional(None),
 
                    owner=Optional(OAttr('apiuser')),
 
                    group=Optional(None),
 
                    description=Optional(''), private=Optional(False),
 
                    clone_uri=Optional(None), landing_rev=Optional('rev:tip'),
 
                    enable_statistics=Optional(False),
 
                    enable_locking=Optional(False),
 
                    enable_downloads=Optional(False)):
 

	
 
        """
 
        Updates repo
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param name:
 
        :param owner:
 
        :param group:
 
        :param description:
 
        :param private:
 
        :param clone_uri:
 
        :param landing_rev:
 
        :param enable_statistics:
 
        :param enable_locking:
 
        :param enable_downloads:
 
        """
 
        repo = get_repo_or_error(repoid)
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this repo !
 
            if not HasRepoPermissionAnyApi('repository.admin')(user=apiuser,
 
                                                               repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
            if (name != repo.repo_name and
 
                not HasPermissionAnyApi('hg.create.repository')(user=apiuser)
 
                ):
 
                raise JSONRPCError('no permission to create (or move) repositories')
 

	
 
        updates = {
 
            # update function requires this.
 
            'repo_name': repo.repo_name
 
        }
 
        repo_group = group
 
        if not isinstance(repo_group, Optional):
 
            repo_group = get_repo_group_or_error(repo_group)
 
            repo_group = repo_group.group_id
 
        try:
 
            store_update(updates, name, 'repo_name')
 
            store_update(updates, repo_group, 'repo_group')
 
            store_update(updates, owner, 'user')
 
            store_update(updates, description, 'repo_description')
 
            store_update(updates, private, 'repo_private')
 
            store_update(updates, clone_uri, 'clone_uri')
 
            store_update(updates, landing_rev, 'repo_landing_rev')
 
            store_update(updates, enable_statistics, 'repo_enable_statistics')
 
            store_update(updates, enable_locking, 'repo_enable_locking')
 
            store_update(updates, enable_downloads, 'repo_enable_downloads')
 

	
 
            RepoModel().update(repo, **updates)
 
            Session().commit()
 
            return dict(
 
                msg='updated repo ID:%s %s' % (repo.repo_id, repo.repo_name),
 
                repository=repo.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to update repo `%s`' % repoid)
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
 
    def fork_repo(self, apiuser, repoid, fork_name,
 
                  owner=Optional(OAttr('apiuser')),
 
                  description=Optional(''), copy_permissions=Optional(False),
 
                  private=Optional(False), landing_rev=Optional('rev:tip')):
 
        """
 
        Creates a fork of given repo. In case of using celery this will
 
        immediately return success message, while fork is going to be created
 
        asynchronous. This command can be executed only using api_key belonging to
 
        user with admin rights or regular user that have fork permission, and at least
 
        read access to forking repository. Regular users cannot specify owner parameter.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param fork_name:
 
        :param owner:
 
        :param description:
 
        :param copy_permissions:
 
        :param private:
 
        :param landing_rev:
 

	
 
        INPUT::
 

	
 
            id : <id_for_response>
 
            api_key : "<api_key>"
 
            args:     {
 
                        "repoid" :          "<reponame or repo_id>",
 
                        "fork_name":        "<forkname>",
 
                        "owner":            "<username or user_id = Optional(=apiuser)>",
 
                        "description":      "<description>",
 
                        "copy_permissions": "<bool>",
 
                        "private":          "<bool>",
 
                        "landing_rev":      "<landing_rev>"
 
                      }
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg": "Created fork of `<reponame>` as `<forkname>`",
 
                      "success": true,
 
                      "task": "<celery task id or None if done sync>"
 
                    }
 
            error:  null
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 
        repo_name = repo.repo_name
 

	
 
        _repo = RepoModel().get_by_repo_name(fork_name)
 
        if _repo:
 
            type_ = 'fork' if _repo.fork else 'repo'
 
            raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
 

	
 
        if HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            pass
 
        elif HasRepoPermissionAnyApi('repository.admin',
 
                                     'repository.write',
 
                                     'repository.read')(user=apiuser,
 
                                                        repo_name=repo.repo_name):
 
            if not isinstance(owner, Optional):
 
                #forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
 

	
 
            if not HasPermissionAnyApi('hg.create.repository')(user=apiuser):
 
                raise JSONRPCError('no permission to create repositories')
 
        else:
 
            raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        if isinstance(owner, Optional):
 
            owner = apiuser.user_id
 

	
 
        owner = get_user_or_error(owner)
 

	
 
        try:
 
            # create structure of groups and return the last group
 
            group = map_groups(fork_name)
 

	
 
            form_data = dict(
 
                repo_name=fork_name,
 
                repo_name_full=fork_name,
 
                repo_group=group,
 
                repo_type=repo.repo_type,
 
                description=Optional.extract(description),
 
                private=Optional.extract(private),
 
                copy_permissions=Optional.extract(copy_permissions),
 
                landing_rev=Optional.extract(landing_rev),
 
                update_after_clone=False,
 
                fork_parent_id=repo.repo_id,
 
            )
 
            task = RepoModel().create_fork(form_data, cur_user=owner)
 
            # no commit, it's done in RepoModel, or async via celery
 
            from celery.result import BaseAsyncResult
 
            task_id = None
 
            if isinstance(task, BaseAsyncResult):
 
                task_id = task.task_id
 
            return dict(
 
                msg='Created fork of `%s` as `%s`' % (repo.repo_name,
 
                                                      fork_name),
 
                success=True,  # cannot return the repo data here since fork
 
                               # can be done async
 
                task=task_id
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to fork repository `%s` as `%s`' % (repo_name,
 
                                                            fork_name)
 
            )
 

	
 
    # permission check inside
 
    def delete_repo(self, apiuser, repoid, forks=Optional('')):
 
        """
 
        Deletes a repository. This command can be executed only using api_key belonging
 
        to user with admin rights or regular user that have admin access to repository.
 
        When `forks` param is set it's possible to detach or delete forks of deleting
 
        repository
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param forks: `detach` or `delete`, what do do with attached forks for repo
 
        :type forks: Optional(str)
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg": "Deleted repository `<reponame>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 

	
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this repo !
 
            if not HasRepoPermissionAnyApi('repository.admin')(user=apiuser,
 
                                                           repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        try:
 
            handle_forks = Optional.extract(forks)
 
            _forks_msg = ''
 
            _forks = [f for f in repo.forks]
 
            if handle_forks == 'detach':
 
                _forks_msg = ' ' + 'Detached %s forks' % len(_forks)
 
            elif handle_forks == 'delete':
 
                _forks_msg = ' ' + 'Deleted %s forks' % len(_forks)
 
            elif _forks:
 
                raise JSONRPCError(
 
                    'Cannot delete `%s` it still contains attached forks' %
 
                    (repo.repo_name,)
 
                )
 

	
 
            RepoModel().delete(repo, forks=forks)
 
            Session().commit()
 
            return dict(
 
                msg='Deleted repository `%s`%s' % (repo.repo_name, _forks_msg),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to delete repository `%s`' % (repo.repo_name,)
 
            )
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def grant_user_permission(self, apiuser, repoid, userid, perm):
 
        """
 
        Grant permission for user on given repository, or update existing one
 
        if found. This command can be executed only using api_key belonging to user
 
        with admin rights.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param userid:
 
        :param perm: (repository.(none|read|write|admin))
 
        :type perm: str
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Granted perm: `<perm>` for user: `<username>` in repo: `<reponame>`",
 
                      "success": true
 
                    }
 
            error:  null
 
        """
 
        repo = get_repo_or_error(repoid)
 
        user = get_user_or_error(userid)
 
        perm = get_perm_or_error(perm)
 

	
 
        try:
 

	
 
            RepoModel().grant_user_permission(repo=repo, user=user, perm=perm)
 

	
 
            Session().commit()
 
            return dict(
 
                msg='Granted perm: `%s` for user: `%s` in repo: `%s`' % (
 
                    perm.permission_name, user.username, repo.repo_name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user: `%s` in repo: `%s`' % (
 
                    userid, repoid
 
                )
 
            )
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def revoke_user_permission(self, apiuser, repoid, userid):
 
        """
 
        Revoke permission for user on given repository. This command can be executed
 
        only using api_key belonging to user with admin rights.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param userid:
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Revoked perm for user: `<username>` in repo: `<reponame>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        """
 

	
 
        repo = get_repo_or_error(repoid)
 
        user = get_user_or_error(userid)
 
        try:
 
            RepoModel().revoke_user_permission(repo=repo, user=user)
 
            Session().commit()
 
            return dict(
 
                msg='Revoked perm for user: `%s` in repo: `%s`' % (
 
                    user.username, repo.repo_name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user: `%s` in repo: `%s`' % (
 
                    userid, repoid
 
                )
 
            )
 

	
 
    # permission check inside
 
    def grant_user_group_permission(self, apiuser, repoid, usergroupid, perm):
 
        """
 
        Grant permission for user group on given repository, or update
 
        existing one if found. This command can be executed only using
 
        api_key belonging to user with admin rights.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param usergroupid: id of usergroup
 
        :type usergroupid: str or int
 
        :param perm: (repository.(none|read|write|admin))
 
        :type perm: str
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "msg" : "Granted perm: `<perm>` for group: `<usersgroupname>` in repo: `<reponame>`",
 
            "success": true
 

	
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to edit permission for user group: `<usergroup>` in repo `<repo>`'
 
          }
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 
        perm = get_perm_or_error(perm)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this repo !
 
            _perms = ('repository.admin',)
 
            if not HasRepoPermissionAnyApi(*_perms)(
 
                    user=apiuser, repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
            # check if we have at least read permission for this user group !
 
            _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
                    user=apiuser, user_group_name=user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        try:
 
            RepoModel().grant_user_group_permission(
 
                repo=repo, group_name=user_group, perm=perm)
 

	
 
            Session().commit()
 
            return dict(
 
                msg='Granted perm: `%s` for user group: `%s` in '
 
                    'repo: `%s`' % (
 
                        perm.permission_name, user_group.users_group_name,
 
                        repo.repo_name
 
                    ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user group: `%s` in '
 
                'repo: `%s`' % (
 
                    usergroupid, repo.repo_name
 
                )
 
            )
 

	
 
    # permission check inside
 
    def revoke_user_group_permission(self, apiuser, repoid, usergroupid):
 
        """
 
        Revoke permission for user group on given repository. This command can be
 
        executed only using api_key belonging to user with admin rights.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param usergroupid:
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Revoked perm for group: `<usersgroupname>` in repo: `<reponame>`",
 
                      "success": true
 
                    }
 
            error:  null
 
        """
 
        repo = get_repo_or_error(repoid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this repo !
 
            _perms = ('repository.admin',)
 
            if not HasRepoPermissionAnyApi(*_perms)(
 
                    user=apiuser, repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
            # check if we have at least read permission for this user group !
 
            _perms = ('usergroup.read', 'usergroup.write', 'usergroup.admin',)
 
            if not HasUserGroupPermissionAny(*_perms)(
 
                    user=apiuser, user_group_name=user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        try:
 
            RepoModel().revoke_user_group_permission(
 
                repo=repo, group_name=user_group)
 

	
 
            Session().commit()
 
            return dict(
 
                msg='Revoked perm for user group: `%s` in repo: `%s`' % (
 
                    user_group.users_group_name, repo.repo_name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user group: `%s` in '
 
                'repo: `%s`' % (
 
                    user_group.users_group_name, repo.repo_name
 
                )
 
            )
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def get_repo_group(self, apiuser, repogroupid):
 
        """
 
        Returns given repo group together with permissions, and repositories
 
        inside the group
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param repogroupid: id/name of repository group
 
        :type repogroupid: str or int
 
        """
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        members = []
 
        for user in repo_group.repo_group_to_perm:
 
            perm = user.permission.permission_name
 
            user = user.user
 
            user_data = {
 
                'name': user.username,
 
                'type': "user",
 
                'permission': perm
 
            }
 
            members.append(user_data)
 

	
 
        for user_group in repo_group.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group = user_group.users_group
 
            user_group_data = {
 
                'name': user_group.users_group_name,
 
                'type': "user_group",
 
                'permission': perm
 
            }
 
            members.append(user_group_data)
 

	
 
        data = repo_group.get_api_data()
 
        data["members"] = members
 
        return data
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def get_repo_groups(self, apiuser):
 
        """
 
        Returns all repository groups
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        """
 
        result = []
 
        for repo_group in RepoGroupModel().get_all():
 
            result.append(repo_group.get_api_data())
 
        return result
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def create_repo_group(self, apiuser, group_name, description=Optional(''),
 
                          owner=Optional(OAttr('apiuser')),
 
                          parent=Optional(None),
 
                          copy_permissions=Optional(False)):
 
        """
 
        Creates a repository group. This command can be executed only using
 
        api_key belonging to user with admin rights.
 

	
 
        :param apiuser: filled automatically from apikey
 
        :type apiuser: AuthUser
 
        :param group_name:
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -797,893 +797,957 @@ class BaseTestApi(object):
 
            members.append(user_group_data)
 

	
 
        for user in repo.followers:
 
            followers.append(user.user.get_api_data())
 

	
 
        ret['members'] = members
 
        ret['followers'] = followers
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_user_group(new_group)
 

	
 
    @parameterized.expand([
 
        ('repository.admin',),
 
        ('repository.write',),
 
        ('repository.read',),
 
    ])
 
    def test_api_get_repo_by_non_admin(self, grant_perm):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=grant_perm)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey_regular, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        ret = repo.get_api_data()
 

	
 
        members = []
 
        followers = []
 
        self.assertEqual(2, len(repo.repo_to_perm))
 
        for user in repo.repo_to_perm:
 
            perm = user.permission.permission_name
 
            user_obj = user.user
 
            user_data = {'name': user_obj.username, 'type': "user",
 
                         'permission': perm}
 
            members.append(user_data)
 

	
 
        for user_group in repo.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group_obj = user_group.users_group
 
            user_group_data = {'name': user_group_obj.users_group_name,
 
                               'type': "user_group", 'permission': perm}
 
            members.append(user_group_data)
 

	
 
        for user in repo.followers:
 
            followers.append(user.user.get_api_data())
 

	
 
        ret['members'] = members
 
        ret['followers'] = followers
 

	
 
        expected = ret
 
        try:
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
 

	
 
    def test_api_get_repo_by_non_admin_no_permission_to_repo(self):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.none')
 

	
 
        id_, params = _build_data(self.apikey_regular, 'get_repo',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_that_doesn_not_exist(self):
 
        id_, params = _build_data(self.apikey, 'get_repo',
 
                                  repoid='no-such-repo')
 
        response = api_call(self, params)
 

	
 
        ret = 'repository `%s` does not exist' % 'no-such-repo'
 
        expected = ret
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repos(self):
 
        id_, params = _build_data(self.apikey, 'get_repos')
 
        response = api_call(self, params)
 

	
 
        result = []
 
        for repo in RepoModel().get_all():
 
            result.append(repo.get_api_data())
 
        ret = jsonify(result)
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_repos_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_repos')
 
        response = api_call(self, params)
 

	
 
        result = []
 
        for repo in RepoModel().get_all_user_repos(self.TEST_USER_LOGIN):
 
            result.append(repo.get_api_data())
 
        ret = jsonify(result)
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('all', 'all'),
 
                           ('dirs', 'dirs'),
 
                           ('files', 'files'), ])
 
    def test_api_get_repo_nodes(self, name, ret_type):
 
        rev = 'tip'
 
        path = '/'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        # we don't the actual return types here since it's tested somewhere
 
        # else
 
        expected = response.json['result']
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_revisions(self):
 
        rev = 'i-dont-exist'
 
        path = '/'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to get repo: `%s` nodes' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_path(self):
 
        rev = 'tip'
 
        path = '/idontexits'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to get repo: `%s` nodes' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_ret_type(self):
 
        rev = 'tip'
 
        path = '/'
 
        ret_type = 'error'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        expected = ('ret_type must be one of %s'
 
                    % (','.join(['files', 'dirs', 'all'])))
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('all', 'all', 'repository.write'),
 
                           ('dirs', 'dirs', 'repository.admin'),
 
                           ('files', 'files', 'repository.read'), ])
 
    def test_api_get_repo_nodes_by_regular_user(self, name, ret_type, grant_perm):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=grant_perm)
 
        Session().commit()
 

	
 
        rev = 'tip'
 
        path = '/'
 
        id_, params = _build_data(self.apikey_regular, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        # we don't the actual return types here since it's tested somewhere
 
        # else
 
        expected = response.json['result']
 
        try:
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
 

	
 
    def test_api_create_repo(self):
 
        repo_name = 'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_in_group(self):
 
        repo_name = 'my_gr/api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        print params
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 
        fixture.destroy_repo_group('my_gr')
 

	
 
    def test_api_create_repo_unknown_owner(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=owner,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 
        expected = 'user `%s` does not exist' % owner
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_repo_dont_specify_owner(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_by_non_admin(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        self.assertNotEqual(repo, None)
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_by_non_admin_specify_owner(self):
 
        repo_name = 'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
                                  owner=owner)
 
        response = api_call(self, params)
 

	
 
        expected = 'Only Kallithea admin can specify `owner` param'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_exists(self):
 
        repo_name = self.REPO
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = "repo `%s` already exist" % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'create', crash)
 
    def test_api_create_repo_exception_occurred(self):
 
        repo_name = 'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = 'failed to create repository `%s`' % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([
 
        ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
        ('description', {'description': 'new description'}),
 
        ('active', {'active': True}),
 
        ('active', {'active': False}),
 
        ('clone_uri', {'clone_uri': 'http://foo.com/repo'}),
 
        ('clone_uri', {'clone_uri': None}),
 
        ('landing_rev', {'landing_rev': 'branch:master'}),
 
        ('enable_statistics', {'enable_statistics': True}),
 
        ('enable_locking', {'enable_locking': True}),
 
        ('enable_downloads', {'enable_downloads': True}),
 
        ('name', {'name': 'new_repo_name'}),
 
        ('repo_group', {'group': 'test_group_for_update'}),
 
    ])
 
    def test_api_update_repo(self, changing_attr, updates):
 
        repo_name = 'api_update_me'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        if changing_attr == 'repo_group':
 
            fixture.create_repo_group(updates['group'])
 

	
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        if changing_attr == 'name':
 
            repo_name = updates['name']
 
        if changing_attr == 'repo_group':
 
            repo_name = '/'.join([updates['group'], repo_name])
 
        try:
 
            expected = {
 
                'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
 
                'repository': repo.get_api_data()
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            if changing_attr == 'repo_group':
 
                fixture.destroy_repo_group(updates['group'])
 

	
 
    def test_api_update_repo_repo_group_does_not_exist(self):
 
        repo_name = 'admin_owned'
 
        fixture.create_repo(repo_name)
 
        updates = {'group': 'test_group_for_update'}
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'repository group `%s` does not exist' % updates['group']
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_update_repo_regular_user_not_allowed(self):
 
        repo_name = 'admin_owned'
 
        fixture.create_repo(repo_name)
 
        updates = {'active': False}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'repository `%s` does not exist' % repo_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    @mock.patch.object(RepoModel, 'update', crash)
 
    def test_api_update_repo_exception_occurred(self):
 
        repo_name = 'api_update_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, owner=TEST_USER_ADMIN_LOGIN,)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'failed to update repo `%s`' % repo_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_update_repo_regular_user_change_repo_name(self):
 
        repo_name = 'admin_owned'
 
        new_repo_name = 'new_repo_name'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        RepoModel().grant_user_permission(repo=repo_name,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.admin')
 
        UserModel().revoke_perm('default', 'hg.create.repository')
 
        UserModel().grant_perm('default', 'hg.create.none')
 
        updates = {'name': new_repo_name}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'no permission to create (or move) repositories'
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            fixture.destroy_repo(new_repo_name)
 

	
 
    def test_api_update_repo_regular_user_change_repo_name_allowed(self):
 
        repo_name = 'admin_owned'
 
        new_repo_name = 'new_repo_name'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        RepoModel().grant_user_permission(repo=repo_name,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.admin')
 
        UserModel().revoke_perm('default', 'hg.create.none')
 
        UserModel().grant_perm('default', 'hg.create.repository')
 
        updates = {'name': new_repo_name}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = {
 
                'msg': 'updated repo ID:%s %s' % (repo.repo_id, new_repo_name),
 
                'repository': repo.get_api_data()
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            fixture.destroy_repo(new_repo_name)
 

	
 
    def test_api_delete_repo(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 

	
 
        id_, params = _build_data(self.apikey, 'delete_repo',
 
                                  repoid=repo_name, )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Deleted repository `%s`' % repo_name,
 
            'success': True
 
        }
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_by_non_admin(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
 
                            cur_user=self.TEST_USER_LOGIN)
 
        id_, params = _build_data(self.apikey_regular, 'delete_repo',
 
                                  repoid=repo_name, )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Deleted repository `%s`' % repo_name,
 
            'success': True
 
        }
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_by_non_admin_no_permission(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        try:
 
            id_, params = _build_data(self.apikey_regular, 'delete_repo',
 
                                      repoid=repo_name, )
 
            response = api_call(self, params)
 
            expected = 'repository `%s` does not exist' % (repo_name)
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_delete_repo_exception_occurred(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        try:
 
            with mock.patch.object(RepoModel, 'delete', crash):
 
                id_, params = _build_data(self.apikey, 'delete_repo',
 
                                          repoid=repo_name, )
 
                response = api_call(self, params)
 

	
 
                expected = 'failed to delete repository `%s`' % repo_name
 
                self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_fork_repo(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
 
                                                     fork_name),
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
 
                                                     fork_name),
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin_specify_owner(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 
        expected = 'Only Kallithea admin can specify `owner` param'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_non_admin_no_permission_to_fork(self):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.none')
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    @parameterized.expand([('read', 'repository.read'),
 
                           ('write', 'repository.write'),
 
                           ('admin', 'repository.admin')])
 
    def test_api_fork_repo_non_admin_no_create_repo_permission(self, name, perm):
 
        fork_name = 'api-repo-fork'
 
        # regardless of base repository permission, forking is disallowed
 
        # when repository creation is disabled
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=perm)
 
        UserModel().revoke_perm('default', 'hg.create.repository')
 
        UserModel().grant_perm('default', 'hg.create.none')
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 
        expected = 'no permission to create repositories'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_unknown_owner(self):
 
        fork_name = 'api-repo-fork'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=owner,
 
        )
 
        response = api_call(self, params)
 
        expected = 'user `%s` does not exist' % owner
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_fork_repo_fork_exists(self):
 
        fork_name = 'api-repo-fork'
 
        fixture.create_fork(self.REPO, fork_name)
 

	
 
        try:
 
            fork_name = 'api-repo-fork'
 

	
 
            id_, params = _build_data(self.apikey, 'fork_repo',
 
                                      repoid=self.REPO,
 
                                      fork_name=fork_name,
 
                                      owner=TEST_USER_ADMIN_LOGIN,
 
            )
 
            response = api_call(self, params)
 

	
 
            expected = "fork `%s` already exist" % fork_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_repo_exists(self):
 
        fork_name = self.REPO
 

	
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        expected = "repo `%s` already exist" % fork_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'create_fork', crash)
 
    def test_api_fork_repo_exception_occurred(self):
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
        )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
 
                                                               fork_name)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_group(self):
 
        id_, params = _build_data(self.apikey, 'get_user_group',
 
                                  usergroupid=TEST_USER_GROUP)
 
        response = api_call(self, params)
 

	
 
        user_group = UserGroupModel().get_group(TEST_USER_GROUP)
 
        members = []
 
        for user in user_group.members:
 
            user = user.user
 
            members.append(user.get_api_data())
 

	
 
        ret = user_group.get_api_data()
 
        ret['members'] = members
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_groups(self):
 
        gr_name = 'test_user_group2'
 
        make_user_group(gr_name)
 

	
 
        id_, params = _build_data(self.apikey, 'get_user_groups', )
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = []
 
            for gr_name in [TEST_USER_GROUP, 'test_user_group2']:
 
                user_group = UserGroupModel().get_group(gr_name)
 
                ret = user_group.get_api_data()
 
                expected.append(ret)
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_create_user_group(self):
 
        group_name = 'some_new_group'
 
        id_, params = _build_data(self.apikey, 'create_user_group',
 
                                  group_name=group_name)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'created new user group `%s`' % group_name,
 
            'user_group': jsonify(UserGroupModel() \
 
                .get_by_name(group_name) \
 
                .get_api_data())
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        fixture.destroy_user_group(group_name)
 

	
 
    def test_api_get_user_group_that_exist(self):
 
        id_, params = _build_data(self.apikey, 'create_user_group',
 
                                  group_name=TEST_USER_GROUP)
 
        response = api_call(self, params)
 

	
 
        expected = "user group `%s` already exist" % TEST_USER_GROUP
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserGroupModel, 'create', crash)
 
    def test_api_get_user_group_exception_occurred(self):
 
        group_name = 'exception_happens'
 
        id_, params = _build_data(self.apikey, 'create_user_group',
 
                                  group_name=group_name)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to create group `%s`' % group_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('group_name', {'group_name': 'new_group_name'}),
 
                           ('group_name', {'group_name': 'test_group_for_update'}),
 
                           ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
                           ('active', {'active': False}),
 
                           ('active', {'active': True})])
 
    def test_api_update_user_group(self, changing_attr, updates):
 
        gr_name = 'test_group_for_update'
 
        user_group = fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'update_user_group',
 
                                  usergroupid=gr_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = {
 
               'msg': 'updated user group ID:%s %s' % (user_group.users_group_id,
 
                                                     user_group.users_group_name),
 
               'user_group': user_group.get_api_data()
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            if changing_attr == 'group_name':
 
                # switch to updated name for proper cleanup
 
                gr_name = updates['group_name']
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @mock.patch.object(UserGroupModel, 'update', crash)
 
    def test_api_update_user_group_exception_occurred(self):
 
        gr_name = 'test_group'
 
        fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'update_user_group',
 
                                  usergroupid=gr_name)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'failed to update user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_add_user_to_user_group(self):
 
        gr_name = 'test_group'
 
        fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 
        try:
 
            expected = {
 
            'msg': 'added member `%s` to user group `%s`' % (
 
                    TEST_USER_ADMIN_LOGIN, gr_name),
 
            'success': True
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_add_user_to_user_group_that_doesnt_exist(self):
 
        id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                  usergroupid='false-group',
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'user group `%s` does not exist' % 'false-group'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(UserGroupModel, 'add_user_to_group', crash)
 
    def test_api_add_user_to_user_group_exception_occurred(self):
 
        gr_name = 'test_group'
 
        fixture.create_user_group(gr_name)
 
        id_, params = _build_data(self.apikey, 'add_user_to_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = 'failed to add member to user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_remove_user_from_user_group(self):
 
        gr_name = 'test_group_3'
 
        gr = fixture.create_user_group(gr_name)
 
        UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = {
 
                'msg': 'removed member `%s` from user group `%s`' % (
 
                    TEST_USER_ADMIN_LOGIN, gr_name
 
                ),
 
                'success': True}
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @mock.patch.object(UserGroupModel, 'remove_user_from_group', crash)
 
    def test_api_remove_user_from_user_group_exception_occurred(self):
 
        gr_name = 'test_group_3'
 
        gr = fixture.create_user_group(gr_name)
 
        UserGroupModel().add_user_to_group(gr, user=TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 
        id_, params = _build_data(self.apikey, 'remove_user_from_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'failed to remove member from user group `%s`' % gr_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_delete_user_group(self):
 
        gr_name = 'test_group'
 
        ugroup = fixture.create_user_group(gr_name)
 
        gr_id = ugroup.users_group_id
 
        id_, params = _build_data(self.apikey, 'delete_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = {
 
                'user_group': None,
 
                'msg': 'deleted user group ID:%s %s' % (gr_id, gr_name)
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            if UserGroupModel().get_by_name(gr_name):
 
                fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_delete_user_group_that_is_assigned(self):
 
        gr_name = 'test_group'
 
        ugroup = fixture.create_user_group(gr_name)
 
        gr_id = ugroup.users_group_id
 

	
 
        ugr_to_perm = RepoModel().grant_user_group_permission(self.REPO, gr_name, 'repository.write')
 
        msg = 'User Group assigned to %s' % ugr_to_perm.repository.repo_name
 

	
 
        id_, params = _build_data(self.apikey, 'delete_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        try:
 
            expected = msg
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            if UserGroupModel().get_by_name(gr_name):
 
                fixture.destroy_user_group(gr_name)
 

	
 
    def test_api_delete_user_group_exception_occurred(self):
 
        gr_name = 'test_group'
 
        ugroup = fixture.create_user_group(gr_name)
 
        gr_id = ugroup.users_group_id
 
        id_, params = _build_data(self.apikey, 'delete_user_group',
 
                                  usergroupid=gr_name,
 
                                  userid=TEST_USER_ADMIN_LOGIN)
 

	
 
        try:
 
            with mock.patch.object(UserGroupModel, 'delete', crash):
 
                response = api_call(self, params)
 
                expected = 'failed to delete user group ID:%s %s' % (gr_id, gr_name)
 
                self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user_group(gr_name)
 

	
 
    @parameterized.expand([('none', 'repository.none'),
 
                           ('read', 'repository.read'),
 
                           ('write', 'repository.write'),
 
                           ('admin', 'repository.admin')])
 
    def test_api_grant_user_permission(self, name, perm):
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Granted perm: `%s` for user: `%s` in repo: `%s`' % (
 
                perm, TEST_USER_ADMIN_LOGIN, self.REPO
 
            ),
 
            'success': True
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_grant_user_permission_wrong_permission(self):
 
        perm = 'haha.no.permission'
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'permission `%s` does not exist' % perm
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'grant_user_permission', crash)
 
    def test_api_grant_user_permission_exception_when_adding(self):
 
        perm = 'repository.read'
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
 
            TEST_USER_ADMIN_LOGIN, self.REPO
 
        )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_revoke_user_permission(self):
 
        id_, params = _build_data(self.apikey,
 
                                  'revoke_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN, )
 
        response = api_call(self, params)
 

	
 
        expected = {
 
            'msg': 'Revoked perm for user: `%s` in repo: `%s`' % (
 
                TEST_USER_ADMIN_LOGIN, self.REPO
 
            ),
 
            'success': True
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoModel, 'revoke_user_permission', crash)
 
    def test_api_revoke_user_permission_exception_when_adding(self):
 
        id_, params = _build_data(self.apikey,
 
                                  'revoke_user_permission',
 
                                  repoid=self.REPO,
 
                                  userid=TEST_USER_ADMIN_LOGIN, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for user: `%s` in repo: `%s`' % (
 
            TEST_USER_ADMIN_LOGIN, self.REPO
 
        )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parameterized.expand([('none', 'repository.none'),
 
                           ('read', 'repository.read'),
 
                           ('write', 'repository.write'),
 
                           ('admin', 'repository.admin')])
 
    def test_api_grant_user_group_permission(self, name, perm):
 
        id_, params = _build_data(self.apikey,
 
                                  'grant_user_group_permission',
 
                                  repoid=self.REPO,
 
                                  usergroupid=TEST_USER_GROUP,
 
                                  perm=perm)
 
        response = api_call(self, params)
 

	
 
        ret = {
0 comments (0 inline, 0 general)