Changeset - caa482f8fb5f
[Not reviewed]
default
0 3 0
Mads Kiilerich - 7 years ago 2018-05-29 12:25:41
mads@kiilerich.com
repos: only allow api repo creation in existing groups

Fix problem with '../something' paths being allowed; '..' will always exist and
can't be created.

This also introduce a small API change: Repository groups must now exist before
repositories can be created. This makes the API more explicit and simpler.

This issue was found and reported by
Kacper Szurek
https://security.szurek.pl/
3 files changed with 43 insertions and 23 deletions:
0 comments (0 inline, 0 general)
docs/api/api.rst
Show inline comments
 
@@ -703,196 +703,198 @@ OUTPUT::
 
                        "username": "<user_id>",
 
                        "text": "<comment text>",
 
                        "comment_id": "<comment_id>",
 
                      },
 
                      ...
 
                    ],
 
                    "owner": "<username>",
 
                    "statuses": [
 
                      {
 
                        "status": "<status_of_review>",        # "under_review", "approved" or "rejected"
 
                        "reviewer": "<user_id>",
 
                        "modified_at": "<date_time_of_review>" # iso 8601 date, server's timezone
 
                      },
 
                      ...
 
                    ],
 
                    "revisions": [
 
                      "<raw_id>",
 
                      ...
 
                    ]
 
                  },
 
                  ...
 
                ]
 
            }
 
    error:  null
 

	
 
get_repos
 
^^^^^^^^^
 

	
 
List all existing repositories.
 
This command can only be executed using the api_key of a user with admin rights,
 
or that of a regular user with at least read access to the repository.
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "get_repos"
 
    args:     { }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: [
 
              {
 
                "repo_id" :          "<repo_id>",
 
                "repo_name" :        "<reponame>"
 
                "repo_type" :        "<repo_type>",
 
                "clone_uri" :        "<clone_uri>",
 
                "private" :          "<bool>",
 
                "created_on" :       "<datetimecreated>",
 
                "description" :      "<description>",
 
                "landing_rev":       "<landing_rev>",
 
                "owner":             "<repo_owner>",
 
                "fork_of":           "<name_of_fork_parent>",
 
                "enable_downloads":  "<bool>",
 
                "enable_locking":    "<bool>",
 
                "enable_statistics": "<bool>",
 
              },
 
 
            ]
 
    error:  null
 

	
 
get_repo_nodes
 
^^^^^^^^^^^^^^
 

	
 
Return a list of files and directories for a given path at the given revision.
 
It is possible to specify ret_type to show only ``files`` or ``dirs``.
 
