Changeset - 56451a7ca82f
[Not reviewed]
default
0 2 0
Thomas De Schampheleire - 5 years ago 2020-10-09 15:34:41
thomas.de_schampheleire@nokia.com
api: new method: edit_reviewers

Allow adding and removing reviewers of a pull request with the API call
'edit_reviewers', taking a pull request ID and a list of usernames or
userids to add and remove. For single-user operation, a string can be used
instead of a list.

Note that the 'kallithea-api' tool only accepts strings, so can only perform
single-user operations. Use python 'requests', 'curl' or similar instead if
you need to operate on multiple users at a time.
2 files changed with 362 insertions and 1 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/api/api.py
Show inline comments
 
@@ -2040,384 +2040,430 @@ class ApiController(JSONRPCController):
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 
        :param usergroupid: id of usergroup
 
        :type usergroupid: str or int
 
        :param perm: (group.(none|read|write|admin))
 
        :type perm: str
 
        :param apply_to_children: 'none', 'repos', 'groups', 'all'
 
        :type apply_to_children: str
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "msg" : "Granted perm: `<perm>` (recursive:<apply_to_children>) for user group: `<usersgroupname>` in repo group: `<repo_group_name>`",
 
            "success": true
 

	
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to edit permission for user group: `<usergroup>` in repo group: `<repo_group_name>`"
 
          }
 

	
 
        """
 
        repo_group = get_repo_group_or_error(repogroupid)
 
        perm = get_perm_or_error(perm, prefix='group.')
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
 
                raise JSONRPCError(
 
                    'repository group `%s` does not exist' % (repogroupid,))
 

	
 
            if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
 
                raise JSONRPCError(
 
                    'user group `%s` does not exist' % (usergroupid,))
 

	
 
        apply_to_children = Optional.extract(apply_to_children)
 

	
 
        try:
 
            RepoGroupModel().add_permission(repo_group=repo_group,
 
                                            obj=user_group,
 
                                            obj_type="user_group",
 
                                            perm=perm,
 
                                            recursive=apply_to_children)
 
            Session().commit()
 
            return dict(
 
                msg='Granted perm: `%s` (recursive:%s) for user group: `%s` in repo group: `%s`' % (
 
                    perm.permission_name, apply_to_children,
 
                    user_group.users_group_name, repo_group.name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user group: `%s` in '
 
                'repo group: `%s`' % (
 
                    usergroupid, repo_group.name
 
                )
 
            )
 

	
 
    # permission check inside
 
    def revoke_user_group_permission_from_repo_group(
 
            self, repogroupid, usergroupid,
 
            apply_to_children=Optional('none')):
 
        """
 
        Revoke permission for user group on given repository. This command can be
 
        executed only using api_key belonging to user with admin rights, or
 
        user who has admin right to given repository group.
 

	
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 
        :param usergroupid:
 
        :param apply_to_children: 'none', 'repos', 'groups', 'all'
 
        :type apply_to_children: str
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Revoked perm (recursive:<apply_to_children>) for user group: `<usersgroupname>` in repo group: `<repo_group_name>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to edit permission for user group: `<usergroup>` in repo group: `<repo_group_name>`"
 
          }
 

	
 

	
 
        """
 
        repo_group = get_repo_group_or_error(repogroupid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
 
                raise JSONRPCError(
 
                    'repository group `%s` does not exist' % (repogroupid,))
 

	
 
            if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
 
                raise JSONRPCError(
 
                    'user group `%s` does not exist' % (usergroupid,))
 

	
 
        apply_to_children = Optional.extract(apply_to_children)
 

	
 
        try:
 
            RepoGroupModel().delete_permission(repo_group=repo_group,
 
                                               obj=user_group,
 
                                               obj_type="user_group",
 
                                               recursive=apply_to_children)
 
            Session().commit()
 
            return dict(
 
                msg='Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % (
 
                    apply_to_children, user_group.users_group_name, repo_group.name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user group: `%s` in repo group: `%s`' % (
 
                    user_group.users_group_name, repo_group.name
 
                )
 
            )
 

	
 
    def get_gist(self, gistid):
 
        """
 
        Get given gist by id
 

	
 
        :param gistid: id of private or public gist
 
        :type gistid: str
 
        """
 
        gist = get_gist_or_error(gistid)
 
        if not HasPermissionAny('hg.admin')():
 
            if gist.owner_id != request.authuser.user_id:
 
                raise JSONRPCError('gist `%s` does not exist' % (gistid,))
 
        return gist.get_api_data()
 

	
 
    def get_gists(self, userid=Optional(OAttr('apiuser'))):
 
        """
 
        Get all gists for given user. If userid is empty returned gists
 
        are for user who called the api
 

	
 
        :param userid: user to get gists for
 
        :type userid: Optional(str or int)
 
        """
 
        if not HasPermissionAny('hg.admin')():
 
            # make sure normal user does not pass someone else userid,
 
            # he is not allowed to do that
 
            if not isinstance(userid, Optional) and userid != request.authuser.user_id:
 
                raise JSONRPCError(
 
                    'userid is not the same as your user'
 
                )
 

	
 
        if isinstance(userid, Optional):
 
            user_id = request.authuser.user_id
 
        else:
 
            user_id = get_user_or_error(userid).user_id
 

	
 
        return [
 
            gist.get_api_data()
 
            for gist in Gist().query()
 
                .filter_by(is_expired=False)
 
                .filter(Gist.owner_id == user_id)
 
                .order_by(Gist.created_on.desc())
 
        ]
 

	
 
    def create_gist(self, files, owner=Optional(OAttr('apiuser')),
 
                    gist_type=Optional(Gist.GIST_PUBLIC), lifetime=Optional(-1),
 
                    description=Optional('')):
 

	
 
        """
 
        Creates new Gist
 

	
 
        :param files: files to be added to gist
 
            {'filename': {'content':'...', 'lexer': null},
 
             'filename2': {'content':'...', 'lexer': null}}
 
        :type files: dict
 
        :param owner: gist owner, defaults to api method caller
 
        :type owner: Optional(str or int)
 
        :param gist_type: type of gist 'public' or 'private'
 
        :type gist_type: Optional(str)
 
        :param lifetime: time in minutes of gist lifetime
 
        :type lifetime: Optional(int)
 
        :param description: gist description
 
        :type description: Optional(str)
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "msg": "created new gist",
 
            "gist": {}
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to create gist"
 
          }
 

	
 
        """
 
        try:
 
            if isinstance(owner, Optional):
 
                owner = request.authuser.user_id
 

	
 
            owner = get_user_or_error(owner)
 
            description = Optional.extract(description)
 
            gist_type = Optional.extract(gist_type)
 
            lifetime = Optional.extract(lifetime)
 

	
 
            gist = GistModel().create(description=description,
 
                                      owner=owner,
 
                                      ip_addr=request.ip_addr,
 
                                      gist_mapping=files,
 
                                      gist_type=gist_type,
 
                                      lifetime=lifetime)
 
            Session().commit()
 
            return dict(
 
                msg='created new gist',
 
                gist=gist.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to create gist')
 

	
 
    # def update_gist(self, gistid, files, owner=Optional(OAttr('apiuser')),
 
    #                 gist_type=Optional(Gist.GIST_PUBLIC),
 
    #                 gist_lifetime=Optional(-1), gist_description=Optional('')):
 
    #     gist = get_gist_or_error(gistid)
 
    #     updates = {}
 

	
 
    # permission check inside
 
    def delete_gist(self, gistid):
 
        """
 
        Deletes existing gist
 

	
 
        :param gistid: id of gist to delete
 
        :type gistid: str
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "deleted gist ID: <gist_id>",
 
            "gist": null
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to delete gist ID:<gist_id>"
 
          }
 

	
 
        """
 
        gist = get_gist_or_error(gistid)
 
        if not HasPermissionAny('hg.admin')():
 
            if gist.owner_id != request.authuser.user_id:
 
                raise JSONRPCError('gist `%s` does not exist' % (gistid,))
 

	
 
        try:
 
            GistModel().delete(gist)
 
            Session().commit()
 
            return dict(
 
                msg='deleted gist ID:%s' % (gist.gist_access_id,),
 
                gist=None
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to delete gist ID:%s'
 
                               % (gist.gist_access_id,))
 

	
 
    # permission check inside
 
    def get_changesets(self, repoid, start=None, end=None, start_date=None,
 
                       end_date=None, branch_name=None, reverse=False, with_file_list=False, max_revisions=None):
 
        repo = get_repo_or_error(repoid)
 
        if not HasRepoPermissionLevel('read')(repo.repo_name):
 
            raise JSONRPCError('Access denied to repo %s' % repo.repo_name)
 

	
 
        format = "%Y-%m-%dT%H:%M:%S"
 
        try:
 
            return [e.__json__(with_file_list) for e in
 
                repo.scm_instance.get_changesets(start,
 
                                                 end,
 
                                                 datetime.strptime(start_date, format) if start_date else None,
 
                                                 datetime.strptime(end_date, format) if end_date else None,
 
                                                 branch_name,
 
                                                 reverse, max_revisions)]
 
        except EmptyRepositoryError as e:
 
            raise JSONRPCError('Repository is empty')
 

	
 
    # permission check inside
 
    def get_changeset(self, repoid, raw_id, with_reviews=Optional(False)):
 
        repo = get_repo_or_error(repoid)
 
        if not HasRepoPermissionLevel('read')(repo.repo_name):
 
            raise JSONRPCError('Access denied to repo %s' % repo.repo_name)
 
        changeset = repo.get_changeset(raw_id)
 
        if isinstance(changeset, EmptyChangeset):
 
            raise JSONRPCError('Changeset %s does not exist' % raw_id)
 

	
 
        info = dict(changeset.as_dict())
 

	
 
        with_reviews = Optional.extract(with_reviews)
 
        if with_reviews:
 
            reviews = ChangesetStatusModel().get_statuses(
 
                                repo.repo_name, raw_id)
 
            info["reviews"] = reviews
 

	
 
        return info
 

	
 
    # permission check inside
 
    def get_pullrequest(self, pullrequest_id):
 
        """
 
        Get given pull request by id
 
        """
 
        pull_request = PullRequest.get(pullrequest_id)
 
        if pull_request is None:
 
            raise JSONRPCError('pull request `%s` does not exist' % (pullrequest_id,))
 
        if not HasRepoPermissionLevel('read')(pull_request.org_repo.repo_name):
 
            raise JSONRPCError('not allowed')
 
        return pull_request.get_api_data()
 

	
 
    # permission check inside
 
    def comment_pullrequest(self, pull_request_id, comment_msg='', status=None, close_pr=False):
 
        """
 
        Add comment, close and change status of pull request.
 
        """
 
        apiuser = get_user_or_error(request.authuser.user_id)
 
        pull_request = PullRequest.get(pull_request_id)
 
        if pull_request is None:
 
            raise JSONRPCError('pull request `%s` does not exist' % (pull_request_id,))
 
        if (not HasRepoPermissionLevel('read')(pull_request.org_repo.repo_name)):
 
            raise JSONRPCError('No permission to add comment. User needs at least reading permissions'
 
                               ' to the source repository.')
 
        owner = apiuser.user_id == pull_request.owner_id
 
        reviewer = apiuser.user_id in [reviewer.user_id for reviewer in pull_request.reviewers]
 
        if close_pr and not (apiuser.admin or owner):
 
            raise JSONRPCError('No permission to close pull request. User needs to be admin or owner.')
 
        if status and not (apiuser.admin or owner or reviewer):
 
            raise JSONRPCError('No permission to change pull request status. User needs to be admin, owner or reviewer.')
 
        if pull_request.is_closed():
 
            raise JSONRPCError('pull request is already closed')
 

	
 
        comment = ChangesetCommentsModel().create(
 
            text=comment_msg,
 
            repo=pull_request.org_repo.repo_id,
 
            author=apiuser.user_id,
 
            pull_request=pull_request.pull_request_id,
 
            f_path=None,
 
            line_no=None,
 
            status_change=ChangesetStatus.get_status_lbl(status),
 
            closing_pr=close_pr
 
        )
 
        action_logger(apiuser,
 
                      'user_commented_pull_request:%s' % pull_request_id,
 
                      pull_request.org_repo, request.ip_addr)
 
        if status:
 
            ChangesetStatusModel().set_status(
 
                pull_request.org_repo_id,
 
                status,
 
                apiuser.user_id,
 
                comment,
 
                pull_request=pull_request_id
 
            )
 
        if close_pr:
 
            PullRequestModel().close_pull_request(pull_request_id)
 
            action_logger(apiuser,
 
                          'user_closed_pull_request:%s' % pull_request_id,
 
                          pull_request.org_repo, request.ip_addr)
 
        Session().commit()
 
        return True
 

	
 
    # permission check inside
 
    def edit_reviewers(self, pull_request_id, add=None, remove=None):
 
        """
 
        Add and/or remove one or more reviewers to a pull request, by username
 
        or user ID. Reviewers are specified either as a single-user string or
 
        as a JSON list of one or more strings.
 
        """
 
        if add is None and remove is None:
 
            raise JSONRPCError('''Invalid request. Neither 'add' nor 'remove' is specified.''')
 

	
 
        pull_request = PullRequest.get(pull_request_id)
 
        if pull_request is None:
 
            raise JSONRPCError('pull request `%s` does not exist' % (pull_request_id,))
 

	
 
        apiuser = get_user_or_error(request.authuser.user_id)
 
        is_owner = apiuser.user_id == pull_request.owner_id
 
        is_repo_admin = HasRepoPermissionLevel('admin')(pull_request.other_repo.repo_name)
 
        if not (apiuser.admin or is_repo_admin or is_owner):
 
            raise JSONRPCError('No permission to edit reviewers of this pull request. User needs to be admin or pull request owner.')
 
        if pull_request.is_closed():
 
            raise JSONRPCError('Cannot edit reviewers of a closed pull request.')
 

	
 
        if not isinstance(add, list):
 
            add = [add]
 
        if not isinstance(remove, list):
 
            remove = [remove]
 

	
 
        # look up actual user objects from given name or id. Bail out if unknown.
 
        add_objs = set(get_user_or_error(user) for user in add if user is not None)
 
        remove_objs = set(get_user_or_error(user) for user in remove if user is not None)
 

	
 
        new_reviewers = redundant_reviewers = set()
 
        if add_objs:
 
            new_reviewers, redundant_reviewers = PullRequestModel().add_reviewers(apiuser, pull_request, add_objs)
 
        if remove_objs:
 
            PullRequestModel().remove_reviewers(apiuser, pull_request, remove_objs)
 

	
 
        Session().commit()
 

	
 
        return {
 
            'added': [x.username for x in new_reviewers],
 
            'already_present': [x.username for x in redundant_reviewers],
 
            # NOTE: no explicit check that removed reviewers were actually present.
 
            'removed': [x.username for x in remove_objs],
 
        }
kallithea/tests/api/api_base.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
Tests for the JSON-RPC web api.
 
"""
 

	
 
import os
 
import random
 
import re
 

	
 
import mock
 
import pytest
 

	
 
from kallithea.lib import ext_json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.lib.utils2 import ascii_bytes
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.db import ChangesetStatus, PullRequest, RepoGroup, Repository, Setting, Ui, User
 
from kallithea.model.db import ChangesetStatus, PullRequest, PullRequestReviewer, RepoGroup, Repository, Setting, Ui, User
 
from kallithea.model.gist import GistModel
 
from kallithea.model.meta import Session
 
from kallithea.model.pull_request import PullRequestModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture, raise_exception
 

	
 

	
 
API_URL = '/_admin/api'
 
TEST_USER_GROUP = 'test_user_group'
 
TEST_REPO_GROUP = 'test_repo_group'
 

	
 
fixture = Fixture()
 

	
 

	
 
def _build_data(apikey, method, **kw):
 
    """
 
    Builds API data with given random ID
 
    For convenience, the json is returned as str
 
    """
 
    random_id = random.randrange(1, 9999)
 
    return random_id, ext_json.dumps({
 
        "id": random_id,
 
        "api_key": apikey,
 
        "method": method,
 
        "args": kw
 
    })
 

	
 

	
 
jsonify = lambda obj: ext_json.loads(ext_json.dumps(obj))
 

	
 

	
 
def api_call(test_obj, params):
 
    response = test_obj.app.post(API_URL, content_type='application/json',
 
                                 params=params)
 
    return response
 

	
 

	
 
## helpers
 
def make_user_group(name=TEST_USER_GROUP):
 
    gr = fixture.create_user_group(name, cur_user=base.TEST_USER_ADMIN_LOGIN)
 
    UserGroupModel().add_user_to_group(user_group=gr,
 
                                       user=base.TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
def make_repo_group(name=TEST_REPO_GROUP):
 
    gr = fixture.create_repo_group(name, cur_user=base.TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
class _BaseTestApi(object):
 
    REPO = None
 
    REPO_TYPE = None
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        cls.usr = User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
 
        cls.apikey = cls.usr.api_key
 
        cls.test_user = UserModel().create_or_update(
 
            username='test-api',
 
            password='test',
 
            email='test@example.com',
 
            firstname='first',
 
            lastname='last'
 
        )
 
        Session().commit()
 
        cls.TEST_USER_LOGIN = cls.test_user.username
 
        cls.apikey_regular = cls.test_user.api_key
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        pass
 

	
 
    def setup_method(self, method):
 
        make_user_group()
 
        make_repo_group()
 

	
 
    def teardown_method(self, method):
 
        fixture.destroy_user_group(TEST_USER_GROUP)
 
        fixture.destroy_gists()
 
        fixture.destroy_repo_group(TEST_REPO_GROUP)
 

	
 
    def _compare_ok(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': None,
 
            'result': expected
 
        })
 
        given = ext_json.loads(given)
 
        assert expected == given, (expected, given)
 

	
 
    def _compare_error(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': expected,
 
            'result': None
 
        })
 
        given = ext_json.loads(given)
 
        assert expected == given, (expected, given)
 

	
 
    def test_Optional_object(self):
 
        from kallithea.controllers.api.api import Optional
 

	
 
        option1 = Optional(None)
 
        assert '<Optional:%s>' % None == repr(option1)
 
        assert option1() is None
 

	
 
        assert 1 == Optional.extract(Optional(1))
 
        assert 'trololo' == Optional.extract('trololo')
 

	
 
    def test_Optional_OAttr(self):
 
        from kallithea.controllers.api.api import OAttr, Optional
 

	
 
        option1 = Optional(OAttr('apiuser'))
 
        assert 'apiuser' == Optional.extract(option1)
 

	
 
    def test_OAttr_object(self):
 
        from kallithea.controllers.api.api import OAttr
 

	
 
        oattr1 = OAttr('apiuser')
 
        assert '<OptionalAttr:apiuser>' == repr(oattr1)
 
        assert oattr1() == oattr1
 

	
 
    def test_api_wrong_key(self):
 
        id_, params = _build_data('trololo', 'get_user')
 
        response = api_call(self, params)
 

	
 
        expected = 'Invalid API key'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param_args_null(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param_args_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_args_is_null(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 

	
 
    def test_api_args_is_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 

	
 
    def test_api_args_different_args(self):
 
        import string
 
        expected = {
 
            'ascii_letters': string.ascii_letters,
 
            'ws': string.whitespace,
 
            'printables': string.printable
 
        }
 
        id_, params = _build_data(self.apikey, 'test', args=expected)
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 
        self._compare_ok(id_, expected, response.body)
 

	
 
    def test_api_get_users(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        response = api_call(self, params)
 
        ret_all = []
 
        _users = User.query().filter_by(is_default_user=False) \
 
            .order_by(User.username).all()
 
        for usr in _users:
 
            ret = usr.get_api_data()
 
            ret_all.append(jsonify(ret))
 
        expected = ret_all
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user(self):
 
        id_, params = _build_data(self.apikey, 'get_user',
 
                                  userid=base.TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_that_does_not_exist(self):
 
        id_, params = _build_data(self.apikey, 'get_user',
 
                                  userid='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` does not exist" % 'trololo'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid(self):
 
        id_, params = _build_data(self.apikey, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(base.TEST_USER_ADMIN_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(self.TEST_USER_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_with_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user',
 
                                  userid=self.TEST_USER_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'userid is not the same as your user'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_pull_remote(self):
 
        # Note: pulling from local repos is a mis-feature - it will bypass access control
 
        # ... but ok, if the path already has been set in the database
 
        repo_name = 'test_pull'
 
        r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        # hack around that clone_uri can't be set to to a local path
 
        # (as shown by test_api_create_repo_clone_uri_local)
 
        r.clone_uri = os.path.join(Ui.get_by_key('paths', '/').ui_value, self.REPO)
 
        Session().commit()
 

	
 
        pre_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in Repository.query().filter(Repository.repo_name == repo_name)]
 

	
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=repo_name,)
 
        response = api_call(self, params)
 

	
 
        expected = {'msg': 'Pulled from `%s`' % repo_name,
 
                    'repository': repo_name}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        post_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in Repository.query().filter(Repository.repo_name == repo_name)]
 

	
 
        fixture.destroy_repo(repo_name)
 

	
 
        assert pre_cached_tip != post_cached_tip
 

	
 
    def test_api_pull_fork(self):
 
        fork_name = 'fork'
 
        fixture.create_fork(self.REPO, fork_name)
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=fork_name,)
 
        response = api_call(self, params)
 

	
 
        expected = {'msg': 'Pulled from `%s`' % fork_name,
 
                    'repository': fork_name}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        fixture.destroy_repo(fork_name)
 

	
 
    def test_api_pull_error_no_remote_no_fork(self):
 
        # should fail because no clone_uri is set
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=self.REPO, )
 
        response = api_call(self, params)
 

	
 
        expected = 'Unable to pull changes from `%s`' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_pull_custom_remote(self):
 
        repo_name = 'test_pull_custom_remote'
 
        fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 

	
 
        custom_remote_path = os.path.join(Ui.get_by_key('paths', '/').ui_value, self.REPO)
 

	
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=repo_name,
 
                                  clone_uri=custom_remote_path)
 
        response = api_call(self, params)
 

	
 
        expected = {'msg': 'Pulled from `%s`' % repo_name,
 
                    'repository': repo_name}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_rescan_repos(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos')
 
        response = api_call(self, params)
 

	
 
        expected = {'added': [], 'removed': []}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(ScmModel, 'repo_scan', raise_exception)
 
    def test_api_rescann_error(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos', )
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred during rescan repositories action'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_existing_user(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=base.TEST_USER_ADMIN_LOGIN,
 
                                  email='test@example.com',
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` already exist" % base.TEST_USER_ADMIN_LOGIN
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user_with_existing_email(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=base.TEST_USER_ADMIN_LOGIN + 'new',
 
                                  email=base.TEST_USER_REGULAR_EMAIL,
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "email `%s` already exist" % base.TEST_USER_REGULAR_EMAIL
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user(self):
 
        username = 'test_new_api_user'
 
        email = username + "@example.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email,
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(username)
 
        ret = dict(
 
            msg='created new user `%s`' % username,
 
            user=jsonify(usr.get_api_data())
 
        )
 

	
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user(usr.user_id)
 

	
 
    def test_api_create_user_without_password(self):
 
        username = 'test_new_api_user_passwordless'
 
        email = username + "@example.com"
 

	
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=username,
 
                                  email=email)
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(username)
 
        ret = dict(
 
            msg='created new user `%s`' % username,
 
            user=jsonify(usr.get_api_data())
 
        )
 
        try:
 
            expected = ret
 
            self._compare_ok(id_, expected, given=response.body)
 
        finally:
 
            fixture.destroy_user(usr.user_id)
 
@@ -2147,384 +2148,698 @@ class _BaseTestApi(object):
 
    ])
 
    def test_api_revoke_user_group_permission_from_repo_group_by_regular_user(
 
            self, name, apply_to_children, grant_admin, access_ok):
 
        RepoGroupModel().grant_user_permission(repo_group=TEST_REPO_GROUP,
 
                                               user=base.TEST_USER_ADMIN_LOGIN,
 
                                               perm='group.read',)
 
        Session().commit()
 

	
 
        if grant_admin:
 
            RepoGroupModel().grant_user_permission(TEST_REPO_GROUP,
 
                                                   self.TEST_USER_LOGIN,
 
                                                   'group.admin')
 
            Session().commit()
 

	
 
        id_, params = _build_data(self.apikey_regular,
 
                                  'revoke_user_group_permission_from_repo_group',
 
                                  repogroupid=TEST_REPO_GROUP,
 
                                  usergroupid=TEST_USER_GROUP,
 
                                  apply_to_children=apply_to_children,)
 
        response = api_call(self, params)
 
        if access_ok:
 
            expected = {
 
                'msg': 'Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % (
 
                    apply_to_children, base.TEST_USER_ADMIN_LOGIN, TEST_REPO_GROUP
 
                ),
 
                'success': True
 
            }
 
            self._compare_ok(id_, expected, given=response.body)
 
        else:
 
            expected = 'repository group `%s` does not exist' % TEST_REPO_GROUP
 
            self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(RepoGroupModel, 'revoke_user_group_permission', raise_exception)
 
    def test_api_revoke_user_group_permission_from_repo_group_exception_when_adding(self):
 
        id_, params = _build_data(self.apikey, 'revoke_user_group_permission_from_repo_group',
 
                                  repogroupid=TEST_REPO_GROUP,
 
                                  usergroupid=TEST_USER_GROUP,)
 
        response = api_call(self, params)
 

	
 
        expected = 'failed to edit permission for user group: `%s` in repo group: `%s`' % (
 
            TEST_USER_GROUP, TEST_REPO_GROUP
 
        )
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_gist(self):
 
        gist = fixture.create_gist()
 
        gist_id = gist.gist_access_id
 
        gist_created_on = gist.created_on
 
        id_, params = _build_data(self.apikey, 'get_gist',
 
                                  gistid=gist_id, )
 
        response = api_call(self, params)
 

	
 
        expected = {
 
            'access_id': gist_id,
 
            'created_on': gist_created_on,
 
            'description': 'new-gist',
 
            'expires': -1.0,
 
            'gist_id': int(gist_id),
 
            'type': 'public',
 
            'url': 'http://localhost:80/_admin/gists/%s' % gist_id
 
        }
 

	
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_gist_that_does_not_exist(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_gist',
 
                                  gistid='12345', )
 
        response = api_call(self, params)
 
        expected = 'gist `%s` does not exist' % ('12345',)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_gist_private_gist_without_permission(self):
 
        gist = fixture.create_gist()
 
        gist_id = gist.gist_access_id
 
        gist_created_on = gist.created_on
 
        id_, params = _build_data(self.apikey_regular, 'get_gist',
 
                                  gistid=gist_id, )
 
        response = api_call(self, params)
 

	
 
        expected = 'gist `%s` does not exist' % gist_id
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_gists(self):
 
        fixture.create_gist()
 
        fixture.create_gist()
 

	
 
        id_, params = _build_data(self.apikey, 'get_gists')
 
        response = api_call(self, params)
 
        expected = response.json
 
        assert len(response.json['result']) == 2
 
        #self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_gists_regular_user(self):
 
        # by admin
 
        fixture.create_gist()
 
        fixture.create_gist()
 

	
 
        # by reg user
 
        fixture.create_gist(owner=self.TEST_USER_LOGIN)
 
        fixture.create_gist(owner=self.TEST_USER_LOGIN)
 
        fixture.create_gist(owner=self.TEST_USER_LOGIN)
 

	
 
        id_, params = _build_data(self.apikey_regular, 'get_gists')
 
        response = api_call(self, params)
 
        expected = response.json
 
        assert len(response.json['result']) == 3
 
        #self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_gists_only_for_regular_user(self):
 
        # by admin
 
        fixture.create_gist()
 
        fixture.create_gist()
 

	
 
        # by reg user
 
        fixture.create_gist(owner=self.TEST_USER_LOGIN)
 
        fixture.create_gist(owner=self.TEST_USER_LOGIN)
 
        fixture.create_gist(owner=self.TEST_USER_LOGIN)
 

	
 
        id_, params = _build_data(self.apikey, 'get_gists',
 
                                  userid=self.TEST_USER_LOGIN)
 
        response = api_call(self, params)
 
        expected = response.json
 
        assert len(response.json['result']) == 3
 
        #self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_gists_regular_user_with_different_userid(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_gists',
 
                                  userid=base.TEST_USER_ADMIN_LOGIN)
 
        response = api_call(self, params)
 
        expected = 'userid is not the same as your user'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_gist(self):
 
        id_, params = _build_data(self.apikey_regular, 'create_gist',
 
                                  lifetime=10,
 
                                  description='foobar-gist',
 
                                  gist_type='public',
 
                                  files={'foobar': {'content': 'foo'}})
 
        response = api_call(self, params)
 
        expected = {
 
            'gist': {
 
                'access_id': response.json['result']['gist']['access_id'],
 
                'created_on': response.json['result']['gist']['created_on'],
 
                'description': 'foobar-gist',
 
                'expires': response.json['result']['gist']['expires'],
 
                'gist_id': response.json['result']['gist']['gist_id'],
 
                'type': 'public',
 
                'url': response.json['result']['gist']['url']
 
            },
 
            'msg': 'created new gist'
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(GistModel, 'create', raise_exception)
 
    def test_api_create_gist_exception_occurred(self):
 
        id_, params = _build_data(self.apikey_regular, 'create_gist',
 
                                  files={})
 
        response = api_call(self, params)
 
        expected = 'failed to create gist'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_delete_gist(self):
 
        gist_id = fixture.create_gist().gist_access_id
 
        id_, params = _build_data(self.apikey, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_delete_gist_regular_user(self):
 
        gist_id = fixture.create_gist(owner=self.TEST_USER_LOGIN).gist_access_id
 
        id_, params = _build_data(self.apikey_regular, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_delete_gist_regular_user_no_permission(self):
 
        gist_id = fixture.create_gist().gist_access_id
 
        id_, params = _build_data(self.apikey_regular, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = 'gist `%s` does not exist' % (gist_id,)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(GistModel, 'delete', raise_exception)
 
    def test_api_delete_gist_exception_occurred(self):
 
        gist_id = fixture.create_gist().gist_access_id
 
        id_, params = _build_data(self.apikey, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = 'failed to delete gist ID:%s' % (gist_id,)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_ip(self):
 
        id_, params = _build_data(self.apikey, 'get_ip')
 
        response = api_call(self, params)
 
        expected = {
 
            'server_ip_addr': '0.0.0.0',
 
            'user_ips': []
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_server_info(self):
 
        id_, params = _build_data(self.apikey, 'get_server_info')
 
        response = api_call(self, params)
 
        expected = Setting.get_server_info()
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_changesets(self):
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, start=0, end=2)
 
        response = api_call(self, params)
 
        result = ext_json.loads(response.body)["result"]
 
        assert len(result) == 3
 
        assert 'message' in result[0]
 
        assert 'added' not in result[0]
 

	
 
    def test_api_get_changesets_with_max_revisions(self):
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, start_date="2011-02-24T00:00:00", max_revisions=10)
 
        response = api_call(self, params)
 
        result = ext_json.loads(response.body)["result"]
 
        assert len(result) == 10
 
        assert 'message' in result[0]
 
        assert 'added' not in result[0]
 

	
 
    def test_api_get_changesets_with_branch(self):
 
        if self.REPO == 'vcs_test_hg':
 
            branch = 'stable'
 
        else:
 
            pytest.skip("skipping due to missing branches in git test repo")
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, branch_name=branch, start_date="2011-02-24T00:00:00")
 
        response = api_call(self, params)
 
        result = ext_json.loads(response.body)["result"]
 
        assert len(result) == 5
 
        assert 'message' in result[0]
 
        assert 'added' not in result[0]
 

	
 
    def test_api_get_changesets_with_file_list(self):
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, start_date="2010-04-07T23:30:30", end_date="2010-04-08T00:31:14", with_file_list=True)
 
        response = api_call(self, params)
 
        result = ext_json.loads(response.body)["result"]
 
        assert len(result) == 3
 
        assert 'message' in result[0]
 
        assert 'added' in result[0]
 

	
 
    def test_api_get_changeset(self):
 
        review = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
 
        id_, params = _build_data(self.apikey, 'get_changeset',
 
                                  repoid=self.REPO, raw_id=self.TEST_REVISION)
 
        response = api_call(self, params)
 
        result = ext_json.loads(response.body)["result"]
 
        assert result["raw_id"] == self.TEST_REVISION
 
        assert "reviews" not in result
 

	
 
    def test_api_get_changeset_with_reviews(self):
 
        reviewobjs = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
 
        id_, params = _build_data(self.apikey, 'get_changeset',
 
                                  repoid=self.REPO, raw_id=self.TEST_REVISION,
 
                                  with_reviews=True)
 
        response = api_call(self, params)
 
        result = ext_json.loads(response.body)["result"]
 
        assert result["raw_id"] == self.TEST_REVISION
 
        assert "reviews" in result
 
        assert len(result["reviews"]) == 1
 
        review = result["reviews"][0]
 
        expected = {
 
            'status': 'approved',
 
            'modified_at': reviewobjs[0].modified_at.replace(microsecond=0).isoformat(),
 
            'reviewer': 'test_admin',
 
        }
 
        assert review == expected
 

	
 
    def test_api_get_changeset_that_does_not_exist(self):
 
        """ Fetch changeset status for non-existant changeset.
 
        revision id is the above git hash used in the test above with the
 
        last 3 nibbles replaced with 0xf.  Should not exist for git _or_ hg.
 
        """
 
        id_, params = _build_data(self.apikey, 'get_changeset',
 
                                  repoid=self.REPO, raw_id = '7ab37bc680b4aa72c34d07b230c866c28e9fcfff')
 
        response = api_call(self, params)
 
        expected = 'Changeset %s does not exist' % ('7ab37bc680b4aa72c34d07b230c866c28e9fcfff',)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_changeset_without_permission(self):
 
        review = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
 
        RepoModel().revoke_user_permission(repo=self.REPO, user=self.TEST_USER_LOGIN)
 
        RepoModel().revoke_user_permission(repo=self.REPO, user="default")
 
        id_, params = _build_data(self.apikey_regular, 'get_changeset',
 
                                  repoid=self.REPO, raw_id=self.TEST_REVISION)
 
        response = api_call(self, params)
 
        expected = 'Access denied to repo %s' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'get test')
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": 'get_pullrequest',
 
            "args": {"pullrequest_id": pull_request_id},
 
        }))
 
        response = api_call(self, params)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        expected = {
 
            "status": "new",
 
            "pull_request_id": pull_request_id,
 
            "description": "No description",
 
            "url": "/%s/pull-request/%s/_/%s" % (self.REPO, pull_request_id, "stable"),
 
            "reviewers": [{"username": "test_regular"}],
 
            "org_repo_url": "http://localhost:80/%s" % self.REPO,
 
            "org_ref_parts": ["branch", "stable", self.TEST_PR_SRC],
 
            "other_ref_parts": ["branch", "default", self.TEST_PR_DST],
 
            "comments": [{"username": base.TEST_USER_ADMIN_LOGIN, "text": "",
 
                         "comment_id": pullrequest.comments[0].comment_id}],
 
            "owner": base.TEST_USER_ADMIN_LOGIN,
 
            "statuses": [{"status": "under_review", "reviewer": base.TEST_USER_ADMIN_LOGIN, "modified_at": "2000-01-01T00:00:00"} for i in range(0, len(self.TEST_PR_REVISIONS))],
 
            "title": "get test",
 
            "revisions": self.TEST_PR_REVISIONS,
 
            "created_on": "2000-01-01T00:00:00",
 
            "updated_on": "2000-01-01T00:00:00",
 
        }
 
        self._compare_ok(random_id, expected,
 
                         given=re.sub(br"\d\d\d\d\-\d\d\-\d\dT\d\d\:\d\d\:\d\d",
 
                                      b"2000-01-01T00:00:00", response.body))
 

	
 
    def test_api_close_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'close test')
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "close_pr": True},
 
        }))
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert pullrequest.comments[-1].text == ''
 
        assert pullrequest.status == PullRequest.STATUS_CLOSED
 
        assert pullrequest.is_closed() == True
 

	
 
    def test_api_status_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, "status test")
 

	
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": User.get_by_username(base.TEST_USER_REGULAR2_LOGIN).api_key,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "status": ChangesetStatus.STATUS_APPROVED},
 
        }))
 
        response = api_call(self, params)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        self._compare_error(random_id, "No permission to change pull request status. User needs to be admin, owner or reviewer.", given=response.body)
 
        assert ChangesetStatus.STATUS_UNDER_REVIEW == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": User.get_by_username(base.TEST_USER_REGULAR_LOGIN).api_key,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "status": ChangesetStatus.STATUS_APPROVED},
 
        }))
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert ChangesetStatus.STATUS_APPROVED == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
 

	
 
    def test_api_comment_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, "comment test")
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "comment_msg": "Looks good to me"},
 
        }))
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert pullrequest.comments[-1].text == 'Looks good to me'
 

	
 
    def test_api_edit_reviewers_add_single(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
 
        pullrequest = PullRequest().get(pull_request_id)
 
        pullrequest.owner = self.test_user
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey_regular,
 
            "method": "edit_reviewers",
 
            "args": {"pull_request_id": pull_request_id, "add": base.TEST_USER_REGULAR2_LOGIN},
 
        }))
 
        response = api_call(self, params)
 
        expected = { 'added': [base.TEST_USER_REGULAR2_LOGIN], 'already_present': [], 'removed': [] }
 

	
 
        self._compare_ok(random_id, expected, given=response.body)
 
        assert User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
 

	
 
    def test_api_edit_reviewers_add_nonexistent(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
 
        pullrequest = PullRequest().get(pull_request_id)
 
        pullrequest.owner = self.test_user
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey_regular,
 
            "method": "edit_reviewers",
 
            "args": {"pull_request_id": pull_request_id, "add": 999},
 
        }))
 
        response = api_call(self, params)
 

	
 
        self._compare_error(random_id, "user `999` does not exist", given=response.body)
 

	
 
    def test_api_edit_reviewers_add_multiple(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
 
        pullrequest = PullRequest().get(pull_request_id)
 
        pullrequest.owner = self.test_user
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey_regular,
 
            "method": "edit_reviewers",
 
            "args": {
 
                "pull_request_id": pull_request_id,
 
                "add": [ self.TEST_USER_LOGIN, base.TEST_USER_REGULAR2_LOGIN ]
 
            },
 
        }))
 
        response = api_call(self, params)
 
        # list order depends on python sorting hash, which is randomized
 
        assert set(ext_json.loads(response.body)['result']['added']) == set([base.TEST_USER_REGULAR2_LOGIN, self.TEST_USER_LOGIN])
 
        assert set(ext_json.loads(response.body)['result']['already_present']) == set()
 
        assert set(ext_json.loads(response.body)['result']['removed']) == set()
 

	
 
        assert User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
 
        assert User.get_by_username(self.TEST_USER_LOGIN) in pullrequest.get_reviewer_users()
 

	
 
    def test_api_edit_reviewers_add_already_present(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
 
        pullrequest = PullRequest().get(pull_request_id)
 
        pullrequest.owner = self.test_user
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey_regular,
 
            "method": "edit_reviewers",
 
            "args": {
 
                "pull_request_id": pull_request_id,
 
                "add": [ base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR2_LOGIN ]
 
            },
 
        }))
 
        response = api_call(self, params)
 
        expected = { 'added': [base.TEST_USER_REGULAR2_LOGIN],
 
                     'already_present': [base.TEST_USER_REGULAR_LOGIN],
 
                     'removed': [],
 
                   }
 

	
 
        self._compare_ok(random_id, expected, given=response.body)
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
 
        assert User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
 

	
 
    def test_api_edit_reviewers_add_closed(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
 
        pullrequest = PullRequest().get(pull_request_id)
 
        pullrequest.owner = self.test_user
 
        PullRequestModel().close_pull_request(pull_request_id)
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey_regular,
 
            "method": "edit_reviewers",
 
            "args": {"pull_request_id": pull_request_id, "add": base.TEST_USER_REGULAR2_LOGIN},
 
        }))
 
        response = api_call(self, params)
 
        self._compare_error(random_id, "Cannot edit reviewers of a closed pull request.", given=response.body)
 

	
 
    def test_api_edit_reviewers_add_not_owner(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
 
        pullrequest = PullRequest().get(pull_request_id)
 
        pullrequest.owner = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey_regular,
 
            "method": "edit_reviewers",
 
            "args": {"pull_request_id": pull_request_id, "add": base.TEST_USER_REGULAR2_LOGIN},
 
        }))
 
        response = api_call(self, params)
 
        self._compare_error(random_id, "No permission to edit reviewers of this pull request. User needs to be admin or pull request owner.", given=response.body)
 

	
 

	
 
    def test_api_edit_reviewers_remove_single(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
 

	
 
        pullrequest.owner = self.test_user
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey_regular,
 
            "method": "edit_reviewers",
 
            "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR_LOGIN},
 
        }))
 
        response = api_call(self, params)
 

	
 
        expected = { 'added': [],
 
                     'already_present': [],
 
                     'removed': [base.TEST_USER_REGULAR_LOGIN],
 
                   }
 
        self._compare_ok(random_id, expected, given=response.body)
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users()
 

	
 
    def test_api_edit_reviewers_remove_nonexistent(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
 

	
 
        pullrequest.owner = self.test_user
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey_regular,
 
            "method": "edit_reviewers",
 
            "args": {"pull_request_id": pull_request_id, "remove": 999},
 
        }))
 
        response = api_call(self, params)
 

	
 
        self._compare_error(random_id, "user `999` does not exist", given=response.body)
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
 

	
 
    def test_api_edit_reviewers_remove_nonpresent(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
 
        assert User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users()
 

	
 
        pullrequest.owner = self.test_user
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey_regular,
 
            "method": "edit_reviewers",
 
            "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR2_LOGIN},
 
        }))
 
        response = api_call(self, params)
 

	
 
        # NOTE: no explicit indication that removed user was not even a reviewer
 
        expected = { 'added': [],
 
                     'already_present': [],
 
                     'removed': [base.TEST_USER_REGULAR2_LOGIN],
 
                   }
 
        self._compare_ok(random_id, expected, given=response.body)
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
 
        assert User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users()
 

	
 
    def test_api_edit_reviewers_remove_multiple(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
 
        pullrequest = PullRequest().get(pull_request_id)
 
        prr = PullRequestReviewer(User.get_by_username(base.TEST_USER_REGULAR2_LOGIN), pullrequest)
 
        Session().add(prr)
 
        Session().commit()
 

	
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
 
        assert User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
 

	
 
        pullrequest.owner = self.test_user
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey_regular,
 
            "method": "edit_reviewers",
 
            "args": {"pull_request_id": pull_request_id, "remove": [ base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR2_LOGIN ] },
 
        }))
 
        response = api_call(self, params)
 

	
 
        # list order depends on python sorting hash, which is randomized
 
        assert set(ext_json.loads(response.body)['result']['added']) == set()
 
        assert set(ext_json.loads(response.body)['result']['already_present']) == set()
 
        assert set(ext_json.loads(response.body)['result']['removed']) == set([base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_REGULAR2_LOGIN])
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users()
 
        assert User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users()
 

	
 
    def test_api_edit_reviewers_remove_closed(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
 
        PullRequestModel().close_pull_request(pull_request_id)
 

	
 
        pullrequest.owner = self.test_user
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey_regular,
 
            "method": "edit_reviewers",
 
            "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR_LOGIN},
 
        }))
 
        response = api_call(self, params)
 

	
 
        self._compare_error(random_id, "Cannot edit reviewers of a closed pull request.", given=response.body)
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
 

	
 
    def test_api_edit_reviewers_remove_not_owner(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
 

	
 
        pullrequest.owner = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey_regular,
 
            "method": "edit_reviewers",
 
            "args": {"pull_request_id": pull_request_id, "remove": base.TEST_USER_REGULAR_LOGIN},
 
        }))
 
        response = api_call(self, params)
 

	
 
        self._compare_error(random_id, "No permission to edit reviewers of this pull request. User needs to be admin or pull request owner.", given=response.body)
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
 

	
 
    def test_api_edit_reviewers_add_remove_single(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
 
        assert User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users()
 

	
 
        pullrequest.owner = self.test_user
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey_regular,
 
            "method": "edit_reviewers",
 
            "args": {"pull_request_id": pull_request_id,
 
                     "add": base.TEST_USER_REGULAR2_LOGIN,
 
                     "remove": base.TEST_USER_REGULAR_LOGIN
 
                    },
 
        }))
 
        response = api_call(self, params)
 

	
 
        expected = { 'added': [base.TEST_USER_REGULAR2_LOGIN],
 
                     'already_present': [],
 
                     'removed': [base.TEST_USER_REGULAR_LOGIN],
 
                   }
 
        self._compare_ok(random_id, expected, given=response.body)
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users()
 
        assert User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
 

	
 
    def test_api_edit_reviewers_add_remove_multiple(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
 
        pullrequest = PullRequest().get(pull_request_id)
 
        prr = PullRequestReviewer(User.get_by_username(base.TEST_USER_ADMIN_LOGIN), pullrequest)
 
        Session().add(prr)
 
        Session().commit()
 
        assert User.get_by_username(base.TEST_USER_ADMIN_LOGIN) in pullrequest.get_reviewer_users()
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
 
        assert User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) not in pullrequest.get_reviewer_users()
 

	
 
        pullrequest.owner = self.test_user
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey_regular,
 
            "method": "edit_reviewers",
 
            "args": {"pull_request_id": pull_request_id,
 
                     "add": [ base.TEST_USER_REGULAR2_LOGIN ],
 
                     "remove": [ base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_ADMIN_LOGIN ],
 
                    },
 
        }))
 
        response = api_call(self, params)
 

	
 
        # list order depends on python sorting hash, which is randomized
 
        assert set(ext_json.loads(response.body)['result']['added']) == set([base.TEST_USER_REGULAR2_LOGIN])
 
        assert set(ext_json.loads(response.body)['result']['already_present']) == set()
 
        assert set(ext_json.loads(response.body)['result']['removed']) == set([base.TEST_USER_REGULAR_LOGIN, base.TEST_USER_ADMIN_LOGIN])
 
        assert User.get_by_username(base.TEST_USER_ADMIN_LOGIN) not in pullrequest.get_reviewer_users()
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) not in pullrequest.get_reviewer_users()
 
        assert User.get_by_username(base.TEST_USER_REGULAR2_LOGIN) in pullrequest.get_reviewer_users()
 

	
 
    def test_api_edit_reviewers_invalid_params(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'edit reviewer test')
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert User.get_by_username(base.TEST_USER_REGULAR_LOGIN) in pullrequest.get_reviewer_users()
 

	
 
        pullrequest.owner = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey_regular,
 
            "method": "edit_reviewers",
 
            "args": {"pull_request_id": pull_request_id},
 
        }))
 
        response = api_call(self, params)
 

	
 
        self._compare_error(random_id, "Invalid request. Neither 'add' nor 'remove' is specified.", given=response.body)
 
        assert ext_json.loads(response.body)['result'] is None
0 comments (0 inline, 0 general)