Changeset - 6620542597d3
[Not reviewed]
stable
0 3 0
Mads Kiilerich - 10 years ago 2015-07-07 02:25:59
madski@unity3d.com
api: check repo create permissions for update_repo and fork_repo as for create-repo

Close loophole for creating repos everywhere.

Tests by Thomas De Schampheleire.
3 files changed with 75 insertions and 2 deletions:
0 comments (0 inline, 0 general)
docs/api/api.rst
Show inline comments
 
@@ -824,26 +824,27 @@ OUTPUT::
 
                "enable_statistics": "<bool>",
 
              },
 
            }
 
    error:  null
 

	
 

	
 
fork_repo
 
---------
 

	
 
Create a fork of the given repo. If using Celery, this will
 
return success message immediately and a fork will be created
 
asynchronously.
 
This command can only be executed using the api_key of a user with admin rights,
 
or that of a regular user with fork permission and at least read access to the repository.
 
This command can only be executed using the api_key of a user with admin
 
rights, or with the global fork permission, by a regular user with create
 
repository permission and at least read access to the repository.
 
Regular users cannot specify owner parameter.
 

	
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method :  "fork_repo"
 
    args:     {
 
                "repoid" :          "<reponame or repo_id>",
 
                "fork_name":        "<forkname>",
 
                "owner":            "<username or user_id = Optional(=apiuser)>",
kallithea/controllers/api/api.py
Show inline comments
 
@@ -1547,24 +1547,29 @@ class ApiController(JSONRPCController):
 
        :param landing_rev:
 
        :param enable_statistics:
 
        :param enable_locking:
 
        :param enable_downloads:
 
        """
 
        repo = get_repo_or_error(repoid)
 
        if not HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            # check if we have admin permission for this repo !
 
            if not HasRepoPermissionAnyApi('repository.admin')(user=apiuser,
 
                                                               repo_name=repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
            if (name != repo.repo_name and
 
                not HasPermissionAnyApi('hg.create.repository')(user=apiuser)
 
                ):
 
                raise JSONRPCError('no permission to create (or move) repositories')
 

	
 
        updates = {
 
            # update function requires this.
 
            'repo_name': repo.repo_name
 
        }
 
        repo_group = group
 
        if not isinstance(repo_group, Optional):
 
            repo_group = get_repo_group_or_error(repo_group)
 
            repo_group = repo_group.group_id
 
        try:
 
            store_update(updates, name, 'repo_name')
 
            store_update(updates, repo_group, 'repo_group')
 
            store_update(updates, owner, 'user')
 
@@ -1644,24 +1649,27 @@ class ApiController(JSONRPCController):
 

	
 
        if HasPermissionAnyApi('hg.admin')(user=apiuser):
 
            pass
 
        elif HasRepoPermissionAnyApi('repository.admin',
 
                                     'repository.write',
 
                                     'repository.read')(user=apiuser,
 
                                                        repo_name=repo.repo_name):
 
            if not isinstance(owner, Optional):
 
                #forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
 

	
 
            if not HasPermissionAnyApi('hg.create.repository')(user=apiuser):
 
                raise JSONRPCError('no permission to create repositories')
 
        else:
 
            raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        if isinstance(owner, Optional):
 
            owner = apiuser.user_id
 

	
 
        owner = get_user_or_error(owner)
 

	
 
        try:
 
            # create structure of groups and return the last group
 
            group = map_groups(fork_name)
 

	
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -1169,24 +1169,67 @@ class BaseTestApi(object):
 
    def test_api_update_repo_exception_occurred(self):
 
        repo_name = 'api_update_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        id_, params = _build_data(self.apikey, 'update_repo',
 
                                  repoid=repo_name, owner=TEST_USER_ADMIN_LOGIN,)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'failed to update repo `%s`' % repo_name
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 

	
 
    def test_api_update_repo_regular_user_change_repo_name(self):
 
        repo_name = 'admin_owned'
 
        new_repo_name = 'new_repo_name'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        RepoModel().grant_user_permission(repo=repo_name,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.admin')
 
        UserModel().revoke_perm('default', 'hg.create.repository')
 
        UserModel().grant_perm('default', 'hg.create.none')
 
        updates = {'name': new_repo_name}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = 'no permission to create (or move) repositories'
 
            self._compare_error(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            fixture.destroy_repo(new_repo_name)
 

	
 
    def test_api_update_repo_regular_user_change_repo_name_allowed(self):
 
        repo_name = 'admin_owned'
 
        new_repo_name = 'new_repo_name'
 
        repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        RepoModel().grant_user_permission(repo=repo_name,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.admin')
 
        UserModel().revoke_perm('default', 'hg.create.none')
 
        UserModel().grant_perm('default', 'hg.create.repository')
 
        updates = {'name': new_repo_name}
 
        id_, params = _build_data(self.apikey_regular, 'update_repo',
 
                                  repoid=repo_name, **updates)
 
        response = api_call(self, params)
 
        try:
 
            expected = {
 
                'msg': 'updated repo ID:%s %s' % (repo.repo_id, new_repo_name),
 
                'repository': repo.get_api_data()
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_repo(repo_name)
 
            fixture.destroy_repo(new_repo_name)
 

	
 
    def test_api_delete_repo(self):
 
        repo_name = 'api_delete_me'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 

	
 
        id_, params = _build_data(self.apikey, 'delete_repo',
 
                                  repoid=repo_name, )
 
        response = api_call(self, params)
 

	
 
        ret = {
 
            'msg': 'Deleted repository `%s`' % repo_name,
 
            'success': True
 
        }
 
@@ -1294,24 +1337,45 @@ class BaseTestApi(object):
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm='repository.none')
 
        fork_name = 'api-repo-fork'
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 
        expected = 'repository `%s` does not exist' % (self.REPO)
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    @parameterized.expand([('read', 'repository.read'),
 
                           ('write', 'repository.write'),
 
                           ('admin', 'repository.admin')])
 
    def test_api_fork_repo_non_admin_no_create_repo_permission(self, name, perm):
 
        fork_name = 'api-repo-fork'
 
        # regardless of base repository permission, forking is disallowed
 
        # when repository creation is disabled
 
        RepoModel().grant_user_permission(repo=self.REPO,
 
                                          user=self.TEST_USER_LOGIN,
 
                                          perm=perm)
 
        UserModel().revoke_perm('default', 'hg.create.repository')
 
        UserModel().grant_perm('default', 'hg.create.none')
 
        id_, params = _build_data(self.apikey_regular, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
        )
 
        response = api_call(self, params)
 
        expected = 'no permission to create repositories'
 
        self._compare_error(id_, expected, given=response.body)
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_fork_repo_unknown_owner(self):
 
        fork_name = 'api-repo-fork'
 
        owner = 'i-dont-exist'
 
        id_, params = _build_data(self.apikey, 'fork_repo',
 
                                  repoid=self.REPO,
 
                                  fork_name=fork_name,
 
                                  owner=owner,
 
        )
 
        response = api_call(self, params)
 
        expected = 'user `%s` does not exist' % owner
 
        self._compare_error(id_, expected, given=response.body)
 

	
0 comments (0 inline, 0 general)