Changeset - 6452215a54ee
[Not reviewed]
default
0 7 0
domruf - 9 years ago 2017-03-29 22:12:50
dominikruf@gmail.com
api: add get_pullrequest and comment_pullrequest methods

Modified by Mads Kiilerich, mainly to let the test helper function
create_pullrequest use model directly.
7 files changed with 315 insertions and 5 deletions:
0 comments (0 inline, 0 general)
docs/api/api.rst
Show inline comments
 
@@ -1047,127 +1047,216 @@ OUTPUT::
 
    error:  null
 

	
 
revoke_user_group_permission
 
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 

	
 
Revoke permission for a user group on the given repository.
 
This command can only be executed using the api_key of a user with admin rights.
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method  : "revoke_user_group_permission"
 
    args:     {
 
                "repoid" : "<reponame or repo_id>"
 
                "usersgroupid" : "<user group id or name>"
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: {
 
              "msg" : "Revoked perm for group: `<usersgroupname>` in repo: `<reponame>`",
 
              "success": true
 
            }
 
    error:  null
 

	
 
get_changeset
 
^^^^^^^^^^^^^
 

	
 
Get information and review status for a given changeset. This command can only
 
be executed using the api_key of a user with read permissions to the
 
repository.
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method  : "get_changeset"
 
    args:     {
 
                "repoid" : "<reponame or repo_id>",
 
                "raw_id" : "<raw_id>",
 
                "with_reviews": "<bool> = Optional(False)"
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: {
 
              "author":   "<full_author>",
 
              "date":     "<date_time_of_commit>",
 
              "message":  "<commit_message>",
 
              "raw_id":   "<raw_id>",
 
              "revision": "<numeric_revision>",
 
              "short_id": "<short_id>",
 
              "reviews": [{
 
                    "reviewer":   "<username>",
 
                    "modified_at": "<date_time_of_review>",  # iso 8601 date, server's timezone
 
                    "status":   "<status_of_review>",        # "under_review", "approved" or "rejected"
 
                 },
 
                 ...
 
              ]
 
            }
 
    error:  null
 

	
 
Example output::
 

	
 
    {
 
      "id" : 1,
 
      "error" : null,
 
      "result" : {
 
        "author" : {
 
          "email" : "user@example.com",
 
          "name" : "Kallithea Admin"
 
        },
 
        "changed" : [],
 
        "short_id" : "e1022d3d28df",
 
        "date" : "2017-03-28T09:09:03",
 
        "added" : [
 
          "README.rst"
 
        ],
 
        "removed" : [],
 
        "revision" : 0,
 
        "raw_id" : "e1022d3d28dfba02f626cde65dbe08f4ceb0e4e7",
 
        "message" : "Added file via Kallithea",
 
        "id" : "e1022d3d28dfba02f626cde65dbe08f4ceb0e4e7",
 
        "reviews" : [
 
          {
 
            "status" : "under_review",
 
            "modified_at" : "2017-03-28T09:17:08.618",
 
            "reviewer" : "user"
 
          }
 
        ]
 
      }
 
    }
 

	
 
get_pullrequest
 
^^^^^^^^^^^^^^^
 

	
 
Get information and review status for a given pull request. This command can only be executed
 
using the api_key of a user with read permissions to the original repository.
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method  : "get_pullrequest"
 
    args:     {
 
                "pullrequest_id" : "<pullrequest_id>",
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: {
 
        "status": "<pull_request_status>",
 
        "pull_request_id": <pull_request_id>,
 
        "description": "<pull_request_description>",
 
        "title": "<pull_request_title>",
 
        "url": "<pull_request_url>",
 
        "reviewers": [
 
          {
 
            "username": "<user_name>",
 
          },
 
          ...
 
        ],
 
        "org_repo_url": "<repo_url>",
 
        "org_ref_parts": [
 
          "<ref_type>",
 
          "<ref_name>",
 
          "<raw_id>"
 
        ],
 
        "other_ref_parts": [
 
          "<ref_type>",
 
          "<ref_name>",
 
          "<raw_id>"
 
        ],
 
        "comments": [
 
          {
 
            "username": "<user_name>",
 
            "text": "<comment text>",
 
            "comment_id": "<comment_id>",
 
          },
 
          ...
 
        ],
 
        "owner": "<username>",
 
        "statuses": [
 
          {
 
            "status": "<status_of_review>",        # "under_review", "approved" or "rejected"
 
            "reviewer": "<user_name>",
 
            "modified_at": "<date_time_of_review>" # iso 8601 date, server's timezone
 
          },
 
          ...
 
        ],
 
        "revisions": [
 
          "<raw_id>",
 
          ...
 
        ]
 
    },
 
    error:  null
 

	
 
comment_pullrequest
 
^^^^^^^^^^^^^^^^^^^
 

	
 
Add comment, change status or close a given pull request. This command can only be executed
 
using the api_key of a user with read permissions to the original repository.
 

	
 
INPUT::
 

	
 
    id : <id_for_response>
 
    api_key : "<api_key>"
 
    method  : "comment_pullrequest"
 
    args:     {
 
                "pull_request_id":  "<pull_request_id>",
 
                "comment_msg":      Optional(''),
 
                "status":           Optional(None),     # "under_review", "approved" or "rejected"
 
                "close_pr":         Optional(False)",
 
              }
 

	
 
OUTPUT::
 

	
 
    id : <id_given_in_input>
 
    result: True
 
    error:  null
 

	
 

	
 
API access for web views
 
------------------------
 

	
 
API access can also be turned on for each web view in Kallithea that is
 
decorated with the ``@LoginRequired`` decorator. Some views use
 
``@LoginRequired(api_access=True)`` and are always available. By default only
 
RSS/Atom feed views are enabled. Other views are
 
only available if they have been whitelisted. Edit the
 
``api_access_controllers_whitelist`` option in your .ini file and define views
 
that should have API access enabled.
 

	
 
For example, to enable API access to patch/diff, raw file and archive::
 

	
 
    api_access_controllers_whitelist =
 
        ChangesetController:changeset_patch,
 
        ChangesetController:changeset_raw,
 
        FilesController:raw,
 
        FilesController:archivefile
 

	
 
After this change, a Kallithea view can be accessed without login using
 
bearer authentication, by including this header with the request::
 

	
 
    Authentication: Bearer <api_key>
 

	
 
Alternatively, the API key can be passed in the URL query string using
 
``?api_key=<api_key>``, though this is not recommended due to the increased
 
risk of API key leaks, and support will likely be removed in the future.
 

	
 
Exposing raw diffs is a good way to integrate with
 
third-party services like code review, or build farms that can download archives.
kallithea/controllers/api/api.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.api.api
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
API controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 20, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import time
 
import traceback
 
import logging
 
from sqlalchemy import or_
 

	
 
from tg import request
 

	
 
from kallithea.controllers.api import JSONRPCController, JSONRPCError
 
from kallithea.lib.auth import (
 
    PasswordGenerator, AuthUser, HasPermissionAnyDecorator,
 
    HasPermissionAnyDecorator, HasPermissionAny, HasRepoPermissionLevel,
 
    HasRepoGroupPermissionLevel, HasUserGroupPermissionLevel)
 
from kallithea.lib.utils import map_groups, repo2db_mapper
 
from kallithea.lib.utils2 import (
 
    str2bool, time_to_datetime, safe_int, Optional, OAttr)
 
from kallithea.model.meta import Session
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.scm import ScmModel, UserGroupList
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.comment import ChangesetCommentsModel
 
from kallithea.model.pull_request import PullRequestModel
 
from kallithea.model.db import (
 
    Repository, Setting, UserIpMap, Permission, User, Gist,
 
    RepoGroup, UserGroup)
 
    RepoGroup, UserGroup, PullRequest, ChangesetStatus)
 
from kallithea.lib.compat import json
 
from kallithea.lib.exceptions import (
 
    DefaultUserException, UserGroupsAssignedException)
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.utils import action_logger
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def store_update(updates, attr, name):
 
    """
 
    Stores param in updates dict if it's not instance of Optional
 
    allows easy updates of passed in params
 
    """
 
    if not isinstance(attr, Optional):
 
        updates[name] = attr
 

	
 

	
 
def get_user_or_error(userid):
 
    """
 
    Get user by id or name or return JsonRPCError if not found
 

	
 
    :param userid:
 
    """
 
    user = UserModel().get_user(userid)
 
    if user is None:
 
        raise JSONRPCError("user `%s` does not exist" % (userid,))
 
    return user
 

	
 

	
 
def get_repo_or_error(repoid):
 
    """
 
    Get repo by id or name or return JsonRPCError if not found
 

	
 
    :param repoid:
 
    """
 
    repo = RepoModel().get_repo(repoid)
 
    if repo is None:
 
        raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 
    return repo
 

	
 

	
 
def get_repo_group_or_error(repogroupid):
 
    """
 
    Get repo group by id or name or return JsonRPCError if not found
 

	
 
    :param repogroupid:
 
    """
 
    repo_group = RepoGroup.guess_instance(repogroupid)
 
    if repo_group is None:
 
        raise JSONRPCError(
 
            'repository group `%s` does not exist' % (repogroupid,))
 
    return repo_group
 

	
 

	
 
def get_user_group_or_error(usergroupid):
 
    """
 
    Get user group by id or name or return JsonRPCError if not found
 

	
 
    :param usergroupid:
 
    """
 
    user_group = UserGroupModel().get_group(usergroupid)
 
    if user_group is None:
 
        raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 
    return user_group
 

	
 

	
 
def get_perm_or_error(permid, prefix=None):
 
    """
 
    Get permission by id or name or return JsonRPCError if not found
 

	
 
    :param permid:
 
    """
 
    perm = Permission.get_by_key(permid)
 
    if perm is None:
 
        raise JSONRPCError('permission `%s` does not exist' % (permid,))
 
    if prefix:
 
        if not perm.permission_name.startswith(prefix):
 
            raise JSONRPCError('permission `%s` is invalid, '
 
                               'should start with %s' % (permid, prefix))
 
    return perm
 

	
 

	
 
def get_gist_or_error(gistid):
 
    """
 
    Get gist by id or gist_access_id or return JsonRPCError if not found
 

	
 
    :param gistid:
 
    """
 
    gist = GistModel().get_gist(gistid)
 
    if gist is None:
 
        raise JSONRPCError('gist `%s` does not exist' % (gistid,))
 
    return gist
 

	
 

	
 
class ApiController(JSONRPCController):
 
    """
 
    API Controller
 

	
 
    The authenticated user can be found as request.authuser.
 

	
 
@@ -2413,96 +2416,158 @@ class ApiController(JSONRPCController):
 
          result : null
 
          error :  {
 
            "failed to create gist"
 
          }
 

	
 
        """
 
        try:
 
            if isinstance(owner, Optional):
 
                owner = request.authuser.user_id
 

	
 
            owner = get_user_or_error(owner)
 
            description = Optional.extract(description)
 
            gist_type = Optional.extract(gist_type)
 
            lifetime = Optional.extract(lifetime)
 

	
 
            gist = GistModel().create(description=description,
 
                                      owner=owner,
 
                                      gist_mapping=files,
 
                                      gist_type=gist_type,
 
                                      lifetime=lifetime)
 
            Session().commit()
 
            return dict(
 
                msg='created new gist',
 
                gist=gist.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to create gist')
 

	
 
    # def update_gist(self, gistid, files, owner=Optional(OAttr('apiuser')),
 
    #                 gist_type=Optional(Gist.GIST_PUBLIC),
 
    #                 gist_lifetime=Optional(-1), gist_description=Optional('')):
 
    #     gist = get_gist_or_error(gistid)
 
    #     updates = {}
 

	
 
    # permission check inside
 
    def delete_gist(self, gistid):
 
        """
 
        Deletes existing gist
 

	
 
        :param gistid: id of gist to delete
 
        :type gistid: str
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "deleted gist ID: <gist_id>",
 
            "gist": null
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to delete gist ID:<gist_id>"
 
          }
 

	
 
        """
 
        gist = get_gist_or_error(gistid)
 
        if not HasPermissionAny('hg.admin')():
 
            if gist.owner_id != request.authuser.user_id:
 
                raise JSONRPCError('gist `%s` does not exist' % (gistid,))
 

	
 
        try:
 
            GistModel().delete(gist)
 
            Session().commit()
 
            return dict(
 
                msg='deleted gist ID:%s' % (gist.gist_access_id,),
 
                gist=None
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to delete gist ID:%s'
 
                               % (gist.gist_access_id,))
 

	
 
    # permission check inside
 
    def get_changeset(self, repoid, raw_id, with_reviews=Optional(False)):
 
        repo = get_repo_or_error(repoid)
 
        if not HasRepoPermissionLevel('read')(repo.repo_name):
 
            raise JSONRPCError('Access denied to repo %s' % repo.repo_name)
 
        changeset = repo.get_changeset(raw_id)
 
        if isinstance(changeset, EmptyChangeset):
 
            raise JSONRPCError('Changeset %s does not exist' % raw_id)
 

	
 
        info = dict(changeset.as_dict())
 

	
 
        with_reviews = Optional.extract(with_reviews)
 
        if with_reviews:
 
                reviews = ChangesetStatusModel().get_statuses(
 
                                    repo.repo_name, raw_id)
 
                info["reviews"] = reviews
 

	
 
        return info
 

	
 
    # permission check inside
 
    def get_pullrequest(self, pullrequest_id):
 
        """
 
        Get given pull request by id
 
        """
 
        pull_request = PullRequest.get(pullrequest_id)
 
        if pull_request is None:
 
            raise JSONRPCError('pull request `%s` does not exist' % (pullrequest_id,))
 
        if not HasRepoPermissionLevel('read')(pull_request.org_repo.repo_name):
 
            raise JSONRPCError('not allowed')
 
        return pull_request.get_api_data()
 

	
 
    # permission check inside
 
    def comment_pullrequest(self, pull_request_id, comment_msg='', status=None, close_pr=False):
 
        """
 
        Add comment, close and change status of pull request.
 
        """
 
        apiuser = get_user_or_error(request.authuser.user_id)
 
        pull_request = PullRequest.get(pull_request_id)
 
        if pull_request is None:
 
            raise JSONRPCError('pull request `%s` does not exist' % (pull_request_id,))
 
        if (not HasRepoPermissionLevel('read')(pull_request.org_repo.repo_name)):
 
            raise JSONRPCError('No permission to add comment. User needs at least reading permissions'
 
                               ' to the source repository.')
 
        owner = apiuser.user_id == pull_request.owner_id
 
        reviewer = apiuser.user_id in [reviewer.user_id for reviewer in pull_request.reviewers]
 
        if close_pr and not (apiuser.admin or owner):
 
            raise JSONRPCError('No permission to close pull request. User needs to be admin or owner.')
 
        if status and not (apiuser.admin or owner or reviewer):
 
            raise JSONRPCError('No permission to change pull request status. User needs to be admin, owner or reviewer.')
 
        if pull_request.is_closed():
 
            raise JSONRPCError('pull request is already closed')
 

	
 
        comment = ChangesetCommentsModel().create(
 
            text=comment_msg,
 
            repo=pull_request.org_repo.repo_id,
 
            author=apiuser.user_id,
 
            pull_request=pull_request.pull_request_id,
 
            f_path=None,
 
            line_no=None,
 
            status_change=(ChangesetStatus.get_status_lbl(status)),
 
            closing_pr=close_pr
 
        )
 
        action_logger(apiuser,
 
                      'user_commented_pull_request:%s' % pull_request_id,
 
                      pull_request.org_repo, request.ip_addr)
 
        if status:
 
            ChangesetStatusModel().set_status(
 
                pull_request.org_repo_id,
 
                status,
 
                apiuser.user_id,
 
                comment,
 
                pull_request=pull_request_id
 
            )
 
        if close_pr:
 
            PullRequestModel().close_pull_request(pull_request_id)
 
            action_logger(apiuser,
 
                          'user_closed_pull_request:%s' % pull_request_id,
 
                          pull_request.org_repo, request.ip_addr)
 
        Session().commit()
 
        return True
kallithea/model/db.py
Show inline comments
 
@@ -1422,193 +1422,198 @@ class Repository(Base, BaseDbModel):
 
        Returns comments for this repository grouped by revisions
 

	
 
        :param revisions: filter query by revisions only
 
        """
 
        cmts = ChangesetComment.query() \
 
            .filter(ChangesetComment.repo == self)
 
        if revisions is not None:
 
            if not revisions:
 
                return {} # don't use sql 'in' on empty set
 
            cmts = cmts.filter(ChangesetComment.revision.in_(revisions))
 
        grouped = collections.defaultdict(list)
 
        for cmt in cmts.all():
 
            grouped[cmt.revision].append(cmt)
 
        return grouped
 

	
 
    def statuses(self, revisions):
 
        """
 
        Returns statuses for this repository.
 
        PRs without any votes do _not_ show up as unreviewed.
 

	
 
        :param revisions: list of revisions to get statuses for
 
        """
 
        if not revisions:
 
            return {}
 

	
 
        statuses = ChangesetStatus.query() \
 
            .filter(ChangesetStatus.repo == self) \
 
            .filter(ChangesetStatus.version == 0) \
 
            .filter(ChangesetStatus.revision.in_(revisions))
 

	
 
        grouped = {}
 
        for stat in statuses.all():
 
            pr_id = pr_nice_id = pr_repo = None
 
            if stat.pull_request:
 
                pr_id = stat.pull_request.pull_request_id
 
                pr_nice_id = PullRequest.make_nice_id(pr_id)
 
                pr_repo = stat.pull_request.other_repo.repo_name
 
            grouped[stat.revision] = [str(stat.status), stat.status_lbl,
 
                                      pr_id, pr_repo, pr_nice_id,
 
                                      stat.author]
 
        return grouped
 

	
 
    def _repo_size(self):
 
        from kallithea.lib import helpers as h
 
        log.debug('calculating repository size...')
 
        return h.format_byte_size(self.scm_instance.size)
 

	
 
    #==========================================================================
 
    # SCM CACHE INSTANCE
 
    #==========================================================================
 

	
 
    def set_invalidate(self):
 
        """
 
        Mark caches of this repo as invalid.
 
        """
 
        CacheInvalidation.set_invalidate(self.repo_name)
 

	
 
    _scm_instance = None
 

	
 
    @property
 
    def scm_instance(self):
 
        if self._scm_instance is None:
 
            self._scm_instance = self.scm_instance_cached()
 
        return self._scm_instance
 

	
 
    def scm_instance_cached(self, valid_cache_keys=None):
 
        @cache_region('long_term', 'scm_instance_cached')
 
        def _c(repo_name): # repo_name is just for the cache key
 
            log.debug('Creating new %s scm_instance and populating cache', repo_name)
 
            return self.scm_instance_no_cache()
 
        rn = self.repo_name
 

	
 
        valid = CacheInvalidation.test_and_set_valid(rn, None, valid_cache_keys=valid_cache_keys)
 
        if not valid:
 
            log.debug('Cache for %s invalidated, getting new object', rn)
 
            region_invalidate(_c, None, 'scm_instance_cached', rn)
 
        else:
 
            log.debug('Trying to get scm_instance of %s from cache', rn)
 
        return _c(rn)
 

	
 
    def scm_instance_no_cache(self):
 
        repo_full_path = safe_str(self.repo_full_path)
 
        alias = get_scm(repo_full_path)[0]
 
        log.debug('Creating instance of %s repository from %s',
 
                  alias, self.repo_full_path)
 
        backend = get_backend(alias)
 

	
 
        if alias == 'hg':
 
            repo = backend(repo_full_path, create=False,
 
                           baseui=self._ui)
 
        else:
 
            repo = backend(repo_full_path, create=False)
 

	
 
        return repo
 

	
 
    def __json__(self):
 
        return dict(landing_rev = self.landing_rev)
 
        return dict(
 
            repo_id=self.repo_id,
 
            repo_name=self.repo_name,
 
            landing_rev=self.landing_rev,
 
        )
 

	
 

	
 
class RepoGroup(Base, BaseDbModel):
 
    __tablename__ = 'groups'
 
    __table_args__ = (
 
        CheckConstraint('group_id != group_parent_id', name='ck_groups_no_self_parent'),
 
        _table_args_default_dict,
 
    )
 
    __mapper_args__ = {'order_by': 'group_name'} # TODO: Deprecated as of SQLAlchemy 1.1.
 

	
 
    SEP = ' &raquo; '
 

	
 
    group_id = Column(Integer(), primary_key=True)
 
    group_name = Column(Unicode(255), nullable=False, unique=True) # full path
 
    parent_group_id = Column('group_parent_id', Integer(), ForeignKey('groups.group_id'), nullable=True)
 
    group_description = Column(Unicode(10000), nullable=False)
 
    enable_locking = Column(Boolean(), nullable=False, default=False)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', cascade='all', order_by='UserRepoGroupToPerm.group_to_perm_id')
 
    users_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
 
    parent_group = relationship('RepoGroup', remote_side=group_id)
 
    owner = relationship('User')
 

	
 
    @classmethod
 
    def query(cls, sorted=False):
 
        """Add RepoGroup-specific helpers for common query constructs.
 

	
 
        sorted: if True, apply the default ordering (name, case insensitive).
 
        """
 
        q = super(RepoGroup, cls).query()
 

	
 
        if sorted:
 
            q = q.order_by(func.lower(RepoGroup.group_name))
 

	
 
        return q
 

	
 
    def __init__(self, group_name='', parent_group=None):
 
        self.group_name = group_name
 
        self.parent_group = parent_group
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__, self.group_id,
 
                                      self.group_name)
 

	
 
    @classmethod
 
    def _generate_choice(cls, repo_group):
 
        """Return tuple with group_id and name as html literal"""
 
        from webhelpers.html import literal
 
        if repo_group is None:
 
            return (-1, u'-- %s --' % _('top level'))
 
        return repo_group.group_id, literal(cls.SEP.join(repo_group.full_path_splitted))
 

	
 
    @classmethod
 
    def groups_choices(cls, groups):
 
        """Return tuples with group_id and name as html literal."""
 
        return sorted((cls._generate_choice(g) for g in groups),
 
                      key=lambda c: c[1].split(cls.SEP))
 

	
 
    @classmethod
 
    def url_sep(cls):
 
        return URL_SEP
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(RepoGroup, cls).guess_instance(value, RepoGroup.get_by_group_name)
 

	
 
    @classmethod
 
    def get_by_group_name(cls, group_name, cache=False, case_insensitive=False):
 
        group_name = group_name.rstrip('/')
 
        if case_insensitive:
 
            gr = cls.query() \
 
                .filter(func.lower(cls.group_name) == func.lower(group_name))
 
        else:
 
            gr = cls.query() \
 
                .filter(cls.group_name == group_name)
 
        if cache:
 
            gr = gr.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_group_%s" % _hash_key(group_name)
 
                            )
 
            )
 
        return gr.scalar()
 

	
 
    @property
 
    def parents(self):
 
        groups = []
 
        group = self.parent_group
 
        while group is not None:
 
            groups.append(group)
 
            group = group.parent_group
 
            assert group not in groups, group # avoid recursion on bad db content
 
        groups.reverse()
 
        return groups
 

	
 
    @property
 
@@ -2145,400 +2150,427 @@ class CacheInvalidation(Base, BaseDbMode
 
        prefix = kallithea.CONFIG.get('instance_id', '')
 
        return "%s%s" % (prefix, key)
 

	
 
    @classmethod
 
    def set_invalidate(cls, repo_name):
 
        """
 
        Mark all caches of a repo as invalid in the database.
 
        """
 
        inv_objs = Session().query(cls).filter(cls.cache_args == repo_name).all()
 
        log.debug('for repo %s got %s invalidation objects',
 
                  safe_str(repo_name), inv_objs)
 

	
 
        for inv_obj in inv_objs:
 
            log.debug('marking %s key for invalidation based on repo_name=%s',
 
                      inv_obj, safe_str(repo_name))
 
            Session().delete(inv_obj)
 
        Session().commit()
 

	
 
    @classmethod
 
    def test_and_set_valid(cls, repo_name, kind, valid_cache_keys=None):
 
        """
 
        Mark this cache key as active and currently cached.
 
        Return True if the existing cache registration still was valid.
 
        Return False to indicate that it had been invalidated and caches should be refreshed.
 
        """
 

	
 
        key = (repo_name + '_' + kind) if kind else repo_name
 
        cache_key = cls._get_cache_key(key)
 

	
 
        if valid_cache_keys and cache_key in valid_cache_keys:
 
            return True
 

	
 
        inv_obj = cls.query().filter(cls.cache_key == cache_key).scalar()
 
        if inv_obj is None:
 
            inv_obj = cls(cache_key, repo_name)
 
            Session().add(inv_obj)
 
        elif inv_obj.cache_active:
 
            return True
 
        inv_obj.cache_active = True
 
        try:
 
            Session().commit()
 
        except sqlalchemy.exc.IntegrityError:
 
            log.error('commit of CacheInvalidation failed - retrying')
 
            Session().rollback()
 
            inv_obj = cls.query().filter(cls.cache_key == cache_key).scalar()
 
            if inv_obj is None:
 
                log.error('failed to create CacheInvalidation entry')
 
                # TODO: fail badly?
 
            # else: TOCTOU - another thread added the key at the same time; no further action required
 
        return False
 

	
 
    @classmethod
 
    def get_valid_cache_keys(cls):
 
        """
 
        Return opaque object with information of which caches still are valid
 
        and can be used without checking for invalidation.
 
        """
 
        return set(inv_obj.cache_key for inv_obj in cls.query().filter(cls.cache_active).all())
 

	
 

	
 
class ChangesetComment(Base, BaseDbModel):
 
    __tablename__ = 'changeset_comments'
 
    __table_args__ = (
 
        Index('cc_revision_idx', 'revision'),
 
        Index('cc_pull_request_id_idx', 'pull_request_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    comment_id = Column(Integer(), primary_key=True)
 
    repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    revision = Column(String(40), nullable=True)
 
    pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=True)
 
    line_no = Column(Unicode(10), nullable=True)
 
    f_path = Column(Unicode(1000), nullable=True)
 
    author_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    text = Column(UnicodeText(), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    modified_at = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    author = relationship('User')
 
    repo = relationship('Repository')
 
    # status_change is frequently used directly in templates - make it a lazy
 
    # join to avoid fetching each related ChangesetStatus on demand.
 
    # There will only be one ChangesetStatus referencing each comment so the join will not explode.
 
    status_change = relationship('ChangesetStatus',
 
                                 cascade="all, delete-orphan", lazy='joined')
 
    pull_request = relationship('PullRequest')
 

	
 
    def url(self):
 
        anchor = "comment-%s" % self.comment_id
 
        import kallithea.lib.helpers as h
 
        if self.revision:
 
            return h.url('changeset_home', repo_name=self.repo.repo_name, revision=self.revision, anchor=anchor)
 
        elif self.pull_request_id is not None:
 
            return self.pull_request.url(anchor=anchor)
 

	
 
    def __json__(self):
 
        return dict(
 
            comment_id=self.comment_id,
 
            username=self.author.username,
 
            text=self.text,
 
        )
 

	
 
    def deletable(self):
 
        return self.created_on > datetime.datetime.now() - datetime.timedelta(minutes=5)
 

	
 

	
 
class ChangesetStatus(Base, BaseDbModel):
 
    __tablename__ = 'changeset_statuses'
 
    __table_args__ = (
 
        Index('cs_revision_idx', 'revision'),
 
        Index('cs_version_idx', 'version'),
 
        Index('cs_pull_request_id_idx', 'pull_request_id'),
 
        Index('cs_changeset_comment_id_idx', 'changeset_comment_id'),
 
        Index('cs_pull_request_id_user_id_version_idx', 'pull_request_id', 'user_id', 'version'),
 
        Index('cs_repo_id_pull_request_id_idx', 'repo_id', 'pull_request_id'),
 
        UniqueConstraint('repo_id', 'revision', 'version'),
 
        _table_args_default_dict,
 
    )
 

	
 
    STATUS_NOT_REVIEWED = DEFAULT = 'not_reviewed'
 
    STATUS_APPROVED = 'approved'
 
    STATUS_REJECTED = 'rejected' # is shown as "Not approved" - TODO: change database content / scheme
 
    STATUS_UNDER_REVIEW = 'under_review'
 

	
 
    STATUSES = [
 
        (STATUS_NOT_REVIEWED, _("Not reviewed")),  # (no icon) and default
 
        (STATUS_UNDER_REVIEW, _("Under review")),
 
        (STATUS_REJECTED, _("Not approved")),
 
        (STATUS_APPROVED, _("Approved")),
 
    ]
 
    STATUSES_DICT = dict(STATUSES)
 

	
 
    changeset_status_id = Column(Integer(), primary_key=True)
 
    repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    revision = Column(String(40), nullable=True)
 
    status = Column(String(128), nullable=False, default=DEFAULT)
 
    comment_id = Column('changeset_comment_id', Integer(), ForeignKey('changeset_comments.comment_id'), nullable=False)
 
    modified_at = Column(DateTime(), nullable=False, default=datetime.datetime.now)
 
    version = Column(Integer(), nullable=False, default=0)
 
    pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=True)
 

	
 
    author = relationship('User')
 
    repo = relationship('Repository')
 
    comment = relationship('ChangesetComment')
 
    pull_request = relationship('PullRequest')
 

	
 
    def __unicode__(self):
 
        return u"<%s('%s:%s')>" % (
 
            self.__class__.__name__,
 
            self.status, self.author
 
        )
 

	
 
    @classmethod
 
    def get_status_lbl(cls, value):
 
        return cls.STATUSES_DICT.get(value)
 

	
 
    @property
 
    def status_lbl(self):
 
        return ChangesetStatus.get_status_lbl(self.status)
 

	
 
    def __json__(self):
 
        return dict(
 
            status=self.status,
 
            modified_at=self.modified_at,
 
            reviewer=self.author.username,
 
            )
 

	
 

	
 
class PullRequest(Base, BaseDbModel):
 
    __tablename__ = 'pull_requests'
 
    __table_args__ = (
 
        Index('pr_org_repo_id_idx', 'org_repo_id'),
 
        Index('pr_other_repo_id_idx', 'other_repo_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    # values for .status
 
    STATUS_NEW = u'new'
 
    STATUS_CLOSED = u'closed'
 

	
 
    pull_request_id = Column(Integer(), primary_key=True)
 
    title = Column(Unicode(255), nullable=False)
 
    description = Column(UnicodeText(), nullable=False)
 
    status = Column(Unicode(255), nullable=False, default=STATUS_NEW) # only for closedness, not approve/reject/etc
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    updated_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    _revisions = Column('revisions', UnicodeText(), nullable=False)
 
    org_repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    org_ref = Column(Unicode(255), nullable=False)
 
    other_repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    other_ref = Column(Unicode(255), nullable=False)
 

	
 
    @hybrid_property
 
    def revisions(self):
 
        return self._revisions.split(':')
 

	
 
    @revisions.setter
 
    def revisions(self, val):
 
        self._revisions = safe_unicode(':'.join(val))
 

	
 
    @property
 
    def org_ref_parts(self):
 
        return self.org_ref.split(':')
 

	
 
    @property
 
    def other_ref_parts(self):
 
        return self.other_ref.split(':')
 

	
 
    owner = relationship('User')
 
    reviewers = relationship('PullRequestReviewer',
 
                             cascade="all, delete-orphan")
 
    org_repo = relationship('Repository', primaryjoin='PullRequest.org_repo_id==Repository.repo_id')
 
    other_repo = relationship('Repository', primaryjoin='PullRequest.other_repo_id==Repository.repo_id')
 
    statuses = relationship('ChangesetStatus', order_by='ChangesetStatus.changeset_status_id')
 
    comments = relationship('ChangesetComment', order_by='ChangesetComment.comment_id',
 
                             cascade="all, delete-orphan")
 

	
 
    @classmethod
 
    def query(cls, reviewer_id=None, include_closed=True, sorted=False):
 
        """Add PullRequest-specific helpers for common query constructs.
 

	
 
        reviewer_id: only PRs with the specified user added as reviewer.
 

	
 
        include_closed: if False, do not include closed PRs.
 

	
 
        sorted: if True, apply the default ordering (newest first).
 
        """
 
        q = super(PullRequest, cls).query()
 

	
 
        if reviewer_id is not None:
 
            q = q.join(PullRequestReviewer).filter(PullRequestReviewer.user_id == reviewer_id)
 

	
 
        if not include_closed:
 
            q = q.filter(PullRequest.status != PullRequest.STATUS_CLOSED)
 

	
 
        if sorted:
 
            q = q.order_by(PullRequest.created_on.desc())
 

	
 
        return q
 

	
 
    def get_reviewer_users(self):
 
        """Like .reviewers, but actually returning the users"""
 
        return User.query() \
 
            .join(PullRequestReviewer) \
 
            .filter(PullRequestReviewer.pull_request == self) \
 
            .order_by(PullRequestReviewer.pull_request_reviewers_id) \
 
            .all()
 

	
 
    def is_closed(self):
 
        return self.status == self.STATUS_CLOSED
 

	
 
    def user_review_status(self, user_id):
 
        """Return the user's latest status votes on PR"""
 
        # note: no filtering on repo - that would be redundant
 
        status = ChangesetStatus.query() \
 
            .filter(ChangesetStatus.pull_request == self) \
 
            .filter(ChangesetStatus.user_id == user_id) \
 
            .order_by(ChangesetStatus.version) \
 
            .first()
 
        return str(status.status) if status else ''
 

	
 
    @classmethod
 
    def make_nice_id(cls, pull_request_id):
 
        '''Return pull request id nicely formatted for displaying'''
 
        return '#%s' % pull_request_id
 

	
 
    def nice_id(self):
 
        '''Return the id of this pull request, nicely formatted for displaying'''
 
        return self.make_nice_id(self.pull_request_id)
 

	
 
    def get_api_data(self):
 
        return self.__json__()
 

	
 
    def __json__(self):
 
        return dict(
 
            revisions=self.revisions
 
            pull_request_id=self.pull_request_id,
 
            url=self.url(),
 
            reviewers=self.reviewers,
 
            revisions=self.revisions,
 
            owner=self.owner.username,
 
            title=self.title,
 
            description=self.description,
 
            org_repo_url=self.org_repo.clone_url(),
 
            org_ref_parts=self.org_ref_parts,
 
            other_ref_parts=self.other_ref_parts,
 
            status=self.status,
 
            comments=self.comments,
 
            statuses=self.statuses,
 
        )
 

	
 
    def url(self, **kwargs):
 
        canonical = kwargs.pop('canonical', None)
 
        import kallithea.lib.helpers as h
 
        b = self.org_ref_parts[1]
 
        if b != self.other_ref_parts[1]:
 
            s = '/_/' + b
 
        else:
 
            s = '/_/' + self.title
 
        kwargs['extra'] = urlreadable(s)
 
        if canonical:
 
            return h.canonical_url('pullrequest_show', repo_name=self.other_repo.repo_name,
 
                                   pull_request_id=self.pull_request_id, **kwargs)
 
        return h.url('pullrequest_show', repo_name=self.other_repo.repo_name,
 
                     pull_request_id=self.pull_request_id, **kwargs)
 

	
 
class PullRequestReviewer(Base, BaseDbModel):
 
    __tablename__ = 'pull_request_reviewers'
 
    __table_args__ = (
 
        Index('pull_request_reviewers_user_id_idx', 'user_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    def __init__(self, user=None, pull_request=None):
 
        self.user = user
 
        self.pull_request = pull_request
 

	
 
    pull_request_reviewers_id = Column('pull_requests_reviewers_id', Integer(), primary_key=True)
 
    pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    pull_request = relationship('PullRequest')
 

	
 
    def __json__(self):
 
        return dict(
 
            username=self.user.username if self.user else None,
 
        )
 

	
 

	
 
class Notification(Base, BaseDbModel):
 
    __tablename__ = 'notifications'
 
    __table_args__ = (
 
        Index('notification_type_idx', 'type'),
 
        _table_args_default_dict,
 
    )
 

	
 
    TYPE_CHANGESET_COMMENT = u'cs_comment'
 
    TYPE_MESSAGE = u'message'
 
    TYPE_MENTION = u'mention' # not used
 
    TYPE_REGISTRATION = u'registration'
 
    TYPE_PULL_REQUEST = u'pull_request'
 
    TYPE_PULL_REQUEST_COMMENT = u'pull_request_comment'
 

	
 
    notification_id = Column(Integer(), primary_key=True)
 
    subject = Column(Unicode(512), nullable=False)
 
    body = Column(UnicodeText(), nullable=False)
 
    created_by = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    type_ = Column('type', Unicode(255), nullable=False)
 

	
 
    created_by_user = relationship('User')
 
    notifications_to_users = relationship('UserNotification', cascade="all, delete-orphan")
 

	
 
    @property
 
    def recipients(self):
 
        return [x.user for x in UserNotification.query()
 
                .filter(UserNotification.notification == self)
 
                .order_by(UserNotification.user_id.asc()).all()]
 

	
 
    @classmethod
 
    def create(cls, created_by, subject, body, recipients, type_=None):
 
        if type_ is None:
 
            type_ = Notification.TYPE_MESSAGE
 

	
 
        notification = cls()
 
        notification.created_by_user = created_by
 
        notification.subject = subject
 
        notification.body = body
 
        notification.type_ = type_
 
        notification.created_on = datetime.datetime.now()
 

	
 
        for recipient in recipients:
 
            un = UserNotification()
 
            un.notification = notification
 
            un.user_id = recipient.user_id
 
            # Mark notifications to self "pre-read" - should perhaps just be skipped
 
            if recipient == created_by:
 
                un.read = True
 
            Session().add(un)
 

	
 
        Session().add(notification)
 
        Session().flush() # assign notification.notification_id
 
        return notification
 

	
 
    @property
 
    def description(self):
 
        from kallithea.model.notification import NotificationModel
 
        return NotificationModel().make_description(self)
 

	
 

	
 
class UserNotification(Base, BaseDbModel):
 
    __tablename__ = 'user_to_notification'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'notification_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), primary_key=True)
 
    notification_id = Column(Integer(), ForeignKey('notifications.notification_id'), primary_key=True)
 
    read = Column(Boolean, nullable=False, default=False)
 
    sent_on = Column(DateTime(timezone=False), nullable=True) # FIXME: not nullable?
 

	
 
    user = relationship('User')
 
    notification = relationship('Notification')
 

	
 
    def mark_as_read(self):
 
        self.read = True
 

	
 

	
 
class Gist(Base, BaseDbModel):
 
    __tablename__ = 'gists'
 
    __table_args__ = (
 
        Index('g_gist_access_id_idx', 'gist_access_id'),
 
        Index('g_created_on_idx', 'created_on'),
 
        _table_args_default_dict,
 
    )
 

	
 
    GIST_PUBLIC = u'public'
 
    GIST_PRIVATE = u'private'
 
    DEFAULT_FILENAME = u'gistfile1.txt'
 

	
 
    gist_id = Column(Integer(), primary_key=True)
 
    gist_access_id = Column(Unicode(250), nullable=False)
 
    gist_description = Column(UnicodeText(), nullable=False)
kallithea/tests/api/api_base.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
Tests for the JSON-RPC web api.
 
"""
 

	
 
import os
 
import random
 
import mock
 
import re
 

	
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib.compat import json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.meta import Session
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.db import Repository, User, Setting, Ui
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.db import Repository, User, Setting, Ui, PullRequest, ChangesetStatus
 
from kallithea.lib.utils2 import time_to_datetime
 

	
 

	
 
API_URL = '/_admin/api'
 
TEST_USER_GROUP = u'test_user_group'
 
TEST_REPO_GROUP = u'test_repo_group'
 

	
 
fixture = Fixture()
 

	
 

	
 
def _build_data(apikey, method, **kw):
 
    """
 
    Builds API data with given random ID
 

	
 
    :param random_id:
 
    """
 
    random_id = random.randrange(1, 9999)
 
    return random_id, json.dumps({
 
        "id": random_id,
 
        "api_key": apikey,
 
        "method": method,
 
        "args": kw
 
    })
 

	
 

	
 
jsonify = lambda obj: json.loads(json.dumps(obj))
 

	
 

	
 
def crash(*args, **kwargs):
 
    raise Exception('Total Crash !')
 

	
 

	
 
def api_call(test_obj, params):
 
    response = test_obj.app.post(API_URL, content_type='application/json',
 
                                 params=params)
 
    return response
 

	
 

	
 
## helpers
 
def make_user_group(name=TEST_USER_GROUP):
 
    gr = fixture.create_user_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
 
    UserGroupModel().add_user_to_group(user_group=gr,
 
                                       user=TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
def make_repo_group(name=TEST_REPO_GROUP):
 
    gr = fixture.create_repo_group(name, cur_user=TEST_USER_ADMIN_LOGIN)
 
    Session().commit()
 
    return gr
 

	
 

	
 
class _BaseTestApi(object):
 
    REPO = None
 
    REPO_TYPE = None
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        cls.usr = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        cls.apikey = cls.usr.api_key
 
        cls.test_user = UserModel().create_or_update(
 
            username='test-api',
 
            password='test',
 
            email='test@example.com',
 
            firstname=u'first',
 
            lastname=u'last'
 
        )
 
        Session().commit()
 
        cls.TEST_USER_LOGIN = cls.test_user.username
 
        cls.apikey_regular = cls.test_user.api_key
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        pass
 

	
 
    def setup_method(self, method):
 
        make_user_group()
 
        make_repo_group()
 

	
 
    def teardown_method(self, method):
 
        fixture.destroy_user_group(TEST_USER_GROUP)
 
        fixture.destroy_gists()
 
        fixture.destroy_repo_group(TEST_REPO_GROUP)
 

	
 
    def _compare_ok(self, id_, expected, given):
 
        expected = jsonify({
 
            'id': id_,
 
            'error': None,
 
            'result': expected
 
        })
 
        given = json.loads(given)
 
        assert expected == given
 

	
 
    def _compare_error(self, id_, expected, given):
 
        expected = jsonify({
 
@@ -2413,96 +2415,182 @@ class _BaseTestApi(object):
 

	
 
    def test_api_delete_gist(self):
 
        gist_id = fixture.create_gist().gist_access_id
 
        id_, params = _build_data(self.apikey, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_delete_gist_regular_user(self):
 
        gist_id = fixture.create_gist(owner=self.TEST_USER_LOGIN).gist_access_id
 
        id_, params = _build_data(self.apikey_regular, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_delete_gist_regular_user_no_permission(self):
 
        gist_id = fixture.create_gist().gist_access_id
 
        id_, params = _build_data(self.apikey_regular, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = 'gist `%s` does not exist' % (gist_id,)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(GistModel, 'delete', crash)
 
    def test_api_delete_gist_exception_occurred(self):
 
        gist_id = fixture.create_gist().gist_access_id
 
        id_, params = _build_data(self.apikey, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = 'failed to delete gist ID:%s' % (gist_id,)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_ip(self):
 
        id_, params = _build_data(self.apikey, 'get_ip')
 
        response = api_call(self, params)
 
        expected = {
 
            'server_ip_addr': '0.0.0.0',
 
            'user_ips': []
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_server_info(self):
 
        id_, params = _build_data(self.apikey, 'get_server_info')
 
        response = api_call(self, params)
 
        expected = Setting.get_server_info()
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_changeset(self):
 
        review = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
 
        id_, params = _build_data(self.apikey, 'get_changeset',
 
                                  repoid=self.REPO, raw_id = self.TEST_REVISION)
 
        response = api_call(self, params)
 
        result = json.loads(response.body)["result"]
 
        assert result["raw_id"] == self.TEST_REVISION
 
        assert not result.has_key("reviews")
 

	
 
    def test_api_get_changeset_with_reviews(self):
 
        reviewobjs = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
 
        id_, params = _build_data(self.apikey, 'get_changeset',
 
                                  repoid=self.REPO, raw_id = self.TEST_REVISION,
 
                                  with_reviews = True)
 
        response = api_call(self, params)
 
        result = json.loads(response.body)["result"]
 
        assert result["raw_id"] == self.TEST_REVISION
 
        assert result.has_key("reviews")
 
        assert len(result["reviews"]) == 1
 
        review = result["reviews"][0]
 
        expected = {
 
            'status': 'approved',
 
            'modified_at': reviewobjs[0].modified_at.isoformat()[:-3],
 
            'reviewer': 'test_admin',
 
        }
 
        assert review == expected
 

	
 
    def test_api_get_changeset_that_does_not_exist(self):
 
        """ Fetch changeset status for non-existant changeset.
 
        revision id is the above git hash used in the test above with the
 
        last 3 nibbles replaced with 0xf.  Should not exist for git _or_ hg.
 
        """
 
        id_, params = _build_data(self.apikey, 'get_changeset',
 
                                  repoid=self.REPO, raw_id = '7ab37bc680b4aa72c34d07b230c866c28e9fcfff')
 
        response = api_call(self, params)
 
        expected = u'Changeset %s does not exist' % ('7ab37bc680b4aa72c34d07b230c866c28e9fcfff',)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_changeset_without_permission(self):
 
        review = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
 
        RepoModel().revoke_user_permission(repo=self.REPO, user=self.TEST_USER_LOGIN)
 
        RepoModel().revoke_user_permission(repo=self.REPO, user="default")
 
        id_, params = _build_data(self.apikey_regular, 'get_changeset',
 
                                  repoid=self.REPO, raw_id = self.TEST_REVISION)
 
        response = api_call(self, params)
 
        expected = u'Access denied to repo %s' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'get test')
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": 'get_pullrequest',
 
            "args": {"pullrequest_id": pull_request_id},
 
        })
 
        response = api_call(self, params)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        expected = {
 
            "status": "new",
 
            "pull_request_id": pull_request_id,
 
            "description": "No description",
 
            "url": "/%s/pull-request/%s/_/%s" % (self.REPO, pull_request_id, "stable"),
 
            "reviewers": [{"username": "test_regular"}],
 
            "org_repo_url": "http://localhost:80/%s" % self.REPO,
 
            "org_ref_parts": ["branch", "stable", self.TEST_PR_SRC],
 
            "other_ref_parts": ["branch", "default", self.TEST_PR_DST],
 
            "comments": [{"username": TEST_USER_ADMIN_LOGIN, "text": "",
 
                         "comment_id": pullrequest.comments[0].comment_id}],
 
            "owner": TEST_USER_ADMIN_LOGIN,
 
            "statuses": [{"status": "under_review", "reviewer": TEST_USER_ADMIN_LOGIN, "modified_at": "2000-01-01T00:00:00.000"} for i in range(0, len(self.TEST_PR_REVISIONS))],
 
            "title": "get test",
 
            "revisions": self.TEST_PR_REVISIONS,
 
        }
 
        self._compare_ok(random_id, expected,
 
                         given=re.sub("\d\d\d\d\-\d\d\-\d\dT\d\d\:\d\d\:\d\d\.\d\d\d",
 
                                      "2000-01-01T00:00:00.000", response.body))
 

	
 
    def test_api_close_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'close test')
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "close_pr": True},
 
        })
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert pullrequest.comments[-1].text == ''
 
        assert pullrequest.status == PullRequest.STATUS_CLOSED
 
        assert pullrequest.is_closed() == True
 

	
 
    def test_api_status_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, "status test")
 

	
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": User.get_by_username(TEST_USER_REGULAR2_LOGIN).api_key,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "status": ChangesetStatus.STATUS_APPROVED},
 
        })
 
        response = api_call(self, params)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        self._compare_error(random_id, "No permission to change pull request status. User needs to be admin, owner or reviewer.", given=response.body)
 
        assert ChangesetStatus.STATUS_UNDER_REVIEW == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": User.get_by_username(TEST_USER_REGULAR_LOGIN).api_key,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "status": ChangesetStatus.STATUS_APPROVED},
 
        })
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert ChangesetStatus.STATUS_APPROVED == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
 

	
 
    def test_api_comment_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, "comment test")
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "comment_msg": "Looks good to me"},
 
        })
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert pullrequest.comments[-1].text == u'Looks good to me'
kallithea/tests/api/test_api_git.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
from kallithea.tests.base import TestController, GIT_REPO, GIT_TEST_REVISION
 
from kallithea.tests.api.api_base import _BaseTestApi
 

	
 

	
 
class TestGitApi(_BaseTestApi, TestController):
 
    REPO = GIT_REPO
 
    REPO_TYPE = 'git'
 
    TEST_REVISION = GIT_TEST_REVISION
 
    TEST_PR_SRC = u'c60f01b77c42dce653d6b1d3b04689862c261929'
 
    TEST_PR_DST = u'10cddef6b794696066fb346434014f0a56810218'
 
    TEST_PR_REVISIONS = [u'1bead5880d2dbe831762bf7fb439ba2919b75fdd',
 
                         u'9bcd3ecfc8832a8cd881c1c1bbe2d13ffa9d94c7',
 
                         u'283de4dfca8479875a1befb8d4059f3bbb725145',
 
                         u'c60f01b77c42dce653d6b1d3b04689862c261929']
kallithea/tests/api/test_api_hg.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
from kallithea.tests.base import TestController, HG_REPO, HG_TEST_REVISION
 
from kallithea.tests.api.api_base import _BaseTestApi
 

	
 

	
 
class TestHgApi(_BaseTestApi, TestController):
 
    REPO = HG_REPO
 
    REPO_TYPE = 'hg'
 
    TEST_REVISION = HG_TEST_REVISION
 
    TEST_PR_SRC = u'4f7e2131323e0749a740c0a56ab68ae9269c562a'
 
    TEST_PR_DST = u'92831aebf2f8dd4879e897024b89d09af214df1c'
 
    TEST_PR_REVISIONS = [u'720bbdb27665d6262b313e8a541b654d0cbd5b27',
 
                         u'f41649565a9e89919a588a163e717b4084f8a3b1',
 
                         u'94f45ed825a113e61af7e141f44ca578374abef0',
 
                         u'fef5bfe1dc17611d5fb59a7f6f95c55c3606f933',
 
                         u'4f7e2131323e0749a740c0a56ab68ae9269c562a']
kallithea/tests/fixture.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
Helpers for fixture generation
 
"""
 

	
 
import logging
 
import os
 
import shutil
 
import tarfile
 
from os.path import dirname
 

	
 
import mock
 
from tg import request
 
from tg.util.webtest import test_context
 

	
 
from kallithea.model.db import Repository, User, RepoGroup, UserGroup, Gist, ChangesetStatus
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.comment import ChangesetCommentsModel
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.pull_request import CreatePullRequestAction#, CreatePullRequestIterationAction, PullRequestModel
 
from kallithea.lib import helpers
 
from kallithea.lib.auth import AuthUser
 
from kallithea.lib.db_manage import DbManage
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.tests.base import invalidate_all_caches, GIT_REPO, HG_REPO, TESTS_TMP_PATH, TEST_USER_ADMIN_LOGIN
 
from kallithea.tests.base import invalidate_all_caches, GIT_REPO, HG_REPO, \
 
    TESTS_TMP_PATH, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
FIXTURES = os.path.join(dirname(dirname(os.path.abspath(__file__))), 'tests', 'fixtures')
 

	
 

	
 
def error_function(*args, **kwargs):
 
    raise Exception('Total Crash !')
 

	
 

	
 
class Fixture(object):
 

	
 
    def __init__(self):
 
        pass
 

	
 
    def anon_access(self, status):
 
        """
 
        Context manager for controlling anonymous access.
 
        Anon access will be set and committed, but restored again when exiting the block.
 

	
 
        Usage:
 

	
 
        fixture = Fixture()
 
        with fixture.anon_access(False):
 
            stuff
 
        """
 

	
 
        class context(object):
 
            def __enter__(self):
 
                anon = User.get_default_user()
 
                self._before = anon.active
 
                anon.active = status
 
                Session().commit()
 
                invalidate_all_caches()
 

	
 
            def __exit__(self, exc_type, exc_val, exc_tb):
 
                anon = User.get_default_user()
 
                anon.active = self._before
 
                Session().commit()
 

	
 
        return context()
 

	
 
    def _get_repo_create_params(self, **custom):
 
        """Return form values to be validated through RepoForm"""
 
        defs = dict(
 
            repo_name=None,
 
            repo_type='hg',
 
            clone_uri='',
 
            repo_group=u'-1',
 
            repo_description=u'DESC',
 
            repo_private=False,
 
            repo_landing_rev='rev:tip',
 
            repo_copy_permissions=False,
 
            repo_state=Repository.STATE_CREATED,
 
        )
 
        defs.update(custom)
 
        if 'repo_name_full' not in custom:
 
            defs.update({'repo_name_full': defs['repo_name']})
 

	
 
        # fix the repo name if passed as repo_name_full
 
        if defs['repo_name']:
 
            defs['repo_name'] = defs['repo_name'].split('/')[-1]
 

	
 
        return defs
 

	
 
    def _get_repo_group_create_params(self, **custom):
 
        """Return form values to be validated through RepoGroupForm"""
 
        defs = dict(
 
            group_name=None,
 
            group_description=u'DESC',
 
            parent_group_id=u'-1',
 
            perms_updates=[],
 
            perms_new=[],
 
            enable_locking=False,
 
            recursive=False
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def _get_user_create_params(self, name, **custom):
 
        defs = dict(
 
            username=name,
 
            password='qweqwe',
 
            email='%s+test@example.com' % name,
 
            firstname=u'TestUser',
 
            lastname=u'Test',
 
            active=True,
 
            admin=False,
 
            extern_type='internal',
 
            extern_name=None
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 
@@ -224,175 +232,190 @@ class Fixture(object):
 

	
 
    def create_user_group(self, name, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            gr = UserGroup.get_by_group_name(group_name=name)
 
            if gr:
 
                return gr
 
        form_data = self._get_user_group_create_params(name, **kwargs)
 
        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        user_group = UserGroupModel().create(
 
            name=form_data['users_group_name'],
 
            description=form_data['user_group_description'],
 
            owner=owner, active=form_data['users_group_active'],
 
            group_data=form_data['user_group_data'])
 
        Session().commit()
 
        user_group = UserGroup.get_by_group_name(user_group.users_group_name)
 
        return user_group
 

	
 
    def destroy_user_group(self, usergroupid):
 
        UserGroupModel().delete(user_group=usergroupid, force=True)
 
        Session().commit()
 

	
 
    def create_gist(self, **kwargs):
 
        form_data = {
 
            'description': u'new-gist',
 
            'owner': TEST_USER_ADMIN_LOGIN,
 
            'gist_type': Gist.GIST_PUBLIC,
 
            'lifetime': -1,
 
            'gist_mapping': {'filename1.txt':{'content':'hello world'},}
 
        }
 
        form_data.update(kwargs)
 
        gist = GistModel().create(
 
            description=form_data['description'],owner=form_data['owner'],
 
            gist_mapping=form_data['gist_mapping'], gist_type=form_data['gist_type'],
 
            lifetime=form_data['lifetime']
 
        )
 
        Session().commit()
 

	
 
        return gist
 

	
 
    def destroy_gists(self, gistid=None):
 
        for g in Gist.query():
 
            if gistid:
 
                if gistid == g.gist_access_id:
 
                    GistModel().delete(g)
 
            else:
 
                GistModel().delete(g)
 
        Session().commit()
 

	
 
    def load_resource(self, resource_name, strip=True):
 
        with open(os.path.join(FIXTURES, resource_name), 'rb') as f:
 
            source = f.read()
 
            if strip:
 
                source = source.strip()
 

	
 
        return source
 

	
 
    def commit_change(self, repo, filename, content, message, vcs_type,
 
                      parent=None, newfile=False, author=None):
 
        repo = Repository.get_by_repo_name(repo)
 
        _cs = parent
 
        if parent is None:
 
            _cs = EmptyChangeset(alias=vcs_type)
 
        if author is None:
 
            author = TEST_USER_ADMIN_LOGIN
 

	
 
        if newfile:
 
            nodes = {
 
                filename: {
 
                    'content': content
 
                }
 
            }
 
            cs = ScmModel().create_nodes(
 
                user=TEST_USER_ADMIN_LOGIN, repo=repo,
 
                message=message,
 
                nodes=nodes,
 
                parent_cs=_cs,
 
                author=author,
 
            )
 
        else:
 
            cs = ScmModel().commit_change(
 
                repo=repo.scm_instance, repo_name=repo.repo_name,
 
                cs=parent, user=TEST_USER_ADMIN_LOGIN,
 
                author=author,
 
                message=message,
 
                content=content,
 
                f_path=filename
 
            )
 
        return cs
 

	
 
    def review_changeset(self, repo, revision, status, author=TEST_USER_ADMIN_LOGIN):
 
        comment = ChangesetCommentsModel().create(u"review comment", repo, author, revision=revision, send_email=False)
 
        csm = ChangesetStatusModel().set_status(repo, ChangesetStatus.STATUS_APPROVED, author, comment, revision=revision)
 
        Session().commit()
 
        return csm
 

	
 
    def create_pullrequest(self, testcontroller, repo_name, pr_src_rev, pr_dst_rev, title='title'):
 
        org_ref = 'branch:stable:%s' % pr_src_rev
 
        other_ref = 'branch:default:%s' % pr_dst_rev
 
        with test_context(testcontroller.app): # needed to be able to mock request user
 
            org_repo = other_repo = Repository.get_by_repo_name(repo_name)
 
            owner_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
            reviewers = [User.get_by_username(TEST_USER_REGULAR_LOGIN)]
 
            request.authuser = request.user = AuthUser(dbuser=owner_user)
 
            # creating a PR sends a message with an absolute URL - without routing that requires mocking
 
            with mock.patch.object(helpers, 'url', (lambda arg, qualified=False, **kwargs: ('https://localhost' if qualified else '') + '/fake/' + arg)):
 
                cmd = CreatePullRequestAction(org_repo, other_repo, org_ref, other_ref, title, 'No description', owner_user, reviewers)
 
                pull_request = cmd.execute()
 
            Session().commit()
 
        return pull_request.pull_request_id
 

	
 

	
 
#==============================================================================
 
# Global test environment setup
 
#==============================================================================
 

	
 
def create_test_env(repos_test_path, config):
 
    """
 
    Makes a fresh database and
 
    install test repository into tmp dir
 
    """
 

	
 
    # PART ONE create db
 
    dbconf = config['sqlalchemy.url']
 
    log.debug('making test db %s', dbconf)
 

	
 
    # create test dir if it doesn't exist
 
    if not os.path.isdir(repos_test_path):
 
        log.debug('Creating testdir %s', repos_test_path)
 
        os.makedirs(repos_test_path)
 

	
 
    dbmanage = DbManage(dbconf=dbconf, root=config['here'],
 
                        tests=True)
 
    dbmanage.create_tables(override=True)
 
    # for tests dynamically set new root paths based on generated content
 
    dbmanage.create_settings(dbmanage.config_prompt(repos_test_path))
 
    dbmanage.create_default_user()
 
    dbmanage.admin_prompt()
 
    dbmanage.create_permissions()
 
    dbmanage.populate_default_permissions()
 
    Session().commit()
 
    # PART TWO make test repo
 
    log.debug('making test vcs repositories')
 

	
 
    idx_path = config['index_dir']
 
    data_path = config['cache_dir']
 

	
 
    #clean index and data
 
    if idx_path and os.path.exists(idx_path):
 
        log.debug('remove %s', idx_path)
 
        shutil.rmtree(idx_path)
 

	
 
    if data_path and os.path.exists(data_path):
 
        log.debug('remove %s', data_path)
 
        shutil.rmtree(data_path)
 

	
 
    #CREATE DEFAULT TEST REPOS
 
    tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_hg.tar.gz'))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
    tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_git.tar.gz'))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, GIT_REPO))
 
    tar.close()
 

	
 
    #LOAD VCS test stuff
 
    from kallithea.tests.vcs import setup_package
 
    setup_package()
 

	
 

	
 
def create_test_index(repo_location, config, full_index):
 
    """
 
    Makes default test index
 
    """
 

	
 
    from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
    from kallithea.lib.pidlock import DaemonLock, LockHeld
 

	
 
    index_location = os.path.join(config['index_dir'])
 
    if not os.path.exists(index_location):
 
        os.makedirs(index_location)
 

	
 
    try:
 
        l = DaemonLock(file_=os.path.join(dirname(index_location), 'make_index.lock'))
 
        WhooshIndexingDaemon(index_location=index_location,
 
                             repo_location=repo_location) \
 
            .run(full_index=full_index)
 
        l.release()
 
    except LockHeld:
 
        pass
0 comments (0 inline, 0 general)