This command can only be executed using the api_key of a user with admin rights.
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "get_repo_nodes"
 
    args:     {
 
                "repoid" : "<reponame or repo_id>"
 
                "revision"  : "<revision>",
 
                "root_path" : "<root_path>",
 
                "ret_type"  : "<ret_type> = Optional('all')"
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: [
 
              {
 
                "name" :        "<name>"
 
                "type" :        "<type>",
 
              },
 
 
            ]
 
    error:  null
 

	
 
create_repo
 
^^^^^^^^^^^
 

	
 
Create a repository. If the repository name contains "/", all needed repository
 
groups will be created. For example "foo/bar/baz" will create repository groups
 
"foo", "bar" (with "foo" as parent), and create "baz" repository with
 
"bar" as group.
 
Create a repository. If the repository name contains "/", the repository will be
 
created in the repository group indicated by that path. Any such repository
 
groups need to exist before calling this method, or the call will fail.
 
For example "foo/bar/baz" will create a repository "baz" inside the repository
 
group "bar" which itself is in a repository group "foo", but both "foo" and
 
"bar" already need to exist before calling this method.
 
This command can only be executed using the api_key of a user with admin rights,
 
or that of a regular user with create repository permission.
 
Regular users cannot specify owner parameter.
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "create_repo"
 
    args:     {
 
                "repo_name" :        "<reponame>",
 
                "owner" :            "<owner_name_or_id = Optional(=apiuser)>",
 
                "repo_type" :        "<repo_type> = Optional('hg')",
 
                "description" :      "<description> = Optional('')",
 
                "private" :          "<bool> = Optional(False)",
 
                "clone_uri" :        "<clone_uri> = Optional(None)",
 
                "landing_rev" :      "<landing_rev> = Optional('tip')",
 
                "enable_downloads":  "<bool> = Optional(False)",
 
                "enable_locking":    "<bool> = Optional(False)",
 
                "enable_statistics": "<bool> = Optional(False)",
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: {
 
              "msg": "Created new repository `<reponame>`",
 
              "repo": {
 
                "repo_id" :          "<repo_id>",
 
                "repo_name" :        "<reponame>"
 
                "repo_type" :        "<repo_type>",
 
                "clone_uri" :        "<clone_uri>",
 
                "private" :          "<bool>",
 
                "created_on" :       "<datetimecreated>",
 
                "description" :      "<description>",
 
                "landing_rev":       "<landing_rev>",
 
                "owner":             "<username or user_id>",
 
                "fork_of":           "<name_of_fork_parent>",
 
                "enable_downloads":  "<bool>",
 
                "enable_locking":    "<bool>",
 
                "enable_statistics": "<bool>",
 
              },
 
            }
 
    error:  null
 

	
 
update_repo
 
^^^^^^^^^^^
 

	
 
Update a repository.
 
This command can only be executed using the api_key of a user with admin rights,
 
or that of a regular user with create repository permission.
 
Regular users cannot specify owner parameter.
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "update_repo"
 
    args:     {
 
                "repoid" :           "<reponame or repo_id>"
 
                "name" :             "<reponame> = Optional('')",
 
                "group" :            "<group_id> = Optional(None)",
 
                "owner" :            "<owner_name_or_id = Optional(=apiuser)>",
 
                "description" :      "<description> = Optional('')",
 
                "private" :          "<bool> = Optional(False)",
 
                "clone_uri" :        "<clone_uri> = Optional(None)",
 
                "landing_rev" :      "<landing_rev> = Optional('tip')",
 
                "enable_downloads":  "<bool> = Optional(False)",
 
                "enable_locking":    "<bool> = Optional(False)",
 
                "enable_statistics": "<bool> = Optional(False)",
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: {
 
              "msg": "updated repo ID:repo_id `<reponame>`",
 
              "repository": {
 
                "repo_id" :          "<repo_id>",
 
                "repo_name" :        "<reponame>"
 
                "repo_type" :        "<repo_type>",
 
                "clone_uri" :        "<clone_uri>",
 
                "private":           "<bool>",
 
                "created_on" :       "<datetimecreated>",
 
                "description" :      "<description>",
 
                "landing_rev":       "<landing_rev>",
 
                "owner":             "<username or user_id>",
 
                "fork_of":           "<name_of_fork_parent>",
 
                "enable_downloads":  "<bool>",
 
                "enable_locking":    "<bool>",
 
                "enable_statistics": "<bool>",
 
                "last_changeset":    {
 
                                       "author":   "<full_author>",
 
                                       "date":     "<date_time_of_commit>",
 
                                       "message":  "<commit_message>",
 
                                       "raw_id":   "<raw_id>",
kallithea/controllers/api/api.py
Show inline comments
 
@@ -1257,458 +1257,466 @@ class ApiController(JSONRPCController):
 
                      {
 
                        "repo_id" :          "<repo_id>",
 
                        "repo_name" :        "<reponame>"
 
                        "repo_type" :        "<repo_type>",
 
                        "clone_uri" :        "<clone_uri>",
 
                        "private": :         "<bool>",
 
                        "created_on" :       "<datetimecreated>",
 
                        "description" :      "<description>",
 
                        "landing_rev":       "<landing_rev>",
 
                        "owner":             "<repo_owner>",
 
                        "fork_of":           "<name_of_fork_parent>",
 
                        "enable_downloads":  "<bool>",
 
                        "enable_locking":    "<bool>",
 
                        "enable_statistics": "<bool>",
 
                      },
 
 
                    ]
 
            error:  null
 
        """
 
        if not HasPermissionAny('hg.admin')():
 
            repos = RepoModel().get_all_user_repos(user=request.authuser.user_id)
 
        else:
 
            repos = Repository.query()
 

	
 
        return [
 
            repo.get_api_data()
 
            for repo in repos
 
        ]
 

	
 
    # permission check inside
 
    def get_repo_nodes(self, repoid, revision, root_path,
 
                       ret_type=Optional('all')):
 
        """
 
        returns a list of nodes and it's children in a flat list for a given path
 
        at given revision. It's possible to specify ret_type to show only `files` or
 
        `dirs`.  This command can be executed only using api_key belonging to
 
        user with admin rights or regular user that have at least read access to repository.
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param revision: revision for which listing should be done
 
        :type revision: str
 
        :param root_path: path from which start displaying
 
        :type root_path: str
 
        :param ret_type: return type 'all|files|dirs' nodes
 
        :type ret_type: Optional(str)
 

	
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: [
 
                      {
 
                        "name" :        "<name>"
 
                        "type" :        "<type>",
 
                      },
 
 
                    ]
 
            error:  null
 
        """
 
        repo = get_repo_or_error(repoid)
 

	
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoPermissionLevel('read')(repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        ret_type = Optional.extract(ret_type)
 
        _map = {}
 
        try:
 
            _d, _f = ScmModel().get_nodes(repo, revision, root_path,
 
                                          flat=False)
 
            _map = {
 
                'all': _d + _f,
 
                'files': _f,
 
                'dirs': _d,
 
            }
 
            return _map[ret_type]
 
        except KeyError:
 
            raise JSONRPCError('ret_type must be one of %s'
 
                               % (','.join(_map.keys())))
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to get repo: `%s` nodes' % repo.repo_name
 
            )
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
 
    def create_repo(self, repo_name, owner=Optional(OAttr('apiuser')),
 
                    repo_type=Optional('hg'), description=Optional(''),
 
                    private=Optional(False), clone_uri=Optional(None),
 
                    landing_rev=Optional('rev:tip'),
 
                    enable_statistics=Optional(False),
 
                    enable_locking=Optional(False),
 
                    enable_downloads=Optional(False),
 
                    copy_permissions=Optional(False)):
 
        """
 
        Creates a repository. If repository name contains "/", all needed repository
 
        groups will be created. For example "foo/bar/baz" will create groups
 
        "foo", "bar" (with "foo" as parent), and create "baz" repository with
 
        Creates a repository. The repository name contains the full path, but the
 
        parent repository group must exist. For example "foo/bar/baz" require the groups
 
        "foo" and "bar" (with "foo" as parent), and create "baz" repository with
 
        "bar" as group. This command can be executed only using api_key
 
        belonging to user with admin rights or regular user that have create
 
        repository permission. Regular users cannot specify owner parameter
 

	
 
        :param repo_name: repository name
 
        :type repo_name: str
 
        :param owner: user_id or username
 
        :type owner: Optional(str)
 
        :param repo_type: 'hg' or 'git'
 
        :type repo_type: Optional(str)
 
        :param description: repository description
 
        :type description: Optional(str)
 
        :param private:
 
        :type private: bool
 
        :param clone_uri:
 
        :type clone_uri: str
 
        :param landing_rev: <rev_type>:<rev>
 
        :type landing_rev: str
 
        :param enable_locking:
 
        :type enable_locking: bool
 
        :param enable_downloads:
 
        :type enable_downloads: bool
 
        :param enable_statistics:
 
        :type enable_statistics: bool
 
        :param copy_permissions: Copy permission from group that repository is
 
            being created.
 
        :type copy_permissions: bool
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg": "Created new repository `<reponame>`",
 
                      "success": true,
 
                      "task": "<celery task id or None if done sync>"
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
             'failed to create repository `<repo_name>`
 
          }
 

	
 
        """
 
        if not HasPermissionAny('hg.admin')():
 
            if not isinstance(owner, Optional):
 
                # forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
 
        if isinstance(owner, Optional):
 
            owner = request.authuser.user_id
 

	
 
        owner = get_user_or_error(owner)
 

	
 
        if RepoModel().get_by_repo_name(repo_name):
 
            raise JSONRPCError("repo `%s` already exist" % repo_name)
 

	
 
        defs = Setting.get_default_repo_settings(strip_prefix=True)
 
        if isinstance(private, Optional):
 
            private = defs.get('repo_private') or Optional.extract(private)
 
        if isinstance(repo_type, Optional):
 
            repo_type = defs.get('repo_type')
 
        if isinstance(enable_statistics, Optional):
 
            enable_statistics = defs.get('repo_enable_statistics')
 
        if isinstance(enable_locking, Optional):
 
            enable_locking = defs.get('repo_enable_locking')
 
        if isinstance(enable_downloads, Optional):
 
            enable_downloads = defs.get('repo_enable_downloads')
 

	
 
        clone_uri = Optional.extract(clone_uri)
 
        description = Optional.extract(description)
 
        landing_rev = Optional.extract(landing_rev)
 
        copy_permissions = Optional.extract(copy_permissions)
 

	
 
        try:
 
            repo_name_cleaned = repo_name.split('/')[-1]
 
            # create structure of groups and return the last group
 
            repo_group = map_groups(repo_name)
 
            repo_name_parts = repo_name.split('/')
 
            repo_group = None
 
            if len(repo_name_parts) > 1:
 
                group_name = '/'.join(repo_name_parts[:-1])
 
                repo_group = RepoGroup.get_by_group_name(group_name)
 
                if repo_group is None:
 
                    raise JSONRPCError("repo group `%s` not found" % group_name)
 
            data = dict(
 
                repo_name=repo_name_cleaned,
 
                repo_name=repo_name_parts[-1],
 
                repo_name_full=repo_name,
 
                repo_type=repo_type,
 
                repo_description=description,
 
                owner=owner,
 
                repo_private=private,
 
                clone_uri=clone_uri,
 
                repo_group=repo_group,
 
                repo_landing_rev=landing_rev,
 
                enable_statistics=enable_statistics,
 
                enable_locking=enable_locking,
 
                enable_downloads=enable_downloads,
 
                repo_copy_permissions=copy_permissions,
 
            )
 

	
 
            task = RepoModel().create(form_data=data, cur_user=owner)
 
            task_id = task.task_id
 
            # no commit, it's done in RepoModel, or async via celery
 
            return dict(
 
                msg="Created new repository `%s`" % (repo_name,),
 
                success=True,  # cannot return the repo data here since fork
 
                               # can be done async
 
                task=task_id
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to create repository `%s`' % (repo_name,))
 

	
 
    # permission check inside
 
    def update_repo(self, repoid, name=Optional(None),
 
                    owner=Optional(OAttr('apiuser')),
 
                    group=Optional(None),
 
                    description=Optional(''), private=Optional(False),
 
                    clone_uri=Optional(None), landing_rev=Optional('rev:tip'),
 
                    enable_statistics=Optional(False),
 
                    enable_locking=Optional(False),
 
                    enable_downloads=Optional(False)):
 

	
 
        """
 
        Updates repo
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param name:
 
        :param owner:
 
        :param group:
 
        :param description:
 
        :param private:
 
        :param clone_uri:
 
        :param landing_rev:
 
        :param enable_statistics:
 
        :param enable_locking:
 
        :param enable_downloads:
 
        """
 
        repo = get_repo_or_error(repoid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoPermissionLevel('admin')(repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
            if (name != repo.repo_name and
 
                not HasPermissionAny('hg.create.repository')()
 
                ):
 
                raise JSONRPCError('no permission to create (or move) repositories')
 

	
 
            if not isinstance(owner, Optional):
 
                # forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
 

	
 
        updates = {}
 
        repo_group = group
 
        if not isinstance(repo_group, Optional):
 
            repo_group = get_repo_group_or_error(repo_group)
 
            repo_group = repo_group.group_id
 
        try:
 
            store_update(updates, name, 'repo_name')
 
            store_update(updates, repo_group, 'repo_group')
 
            store_update(updates, owner, 'owner')
 
            store_update(updates, description, 'repo_description')
 
            store_update(updates, private, 'repo_private')
 
            store_update(updates, clone_uri, 'clone_uri')
 
            store_update(updates, landing_rev, 'repo_landing_rev')
 
            store_update(updates, enable_statistics, 'repo_enable_statistics')
 
            store_update(updates, enable_locking, 'repo_enable_locking')
 
            store_update(updates, enable_downloads, 'repo_enable_downloads')
 

	
 
            RepoModel().update(repo, **updates)
 
            Session().commit()
 
            return dict(
 
                msg='updated repo ID:%s %s' % (repo.repo_id, repo.repo_name),
 
                repository=repo.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to update repo `%s`' % repoid)
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
 
    def fork_repo(self, repoid, fork_name,
 
                  owner=Optional(OAttr('apiuser')),
 
                  description=Optional(''), copy_permissions=Optional(False),
 
                  private=Optional(False), landing_rev=Optional('rev:tip')):
 
        """
 
        Creates a fork of given repo. In case of using celery this will
 
        immediately return success message, while fork is going to be created
 
        asynchronous. This command can be executed only using api_key belonging to
 
        user with admin rights or regular user that have fork permission, and at least
 
        read access to forking repository. Regular users cannot specify owner parameter.
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param fork_name:
 
        :param owner:
 
        :param description:
 
        :param copy_permissions:
 
        :param private:
 
        :param landing_rev:
 

	
 
        INPUT::
 

	
 
            id : <id_for_response>
 
            api_key : "<api_key>"
 
            args:     {
 
                        "repoid" :          "<reponame or repo_id>",
 
                        "fork_name":        "<forkname>",
 
                        "owner":            "<username or user_id = Optional(=apiuser)>",
 
                        "description":      "<description>",
 
                        "copy_permissions": "<bool>",
 
                        "private":          "<bool>",
 
                        "landing_rev":      "<landing_rev>"
 
                      }
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg": "Created fork of `<reponame>` as `<forkname>`",
 
                      "success": true,
 
                      "task": "<celery task id or None if done sync>"
 
                    }
 
            error:  null
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 
        repo_name = repo.repo_name
 

	
 
        _repo = RepoModel().get_by_repo_name(fork_name)
 
        if _repo:
 
            type_ = 'fork' if _repo.fork else 'repo'
 
            raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
 

	
 
        if HasPermissionAny('hg.admin')():
 
            pass
 
        elif HasRepoPermissionLevel('read')(repo.repo_name):
 
            if not isinstance(owner, Optional):
 
                # forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
 

	
 
            if not HasPermissionAny('hg.create.repository')():
 
                raise JSONRPCError('no permission to create repositories')
 
        else:
 
            raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        if isinstance(owner, Optional):
 
            owner = request.authuser.user_id
 

	
 
        owner = get_user_or_error(owner)
 

	
 
        try:
 
            # create structure of groups and return the last group
 
            group = map_groups(fork_name)
 
            fork_base_name = fork_name.rsplit('/', 1)[-1]
 
            fork_name_parts = fork_name.split('/')
 
            repo_group = None
 
            if len(fork_name_parts) > 1:
 
                group_name = '/'.join(fork_name_parts[:-1])
 
                repo_group = RepoGroup.get_by_group_name(group_name)
 
                if repo_group is None:
 
                    raise JSONRPCError("repo group `%s` not found" % group_name)
 

	
 
            form_data = dict(
 
                repo_name=fork_base_name,
 
                repo_name=fork_name_parts[-1],
 
                repo_name_full=fork_name,
 
                repo_group=group,
 
                repo_group=repo_group,
 
                repo_type=repo.repo_type,
 
                description=Optional.extract(description),
 
                private=Optional.extract(private),
 
                copy_permissions=Optional.extract(copy_permissions),
 
                landing_rev=Optional.extract(landing_rev),
 
                update_after_clone=False,
 
                fork_parent_id=repo.repo_id,
 
            )
 
            task = RepoModel().create_fork(form_data, cur_user=owner)
 
            # no commit, it's done in RepoModel, or async via celery
 
            task_id = task.task_id
 
            return dict(
 
                msg='Created fork of `%s` as `%s`' % (repo.repo_name,
 
                                                      fork_name),
 
                success=True,  # cannot return the repo data here since fork
 
                               # can be done async
 
                task=task_id
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to fork repository `%s` as `%s`' % (repo_name,
 
                                                            fork_name)
 
            )
 

	
 
    # permission check inside
 
    def delete_repo(self, repoid, forks=Optional('')):
 
        """
 
        Deletes a repository. This command can be executed only using api_key belonging
 
        to user with admin rights or regular user that have admin access to repository.
 
        When `forks` param is set it's possible to detach or delete forks of deleting
 
        repository
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param forks: `detach` or `delete`, what do do with attached forks for repo
 
        :type forks: Optional(str)
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg": "Deleted repository `<reponame>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 

	
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoPermissionLevel('admin')(repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        try:
 
            handle_forks = Optional.extract(forks)
 
            _forks_msg = ''
 
            _forks = [f for f in repo.forks]
 
            if handle_forks == 'detach':
 
                _forks_msg = ' ' + 'Detached %s forks' % len(_forks)
 
            elif handle_forks == 'delete':
 
                _forks_msg = ' ' + 'Deleted %s forks' % len(_forks)
 
            elif _forks:
 
                raise JSONRPCError(
 
                    'Cannot delete `%s` it still contains attached forks' %
 
                    (repo.repo_name,)
 
                )
 

	
 
            RepoModel().delete(repo, forks=forks)
 
            Session().commit()
 
            return dict(
 
                msg='Deleted repository `%s`%s' % (repo.repo_name, _forks_msg),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to delete repository `%s`' % (repo.repo_name,)
 
            )
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def grant_user_permission(self, repoid, userid, perm):
 
        """
 
        Grant permission for user on given repository, or update existing one
 
        if found. This command can be executed only using api_key belonging to user
 
        with admin rights.
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param userid:
 
        :param perm: (repository.(none|read|write|admin))
 
        :type perm: str
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -972,332 +972,342 @@ class _BaseTestApi(object):
 

	
 
        expected = 'failed to get repo: `%s` nodes' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_path(self):
 
        rev = 'tip'
 
        path = '/idontexits'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path, )
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to get repo: `%s` nodes' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_repo_nodes_bad_ret_type(self):
 
        rev = 'tip'
 
        path = '/'
 
        ret_type = 'error'
 
        id_, params = _build_data(self.apikey, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        expected = ('ret_type must be one of %s'
 
                    % (','.join(['files', 'dirs', 'all'])))
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parametrize('name,ret_type,grant_perm', [
 
        ('all', 'all', 'repository.write'),
 
        ('dirs', 'dirs', 'repository.admin'),
 
        ('files', 'files', 'repository.read'),
 
    ])
 
    def test_api_get_repo_nodes_by_regular_user(self, name, ret_type, grant_perm):
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=grant_perm)
 
        Session().commit()
 

	
 
        rev = 'tip'
 
        path = '/'
 
        id_, params = _build_data(self.apikey_regular, 'get_repo_nodes',
 
                                  repoid=self.REPO, revision=rev,
 
                                  root_path=path,
 
                                  ret_type=ret_type)
 
        response = api_call(self, params)
 

	
 
        # we don't the actual return types here since it's tested somewhere
 
        # else
 
        expected = response.json['result']
 
        try:
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            RepoModel().revoke_user_permission(self.REPO, self.TEST_USER_LOGIN)
 

	
 
    def test_api_create_repo(self):
 
        repo_name = u'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        assert repo is not None
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_clone_uri_local(self):
 
        # cloning from local repo was a mis-feature - it would bypass access control
 
        # TODO: introduce other test coverage of actual remote cloning
 
        clone_uri = os.path.join(TESTS_TMP_PATH, self.REPO)
 
        repo_name = u'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,
 
                                  clone_uri=clone_uri,
 
        )
 
        response = api_call(self, params)
 
        expected = "failed to create repository `%s`" % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_and_repo_group(self):
 
        repo_group_name = u'my_gr'
 
        repo_name = u'%s/api-repo' % repo_group_name
 

	
 
        # repo creation can no longer also create repo group
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = u'repo group `%s` not found' % repo_group_name
 
        self._compare_error(id_, expected, given=response.body)
 
        assert RepoModel().get_by_repo_name(repo_name) is None
 

	
 
        # create group before creating repo
 
        rg = fixture.create_repo_group(repo_group_name)
 
        Session().commit()
 

	
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        assert repo is not None
 

	
 
        fixture.destroy_repo(repo_name)
 
        fixture.destroy_repo_group(repo_group_name)
 

	
 
    def test_api_create_repo_in_repo_group_without_permission(self):
 
        repo_group_name = u'%s/api-repo-repo' % TEST_REPO_GROUP
 
        repo_name = u'%s/api-repo' % repo_group_name
 

	
 
        rg = fixture.create_repo_group(repo_group_name)
 
        Session().commit()
 
        RepoGroupModel().grant_user_permission(repo_group_name,
 
                                               self.TEST_USER_LOGIN,
 
                                               'group.none')
 
        Session().commit()
 

	
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        # Current result when API access control is different from Web:
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
        # Expected and arguably more correct result:
 
        #expected = 'failed to create repository `%s`' % repo_name
 
        #self._compare_error(id_, expected, given=response.body)
 

	
 
        fixture.destroy_repo_group(repo_group_name)
 

	
 
    def test_api_create_repo_unknown_owner(self):
 
        repo_name = u'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=owner,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 
        expected = 'user `%s` does not exist' % owner
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_repo_dont_specify_owner(self):
 
        repo_name = u'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        assert repo is not None
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_by_non_admin(self):
 
        repo_name = u'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
        )
 
        response = api_call(self, params)
 

	
 
        repo = RepoModel().get_by_repo_name(repo_name)
 
        assert repo is not None
 
        ret = {
 
            'msg': 'Created new repository `%s`' % repo_name,
 
            'success': True,
 
            'task': None,
 
        }
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_by_non_admin_specify_owner(self):
 
        repo_name = u'api-repo'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey_regular, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  repo_type=self.REPO_TYPE,
 
                                  owner=owner)
 
        response = api_call(self, params)
 

	
 
        expected = 'Only Kallithea admin can specify `owner` param'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_create_repo_exists(self):
 
        repo_name = self.REPO
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = "repo `%s` already exist" % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_repo_dot_dot(self):
 
        # FIXME: it should only be possible to create repositories in existing repo groups - and '..' can't be used
 
        # it is only possible to create repositories in existing repo groups - and '..' can't be used
 
        group_name = '%s/..' % TEST_REPO_GROUP
 
        repo_name = '%s/%s' % (group_name, 'could-be-outside')
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = {
 
            u'msg': u"Created new repository `%s`" % repo_name,
 
            u'success': True,
 
            u'task': None,
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 
        expected = u'repo group `%s` not found' % group_name
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(repo_name)
 

	
 
    @mock.patch.object(RepoModel, 'create', crash)
 
    def test_api_create_repo_exception_occurred(self):
 
        repo_name = u'api-repo'
 
        id_, params = _build_data(self.apikey, 'create_repo',
 
                                  repo_name=repo_name,
 
                                  owner=TEST_USER_ADMIN_LOGIN,
 
                                  repo_type=self.REPO_TYPE,)
 
        response = api_call(self, params)
 
        expected = 'failed to create repository `%s`' % repo_name
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @parametrize('changing_attr,updates', [
 
        ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
        ('description', {'description': u'new description'}),
 
        ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail
 
        ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a mis-feature - it would bypass access control
 
        ('clone_uri', {'clone_uri': None}),
 
        ('landing_rev', {'landing_rev': 'branch:master'}),
 
        ('enable_statistics', {'enable_statistics': True}),
 
        ('enable_locking', {'enable_locking': True}),
 
        ('enable_downloads', {'enable_downloads': True}),
 
        ('name', {'name': u'new_repo_name'}),
 
        ('repo_group', {'group': u'test_group_for_update'}),
 
    ])
 
    def test_api_update_repo(self, changing_attr, updates):
 
        repo_name = u'api_update_me'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        if changing_attr == 'repo_group':
 
            fixture.create_repo_group(updates['group'])
 

	
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        if changing_attr == 'name':
 
            repo_name = updates['name']
 
        if changing_attr == 'repo_group':
 
            repo_name = u'/'.join([updates['group'], repo_name])
 
        try:
 
            if changing_attr == 'clone_uri' and updates['clone_uri']:
 
                expected = u'failed to update repo `%s`' % repo_name
 
                self._compare_error(id_, expected, given=response.body)
 
            else:
 
                expected = {
 
                    'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
 
                    'repository': repo.get_api_data()
 
                }
 
                self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            if changing_attr == 'repo_group':
 
                fixture.destroy_repo_group(updates['group'])
 

	
 
    @parametrize('changing_attr,updates', [
 
        ('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
 
        ('description', {'description': u'new description'}),
 
        ('clone_uri', {'clone_uri': 'http://example.com/repo'}), # will fail - pulling from non-existing repo should fail
 
        ('clone_uri', {'clone_uri': '/repo'}), # will fail - pulling from local repo was a mis-feature - it would bypass access control
 
        ('clone_uri', {'clone_uri': None}),
 
        ('landing_rev', {'landing_rev': 'branch:master'}),
 
        ('enable_statistics', {'enable_statistics': True}),
 
        ('enable_locking', {'enable_locking': True}),
 
        ('enable_downloads', {'enable_downloads': True}),
 
        ('name', {'name': u'new_repo_name'}),
 
        ('repo_group', {'group': u'test_group_for_update'}),
 
    ])
 
    def test_api_update_group_repo(self, changing_attr, updates):
 
        group_name = u'lololo'
 
        fixture.create_repo_group(group_name)
 
        repo_name = u'%s/api_update_me' % group_name
 
        repo = fixture.create_repo(repo_name, repo_group=group_name, repo_type=self.REPO_TYPE)
 
        if changing_attr == 'repo_group':
 
            fixture.create_repo_group(updates['group'])
 

	
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        if changing_attr == 'name':
 
            repo_name = u'%s/%s' % (group_name, updates['name'])
 
        if changing_attr == 'repo_group':
 
            repo_name = u'/'.join([updates['group'], repo_name.rsplit('/', 1)[-1]])
 
        try:
 
            if changing_attr == 'clone_uri' and updates['clone_uri']:
 
                expected = u'failed to update repo `%s`' % repo_name
 
                self._compare_error(id_, expected, given=response.body)
 
            else:
 
                expected = {
 
                    'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
 
                    'repository': repo.get_api_data()
 
                }
 
                self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            if changing_attr == 'repo_group':
 
                fixture.destroy_repo_group(updates['group'])
0 comments (0 inline, 0 general)