Changeset - 548357c4301d
[Not reviewed]
default
0 6 0
Mads Kiilerich - 6 years ago 2019-08-04 01:39:04
mads@kiilerich.com
Grafted from: 065b18816a85
flake8: fix E117 over-indented
6 files changed with 26 insertions and 26 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/api/__init__.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.api
 
~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
JSON RPC controller
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 20, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import inspect
 
import itertools
 
import logging
 
import time
 
import traceback
 
import types
 

	
 
from tg import Response, TGController, request, response
 
from webob.exc import HTTPError, HTTPException
 

	
 
from kallithea.lib.auth import AuthUser
 
from kallithea.lib.base import _get_access_path
 
from kallithea.lib.base import _get_ip_addr as _get_ip
 
from kallithea.lib.compat import json
 
from kallithea.lib.utils2 import safe_str, safe_unicode
 
from kallithea.model.db import User
 

	
 

	
 
log = logging.getLogger('JSONRPC')
 

	
 

	
 
class JSONRPCError(BaseException):
 

	
 
    def __init__(self, message):
 
        self.message = message
 
        super(JSONRPCError, self).__init__()
 

	
 
    def __str__(self):
 
        return safe_str(self.message)
 

	
 

	
 
class JSONRPCErrorResponse(Response, HTTPException):
 
    """
 
    Generate a Response object with a JSON-RPC error body
 
    """
 

	
 
    def __init__(self, message=None, retid=None, code=None):
 
        HTTPException.__init__(self, message, self)
 
        Response.__init__(self,
 
                          json_body=dict(id=retid, result=None, error=message),
 
                          status=code,
 
                          content_type='application/json')
 

	
 

	
 
class JSONRPCController(TGController):
 
    """
 
     A WSGI-speaking JSON-RPC controller class
 

	
 
     See the specification:
 
     <http://json-rpc.org/wiki/specification>`.
 

	
 
     Valid controller return values should be json-serializable objects.
 

	
 
     Sub-classes should catch their exceptions and raise JSONRPCError
 
     if they want to pass meaningful errors to the client.
 

	
 
     """
 

	
 
    def _get_ip_addr(self, environ):
 
        return _get_ip(environ)
 

	
 
    def _get_method_args(self):
 
        """
 
        Return `self._rpc_args` to dispatched controller method
 
        chosen by __call__
 
        """
 
        return self._rpc_args
 

	
 
    def _dispatch(self, state, remainder=None):
 
        """
 
        Parse the request body as JSON, look up the method on the
 
        controller and if it exists, dispatch to it.
 
        """
 
        # Since we are here we should respond as JSON
 
        response.content_type = 'application/json'
 

	
 
        environ = state.request.environ
 
        start = time.time()
 
        ip_addr = self._get_ip_addr(environ)
 
        self._req_id = None
 
        if 'CONTENT_LENGTH' not in environ:
 
            log.debug("No Content-Length")
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message="No Content-Length in request")
 
        else:
 
            length = environ['CONTENT_LENGTH'] or 0
 
            length = int(environ['CONTENT_LENGTH'])
 
            log.debug('Content-Length: %s', length)
 

	
 
        if length == 0:
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message="Content-Length is 0")
 

	
 
        raw_body = environ['wsgi.input'].read(length)
 

	
 
        try:
 
            json_body = json.loads(raw_body)
 
        except ValueError as e:
 
            # catch JSON errors Here
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message="JSON parse error ERR:%s RAW:%r"
 
                                                % (e, raw_body))
 

	
 
        # check AUTH based on API key
 
        try:
 
            self._req_api_key = json_body['api_key']
 
            self._req_id = json_body['id']
 
            self._req_method = json_body['method']
 
            self._request_params = json_body['args']
 
            if not isinstance(self._request_params, dict):
 
                self._request_params = {}
 

	
 
            log.debug('method: %s, params: %s',
 
                      self._req_method, self._request_params)
 
        except KeyError as e:
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message='Incorrect JSON query missing %s' % e)
 

	
 
        # check if we can find this session using api_key
 
        try:
 
            u = User.get_by_api_key(self._req_api_key)
 
            auth_user = AuthUser.make(dbuser=u, ip_addr=ip_addr)
 
            if auth_user is None:
 
                raise JSONRPCErrorResponse(retid=self._req_id,
 
                                           message='Invalid API key')
 
        except Exception as e:
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message='Invalid API key')
 

	
 
        request.authuser = auth_user
 
        request.ip_addr = ip_addr
 

	
 
        self._error = None
 
        try:
 
            self._func = self._find_method()
 
        except AttributeError as e:
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message=str(e))
 

	
 
        # now that we have a method, add self._req_params to
 
        # self.kargs and dispatch control to WGIController
 
        argspec = inspect.getargspec(self._func)
 
        arglist = argspec[0][1:]
 
        defaults = map(type, argspec[3] or [])
 
        default_empty = types.NotImplementedType
 

	
 
        # kw arguments required by this method
 
        func_kwargs = dict(itertools.izip_longest(reversed(arglist), reversed(defaults),
 
                                                  fillvalue=default_empty))
 

	
 
        # This attribute will need to be first param of a method that uses
 
        # api_key, which is translated to instance of user at that name
 
        USER_SESSION_ATTR = 'apiuser'
 

	
 
        # get our arglist and check if we provided them as args
 
        for arg, default in func_kwargs.iteritems():
 
            if arg == USER_SESSION_ATTR:
 
                # USER_SESSION_ATTR is something translated from API key and
 
                # this is checked before so we don't need validate it
 
                continue
 

	
 
            # skip the required param check if it's default value is
 
            # NotImplementedType (default_empty)
 
            if default == default_empty and arg not in self._request_params:
 
                raise JSONRPCErrorResponse(
 
                    retid=self._req_id,
 
                    message='Missing non optional `%s` arg in JSON DATA' % arg,
 
                )
 

	
 
        extra = set(self._request_params).difference(func_kwargs)
 
        if extra:
 
                raise JSONRPCErrorResponse(
 
                    retid=self._req_id,
 
                    message='Unknown %s arg in JSON DATA' %
 
                            ', '.join('`%s`' % arg for arg in extra),
 
                )
 
            raise JSONRPCErrorResponse(
 
                retid=self._req_id,
 
                message='Unknown %s arg in JSON DATA' %
 
                        ', '.join('`%s`' % arg for arg in extra),
 
            )
 

	
 
        self._rpc_args = {}
 
        self._rpc_args.update(self._request_params)
 
        self._rpc_args['action'] = self._req_method
 
        self._rpc_args['environ'] = environ
 

	
 
        log.info('IP: %s Request to %s time: %.3fs' % (
 
            self._get_ip_addr(environ),
 
            safe_unicode(_get_access_path(environ)), time.time() - start)
 
        )
 

	
 
        state.set_action(self._rpc_call, [])
 
        state.set_params(self._rpc_args)
 
        return state
 

	
 
    def _rpc_call(self, action, environ, **rpc_args):
 
        """
 
        Call the specified RPC Method
 
        """
 
        raw_response = ''
 
        try:
 
            raw_response = getattr(self, action)(**rpc_args)
 
            if isinstance(raw_response, HTTPError):
 
                self._error = str(raw_response)
 
        except JSONRPCError as e:
 
            self._error = safe_str(e)
 
        except Exception as e:
 
            log.error('Encountered unhandled exception: %s',
 
                      traceback.format_exc(),)
 
            json_exc = JSONRPCError('Internal server error')
 
            self._error = safe_str(json_exc)
 

	
 
        if self._error is not None:
 
            raw_response = None
 

	
 
        response = dict(id=self._req_id, result=raw_response, error=self._error)
 
        try:
 
            return json.dumps(response)
 
        except TypeError as e:
 
            log.error('API FAILED. Error encoding response: %s', e)
 
            return json.dumps(
 
                dict(
 
                    id=self._req_id,
 
                    result=None,
 
                    error="Error encoding response"
 
                )
 
            )
 

	
 
    def _find_method(self):
 
        """
 
        Return method named by `self._req_method` in controller if able
 
        """
 
        log.debug('Trying to find JSON-RPC method: %s', self._req_method)
 
        if self._req_method.startswith('_'):
 
            raise AttributeError("Method not allowed")
 

	
 
        try:
 
            func = getattr(self, self._req_method, None)
 
        except UnicodeEncodeError:
 
            raise AttributeError("Problem decoding unicode in requested "
 
                                 "method name.")
 

	
 
        if isinstance(func, types.MethodType):
 
            return func
 
        else:
 
            raise AttributeError("No such method: %s" % (self._req_method,))
kallithea/controllers/api/api.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.api.api
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
API controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 20, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 
from datetime import datetime
 

	
 
from tg import request
 

	
 
from kallithea.controllers.api import JSONRPCController, JSONRPCError
 
from kallithea.lib.auth import (
 
    AuthUser, HasPermissionAny, HasPermissionAnyDecorator, HasRepoGroupPermissionLevel, HasRepoPermissionLevel, HasUserGroupPermissionLevel)
 
from kallithea.lib.exceptions import DefaultUserException, UserGroupsAssignedException
 
from kallithea.lib.utils import action_logger, repo2db_mapper
 
from kallithea.lib.utils2 import OAttr, Optional
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.exceptions import EmptyRepositoryError
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.comment import ChangesetCommentsModel
 
from kallithea.model.db import ChangesetStatus, Gist, Permission, PullRequest, RepoGroup, Repository, Setting, User, UserGroup, UserIpMap
 
from kallithea.model.gist import GistModel
 
from kallithea.model.meta import Session
 
from kallithea.model.pull_request import PullRequestModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.scm import ScmModel, UserGroupList
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def store_update(updates, attr, name):
 
    """
 
    Stores param in updates dict if it's not instance of Optional
 
    allows easy updates of passed in params
 
    """
 
    if not isinstance(attr, Optional):
 
        updates[name] = attr
 

	
 

	
 
def get_user_or_error(userid):
 
    """
 
    Get user by id or name or return JsonRPCError if not found
 

	
 
    :param userid:
 
    """
 
    user = UserModel().get_user(userid)
 
    if user is None:
 
        raise JSONRPCError("user `%s` does not exist" % (userid,))
 
    return user
 

	
 

	
 
def get_repo_or_error(repoid):
 
    """
 
    Get repo by id or name or return JsonRPCError if not found
 

	
 
    :param repoid:
 
    """
 
    repo = RepoModel().get_repo(repoid)
 
    if repo is None:
 
        raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 
    return repo
 

	
 

	
 
def get_repo_group_or_error(repogroupid):
 
    """
 
    Get repo group by id or name or return JsonRPCError if not found
 

	
 
    :param repogroupid:
 
    """
 
    repo_group = RepoGroup.guess_instance(repogroupid)
 
    if repo_group is None:
 
        raise JSONRPCError(
 
            'repository group `%s` does not exist' % (repogroupid,))
 
    return repo_group
 

	
 

	
 
def get_user_group_or_error(usergroupid):
 
    """
 
    Get user group by id or name or return JsonRPCError if not found
 

	
 
    :param usergroupid:
 
    """
 
    user_group = UserGroupModel().get_group(usergroupid)
 
    if user_group is None:
 
        raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 
    return user_group
 

	
 

	
 
def get_perm_or_error(permid, prefix=None):
 
    """
 
    Get permission by id or name or return JsonRPCError if not found
 

	
 
    :param permid:
 
    """
 
    perm = Permission.get_by_key(permid)
 
    if perm is None:
 
        raise JSONRPCError('permission `%s` does not exist' % (permid,))
 
    if prefix:
 
        if not perm.permission_name.startswith(prefix):
 
            raise JSONRPCError('permission `%s` is invalid, '
 
                               'should start with %s' % (permid, prefix))
 
    return perm
 

	
 

	
 
def get_gist_or_error(gistid):
 
    """
 
    Get gist by id or gist_access_id or return JsonRPCError if not found
 

	
 
    :param gistid:
 
    """
 
    gist = GistModel().get_gist(gistid)
 
    if gist is None:
 
        raise JSONRPCError('gist `%s` does not exist' % (gistid,))
 
    return gist
 

	
 

	
 
class ApiController(JSONRPCController):
 
    """
 
    API Controller
 

	
 
    The authenticated user can be found as request.authuser.
 

	
 
    Example function::
 

	
 
        def func(arg1, arg2,...):
 
            pass
 

	
 
    Each function should also **raise** JSONRPCError for any
 
    errors that happens.
 
    """
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def test(self, args):
 
        return args
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def pull(self, repoid, clone_uri=Optional(None)):
 
        """
 
        Triggers a pull from remote location on given repo. Can be used to
 
        automatically keep remote repos up to date. This command can be executed
 
        only using api_key belonging to user with admin rights
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param clone_uri: repository URI to pull from (optional)
 
        :type clone_uri: str
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "msg": "Pulled from `<repository name>`"
 
            "repository": "<repository name>"
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "Unable to pull changes from `<reponame>`"
 
          }
 

	
 
        """
 

	
 
        repo = get_repo_or_error(repoid)
 

	
 
        try:
 
            ScmModel().pull_changes(repo.repo_name,
 
                                    request.authuser.username,
 
                                    request.ip_addr,
 
                                    clone_uri=Optional.extract(clone_uri))
 
            return dict(
 
                msg='Pulled from `%s`' % repo.repo_name,
 
                repository=repo.repo_name
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'Unable to pull changes from `%s`' % repo.repo_name
 
            )
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def rescan_repos(self, remove_obsolete=Optional(False)):
 
        """
 
        Triggers rescan repositories action. If remove_obsolete is set
 
        than also delete repos that are in database but not in the filesystem.
 
        aka "clean zombies". This command can be executed only using api_key
 
        belonging to user with admin rights.
 

	
 
        :param remove_obsolete: deletes repositories from
 
            database that are not found on the filesystem
 
        :type remove_obsolete: Optional(bool)
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            'added': [<added repository name>,...]
 
            'removed': [<removed repository name>,...]
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            'Error occurred during rescan repositories action'
 
          }
 

	
 
        """
 

	
 
        try:
 
            rm_obsolete = Optional.extract(remove_obsolete)
 
            added, removed = repo2db_mapper(ScmModel().repo_scan(),
 
                                            remove_obsolete=rm_obsolete)
 
            return {'added': added, 'removed': removed}
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'Error occurred during rescan repositories action'
 
            )
 

	
 
    def invalidate_cache(self, repoid):
 
        """
 
        Invalidate cache for repository.
 
        This command can be executed only using api_key belonging to user with admin
 
        rights or regular user that have write or admin or write access to repository.
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            'msg': Cache for repository `<repository name>` was invalidated,
 
            'repository': <repository name>
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            'Error occurred during cache invalidation action'
 
          }
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoPermissionLevel('write')(repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        try:
 
            ScmModel().mark_for_invalidation(repo.repo_name)
 
            return dict(
 
                msg='Cache for repository `%s` was invalidated' % (repoid,),
 
                repository=repo.repo_name
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'Error occurred during cache invalidation action'
 
            )
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def get_ip(self, userid=Optional(OAttr('apiuser'))):
 
        """
 
        Shows IP address as seen from Kallithea server, together with all
 
        defined IP addresses for given user. If userid is not passed data is
 
        returned for user who's calling this function.
 
        This command can be executed only using api_key belonging to user with
 
        admin rights.
 

	
 
        :param userid: username to show ips for
 
        :type userid: Optional(str or int)
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result : {
 
                         "server_ip_addr": "<ip_from_clien>",
 
                         "user_ips": [
 
                                        {
 
                                           "ip_addr": "<ip_with_mask>",
 
                                           "ip_range": ["<start_ip>", "<end_ip>"],
 
                                        },
 
                                        ...
 
                                     ]
 
            }
 

	
 
        """
 
        if isinstance(userid, Optional):
 
            userid = request.authuser.user_id
 
        user = get_user_or_error(userid)
 
        ips = UserIpMap.query().filter(UserIpMap.user == user).all()
 
        return dict(
 
            server_ip_addr=request.ip_addr,
 
            user_ips=ips
 
        )
 

	
 
    # alias for old
 
    show_ip = get_ip
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def get_server_info(self):
 
        """
 
        return server info, including Kallithea version and installed packages
 

	
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            'modules': [<module name>,...]
 
            'py_version': <python version>,
 
            'platform': <platform type>,
 
            'kallithea_version': <kallithea version>
 
          }
 
          error :  null
 
        """
 
        return Setting.get_server_info()
 

	
 
    def get_user(self, userid=Optional(OAttr('apiuser'))):
 
        """
 
        Gets a user by username or user_id, Returns empty result if user is
 
        not found. If userid param is skipped it is set to id of user who is
 
        calling this method. This command can be executed only using api_key
 
        belonging to user with admin rights, or regular users that cannot
 
        specify different userid than theirs
 

	
 
        :param userid: user to get data for
 
        :type userid: Optional(str or int)
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: None if user does not exist or
 
                    {
 
                        "user_id" :     "<user_id>",
 
                        "api_key" :     "<api_key>",
 
                        "api_keys":     "[<list of all API keys including additional ones>]"
 
                        "username" :    "<username>",
 
                        "firstname":    "<firstname>",
 
                        "lastname" :    "<lastname>",
 
                        "email" :       "<email>",
 
                        "emails":       "[<list of all emails including additional ones>]",
 
                        "ip_addresses": "[<ip_address_for_user>,...]",
 
                        "active" :      "<bool: user active>",
 
                        "admin" :       "<bool: user is admin>",
 
                        "extern_name" : "<extern_name>",
 
                        "extern_type" : "<extern type>
 
                        "last_login":   "<last_login>",
 
                        "permissions": {
 
                            "global": ["hg.create.repository",
 
                                       "repository.read",
 
                                       "hg.register.manual_activate"],
 
                            "repositories": {"repo1": "repository.none"},
 
                            "repositories_groups": {"Group1": "group.read"}
 
                         },
 
                    }
 

	
 
            error:  null
 

	
 
        """
 
        if not HasPermissionAny('hg.admin')():
 
            # make sure normal user does not pass someone else userid,
 
            # he is not allowed to do that
 
            if not isinstance(userid, Optional) and userid != request.authuser.user_id:
 
                raise JSONRPCError(
 
                    'userid is not the same as your user'
 
                )
 

	
 
        if isinstance(userid, Optional):
 
            userid = request.authuser.user_id
 

	
 
        user = get_user_or_error(userid)
 
        data = user.get_api_data()
 
        data['permissions'] = AuthUser(user_id=user.user_id).permissions
 
        return data
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def get_users(self):
 
        """
 
        Lists all existing users. This command can be executed only using api_key
 
        belonging to user with admin rights.
 

	
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: [<user_object>, ...]
 
            error:  null
 
        """
 

	
 
        return [
 
            user.get_api_data()
 
            for user in User.query()
 
                .order_by(User.username)
 
                .filter_by(is_default_user=False)
 
        ]
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def create_user(self, username, email, password=Optional(''),
 
                    firstname=Optional(u''), lastname=Optional(u''),
 
                    active=Optional(True), admin=Optional(False),
 
                    extern_type=Optional(User.DEFAULT_AUTH_TYPE),
 
                    extern_name=Optional('')):
 
        """
 
        Creates new user. Returns new user object. This command can
 
        be executed only using api_key belonging to user with admin rights.
 

	
 
        :param username: new username
 
        :type username: str or int
 
        :param email: email
 
        :type email: str
 
        :param password: password
 
        :type password: Optional(str)
 
        :param firstname: firstname
 
        :type firstname: Optional(str)
 
        :param lastname: lastname
 
        :type lastname: Optional(str)
 
        :param active: active
 
        :type active: Optional(bool)
 
        :param admin: admin
 
        :type admin: Optional(bool)
 
        :param extern_name: name of extern
 
        :type extern_name: Optional(str)
 
        :param extern_type: extern_type
 
        :type extern_type: Optional(str)
 

	
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "created new user `<username>`",
 
                      "user": <user_obj>
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "user `<username>` already exist"
 
            or
 
            "email `<email>` already exist"
 
            or
 
            "failed to create user `<username>`"
 
          }
 

	
 
        """
 

	
 
        if User.get_by_username(username):
 
            raise JSONRPCError("user `%s` already exist" % (username,))
 

	
 
        if User.get_by_email(email):
 
            raise JSONRPCError("email `%s` already exist" % (email,))
 

	
 
        try:
 
            user = UserModel().create_or_update(
 
                username=Optional.extract(username),
 
                password=Optional.extract(password),
 
                email=Optional.extract(email),
 
                firstname=Optional.extract(firstname),
 
                lastname=Optional.extract(lastname),
 
                active=Optional.extract(active),
 
                admin=Optional.extract(admin),
 
                extern_type=Optional.extract(extern_type),
 
                extern_name=Optional.extract(extern_name)
 
            )
 
            Session().commit()
 
            return dict(
 
                msg='created new user `%s`' % username,
 
                user=user.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to create user `%s`' % (username,))
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def update_user(self, userid, username=Optional(None),
 
                    email=Optional(None), password=Optional(None),
 
                    firstname=Optional(None), lastname=Optional(None),
 
                    active=Optional(None), admin=Optional(None),
 
                    extern_type=Optional(None), extern_name=Optional(None)):
 
        """
 
        updates given user if such user exists. This command can
 
        be executed only using api_key belonging to user with admin rights.
 

	
 
        :param userid: userid to update
 
        :type userid: str or int
 
        :param username: new username
 
        :type username: str or int
 
        :param email: email
 
        :type email: str
 
        :param password: password
 
        :type password: Optional(str)
 
        :param firstname: firstname
 
        :type firstname: Optional(str)
 
        :param lastname: lastname
 
        :type lastname: Optional(str)
 
        :param active: active
 
        :type active: Optional(bool)
 
        :param admin: admin
 
        :type admin: Optional(bool)
 
        :param extern_name:
 
        :type extern_name: Optional(str)
 
        :param extern_type:
 
        :type extern_type: Optional(str)
 

	
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "updated user ID:<userid> <username>",
 
                      "user": <user_object>,
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to update user `<username>`"
 
          }
 

	
 
        """
 

	
 
        user = get_user_or_error(userid)
 

	
 
        # only non optional arguments will be stored in updates
 
        updates = {}
 

	
 
        try:
 

	
 
            store_update(updates, username, 'username')
 
            store_update(updates, password, 'password')
 
            store_update(updates, email, 'email')
 
            store_update(updates, firstname, 'name')
 
            store_update(updates, lastname, 'lastname')
 
            store_update(updates, active, 'active')
 
            store_update(updates, admin, 'admin')
 
            store_update(updates, extern_name, 'extern_name')
 
            store_update(updates, extern_type, 'extern_type')
 

	
 
            user = UserModel().update_user(user, **updates)
 
            Session().commit()
 
            return dict(
 
                msg='updated user ID:%s %s' % (user.user_id, user.username),
 
                user=user.get_api_data()
 
            )
 
        except DefaultUserException:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('editing default user is forbidden')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to update user `%s`' % (userid,))
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def delete_user(self, userid):
 
        """
 
        deletes given user if such user exists. This command can
 
        be executed only using api_key belonging to user with admin rights.
 

	
 
        :param userid: user to delete
 
        :type userid: str or int
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "deleted user ID:<userid> <username>",
 
                      "user": null
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to delete user ID:<userid> <username>"
 
          }
 

	
 
        """
 
        user = get_user_or_error(userid)
 

	
 
        try:
 
            UserModel().delete(userid)
 
            Session().commit()
 
            return dict(
 
                msg='deleted user ID:%s %s' % (user.user_id, user.username),
 
                user=None
 
            )
 
        except Exception:
 

	
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to delete user ID:%s %s'
 
                               % (user.user_id, user.username))
 

	
 
    # permission check inside
 
    def get_user_group(self, usergroupid):
 
        """
 
        Gets an existing user group. This command can be executed only using api_key
 
        belonging to user with admin rights or user who has at least
 
        read access to user group.
 

	
 
        :param usergroupid: id of user_group to edit
 
        :type usergroupid: str or int
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result : None if group not exist
 
                     {
 
                       "users_group_id" : "<id>",
 
                       "group_name" :     "<groupname>",
 
                       "active":          "<bool>",
 
                       "members" :  [<user_obj>,...]
 
                     }
 
            error : null
 

	
 
        """
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        data = user_group.get_api_data()
 
        return data
 

	
 
    # permission check inside
 
    def get_user_groups(self):
 
        """
 
        Lists all existing user groups. This command can be executed only using
 
        api_key belonging to user with admin rights or user who has at least
 
        read access to user group.
 

	
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result : [<user_group_obj>,...]
 
            error : null
 
        """
 

	
 
        return [
 
            user_group.get_api_data()
 
            for user_group in UserGroupList(UserGroup.query().all(), perm_level='read')
 
        ]
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.usergroup.create.true')
 
    def create_user_group(self, group_name, description=Optional(u''),
 
                          owner=Optional(OAttr('apiuser')), active=Optional(True)):
 
        """
 
        Creates new user group. This command can be executed only using api_key
 
        belonging to user with admin rights or an user who has create user group
 
        permission
 

	
 
        :param group_name: name of new user group
 
        :type group_name: str
 
        :param description: group description
 
        :type description: str
 
        :param owner: owner of group. If not passed apiuser is the owner
 
        :type owner: Optional(str or int)
 
        :param active: group is active
 
        :type active: Optional(bool)
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg": "created new user group `<groupname>`",
 
                      "user_group": <user_group_object>
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "user group `<group name>` already exist"
 
            or
 
            "failed to create group `<group name>`"
 
          }
 

	
 
        """
 

	
 
        if UserGroupModel().get_by_name(group_name):
 
            raise JSONRPCError("user group `%s` already exist" % (group_name,))
 

	
 
        try:
 
            if isinstance(owner, Optional):
 
                owner = request.authuser.user_id
 

	
 
            owner = get_user_or_error(owner)
 
            active = Optional.extract(active)
 
            description = Optional.extract(description)
 
            ug = UserGroupModel().create(name=group_name, description=description,
 
                                         owner=owner, active=active)
 
            Session().commit()
 
            return dict(
 
                msg='created new user group `%s`' % group_name,
 
                user_group=ug.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to create group `%s`' % (group_name,))
 

	
 
    # permission check inside
 
    def update_user_group(self, usergroupid, group_name=Optional(''),
 
                          description=Optional(''), owner=Optional(None),
 
                          active=Optional(True)):
 
        """
 
        Updates given usergroup.  This command can be executed only using api_key
 
        belonging to user with admin rights or an admin of given user group
 

	
 
        :param usergroupid: id of user group to update
 
        :type usergroupid: str or int
 
        :param group_name: name of new user group
 
        :type group_name: str
 
        :param description: group description
 
        :type description: str
 
        :param owner: owner of group.
 
        :type owner: Optional(str or int)
 
        :param active: group is active
 
        :type active: Optional(bool)
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "msg": 'updated user group ID:<user group id> <user group name>',
 
            "user_group": <user_group_object>
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to update user group `<user group name>`"
 
          }
 

	
 
        """
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        if not isinstance(owner, Optional):
 
            owner = get_user_or_error(owner)
 

	
 
        updates = {}
 
        store_update(updates, group_name, 'users_group_name')
 
        store_update(updates, description, 'user_group_description')
 
        store_update(updates, owner, 'owner')
 
        store_update(updates, active, 'users_group_active')
 
        try:
 
            UserGroupModel().update(user_group, updates)
 
            Session().commit()
 
            return dict(
 
                msg='updated user group ID:%s %s' % (user_group.users_group_id,
 
                                                     user_group.users_group_name),
 
                user_group=user_group.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to update user group `%s`' % (usergroupid,))
 

	
 
    # permission check inside
 
    def delete_user_group(self, usergroupid):
 
        """
 
        Delete given user group by user group id or name.
 
        This command can be executed only using api_key
 
        belonging to user with admin rights or an admin of given user group
 

	
 
        :param usergroupid:
 
        :type usergroupid: int
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "msg": "deleted user group ID:<user_group_id> <user_group_name>"
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to delete user group ID:<user_group_id> <user_group_name>"
 
            or
 
            "RepoGroup assigned to <repo_groups_list>"
 
          }
 

	
 
        """
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        try:
 
            UserGroupModel().delete(user_group)
 
            Session().commit()
 
            return dict(
 
                msg='deleted user group ID:%s %s' %
 
                    (user_group.users_group_id, user_group.users_group_name),
 
                user_group=None
 
            )
 
        except UserGroupsAssignedException as e:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(str(e))
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to delete user group ID:%s %s' %
 
                               (user_group.users_group_id,
 
                                user_group.users_group_name)
 
                               )
 

	
 
    # permission check inside
 
    def add_user_to_user_group(self, usergroupid, userid):
 
        """
 
        Adds a user to a user group. If user exists in that group success will be
 
        `false`. This command can be executed only using api_key
 
        belonging to user with admin rights  or an admin of given user group
 

	
 
        :param usergroupid:
 
        :type usergroupid: int
 
        :param userid:
 
        :type userid: int
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
              "success": True|False # depends on if member is in group
 
              "msg": "added member `<username>` to user group `<groupname>` |
 
                      User is already in that group"
 

	
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to add member to user group `<user_group_name>`"
 
          }
 

	
 
        """
 
        user = get_user_or_error(userid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        try:
 
            ugm = UserGroupModel().add_user_to_group(user_group, user)
 
            success = True if ugm is not True else False
 
            msg = 'added member `%s` to user group `%s`' % (
 
                user.username, user_group.users_group_name
 
            )
 
            msg = msg if success else 'User is already in that group'
 
            Session().commit()
 

	
 
            return dict(
 
                success=success,
 
                msg=msg
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to add member to user group `%s`' % (
 
                    user_group.users_group_name,
 
                )
 
            )
 

	
 
    # permission check inside
 
    def remove_user_from_user_group(self, usergroupid, userid):
 
        """
 
        Removes a user from a user group. If user is not in given group success will
 
        be `false`. This command can be executed only
 
        using api_key belonging to user with admin rights or an admin of given user group
 

	
 
        :param usergroupid:
 
        :param userid:
 

	
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "success":  True|False,  # depends on if member is in group
 
                      "msg": "removed member <username> from user group <groupname> |
 
                              User wasn't in group"
 
                    }
 
            error:  null
 

	
 
        """
 
        user = get_user_or_error(userid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        try:
 
            success = UserGroupModel().remove_user_from_group(user_group, user)
 
            msg = 'removed member `%s` from user group `%s`' % (
 
                user.username, user_group.users_group_name
 
            )
 
            msg = msg if success else "User wasn't in group"
 
            Session().commit()
 
            return dict(success=success, msg=msg)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to remove member from user group `%s`' % (
 
                    user_group.users_group_name,
 
                )
 
            )
 

	
 
    # permission check inside
 
    def get_repo(self, repoid,
 
                 with_revision_names=Optional(False),
 
                 with_pullrequests=Optional(False)):
 
        """
 
        Gets an existing repository by it's name or repository_id. Members will return
 
        either users_group or user associated to that repository. This command can be
 
        executed only using api_key belonging to user with admin
 
        rights or regular user that have at least read access to repository.
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            {
 
                "repo_id" :          "<repo_id>",
 
                "repo_name" :        "<reponame>"
 
                "repo_type" :        "<repo_type>",
 
                "clone_uri" :        "<clone_uri>",
 
                "enable_downloads":  "<bool>",
 
                "enable_statistics": "<bool>",
 
                "private":           "<bool>",
 
                "created_on" :       "<date_time_created>",
 
                "description" :      "<description>",
 
                "landing_rev":       "<landing_rev>",
 
                "last_changeset":    {
 
                                       "author":   "<full_author>",
 
                                       "date":     "<date_time_of_commit>",
 
                                       "message":  "<commit_message>",
 
                                       "raw_id":   "<raw_id>",
 
                                       "revision": "<numeric_revision>",
 
                                       "short_id": "<short_id>"
 
                                     }
 
                "owner":             "<repo_owner>",
 
                "fork_of":           "<name_of_fork_parent>",
 
                "members" :     [
 
                                  {
 
                                    "name":     "<username>",
 
                                    "type" :    "user",
 
                                    "permission" : "repository.(read|write|admin)"
 
                                  },
 
                                  …
 
                                  {
 
                                    "name":     "<usergroup name>",
 
                                    "type" :    "user_group",
 
                                    "permission" : "usergroup.(read|write|admin)"
 
                                  },
 
                                  …
 
                                ]
 
                 "followers":   [<user_obj>, ...],
 
                 <if with_revision_names == True>
 
                 "tags": {
 
                            "<tagname>": "<raw_id>",
 
                            ...
 
                         },
 
                 "branches": {
 
                            "<branchname>": "<raw_id>",
 
                            ...
 
                         },
 
                 "bookmarks": {
 
                            "<bookmarkname>": "<raw_id>",
 
                            ...
 
                         },
 
            }
 
          }
 
          error :  null
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 

	
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoPermissionLevel('read')(repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        members = []
 
        for user in repo.repo_to_perm:
 
            perm = user.permission.permission_name
 
            user = user.user
 
            user_data = {
 
                'name': user.username,
 
                'type': "user",
 
                'permission': perm
 
            }
 
            members.append(user_data)
 

	
 
        for user_group in repo.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group = user_group.users_group
 
            user_group_data = {
 
                'name': user_group.users_group_name,
 
                'type': "user_group",
 
                'permission': perm
 
            }
 
            members.append(user_group_data)
 

	
 
        followers = [
 
            uf.user.get_api_data()
 
            for uf in repo.followers
 
        ]
 

	
 
        data = repo.get_api_data(with_revision_names=Optional.extract(with_revision_names),
 
                                 with_pullrequests=Optional.extract(with_pullrequests))
 
        data['members'] = members
 
        data['followers'] = followers
 
        return data
 

	
 
    # permission check inside
 
    def get_repos(self):
 
        """
 
        Lists all existing repositories. This command can be executed only using
 
        api_key belonging to user with admin rights or regular user that have
 
        admin, write or read access to repository.
 

	
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: [
 
                      {
 
                        "repo_id" :          "<repo_id>",
 
                        "repo_name" :        "<reponame>"
 
                        "repo_type" :        "<repo_type>",
 
                        "clone_uri" :        "<clone_uri>",
 
                        "private": :         "<bool>",
 
                        "created_on" :       "<datetimecreated>",
 
                        "description" :      "<description>",
 
                        "landing_rev":       "<landing_rev>",
 
                        "owner":             "<repo_owner>",
 
                        "fork_of":           "<name_of_fork_parent>",
 
                        "enable_downloads":  "<bool>",
 
                        "enable_statistics": "<bool>",
 
                      },
 
                      …
 
                    ]
 
            error:  null
 
        """
 
        if not HasPermissionAny('hg.admin')():
 
            repos = RepoModel().get_all_user_repos(user=request.authuser.user_id)
 
        else:
 
            repos = Repository.query()
 

	
 
        return [
 
            repo.get_api_data()
 
            for repo in repos
 
        ]
 

	
 
    # permission check inside
 
    def get_repo_nodes(self, repoid, revision, root_path,
 
                       ret_type=Optional('all')):
 
        """
 
        returns a list of nodes and it's children in a flat list for a given path
 
        at given revision. It's possible to specify ret_type to show only `files` or
 
        `dirs`.  This command can be executed only using api_key belonging to
 
        user with admin rights or regular user that have at least read access to repository.
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param revision: revision for which listing should be done
 
        :type revision: str
 
        :param root_path: path from which start displaying
 
        :type root_path: str
 
        :param ret_type: return type 'all|files|dirs' nodes
 
        :type ret_type: Optional(str)
 

	
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: [
 
                      {
 
                        "name" :        "<name>"
 
                        "type" :        "<type>",
 
                      },
 
                      …
 
                    ]
 
            error:  null
 
        """
 
        repo = get_repo_or_error(repoid)
 

	
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoPermissionLevel('read')(repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        ret_type = Optional.extract(ret_type)
 
        _map = {}
 
        try:
 
            _d, _f = ScmModel().get_nodes(repo, revision, root_path,
 
                                          flat=False)
 
            _map = {
 
                'all': _d + _f,
 
                'files': _f,
 
                'dirs': _d,
 
            }
 
            return _map[ret_type]
 
        except KeyError:
 
            raise JSONRPCError('ret_type must be one of %s'
 
                               % (','.join(_map.keys())))
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to get repo: `%s` nodes' % repo.repo_name
 
            )
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
 
    def create_repo(self, repo_name, owner=Optional(OAttr('apiuser')),
 
                    repo_type=Optional('hg'), description=Optional(''),
 
                    private=Optional(False), clone_uri=Optional(None),
 
                    landing_rev=Optional('rev:tip'),
 
                    enable_statistics=Optional(False),
 
                    enable_downloads=Optional(False),
 
                    copy_permissions=Optional(False)):
 
        """
 
        Creates a repository. The repository name contains the full path, but the
 
        parent repository group must exist. For example "foo/bar/baz" require the groups
 
        "foo" and "bar" (with "foo" as parent), and create "baz" repository with
 
        "bar" as group. This command can be executed only using api_key
 
        belonging to user with admin rights or regular user that have create
 
        repository permission. Regular users cannot specify owner parameter
 

	
 
        :param repo_name: repository name
 
        :type repo_name: str
 
        :param owner: user_id or username
 
        :type owner: Optional(str)
 
        :param repo_type: 'hg' or 'git'
 
        :type repo_type: Optional(str)
 
        :param description: repository description
 
        :type description: Optional(str)
 
        :param private:
 
        :type private: bool
 
        :param clone_uri:
 
        :type clone_uri: str
 
        :param landing_rev: <rev_type>:<rev>
 
        :type landing_rev: str
 
        :param enable_downloads:
 
        :type enable_downloads: bool
 
        :param enable_statistics:
 
        :type enable_statistics: bool
 
        :param copy_permissions: Copy permission from group that repository is
 
            being created.
 
        :type copy_permissions: bool
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg": "Created new repository `<reponame>`",
 
                      "success": true,
 
                      "task": "<celery task id or None if done sync>"
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
             'failed to create repository `<repo_name>`
 
          }
 

	
 
        """
 
        if not HasPermissionAny('hg.admin')():
 
            if not isinstance(owner, Optional):
 
                # forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
 
        if isinstance(owner, Optional):
 
            owner = request.authuser.user_id
 

	
 
        owner = get_user_or_error(owner)
 

	
 
        if RepoModel().get_by_repo_name(repo_name):
 
            raise JSONRPCError("repo `%s` already exist" % repo_name)
 

	
 
        defs = Setting.get_default_repo_settings(strip_prefix=True)
 
        if isinstance(private, Optional):
 
            private = defs.get('repo_private') or Optional.extract(private)
 
        if isinstance(repo_type, Optional):
 
            repo_type = defs.get('repo_type')
 
        if isinstance(enable_statistics, Optional):
 
            enable_statistics = defs.get('repo_enable_statistics')
 
        if isinstance(enable_downloads, Optional):
 
            enable_downloads = defs.get('repo_enable_downloads')
 

	
 
        clone_uri = Optional.extract(clone_uri)
 
        description = Optional.extract(description)
 
        landing_rev = Optional.extract(landing_rev)
 
        copy_permissions = Optional.extract(copy_permissions)
 

	
 
        try:
 
            repo_name_parts = repo_name.split('/')
 
            repo_group = None
 
            if len(repo_name_parts) > 1:
 
                group_name = '/'.join(repo_name_parts[:-1])
 
                repo_group = RepoGroup.get_by_group_name(group_name)
 
                if repo_group is None:
 
                    raise JSONRPCError("repo group `%s` not found" % group_name)
 
            data = dict(
 
                repo_name=repo_name_parts[-1],
 
                repo_name_full=repo_name,
 
                repo_type=repo_type,
 
                repo_description=description,
 
                owner=owner,
 
                repo_private=private,
 
                clone_uri=clone_uri,
 
                repo_group=repo_group,
 
                repo_landing_rev=landing_rev,
 
                enable_statistics=enable_statistics,
 
                enable_downloads=enable_downloads,
 
                repo_copy_permissions=copy_permissions,
 
            )
 

	
 
            task = RepoModel().create(form_data=data, cur_user=owner)
 
            task_id = task.task_id
 
            # no commit, it's done in RepoModel, or async via celery
 
            return dict(
 
                msg="Created new repository `%s`" % (repo_name,),
 
                success=True,  # cannot return the repo data here since fork
 
                               # can be done async
 
                task=task_id
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to create repository `%s`' % (repo_name,))
 

	
 
    # permission check inside
 
    def update_repo(self, repoid, name=Optional(None),
 
                    owner=Optional(OAttr('apiuser')),
 
                    group=Optional(None),
 
                    description=Optional(''), private=Optional(False),
 
                    clone_uri=Optional(None), landing_rev=Optional('rev:tip'),
 
                    enable_statistics=Optional(False),
 
                    enable_downloads=Optional(False)):
 

	
 
        """
 
        Updates repo
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param name:
 
        :param owner:
 
        :param group:
 
        :param description:
 
        :param private:
 
        :param clone_uri:
 
        :param landing_rev:
 
        :param enable_statistics:
 
        :param enable_downloads:
 
        """
 
        repo = get_repo_or_error(repoid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoPermissionLevel('admin')(repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
            if (name != repo.repo_name and
 
                not HasPermissionAny('hg.create.repository')()
 
                ):
 
                raise JSONRPCError('no permission to create (or move) repositories')
 

	
 
            if not isinstance(owner, Optional):
 
                # forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
 

	
 
        updates = {}
 
        repo_group = group
 
        if not isinstance(repo_group, Optional):
 
            repo_group = get_repo_group_or_error(repo_group)
 
            repo_group = repo_group.group_id
 
        try:
 
            store_update(updates, name, 'repo_name')
 
            store_update(updates, repo_group, 'repo_group')
 
            store_update(updates, owner, 'owner')
 
            store_update(updates, description, 'repo_description')
 
            store_update(updates, private, 'repo_private')
 
            store_update(updates, clone_uri, 'clone_uri')
 
            store_update(updates, landing_rev, 'repo_landing_rev')
 
            store_update(updates, enable_statistics, 'repo_enable_statistics')
 
            store_update(updates, enable_downloads, 'repo_enable_downloads')
 

	
 
            RepoModel().update(repo, **updates)
 
            Session().commit()
 
            return dict(
 
                msg='updated repo ID:%s %s' % (repo.repo_id, repo.repo_name),
 
                repository=repo.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to update repo `%s`' % repoid)
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
 
    def fork_repo(self, repoid, fork_name,
 
                  owner=Optional(OAttr('apiuser')),
 
                  description=Optional(''), copy_permissions=Optional(False),
 
                  private=Optional(False), landing_rev=Optional('rev:tip')):
 
        """
 
        Creates a fork of given repo. In case of using celery this will
 
        immediately return success message, while fork is going to be created
 
        asynchronous. This command can be executed only using api_key belonging to
 
        user with admin rights or regular user that have fork permission, and at least
 
        read access to forking repository. Regular users cannot specify owner parameter.
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param fork_name:
 
        :param owner:
 
        :param description:
 
        :param copy_permissions:
 
        :param private:
 
        :param landing_rev:
 

	
 
        INPUT::
 

	
 
            id : <id_for_response>
 
            api_key : "<api_key>"
 
            args:     {
 
                        "repoid" :          "<reponame or repo_id>",
 
                        "fork_name":        "<forkname>",
 
                        "owner":            "<username or user_id = Optional(=apiuser)>",
 
                        "description":      "<description>",
 
                        "copy_permissions": "<bool>",
 
                        "private":          "<bool>",
 
                        "landing_rev":      "<landing_rev>"
 
                      }
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg": "Created fork of `<reponame>` as `<forkname>`",
 
                      "success": true,
 
                      "task": "<celery task id or None if done sync>"
 
                    }
 
            error:  null
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 
        repo_name = repo.repo_name
 

	
 
        _repo = RepoModel().get_by_repo_name(fork_name)
 
        if _repo:
 
            type_ = 'fork' if _repo.fork else 'repo'
 
            raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
 

	
 
        if HasPermissionAny('hg.admin')():
 
            pass
 
        elif HasRepoPermissionLevel('read')(repo.repo_name):
 
            if not isinstance(owner, Optional):
 
                # forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
 

	
 
            if not HasPermissionAny('hg.create.repository')():
 
                raise JSONRPCError('no permission to create repositories')
 
        else:
 
            raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        if isinstance(owner, Optional):
 
            owner = request.authuser.user_id
 

	
 
        owner = get_user_or_error(owner)
 

	
 
        try:
 
            fork_name_parts = fork_name.split('/')
 
            repo_group = None
 
            if len(fork_name_parts) > 1:
 
                group_name = '/'.join(fork_name_parts[:-1])
 
                repo_group = RepoGroup.get_by_group_name(group_name)
 
                if repo_group is None:
 
                    raise JSONRPCError("repo group `%s` not found" % group_name)
 

	
 
            form_data = dict(
 
                repo_name=fork_name_parts[-1],
 
                repo_name_full=fork_name,
 
                repo_group=repo_group,
 
                repo_type=repo.repo_type,
 
                description=Optional.extract(description),
 
                private=Optional.extract(private),
 
                copy_permissions=Optional.extract(copy_permissions),
 
                landing_rev=Optional.extract(landing_rev),
 
                update_after_clone=False,
 
                fork_parent_id=repo.repo_id,
 
            )
 
            task = RepoModel().create_fork(form_data, cur_user=owner)
 
            # no commit, it's done in RepoModel, or async via celery
 
            task_id = task.task_id
 
            return dict(
 
                msg='Created fork of `%s` as `%s`' % (repo.repo_name,
 
                                                      fork_name),
 
                success=True,  # cannot return the repo data here since fork
 
                               # can be done async
 
                task=task_id
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to fork repository `%s` as `%s`' % (repo_name,
 
                                                            fork_name)
 
            )
 

	
 
    # permission check inside
 
    def delete_repo(self, repoid, forks=Optional('')):
 
        """
 
        Deletes a repository. This command can be executed only using api_key belonging
 
        to user with admin rights or regular user that have admin access to repository.
 
        When `forks` param is set it's possible to detach or delete forks of deleting
 
        repository
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param forks: `detach` or `delete`, what do do with attached forks for repo
 
        :type forks: Optional(str)
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg": "Deleted repository `<reponame>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 

	
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoPermissionLevel('admin')(repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
        try:
 
            handle_forks = Optional.extract(forks)
 
            _forks_msg = ''
 
            _forks = [f for f in repo.forks]
 
            if handle_forks == 'detach':
 
                _forks_msg = ' ' + 'Detached %s forks' % len(_forks)
 
            elif handle_forks == 'delete':
 
                _forks_msg = ' ' + 'Deleted %s forks' % len(_forks)
 
            elif _forks:
 
                raise JSONRPCError(
 
                    'Cannot delete `%s` it still contains attached forks' %
 
                    (repo.repo_name,)
 
                )
 

	
 
            RepoModel().delete(repo, forks=forks)
 
            Session().commit()
 
            return dict(
 
                msg='Deleted repository `%s`%s' % (repo.repo_name, _forks_msg),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to delete repository `%s`' % (repo.repo_name,)
 
            )
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def grant_user_permission(self, repoid, userid, perm):
 
        """
 
        Grant permission for user on given repository, or update existing one
 
        if found. This command can be executed only using api_key belonging to user
 
        with admin rights.
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param userid:
 
        :param perm: (repository.(none|read|write|admin))
 
        :type perm: str
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Granted perm: `<perm>` for user: `<username>` in repo: `<reponame>`",
 
                      "success": true
 
                    }
 
            error:  null
 
        """
 
        repo = get_repo_or_error(repoid)
 
        user = get_user_or_error(userid)
 
        perm = get_perm_or_error(perm)
 

	
 
        try:
 

	
 
            RepoModel().grant_user_permission(repo=repo, user=user, perm=perm)
 

	
 
            Session().commit()
 
            return dict(
 
                msg='Granted perm: `%s` for user: `%s` in repo: `%s`' % (
 
                    perm.permission_name, user.username, repo.repo_name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user: `%s` in repo: `%s`' % (
 
                    userid, repoid
 
                )
 
            )
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def revoke_user_permission(self, repoid, userid):
 
        """
 
        Revoke permission for user on given repository. This command can be executed
 
        only using api_key belonging to user with admin rights.
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param userid:
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Revoked perm for user: `<username>` in repo: `<reponame>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        """
 

	
 
        repo = get_repo_or_error(repoid)
 
        user = get_user_or_error(userid)
 
        try:
 
            RepoModel().revoke_user_permission(repo=repo, user=user)
 
            Session().commit()
 
            return dict(
 
                msg='Revoked perm for user: `%s` in repo: `%s`' % (
 
                    user.username, repo.repo_name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user: `%s` in repo: `%s`' % (
 
                    userid, repoid
 
                )
 
            )
 

	
 
    # permission check inside
 
    def grant_user_group_permission(self, repoid, usergroupid, perm):
 
        """
 
        Grant permission for user group on given repository, or update
 
        existing one if found. This command can be executed only using
 
        api_key belonging to user with admin rights.
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param usergroupid: id of usergroup
 
        :type usergroupid: str or int
 
        :param perm: (repository.(none|read|write|admin))
 
        :type perm: str
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "msg" : "Granted perm: `<perm>` for group: `<usersgroupname>` in repo: `<reponame>`",
 
            "success": true
 

	
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to edit permission for user group: `<usergroup>` in repo `<repo>`'
 
          }
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 
        perm = get_perm_or_error(perm)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoPermissionLevel('admin')(repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
            if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        try:
 
            RepoModel().grant_user_group_permission(
 
                repo=repo, group_name=user_group, perm=perm)
 

	
 
            Session().commit()
 
            return dict(
 
                msg='Granted perm: `%s` for user group: `%s` in '
 
                    'repo: `%s`' % (
 
                        perm.permission_name, user_group.users_group_name,
 
                        repo.repo_name
 
                    ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user group: `%s` in '
 
                'repo: `%s`' % (
 
                    usergroupid, repo.repo_name
 
                )
 
            )
 

	
 
    # permission check inside
 
    def revoke_user_group_permission(self, repoid, usergroupid):
 
        """
 
        Revoke permission for user group on given repository. This command can be
 
        executed only using api_key belonging to user with admin rights.
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param usergroupid:
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Revoked perm for group: `<usersgroupname>` in repo: `<reponame>`",
 
                      "success": true
 
                    }
 
            error:  null
 
        """
 
        repo = get_repo_or_error(repoid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoPermissionLevel('admin')(repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
            if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
 
                raise JSONRPCError('user group `%s` does not exist' % (usergroupid,))
 

	
 
        try:
 
            RepoModel().revoke_user_group_permission(
 
                repo=repo, group_name=user_group)
 

	
 
            Session().commit()
 
            return dict(
 
                msg='Revoked perm for user group: `%s` in repo: `%s`' % (
 
                    user_group.users_group_name, repo.repo_name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user group: `%s` in '
 
                'repo: `%s`' % (
 
                    user_group.users_group_name, repo.repo_name
 
                )
 
            )
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def get_repo_group(self, repogroupid):
 
        """
 
        Returns given repo group together with permissions, and repositories
 
        inside the group
 

	
 
        :param repogroupid: id/name of repository group
 
        :type repogroupid: str or int
 
        """
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        members = []
 
        for user in repo_group.repo_group_to_perm:
 
            perm = user.permission.permission_name
 
            user = user.user
 
            user_data = {
 
                'name': user.username,
 
                'type': "user",
 
                'permission': perm
 
            }
 
            members.append(user_data)
 

	
 
        for user_group in repo_group.users_group_to_perm:
 
            perm = user_group.permission.permission_name
 
            user_group = user_group.users_group
 
            user_group_data = {
 
                'name': user_group.users_group_name,
 
                'type': "user_group",
 
                'permission': perm
 
            }
 
            members.append(user_group_data)
 

	
 
        data = repo_group.get_api_data()
 
        data["members"] = members
 
        return data
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def get_repo_groups(self):
 
        """
 
        Returns all repository groups
 

	
 
        """
 
        return [
 
            repo_group.get_api_data()
 
            for repo_group in RepoGroup.query()
 
        ]
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def create_repo_group(self, group_name, description=Optional(''),
 
                          owner=Optional(OAttr('apiuser')),
 
                          parent=Optional(None),
 
                          copy_permissions=Optional(False)):
 
        """
 
        Creates a repository group. This command can be executed only using
 
        api_key belonging to user with admin rights.
 

	
 
        :param group_name:
 
        :type group_name:
 
        :param description:
 
        :type description:
 
        :param owner:
 
        :type owner:
 
        :param parent:
 
        :type parent:
 
        :param copy_permissions:
 
        :type copy_permissions:
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
              "msg": "created new repo group `<repo_group_name>`"
 
              "repo_group": <repogroup_object>
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            failed to create repo group `<repogroupid>`
 
          }
 

	
 
        """
 
        if RepoGroup.get_by_group_name(group_name):
 
            raise JSONRPCError("repo group `%s` already exist" % (group_name,))
 

	
 
        if isinstance(owner, Optional):
 
            owner = request.authuser.user_id
 
        group_description = Optional.extract(description)
 
        parent_group = Optional.extract(parent)
 
        if not isinstance(parent, Optional):
 
            parent_group = get_repo_group_or_error(parent_group)
 

	
 
        copy_permissions = Optional.extract(copy_permissions)
 
        try:
 
            repo_group = RepoGroupModel().create(
 
                group_name=group_name,
 
                group_description=group_description,
 
                owner=owner,
 
                parent=parent_group,
 
                copy_permissions=copy_permissions
 
            )
 
            Session().commit()
 
            return dict(
 
                msg='created new repo group `%s`' % group_name,
 
                repo_group=repo_group.get_api_data()
 
            )
 
        except Exception:
 

	
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to create repo group `%s`' % (group_name,))
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def update_repo_group(self, repogroupid, group_name=Optional(''),
 
                          description=Optional(''),
 
                          owner=Optional(OAttr('apiuser')),
 
                          parent=Optional(None)):
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        updates = {}
 
        try:
 
            store_update(updates, group_name, 'group_name')
 
            store_update(updates, description, 'group_description')
 
            store_update(updates, owner, 'owner')
 
            store_update(updates, parent, 'parent_group')
 
            repo_group = RepoGroupModel().update(repo_group, updates)
 
            Session().commit()
 
            return dict(
 
                msg='updated repository group ID:%s %s' % (repo_group.group_id,
 
                                                           repo_group.group_name),
 
                repo_group=repo_group.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to update repository group `%s`'
 
                               % (repogroupid,))
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def delete_repo_group(self, repogroupid):
 
        """
 

	
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            'msg': 'deleted repo group ID:<repogroupid> <repogroupname>
 
            'repo_group': null
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to delete repo group ID:<repogroupid> <repogroupname>"
 
          }
 

	
 
        """
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        try:
 
            RepoGroupModel().delete(repo_group)
 
            Session().commit()
 
            return dict(
 
                msg='deleted repo group ID:%s %s' %
 
                    (repo_group.group_id, repo_group.group_name),
 
                repo_group=None
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to delete repo group ID:%s %s' %
 
                               (repo_group.group_id, repo_group.group_name)
 
                               )
 

	
 
    # permission check inside
 
    def grant_user_permission_to_repo_group(self, repogroupid, userid,
 
                                            perm, apply_to_children=Optional('none')):
 
        """
 
        Grant permission for user on given repository group, or update existing
 
        one if found. This command can be executed only using api_key belonging
 
        to user with admin rights, or user who has admin right to given repository
 
        group.
 

	
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 
        :param userid:
 
        :param perm: (group.(none|read|write|admin))
 
        :type perm: str
 
        :param apply_to_children: 'none', 'repos', 'groups', 'all'
 
        :type apply_to_children: str
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Granted perm: `<perm>` (recursive:<apply_to_children>) for user: `<username>` in repo group: `<repo_group_name>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to edit permission for user: `<userid>` in repo group: `<repo_group_name>`"
 
          }
 

	
 
        """
 

	
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
 
                raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
 

	
 
        user = get_user_or_error(userid)
 
        perm = get_perm_or_error(perm, prefix='group.')
 
        apply_to_children = Optional.extract(apply_to_children)
 

	
 
        try:
 
            RepoGroupModel().add_permission(repo_group=repo_group,
 
                                            obj=user,
 
                                            obj_type="user",
 
                                            perm=perm,
 
                                            recursive=apply_to_children)
 
            Session().commit()
 
            return dict(
 
                msg='Granted perm: `%s` (recursive:%s) for user: `%s` in repo group: `%s`' % (
 
                    perm.permission_name, apply_to_children, user.username, repo_group.name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user: `%s` in repo group: `%s`' % (
 
                    userid, repo_group.name))
 

	
 
    # permission check inside
 
    def revoke_user_permission_from_repo_group(self, repogroupid, userid,
 
                                               apply_to_children=Optional('none')):
 
        """
 
        Revoke permission for user on given repository group. This command can
 
        be executed only using api_key belonging to user with admin rights, or
 
        user who has admin right to given repository group.
 

	
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 
        :param userid:
 
        :type userid:
 
        :param apply_to_children: 'none', 'repos', 'groups', 'all'
 
        :type apply_to_children: str
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Revoked perm (recursive:<apply_to_children>) for user: `<username>` in repo group: `<repo_group_name>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to edit permission for user: `<userid>` in repo group: `<repo_group_name>`"
 
          }
 

	
 
        """
 

	
 
        repo_group = get_repo_group_or_error(repogroupid)
 

	
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
 
                raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,))
 

	
 
        user = get_user_or_error(userid)
 
        apply_to_children = Optional.extract(apply_to_children)
 

	
 
        try:
 
            RepoGroupModel().delete_permission(repo_group=repo_group,
 
                                               obj=user,
 
                                               obj_type="user",
 
                                               recursive=apply_to_children)
 

	
 
            Session().commit()
 
            return dict(
 
                msg='Revoked perm (recursive:%s) for user: `%s` in repo group: `%s`' % (
 
                    apply_to_children, user.username, repo_group.name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user: `%s` in repo group: `%s`' % (
 
                    userid, repo_group.name))
 

	
 
    # permission check inside
 
    def grant_user_group_permission_to_repo_group(
 
            self, repogroupid, usergroupid, perm,
 
            apply_to_children=Optional('none')):
 
        """
 
        Grant permission for user group on given repository group, or update
 
        existing one if found. This command can be executed only using
 
        api_key belonging to user with admin rights, or user who has admin
 
        right to given repository group.
 

	
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 
        :param usergroupid: id of usergroup
 
        :type usergroupid: str or int
 
        :param perm: (group.(none|read|write|admin))
 
        :type perm: str
 
        :param apply_to_children: 'none', 'repos', 'groups', 'all'
 
        :type apply_to_children: str
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "msg" : "Granted perm: `<perm>` (recursive:<apply_to_children>) for user group: `<usersgroupname>` in repo group: `<repo_group_name>`",
 
            "success": true
 

	
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to edit permission for user group: `<usergroup>` in repo group: `<repo_group_name>`"
 
          }
 

	
 
        """
 
        repo_group = get_repo_group_or_error(repogroupid)
 
        perm = get_perm_or_error(perm, prefix='group.')
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
 
                raise JSONRPCError(
 
                    'repository group `%s` does not exist' % (repogroupid,))
 

	
 
            if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
 
                raise JSONRPCError(
 
                    'user group `%s` does not exist' % (usergroupid,))
 

	
 
        apply_to_children = Optional.extract(apply_to_children)
 

	
 
        try:
 
            RepoGroupModel().add_permission(repo_group=repo_group,
 
                                            obj=user_group,
 
                                            obj_type="user_group",
 
                                            perm=perm,
 
                                            recursive=apply_to_children)
 
            Session().commit()
 
            return dict(
 
                msg='Granted perm: `%s` (recursive:%s) for user group: `%s` in repo group: `%s`' % (
 
                    perm.permission_name, apply_to_children,
 
                    user_group.users_group_name, repo_group.name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user group: `%s` in '
 
                'repo group: `%s`' % (
 
                    usergroupid, repo_group.name
 
                )
 
            )
 

	
 
    # permission check inside
 
    def revoke_user_group_permission_from_repo_group(
 
            self, repogroupid, usergroupid,
 
            apply_to_children=Optional('none')):
 
        """
 
        Revoke permission for user group on given repository. This command can be
 
        executed only using api_key belonging to user with admin rights, or
 
        user who has admin right to given repository group.
 

	
 
        :param repogroupid: name or id of repository group
 
        :type repogroupid: str or int
 
        :param usergroupid:
 
        :param apply_to_children: 'none', 'repos', 'groups', 'all'
 
        :type apply_to_children: str
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg" : "Revoked perm (recursive:<apply_to_children>) for user group: `<usersgroupname>` in repo group: `<repo_group_name>`",
 
                      "success": true
 
                    }
 
            error:  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to edit permission for user group: `<usergroup>` in repo group: `<repo_group_name>`"
 
          }
 

	
 

	
 
        """
 
        repo_group = get_repo_group_or_error(repogroupid)
 
        user_group = get_user_group_or_error(usergroupid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoGroupPermissionLevel('admin')(repo_group.group_name):
 
                raise JSONRPCError(
 
                    'repository group `%s` does not exist' % (repogroupid,))
 

	
 
            if not HasUserGroupPermissionLevel('read')(user_group.users_group_name):
 
                raise JSONRPCError(
 
                    'user group `%s` does not exist' % (usergroupid,))
 

	
 
        apply_to_children = Optional.extract(apply_to_children)
 

	
 
        try:
 
            RepoGroupModel().delete_permission(repo_group=repo_group,
 
                                               obj=user_group,
 
                                               obj_type="user_group",
 
                                               recursive=apply_to_children)
 
            Session().commit()
 
            return dict(
 
                msg='Revoked perm (recursive:%s) for user group: `%s` in repo group: `%s`' % (
 
                    apply_to_children, user_group.users_group_name, repo_group.name
 
                ),
 
                success=True
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to edit permission for user group: `%s` in repo group: `%s`' % (
 
                    user_group.users_group_name, repo_group.name
 
                )
 
            )
 

	
 
    def get_gist(self, gistid):
 
        """
 
        Get given gist by id
 

	
 
        :param gistid: id of private or public gist
 
        :type gistid: str
 
        """
 
        gist = get_gist_or_error(gistid)
 
        if not HasPermissionAny('hg.admin')():
 
            if gist.owner_id != request.authuser.user_id:
 
                raise JSONRPCError('gist `%s` does not exist' % (gistid,))
 
        return gist.get_api_data()
 

	
 
    def get_gists(self, userid=Optional(OAttr('apiuser'))):
 
        """
 
        Get all gists for given user. If userid is empty returned gists
 
        are for user who called the api
 

	
 
        :param userid: user to get gists for
 
        :type userid: Optional(str or int)
 
        """
 
        if not HasPermissionAny('hg.admin')():
 
            # make sure normal user does not pass someone else userid,
 
            # he is not allowed to do that
 
            if not isinstance(userid, Optional) and userid != request.authuser.user_id:
 
                raise JSONRPCError(
 
                    'userid is not the same as your user'
 
                )
 

	
 
        if isinstance(userid, Optional):
 
            user_id = request.authuser.user_id
 
        else:
 
            user_id = get_user_or_error(userid).user_id
 

	
 
        return [
 
            gist.get_api_data()
 
            for gist in Gist().query()
 
                .filter_by(is_expired=False)
 
                .filter(Gist.owner_id == user_id)
 
                .order_by(Gist.created_on.desc())
 
        ]
 

	
 
    def create_gist(self, files, owner=Optional(OAttr('apiuser')),
 
                    gist_type=Optional(Gist.GIST_PUBLIC), lifetime=Optional(-1),
 
                    description=Optional('')):
 

	
 
        """
 
        Creates new Gist
 

	
 
        :param files: files to be added to gist
 
            {'filename': {'content':'...', 'lexer': null},
 
             'filename2': {'content':'...', 'lexer': null}}
 
        :type files: dict
 
        :param owner: gist owner, defaults to api method caller
 
        :type owner: Optional(str or int)
 
        :param gist_type: type of gist 'public' or 'private'
 
        :type gist_type: Optional(str)
 
        :param lifetime: time in minutes of gist lifetime
 
        :type lifetime: Optional(int)
 
        :param description: gist description
 
        :type description: Optional(str)
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "msg": "created new gist",
 
            "gist": {}
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to create gist"
 
          }
 

	
 
        """
 
        try:
 
            if isinstance(owner, Optional):
 
                owner = request.authuser.user_id
 

	
 
            owner = get_user_or_error(owner)
 
            description = Optional.extract(description)
 
            gist_type = Optional.extract(gist_type)
 
            lifetime = Optional.extract(lifetime)
 

	
 
            gist = GistModel().create(description=description,
 
                                      owner=owner,
 
                                      ip_addr=request.ip_addr,
 
                                      gist_mapping=files,
 
                                      gist_type=gist_type,
 
                                      lifetime=lifetime)
 
            Session().commit()
 
            return dict(
 
                msg='created new gist',
 
                gist=gist.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to create gist')
 

	
 
    # def update_gist(self, gistid, files, owner=Optional(OAttr('apiuser')),
 
    #                 gist_type=Optional(Gist.GIST_PUBLIC),
 
    #                 gist_lifetime=Optional(-1), gist_description=Optional('')):
 
    #     gist = get_gist_or_error(gistid)
 
    #     updates = {}
 

	
 
    # permission check inside
 
    def delete_gist(self, gistid):
 
        """
 
        Deletes existing gist
 

	
 
        :param gistid: id of gist to delete
 
        :type gistid: str
 

	
 
        OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : {
 
            "deleted gist ID: <gist_id>",
 
            "gist": null
 
          }
 
          error :  null
 

	
 
        ERROR OUTPUT::
 

	
 
          id : <id_given_in_input>
 
          result : null
 
          error :  {
 
            "failed to delete gist ID:<gist_id>"
 
          }
 

	
 
        """
 
        gist = get_gist_or_error(gistid)
 
        if not HasPermissionAny('hg.admin')():
 
            if gist.owner_id != request.authuser.user_id:
 
                raise JSONRPCError('gist `%s` does not exist' % (gistid,))
 

	
 
        try:
 
            GistModel().delete(gist)
 
            Session().commit()
 
            return dict(
 
                msg='deleted gist ID:%s' % (gist.gist_access_id,),
 
                gist=None
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to delete gist ID:%s'
 
                               % (gist.gist_access_id,))
 

	
 
    # permission check inside
 
    def get_changesets(self, repoid, start=None, end=None, start_date=None,
 
                       end_date=None, branch_name=None, reverse=False, with_file_list=False, max_revisions=None):
 
        repo = get_repo_or_error(repoid)
 
        if not HasRepoPermissionLevel('read')(repo.repo_name):
 
            raise JSONRPCError('Access denied to repo %s' % repo.repo_name)
 

	
 
        format = "%Y-%m-%dT%H:%M:%S"
 
        try:
 
            return [e.__json__(with_file_list) for e in
 
                repo.scm_instance.get_changesets(start,
 
                                                 end,
 
                                                 datetime.strptime(start_date, format) if start_date else None,
 
                                                 datetime.strptime(end_date, format) if end_date else None,
 
                                                 branch_name,
 
                                                 reverse, max_revisions)]
 
        except EmptyRepositoryError as e:
 
            raise JSONRPCError(e.message)
 

	
 
    # permission check inside
 
    def get_changeset(self, repoid, raw_id, with_reviews=Optional(False)):
 
        repo = get_repo_or_error(repoid)
 
        if not HasRepoPermissionLevel('read')(repo.repo_name):
 
            raise JSONRPCError('Access denied to repo %s' % repo.repo_name)
 
        changeset = repo.get_changeset(raw_id)
 
        if isinstance(changeset, EmptyChangeset):
 
            raise JSONRPCError('Changeset %s does not exist' % raw_id)
 

	
 
        info = dict(changeset.as_dict())
 

	
 
        with_reviews = Optional.extract(with_reviews)
 
        if with_reviews:
 
                reviews = ChangesetStatusModel().get_statuses(
 
                                    repo.repo_name, raw_id)
 
                info["reviews"] = reviews
 
            reviews = ChangesetStatusModel().get_statuses(
 
                                repo.repo_name, raw_id)
 
            info["reviews"] = reviews
 

	
 
        return info
 

	
 
    # permission check inside
 
    def get_pullrequest(self, pullrequest_id):
 
        """
 
        Get given pull request by id
 
        """
 
        pull_request = PullRequest.get(pullrequest_id)
 
        if pull_request is None:
 
            raise JSONRPCError('pull request `%s` does not exist' % (pullrequest_id,))
 
        if not HasRepoPermissionLevel('read')(pull_request.org_repo.repo_name):
 
            raise JSONRPCError('not allowed')
 
        return pull_request.get_api_data()
 

	
 
    # permission check inside
 
    def comment_pullrequest(self, pull_request_id, comment_msg=u'', status=None, close_pr=False):
 
        """
 
        Add comment, close and change status of pull request.
 
        """
 
        apiuser = get_user_or_error(request.authuser.user_id)
 
        pull_request = PullRequest.get(pull_request_id)
 
        if pull_request is None:
 
            raise JSONRPCError('pull request `%s` does not exist' % (pull_request_id,))
 
        if (not HasRepoPermissionLevel('read')(pull_request.org_repo.repo_name)):
 
            raise JSONRPCError('No permission to add comment. User needs at least reading permissions'
 
                               ' to the source repository.')
 
        owner = apiuser.user_id == pull_request.owner_id
 
        reviewer = apiuser.user_id in [reviewer.user_id for reviewer in pull_request.reviewers]
 
        if close_pr and not (apiuser.admin or owner):
 
            raise JSONRPCError('No permission to close pull request. User needs to be admin or owner.')
 
        if status and not (apiuser.admin or owner or reviewer):
 
            raise JSONRPCError('No permission to change pull request status. User needs to be admin, owner or reviewer.')
 
        if pull_request.is_closed():
 
            raise JSONRPCError('pull request is already closed')
 

	
 
        comment = ChangesetCommentsModel().create(
 
            text=comment_msg,
 
            repo=pull_request.org_repo.repo_id,
 
            author=apiuser.user_id,
 
            pull_request=pull_request.pull_request_id,
 
            f_path=None,
 
            line_no=None,
 
            status_change=(ChangesetStatus.get_status_lbl(status)),
 
            closing_pr=close_pr
 
        )
 
        action_logger(apiuser,
 
                      'user_commented_pull_request:%s' % pull_request_id,
 
                      pull_request.org_repo, request.ip_addr)
 
        if status:
 
            ChangesetStatusModel().set_status(
 
                pull_request.org_repo_id,
 
                status,
 
                apiuser.user_id,
 
                comment,
 
                pull_request=pull_request_id
 
            )
 
        if close_pr:
 
            PullRequestModel().close_pull_request(pull_request_id)
 
            action_logger(apiuser,
 
                          'user_closed_pull_request:%s' % pull_request_id,
 
                          pull_request.org_repo, request.ip_addr)
 
        Session().commit()
 
        return True
kallithea/lib/utils2.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.utils2
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
Some simple helper functions.
 
Note: all these functions should be independent of Kallithea classes, i.e.
 
models, controllers, etc.  to prevent import cycles.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jan 5, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import binascii
 
import datetime
 
import os
 
import pwd
 
import re
 
import time
 
import urllib
 

	
 
import urlobject
 
from tg.i18n import ugettext as _
 
from tg.i18n import ungettext
 
from webhelpers2.text import collapse, remove_formatting, strip_tags
 

	
 
from kallithea.lib.compat import json
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 

	
 

	
 
def str2bool(_str):
 
    """
 
    returns True/False value from given string, it tries to translate the
 
    string into boolean
 

	
 
    :param _str: string value to translate into boolean
 
    :rtype: boolean
 
    :returns: boolean from given string
 
    """
 
    if _str is None:
 
        return False
 
    if _str in (True, False):
 
        return _str
 
    _str = str(_str).strip().lower()
 
    return _str in ('t', 'true', 'y', 'yes', 'on', '1')
 

	
 

	
 
def aslist(obj, sep=None, strip=True):
 
    """
 
    Returns given string separated by sep as list
 

	
 
    :param obj:
 
    :param sep:
 
    :param strip:
 
    """
 
    if isinstance(obj, (basestring)):
 
        lst = obj.split(sep)
 
        if strip:
 
            lst = [v.strip() for v in lst]
 
        return lst
 
    elif isinstance(obj, (list, tuple)):
 
        return obj
 
    elif obj is None:
 
        return []
 
    else:
 
        return [obj]
 

	
 

	
 
def convert_line_endings(line, mode):
 
    """
 
    Converts a given line  "line end" according to given mode
 

	
 
    Available modes are::
 
        0 - Unix
 
        1 - Mac
 
        2 - DOS
 

	
 
    :param line: given line to convert
 
    :param mode: mode to convert to
 
    :rtype: str
 
    :return: converted line according to mode
 
    """
 
    from string import replace
 

	
 
    if mode == 0:
 
            line = replace(line, '\r\n', '\n')
 
            line = replace(line, '\r', '\n')
 
        line = replace(line, '\r\n', '\n')
 
        line = replace(line, '\r', '\n')
 
    elif mode == 1:
 
            line = replace(line, '\r\n', '\r')
 
            line = replace(line, '\n', '\r')
 
        line = replace(line, '\r\n', '\r')
 
        line = replace(line, '\n', '\r')
 
    elif mode == 2:
 
            line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
 
        line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
 
    return line
 

	
 

	
 
def detect_mode(line, default):
 
    """
 
    Detects line break for given line, if line break couldn't be found
 
    given default value is returned
 

	
 
    :param line: str line
 
    :param default: default
 
    :rtype: int
 
    :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
 
    """
 
    if line.endswith('\r\n'):
 
        return 2
 
    elif line.endswith('\n'):
 
        return 0
 
    elif line.endswith('\r'):
 
        return 1
 
    else:
 
        return default
 

	
 

	
 
def generate_api_key():
 
    """
 
    Generates a random (presumably unique) API key.
 

	
 
    This value is used in URLs and "Bearer" HTTP Authorization headers,
 
    which in practice means it should only contain URL-safe characters
 
    (RFC 3986):
 

	
 
        unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
 
    """
 
    # Hexadecimal certainly qualifies as URL-safe.
 
    return binascii.hexlify(os.urandom(20))
 

	
 

	
 
def safe_int(val, default=None):
 
    """
 
    Returns int() of val if val is not convertable to int use default
 
    instead
 

	
 
    :param val:
 
    :param default:
 
    """
 

	
 
    try:
 
        val = int(val)
 
    except (ValueError, TypeError):
 
        val = default
 

	
 
    return val
 

	
 

	
 
def safe_unicode(str_, from_encoding=None):
 
    """
 
    safe unicode function. Does few trick to turn str_ into unicode
 

	
 
    In case of UnicodeDecode error we try to return it with encoding detected
 
    by chardet library if it fails fallback to unicode with errors replaced
 

	
 
    :param str_: string to decode
 
    :rtype: unicode
 
    :returns: unicode object
 
    """
 
    if isinstance(str_, unicode):
 
        return str_
 

	
 
    if not from_encoding:
 
        import kallithea
 
        DEFAULT_ENCODINGS = aslist(kallithea.CONFIG.get('default_encoding',
 
                                                        'utf-8'), sep=',')
 
        from_encoding = DEFAULT_ENCODINGS
 

	
 
    if not isinstance(from_encoding, (list, tuple)):
 
        from_encoding = [from_encoding]
 

	
 
    try:
 
        return unicode(str_)
 
    except UnicodeDecodeError:
 
        pass
 

	
 
    for enc in from_encoding:
 
        try:
 
            return unicode(str_, enc)
 
        except UnicodeDecodeError:
 
            pass
 

	
 
    try:
 
        import chardet
 
        encoding = chardet.detect(str_)['encoding']
 
        if encoding is None:
 
            raise Exception()
 
        return str_.decode(encoding)
 
    except (ImportError, UnicodeDecodeError, Exception):
 
        return unicode(str_, from_encoding[0], 'replace')
 

	
 

	
 
def safe_str(unicode_, to_encoding=None):
 
    """
 
    safe str function. Does few trick to turn unicode_ into string
 

	
 
    In case of UnicodeEncodeError we try to return it with encoding detected
 
    by chardet library if it fails fallback to string with errors replaced
 

	
 
    :param unicode_: unicode to encode
 
    :rtype: str
 
    :returns: str object
 
    """
 

	
 
    # if it's not basestr cast to str
 
    if not isinstance(unicode_, basestring):
 
        return str(unicode_)
 

	
 
    if isinstance(unicode_, str):
 
        return unicode_
 

	
 
    if not to_encoding:
 
        import kallithea
 
        DEFAULT_ENCODINGS = aslist(kallithea.CONFIG.get('default_encoding',
 
                                                        'utf-8'), sep=',')
 
        to_encoding = DEFAULT_ENCODINGS
 

	
 
    if not isinstance(to_encoding, (list, tuple)):
 
        to_encoding = [to_encoding]
 

	
 
    for enc in to_encoding:
 
        try:
 
            return unicode_.encode(enc)
 
        except UnicodeEncodeError:
 
            pass
 

	
 
    try:
 
        import chardet
 
        encoding = chardet.detect(unicode_)['encoding']
 
        if encoding is None:
 
            raise UnicodeEncodeError()
 

	
 
        return unicode_.encode(encoding)
 
    except (ImportError, UnicodeEncodeError):
 
        return unicode_.encode(to_encoding[0], 'replace')
 

	
 

	
 
def remove_suffix(s, suffix):
 
    if s.endswith(suffix):
 
        s = s[:-1 * len(suffix)]
 
    return s
 

	
 

	
 
def remove_prefix(s, prefix):
 
    if s.startswith(prefix):
 
        s = s[len(prefix):]
 
    return s
 

	
 

	
 
def age(prevdate, show_short_version=False, now=None):
 
    """
 
    turns a datetime into an age string.
 
    If show_short_version is True, then it will generate a not so accurate but shorter string,
 
    example: 2days ago, instead of 2 days and 23 hours ago.
 

	
 
    :param prevdate: datetime object
 
    :param show_short_version: if it should approximate the date and return a shorter string
 
    :rtype: unicode
 
    :returns: unicode words describing age
 
    """
 
    now = now or datetime.datetime.now()
 
    order = ['year', 'month', 'day', 'hour', 'minute', 'second']
 
    deltas = {}
 
    future = False
 

	
 
    if prevdate > now:
 
        now, prevdate = prevdate, now
 
        future = True
 
    if future:
 
        prevdate = prevdate.replace(microsecond=0)
 
    # Get date parts deltas
 
    from dateutil import relativedelta
 
    for part in order:
 
        d = relativedelta.relativedelta(now, prevdate)
 
        deltas[part] = getattr(d, part + 's')
 

	
 
    # Fix negative offsets (there is 1 second between 10:59:59 and 11:00:00,
 
    # not 1 hour, -59 minutes and -59 seconds)
 
    for num, length in [(5, 60), (4, 60), (3, 24)]:  # seconds, minutes, hours
 
        part = order[num]
 
        carry_part = order[num - 1]
 

	
 
        if deltas[part] < 0:
 
            deltas[part] += length
 
            deltas[carry_part] -= 1
 

	
 
    # Same thing for days except that the increment depends on the (variable)
 
    # number of days in the month
 
    month_lengths = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
 
    if deltas['day'] < 0:
 
        if prevdate.month == 2 and (prevdate.year % 4 == 0 and
 
            (prevdate.year % 100 != 0 or prevdate.year % 400 == 0)):
 
            deltas['day'] += 29
 
        else:
 
            deltas['day'] += month_lengths[prevdate.month - 1]
 

	
 
        deltas['month'] -= 1
 

	
 
    if deltas['month'] < 0:
 
        deltas['month'] += 12
 
        deltas['year'] -= 1
 

	
 
    # In short version, we want nicer handling of ages of more than a year
 
    if show_short_version:
 
        if deltas['year'] == 1:
 
            # ages between 1 and 2 years: show as months
 
            deltas['month'] += 12
 
            deltas['year'] = 0
 
        if deltas['year'] >= 2:
 
            # ages 2+ years: round
 
            if deltas['month'] > 6:
 
                deltas['year'] += 1
 
                deltas['month'] = 0
 

	
 
    # Format the result
 
    fmt_funcs = {
 
        'year': lambda d: ungettext(u'%d year', '%d years', d) % d,
 
        'month': lambda d: ungettext(u'%d month', '%d months', d) % d,
 
        'day': lambda d: ungettext(u'%d day', '%d days', d) % d,
 
        'hour': lambda d: ungettext(u'%d hour', '%d hours', d) % d,
 
        'minute': lambda d: ungettext(u'%d minute', '%d minutes', d) % d,
 
        'second': lambda d: ungettext(u'%d second', '%d seconds', d) % d,
 
    }
 

	
 
    for i, part in enumerate(order):
 
        value = deltas[part]
 
        if value == 0:
 
            continue
 

	
 
        if i < 5:
 
            sub_part = order[i + 1]
 
            sub_value = deltas[sub_part]
 
        else:
 
            sub_value = 0
 

	
 
        if sub_value == 0 or show_short_version:
 
            if future:
 
                return _('in %s') % fmt_funcs[part](value)
 
            else:
 
                return _('%s ago') % fmt_funcs[part](value)
 
        if future:
 
            return _('in %s and %s') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 
        else:
 
            return _('%s and %s ago') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 

	
 
    return _('just now')
 

	
 

	
 
def uri_filter(uri):
 
    """
 
    Removes user:password from given url string
 

	
 
    :param uri:
 
    :rtype: unicode
 
    :returns: filtered list of strings
 
    """
 
    if not uri:
 
        return ''
 

	
 
    proto = ''
 

	
 
    for pat in ('https://', 'http://', 'git://'):
 
        if uri.startswith(pat):
 
            uri = uri[len(pat):]
 
            proto = pat
 
            break
 

	
 
    # remove passwords and username
 
    uri = uri[uri.find('@') + 1:]
 

	
 
    # get the port
 
    cred_pos = uri.find(':')
 
    if cred_pos == -1:
 
        host, port = uri, None
 
    else:
 
        host, port = uri[:cred_pos], uri[cred_pos + 1:]
 

	
 
    return filter(None, [proto, host, port])
 

	
 

	
 
def credentials_filter(uri):
 
    """
 
    Returns a url with removed credentials
 

	
 
    :param uri:
 
    """
 

	
 
    uri = uri_filter(uri)
 
    # check if we have port
 
    if len(uri) > 2 and uri[2]:
 
        uri[2] = ':' + uri[2]
 

	
 
    return ''.join(uri)
 

	
 

	
 
def get_clone_url(clone_uri_tmpl, prefix_url, repo_name, repo_id, username=None):
 
    parsed_url = urlobject.URLObject(prefix_url)
 
    prefix = safe_unicode(urllib.unquote(parsed_url.path.rstrip('/')))
 
    try:
 
        system_user = pwd.getpwuid(os.getuid()).pw_name
 
    except Exception: # TODO: support all systems - especially Windows
 
        system_user = 'kallithea' # hardcoded default value ...
 
    args = {
 
        'scheme': parsed_url.scheme,
 
        'user': safe_unicode(urllib.quote(safe_str(username or ''))),
 
        'netloc': parsed_url.netloc + prefix,  # like "hostname:port/prefix" (with optional ":port" and "/prefix")
 
        'prefix': prefix, # undocumented, empty or starting with /
 
        'repo': repo_name,
 
        'repoid': str(repo_id),
 
        'system_user': safe_unicode(system_user),
 
        'hostname': parsed_url.hostname,
 
    }
 
    url = re.sub('{([^{}]+)}', lambda m: args.get(m.group(1), m.group(0)), clone_uri_tmpl)
 

	
 
    # remove leading @ sign if it's present. Case of empty user
 
    url_obj = urlobject.URLObject(url)
 
    if not url_obj.username:
 
        url_obj = url_obj.with_username(None)
 

	
 
    return safe_unicode(url_obj)
 

	
 

	
 
def get_changeset_safe(repo, rev):
 
    """
 
    Safe version of get_changeset if this changeset doesn't exists for a
 
    repo it returns a Dummy one instead
 

	
 
    :param repo:
 
    :param rev:
 
    """
 
    from kallithea.lib.vcs.backends.base import BaseRepository
 
    from kallithea.lib.vcs.exceptions import RepositoryError
 
    from kallithea.lib.vcs.backends.base import EmptyChangeset
 
    if not isinstance(repo, BaseRepository):
 
        raise Exception('You must pass an Repository '
 
                        'object as first argument got %s', type(repo))
 

	
 
    try:
 
        cs = repo.get_changeset(rev)
 
    except (RepositoryError, LookupError):
 
        cs = EmptyChangeset(requested_revision=rev)
 
    return cs
 

	
 

	
 
def datetime_to_time(dt):
 
    if dt:
 
        return time.mktime(dt.timetuple())
 

	
 

	
 
def time_to_datetime(tm):
 
    if tm:
 
        if isinstance(tm, basestring):
 
            try:
 
                tm = float(tm)
 
            except ValueError:
 
                return
 
        return datetime.datetime.fromtimestamp(tm)
 

	
 

	
 
# Must match regexp in kallithea/public/js/base.js MentionsAutoComplete()
 
# Check char before @ - it must not look like we are in an email addresses.
 
# Matching is greedy so we don't have to look beyond the end.
 
MENTIONS_REGEX = re.compile(r'(?:^|(?<=[^a-zA-Z0-9]))@([a-zA-Z0-9][-_.a-zA-Z0-9]*[a-zA-Z0-9])')
 

	
 

	
 
def extract_mentioned_usernames(text):
 
    r"""
 
    Returns list of (possible) usernames @mentioned in given text.
 

	
 
    >>> extract_mentioned_usernames('@1-2.a_X,@1234 not@not @ddd@not @n @ee @ff @gg, @gg;@hh @n\n@zz,')
 
    ['1-2.a_X', '1234', 'ddd', 'ee', 'ff', 'gg', 'gg', 'hh', 'zz']
 
    """
 
    return MENTIONS_REGEX.findall(text)
 

	
 

	
 
def extract_mentioned_users(text):
 
    """ Returns set of actual database Users @mentioned in given text. """
 
    from kallithea.model.db import User
 
    result = set()
 
    for name in extract_mentioned_usernames(text):
 
        user = User.get_by_username(name, case_insensitive=True)
 
        if user is not None and not user.is_default_user:
 
            result.add(user)
 
    return result
 

	
 

	
 
class AttributeDict(dict):
 
    def __getattr__(self, attr):
 
        return self.get(attr, None)
 
    __setattr__ = dict.__setitem__
 
    __delattr__ = dict.__delitem__
 

	
 

	
 
def obfuscate_url_pw(engine):
 
    from sqlalchemy.engine import url as sa_url
 
    from sqlalchemy.exc import ArgumentError
 
    try:
 
        _url = sa_url.make_url(engine or '')
 
    except ArgumentError:
 
        return engine
 
    if _url.password:
 
        _url.password = 'XXXXX'
 
    return str(_url)
 

	
 

	
 
def get_hook_environment():
 
    """
 
    Get hook context by deserializing the global KALLITHEA_EXTRAS environment
 
    variable.
 

	
 
    Called early in Git out-of-process hooks to get .ini config path so the
 
    basic environment can be configured properly. Also used in all hooks to get
 
    information about the action that triggered it.
 
    """
 

	
 
    try:
 
        extras = json.loads(os.environ['KALLITHEA_EXTRAS'])
 
    except KeyError:
 
        raise Exception("Environment variable KALLITHEA_EXTRAS not found")
 

	
 
    try:
 
        for k in ['username', 'repository', 'scm', 'action', 'ip']:
 
            extras[k]
 
    except KeyError:
 
        raise Exception('Missing key %s in KALLITHEA_EXTRAS %s' % (k, extras))
 

	
 
    return AttributeDict(extras)
 

	
 

	
 
def set_hook_environment(username, ip_addr, repo_name, repo_alias, action=None):
 
    """Prepare global context for running hooks by serializing data in the
 
    global KALLITHEA_EXTRAS environment variable.
 

	
 
    Most importantly, this allow Git hooks to do proper logging and updating of
 
    caches after pushes.
 

	
 
    Must always be called before anything with hooks are invoked.
 
    """
 
    from kallithea import CONFIG
 
    extras = {
 
        'ip': ip_addr, # used in log_push/pull_action action_logger
 
        'username': username,
 
        'action': action or 'push_local', # used in log_push_action_raw_ids action_logger
 
        'repository': repo_name,
 
        'scm': repo_alias, # used to pick hack in log_push_action_raw_ids
 
        'config': CONFIG['__file__'], # used by git hook to read config
 
    }
 
    os.environ['KALLITHEA_EXTRAS'] = json.dumps(extras)
 

	
 

	
 
def get_current_authuser():
 
    """
 
    Gets kallithea user from threadlocal tmpl_context variable if it's
 
    defined, else returns None.
 
    """
 
    from tg import tmpl_context
 
    if hasattr(tmpl_context, 'authuser'):
 
        return tmpl_context.authuser
 

	
 
    return None
 

	
 

	
 
class OptionalAttr(object):
 
    """
 
    Special Optional Option that defines other attribute. Example::
 

	
 
        def test(apiuser, userid=Optional(OAttr('apiuser')):
 
            user = Optional.extract(userid)
 
            # calls
 

	
 
    """
 

	
 
    def __init__(self, attr_name):
 
        self.attr_name = attr_name
 

	
 
    def __repr__(self):
 
        return '<OptionalAttr:%s>' % self.attr_name
 

	
 
    def __call__(self):
 
        return self
 

	
 

	
 
# alias
 
OAttr = OptionalAttr
 

	
 

	
 
class Optional(object):
 
    """
 
    Defines an optional parameter::
 

	
 
        param = param.getval() if isinstance(param, Optional) else param
 
        param = param() if isinstance(param, Optional) else param
 

	
 
    is equivalent of::
 

	
 
        param = Optional.extract(param)
 

	
 
    """
 

	
 
    def __init__(self, type_):
 
        self.type_ = type_
 

	
 
    def __repr__(self):
 
        return '<Optional:%s>' % self.type_.__repr__()
 

	
 
    def __call__(self):
 
        return self.getval()
 

	
 
    def getval(self):
 
        """
 
        returns value from this Optional instance
 
        """
 
        if isinstance(self.type_, OAttr):
 
            # use params name
 
            return self.type_.attr_name
 
        return self.type_
 

	
 
    @classmethod
 
    def extract(cls, val):
 
        """
 
        Extracts value from Optional() instance
 

	
 
        :param val:
 
        :return: original value if it's not Optional instance else
 
            value of instance
 
        """
 
        if isinstance(val, cls):
 
            return val.getval()
 
        return val
 

	
 

	
 
def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub):
 
    return _cleanstringsub('_', safe_str(s)).rstrip('_')
 

	
 

	
 
def recursive_replace(str_, replace=' '):
 
    """
 
    Recursive replace of given sign to just one instance
 

	
 
    :param str_: given string
 
    :param replace: char to find and replace multiple instances
 

	
 
    Examples::
 
    >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
 
    'Mighty-Mighty-Bo-sstones'
 
    """
 

	
 
    if str_.find(replace * 2) == -1:
 
        return str_
 
    else:
 
        str_ = str_.replace(replace * 2, replace)
 
        return recursive_replace(str_, replace)
 

	
 

	
 
def repo_name_slug(value):
 
    """
 
    Return slug of name of repository
 
    This function is called on each creation/modification
 
    of repository to prevent bad names in repo
 
    """
 

	
 
    slug = remove_formatting(value)
 
    slug = strip_tags(slug)
 

	
 
    for c in r"""`?=[]\;'"<>,/~!@#$%^&*()+{}|: """:
 
        slug = slug.replace(c, '-')
 
    slug = recursive_replace(slug, '-')
 
    slug = collapse(slug, '-')
 
    return slug
 

	
 

	
 
def ask_ok(prompt, retries=4, complaint='Yes or no please!'):
 
    while True:
 
        ok = raw_input(prompt)
 
        if ok in ('y', 'ye', 'yes'):
 
            return True
 
        if ok in ('n', 'no', 'nop', 'nope'):
 
            return False
 
        retries = retries - 1
 
        if retries < 0:
 
            raise IOError
 
        print complaint
kallithea/lib/vcs/backends/git/repository.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    vcs.backends.git.repository
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    Git repository implementation.
 

	
 
    :created_on: Apr 8, 2010
 
    :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
 
"""
 

	
 
import errno
 
import logging
 
import os
 
import posixpath
 
import re
 
import time
 
import urllib
 
import urllib2
 
from collections import OrderedDict
 

	
 
from dulwich.config import ConfigFile
 
from dulwich.objects import Tag
 
from dulwich.repo import NotGitRepository, Repo
 

	
 
from kallithea.lib.vcs import subprocessio
 
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import (
 
    BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError, TagDoesNotExistError)
 
from kallithea.lib.vcs.utils import date_fromtimestamp, makedate, safe_str, safe_unicode
 
from kallithea.lib.vcs.utils.hgcompat import hg_url, httpbasicauthhandler, httpdigestauthhandler
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.paths import abspath, get_user_home
 

	
 
from .changeset import GitChangeset
 
from .inmemory import GitInMemoryChangeset
 
from .workdir import GitWorkdir
 

	
 

	
 
SHA_PATTERN = re.compile(r'^([0-9a-fA-F]{12}|[0-9a-fA-F]{40})$')
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class GitRepository(BaseRepository):
 
    """
 
    Git repository backend.
 
    """
 
    DEFAULT_BRANCH_NAME = 'master'
 
    scm = 'git'
 

	
 
    def __init__(self, repo_path, create=False, src_url=None,
 
                 update_after_clone=False, bare=False):
 

	
 
        self.path = safe_unicode(abspath(repo_path))
 
        self.repo = self._get_repo(create, src_url, update_after_clone, bare)
 
        self.bare = self.repo.bare
 

	
 
    @property
 
    def _config_files(self):
 
        return [
 
            self.bare and abspath(self.path, 'config')
 
                      or abspath(self.path, '.git', 'config'),
 
             abspath(get_user_home(), '.gitconfig'),
 
         ]
 

	
 
    @property
 
    def _repo(self):
 
        return self.repo
 

	
 
    @property
 
    def head(self):
 
        try:
 
            return self._repo.head()
 
        except KeyError:
 
            return None
 

	
 
    @property
 
    def _empty(self):
 
        """
 
        Checks if repository is empty ie. without any changesets
 
        """
 

	
 
        try:
 
            self.revisions[0]
 
        except (KeyError, IndexError):
 
            return True
 
        return False
 

	
 
    @LazyProperty
 
    def revisions(self):
 
        """
 
        Returns list of revisions' ids, in ascending order.  Being lazy
 
        attribute allows external tools to inject shas from cache.
 
        """
 
        return self._get_all_revisions()
 

	
 
    @classmethod
 
    def _run_git_command(cls, cmd, **opts):
 
        """
 
        Runs given ``cmd`` as git command and returns tuple
 
        (stdout, stderr).
 

	
 
        :param cmd: git command to be executed
 
        :param opts: env options to pass into Subprocess command
 
        """
 

	
 
        if '_bare' in opts:
 
            _copts = []
 
            del opts['_bare']
 
        else:
 
            _copts = ['-c', 'core.quotepath=false', ]
 
        safe_call = False
 
        if '_safe' in opts:
 
            # no exc on failure
 
            del opts['_safe']
 
            safe_call = True
 

	
 
        assert isinstance(cmd, list), cmd
 

	
 
        gitenv = os.environ
 
        # need to clean fix GIT_DIR !
 
        if 'GIT_DIR' in gitenv:
 
            del gitenv['GIT_DIR']
 
        gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
 

	
 
        _git_path = settings.GIT_EXECUTABLE_PATH
 
        cmd = [_git_path] + _copts + cmd
 

	
 
        try:
 
            _opts = dict(
 
                env=gitenv,
 
                shell=False,
 
            )
 
            _opts.update(opts)
 
            p = subprocessio.SubprocessIOChunker(cmd, **_opts)
 
        except (EnvironmentError, OSError) as err:
 
            tb_err = ("Couldn't run git command (%s).\n"
 
                      "Original error was:%s\n" % (cmd, err))
 
            log.error(tb_err)
 
            if safe_call:
 
                return '', err
 
            else:
 
                raise RepositoryError(tb_err)
 

	
 
        return ''.join(p.output), ''.join(p.error)
 

	
 
    def run_git_command(self, cmd):
 
        opts = {}
 
        if os.path.isdir(self.path):
 
            opts['cwd'] = self.path
 
        return self._run_git_command(cmd, **opts)
 

	
 
    @classmethod
 
    def _check_url(cls, url):
 
        """
 
        Function will check given url and try to verify if it's a valid
 
        link. Sometimes it may happened that git will issue basic
 
        auth request that can cause whole API to hang when used from python
 
        or other external calls.
 

	
 
        On failures it'll raise urllib2.HTTPError, exception is also thrown
 
        when the return code is non 200
 
        """
 

	
 
        # check first if it's not an local url
 
        if os.path.isdir(url) or url.startswith('file:'):
 
            return True
 

	
 
        if url.startswith('git://'):
 
            return True
 

	
 
        if '+' in url[:url.find('://')]:
 
            url = url[url.find('+') + 1:]
 

	
 
        handlers = []
 
        url_obj = hg_url(url)
 
        test_uri, authinfo = url_obj.authinfo()
 
        url_obj.passwd = '*****'
 
        cleaned_uri = str(url_obj)
 

	
 
        if not test_uri.endswith('info/refs'):
 
            test_uri = test_uri.rstrip('/') + '/info/refs'
 

	
 
        if authinfo:
 
            # create a password manager
 
            passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
 
            passmgr.add_password(*authinfo)
 

	
 
            handlers.extend((httpbasicauthhandler(passmgr),
 
                             httpdigestauthhandler(passmgr)))
 

	
 
        o = urllib2.build_opener(*handlers)
 
        o.addheaders = [('User-Agent', 'git/1.7.8.0')]  # fake some git
 

	
 
        q = {"service": 'git-upload-pack'}
 
        qs = '?%s' % urllib.urlencode(q)
 
        cu = "%s%s" % (test_uri, qs)
 
        req = urllib2.Request(cu, None, {})
 

	
 
        try:
 
            resp = o.open(req)
 
            if resp.code != 200:
 
                raise Exception('Return Code is not 200')
 
        except Exception as e:
 
            # means it cannot be cloned
 
            raise urllib2.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
 

	
 
        # now detect if it's proper git repo
 
        gitdata = resp.read()
 
        if 'service=git-upload-pack' not in gitdata:
 
            raise urllib2.URLError(
 
                "url [%s] does not look like an git" % (cleaned_uri))
 

	
 
        return True
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False,
 
                  bare=False):
 
        if create and os.path.exists(self.path):
 
            raise RepositoryError("Location already exist")
 
        if src_url and not create:
 
            raise RepositoryError("Create should be set to True if src_url is "
 
                                  "given (clone operation creates repository)")
 
        try:
 
            if create and src_url:
 
                GitRepository._check_url(src_url)
 
                self.clone(src_url, update_after_clone, bare)
 
                return Repo(self.path)
 
            elif create:
 
                os.makedirs(self.path)
 
                if bare:
 
                    return Repo.init_bare(self.path)
 
                else:
 
                    return Repo.init(self.path)
 
            else:
 
                return Repo(self.path)
 
        except (NotGitRepository, OSError) as err:
 
            raise RepositoryError(err)
 

	
 
    def _get_all_revisions(self):
 
        # we must check if this repo is not empty, since later command
 
        # fails if it is. And it's cheaper to ask than throw the subprocess
 
        # errors
 
        try:
 
            self._repo.head()
 
        except KeyError:
 
            return []
 

	
 
        rev_filter = settings.GIT_REV_FILTER
 
        cmd = ['rev-list', rev_filter, '--reverse', '--date-order']
 
        try:
 
            so, se = self.run_git_command(cmd)
 
        except RepositoryError:
 
            # Can be raised for empty repositories
 
            return []
 
        return so.splitlines()
 

	
 
    def _get_all_revisions2(self):
 
        # alternate implementation using dulwich
 
        includes = [x[1][0] for x in self._parsed_refs.iteritems()
 
                    if x[1][1] != 'T']
 
        return [c.commit.id for c in self._repo.get_walker(include=includes)]
 

	
 
    def _get_revision(self, revision):
 
        """
 
        For git backend we always return integer here. This way we ensure
 
        that changeset's revision attribute would become integer.
 
        """
 

	
 
        is_null = lambda o: len(o) == revision.count('0')
 

	
 
        if self._empty:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        if revision in (None, '', 'tip', 'HEAD', 'head', -1):
 
            return self.revisions[-1]
 

	
 
        is_bstr = isinstance(revision, (str, unicode))
 
        if ((is_bstr and revision.isdigit() and len(revision) < 12)
 
            or isinstance(revision, int) or is_null(revision)):
 
            try:
 
                revision = self.revisions[int(revision)]
 
            except IndexError:
 
                msg = ("Revision %s does not exist for %s" % (revision, self))
 
                raise ChangesetDoesNotExistError(msg)
 

	
 
        elif is_bstr:
 
            # get by branch/tag name
 
            _ref_revision = self._parsed_refs.get(revision)
 
            if _ref_revision:  # and _ref_revision[1] in ['H', 'RH', 'T']:
 
                return _ref_revision[0]
 

	
 
            _tags_shas = self.tags.values()
 
            # maybe it's a tag ? we don't have them in self.revisions
 
            if revision in _tags_shas:
 
                return _tags_shas[_tags_shas.index(revision)]
 

	
 
            elif not SHA_PATTERN.match(revision) or revision not in self.revisions:
 
                msg = ("Revision %s does not exist for %s" % (revision, self))
 
                raise ChangesetDoesNotExistError(msg)
 

	
 
        # Ensure we return full id
 
        if not SHA_PATTERN.match(str(revision)):
 
            raise ChangesetDoesNotExistError("Given revision %s not recognized"
 
                % revision)
 
        return revision
 

	
 
    def get_ref_revision(self, ref_type, ref_name):
 
        """
 
        Returns ``MercurialChangeset`` object representing repository's
 
        changeset at the given ``revision``.
 
        """
 
        return self._get_revision(ref_name)
 

	
 
    def _get_archives(self, archive_name='tip'):
 

	
 
        for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
 
                yield {"type": i[0], "extension": i[1], "node": archive_name}
 
            yield {"type": i[0], "extension": i[1], "node": archive_name}
 

	
 
    def _get_url(self, url):
 
        """
 
        Returns normalized url. If schema is not given, would fall to
 
        filesystem (``file:///``) schema.
 
        """
 
        url = safe_str(url)
 
        if url != 'default' and '://' not in url:
 
            url = ':///'.join(('file', url))
 
        return url
 

	
 
    def get_hook_location(self):
 
        """
 
        returns absolute path to location where hooks are stored
 
        """
 
        loc = os.path.join(self.path, 'hooks')
 
        if not self.bare:
 
            loc = os.path.join(self.path, '.git', 'hooks')
 
        return loc
 

	
 
    @LazyProperty
 
    def name(self):
 
        return os.path.basename(self.path)
 

	
 
    @LazyProperty
 
    def last_change(self):
 
        """
 
        Returns last change made on this repository as datetime object
 
        """
 
        return date_fromtimestamp(self._get_mtime(), makedate()[1])
 

	
 
    def _get_mtime(self):
 
        try:
 
            return time.mktime(self.get_changeset().date.timetuple())
 
        except RepositoryError:
 
            idx_loc = '' if self.bare else '.git'
 
            # fallback to filesystem
 
            in_path = os.path.join(self.path, idx_loc, "index")
 
            he_path = os.path.join(self.path, idx_loc, "HEAD")
 
            if os.path.exists(in_path):
 
                return os.stat(in_path).st_mtime
 
            else:
 
                return os.stat(he_path).st_mtime
 

	
 
    @LazyProperty
 
    def description(self):
 
        undefined_description = u'unknown'
 
        _desc = self._repo.get_description()
 
        return safe_unicode(_desc or undefined_description)
 

	
 
    @LazyProperty
 
    def contact(self):
 
        undefined_contact = u'Unknown'
 
        return undefined_contact
 

	
 
    @property
 
    def branches(self):
 
        if not self.revisions:
 
            return {}
 
        sortkey = lambda ctx: ctx[0]
 
        _branches = [(x[0], x[1][0])
 
                     for x in self._parsed_refs.iteritems() if x[1][1] == 'H']
 
        return OrderedDict(sorted(_branches, key=sortkey, reverse=False))
 

	
 
    @LazyProperty
 
    def closed_branches(self):
 
        return {}
 

	
 
    @LazyProperty
 
    def tags(self):
 
        return self._get_tags()
 

	
 
    def _get_tags(self):
 
        if not self.revisions:
 
            return {}
 

	
 
        sortkey = lambda ctx: ctx[0]
 
        _tags = [(x[0], x[1][0])
 
                 for x in self._parsed_refs.iteritems() if x[1][1] == 'T']
 
        return OrderedDict(sorted(_tags, key=sortkey, reverse=True))
 

	
 
    def tag(self, name, user, revision=None, message=None, date=None,
 
            **kwargs):
 
        """
 
        Creates and returns a tag for the given ``revision``.
 

	
 
        :param name: name for new tag
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param revision: changeset id for which new tag would be created
 
        :param message: message of the tag's commit
 
        :param date: date of tag's commit
 

	
 
        :raises TagAlreadyExistError: if tag with same name already exists
 
        """
 
        if name in self.tags:
 
            raise TagAlreadyExistError("Tag %s already exists" % name)
 
        changeset = self.get_changeset(revision)
 
        message = message or "Added tag %s for commit %s" % (name,
 
            changeset.raw_id)
 
        self._repo.refs["refs/tags/%s" % name] = changeset._commit.id
 

	
 
        self._parsed_refs = self._get_parsed_refs()
 
        self.tags = self._get_tags()
 
        return changeset
 

	
 
    def remove_tag(self, name, user, message=None, date=None):
 
        """
 
        Removes tag with the given ``name``.
 

	
 
        :param name: name of the tag to be removed
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param message: message of the tag's removal commit
 
        :param date: date of tag's removal commit
 

	
 
        :raises TagDoesNotExistError: if tag with given name does not exists
 
        """
 
        if name not in self.tags:
 
            raise TagDoesNotExistError("Tag %s does not exist" % name)
 
        tagpath = posixpath.join(self._repo.refs.path, 'refs', 'tags', name)
 
        try:
 
            os.remove(tagpath)
 
            self._parsed_refs = self._get_parsed_refs()
 
            self.tags = self._get_tags()
 
        except OSError as e:
 
            raise RepositoryError(e.strerror)
 

	
 
    @LazyProperty
 
    def bookmarks(self):
 
        """
 
        Gets bookmarks for this repository
 
        """
 
        return {}
 

	
 
    @LazyProperty
 
    def _parsed_refs(self):
 
        return self._get_parsed_refs()
 

	
 
    def _get_parsed_refs(self):
 
        # cache the property
 
        _repo = self._repo
 
        refs = _repo.get_refs()
 
        keys = [('refs/heads/', 'H'),
 
                ('refs/remotes/origin/', 'RH'),
 
                ('refs/tags/', 'T')]
 
        _refs = {}
 
        for ref, sha in refs.iteritems():
 
            for k, type_ in keys:
 
                if ref.startswith(k):
 
                    _key = ref[len(k):]
 
                    if type_ == 'T':
 
                        obj = _repo.get_object(sha)
 
                        if isinstance(obj, Tag):
 
                            sha = _repo.get_object(sha).object[1]
 
                    _refs[_key] = [sha, type_]
 
                    break
 
        return _refs
 

	
 
    def _heads(self, reverse=False):
 
        refs = self._repo.get_refs()
 
        heads = {}
 

	
 
        for key, val in refs.items():
 
            for ref_key in ['refs/heads/', 'refs/remotes/origin/']:
 
                if key.startswith(ref_key):
 
                    n = key[len(ref_key):]
 
                    if n not in ['HEAD']:
 
                        heads[n] = val
 

	
 
        return heads if reverse else dict((y, x) for x, y in heads.iteritems())
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns ``GitChangeset`` object representing commit from git repository
 
        at the given revision or head (most recent commit) if None given.
 
        """
 
        if isinstance(revision, GitChangeset):
 
            return revision
 
        revision = self._get_revision(revision)
 
        changeset = GitChangeset(repository=self, revision=revision)
 
        return changeset
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
 
           end_date=None, branch_name=None, reverse=False, max_revisions=None):
 
        """
 
        Returns iterator of ``GitChangeset`` objects from start to end (both
 
        are inclusive), in ascending date order (unless ``reverse`` is set).
 

	
 
        :param start: changeset ID, as str; first returned changeset
 
        :param end: changeset ID, as str; last returned changeset
 
        :param start_date: if specified, changesets with commit date less than
 
          ``start_date`` would be filtered out from returned set
 
        :param end_date: if specified, changesets with commit date greater than
 
          ``end_date`` would be filtered out from returned set
 
        :param branch_name: if specified, changesets not reachable from given
 
          branch would be filtered out from returned set
 
        :param reverse: if ``True``, returned generator would be reversed
 
          (meaning that returned changesets would have descending date order)
 

	
 
        :raise BranchDoesNotExistError: If given ``branch_name`` does not
 
            exist.
 
        :raise ChangesetDoesNotExistError: If changeset for given ``start`` or
 
          ``end`` could not be found.
 

	
 
        """
 
        if branch_name and branch_name not in self.branches:
 
            raise BranchDoesNotExistError("Branch '%s' not found"
 
                                          % branch_name)
 
        # actually we should check now if it's not an empty repo to not spaw
 
        # subprocess commands
 
        if self._empty:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        # %H at format means (full) commit hash, initial hashes are retrieved
 
        # in ascending date order
 
        cmd = ['log', '--date-order', '--reverse', '--pretty=format:%H']
 
        if max_revisions:
 
            cmd += ['--max-count=%s' % max_revisions]
 
        if start_date:
 
            cmd += ['--since', start_date.strftime('%m/%d/%y %H:%M:%S')]
 
        if end_date:
 
            cmd += ['--until', end_date.strftime('%m/%d/%y %H:%M:%S')]
 
        if branch_name:
 
            cmd.append(branch_name)
 
        else:
 
            cmd.append(settings.GIT_REV_FILTER)
 

	
 
        revs = self.run_git_command(cmd)[0].splitlines()
 
        start_pos = 0
 
        end_pos = len(revs)
 
        if start:
 
            _start = self._get_revision(start)
 
            try:
 
                start_pos = revs.index(_start)
 
            except ValueError:
 
                pass
 

	
 
        if end is not None:
 
            _end = self._get_revision(end)
 
            try:
 
                end_pos = revs.index(_end)
 
            except ValueError:
 
                pass
 

	
 
        if None not in [start, end] and start_pos > end_pos:
 
            raise RepositoryError('start cannot be after end')
 

	
 
        if end_pos is not None:
 
            end_pos += 1
 

	
 
        revs = revs[start_pos:end_pos]
 
        if reverse:
 
            revs = reversed(revs)
 
        return CollectionGenerator(self, revs)
 

	
 
    def get_diff(self, rev1, rev2, path=None, ignore_whitespace=False,
 
                 context=3):
 
        """
 
        Returns (git like) *diff*, as plain text. Shows changes introduced by
 
        ``rev2`` since ``rev1``.
 

	
 
        :param rev1: Entry point from which diff is shown. Can be
 
          ``self.EMPTY_CHANGESET`` - in this case, patch showing all
 
          the changes since empty state of the repository until ``rev2``
 
        :param rev2: Until which revision changes should be shown.
 
        :param ignore_whitespace: If set to ``True``, would not show whitespace
 
          changes. Defaults to ``False``.
 
        :param context: How many lines before/after changed lines should be
 
          shown. Defaults to ``3``. Due to limitations in Git, if
 
          value passed-in is greater than ``2**31-1``
 
          (``2147483647``), it will be set to ``2147483647``
 
          instead. If negative value is passed-in, it will be set to
 
          ``0`` instead.
 
        """
 

	
 
        # Git internally uses a signed long int for storing context
 
        # size (number of lines to show before and after the
 
        # differences). This can result in integer overflow, so we
 
        # ensure the requested context is smaller by one than the
 
        # number that would cause the overflow. It is highly unlikely
 
        # that a single file will contain that many lines, so this
 
        # kind of change should not cause any realistic consequences.
 
        overflowed_long_int = 2**31
 

	
 
        if context >= overflowed_long_int:
 
            context = overflowed_long_int - 1
 

	
 
        # Negative context values make no sense, and will result in
 
        # errors. Ensure this does not happen.
 
        if context < 0:
 
            context = 0
 

	
 
        flags = ['-U%s' % context, '--full-index', '--binary', '-p', '-M', '--abbrev=40']
 
        if ignore_whitespace:
 
            flags.append('-w')
 

	
 
        if hasattr(rev1, 'raw_id'):
 
            rev1 = getattr(rev1, 'raw_id')
 

	
 
        if hasattr(rev2, 'raw_id'):
 
            rev2 = getattr(rev2, 'raw_id')
 

	
 
        if rev1 == self.EMPTY_CHANGESET:
 
            rev2 = self.get_changeset(rev2).raw_id
 
            cmd = ['show'] + flags + [rev2]
 
        else:
 
            rev1 = self.get_changeset(rev1).raw_id
 
            rev2 = self.get_changeset(rev2).raw_id
 
            cmd = ['diff'] + flags + [rev1, rev2]
 

	
 
        if path:
 
            cmd += ['--', path]
 

	
 
        stdout, stderr = self.run_git_command(cmd)
 
        # TODO: don't ignore stderr
 
        # If we used 'show' command, strip first few lines (until actual diff
 
        # starts)
 
        if rev1 == self.EMPTY_CHANGESET:
 
            parts = stdout.split('\ndiff ', 1)
 
            if len(parts) > 1:
 
                stdout = 'diff ' + parts[1]
 
        return stdout
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        """
 
        Returns ``GitInMemoryChangeset`` object for this repository.
 
        """
 
        return GitInMemoryChangeset(self)
 

	
 
    def clone(self, url, update_after_clone=True, bare=False):
 
        """
 
        Tries to clone changes from external location.
 

	
 
        :param update_after_clone: If set to ``False``, git won't checkout
 
          working directory
 
        :param bare: If set to ``True``, repository would be cloned into
 
          *bare* git repository (no working directory at all).
 
        """
 
        url = self._get_url(url)
 
        cmd = ['clone', '-q']
 
        if bare:
 
            cmd.append('--bare')
 
        elif not update_after_clone:
 
            cmd.append('--no-checkout')
 
        cmd += ['--', url, self.path]
 
        # If error occurs run_git_command raises RepositoryError already
 
        self.run_git_command(cmd)
 

	
 
    def pull(self, url):
 
        """
 
        Tries to pull changes from external location.
 
        """
 
        url = self._get_url(url)
 
        cmd = ['pull', '--ff-only', url]
 
        # If error occurs run_git_command raises RepositoryError already
 
        self.run_git_command(cmd)
 

	
 
    def fetch(self, url):
 
        """
 
        Tries to pull changes from external location.
 
        """
 
        url = self._get_url(url)
 
        so, se = self.run_git_command(['ls-remote', '-h', url])
 
        cmd = ['fetch', url, '--']
 
        for line in (x for x in so.splitlines()):
 
            sha, ref = line.split('\t')
 
            cmd.append('+%s:%s' % (ref, ref))
 
        self.run_git_command(cmd)
 

	
 
    def _update_server_info(self):
 
        """
 
        runs gits update-server-info command in this repo instance
 
        """
 
        from dulwich.server import update_server_info
 
        try:
 
            update_server_info(self._repo)
 
        except OSError as e:
 
            if e.errno not in [errno.ENOENT, errno.EROFS]:
 
                raise
 
            # Workaround for dulwich crashing on for example its own dulwich/tests/data/repos/simple_merge.git/info/refs.lock
 
            log.error('Ignoring %s running update-server-info: %s', type(e).__name__, e)
 

	
 
    @LazyProperty
 
    def workdir(self):
 
        """
 
        Returns ``Workdir`` instance for this repository.
 
        """
 
        return GitWorkdir(self)
 

	
 
    def get_config_value(self, section, name, config_file=None):
 
        """
 
        Returns configuration value for a given [``section``] and ``name``.
 

	
 
        :param section: Section we want to retrieve value from
 
        :param name: Name of configuration we want to retrieve
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        if config_file is None:
 
            config_file = []
 
        elif isinstance(config_file, basestring):
 
            config_file = [config_file]
 

	
 
        def gen_configs():
 
            for path in config_file + self._config_files:
 
                try:
 
                    yield ConfigFile.from_path(path)
 
                except (IOError, OSError, ValueError):
 
                    continue
 

	
 
        for config in gen_configs():
 
            try:
 
                return config.get(section, name)
 
            except KeyError:
 
                continue
 
        return None
 

	
 
    def get_user_name(self, config_file=None):
 
        """
 
        Returns user's name from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        return self.get_config_value('user', 'name', config_file)
 

	
 
    def get_user_email(self, config_file=None):
 
        """
 
        Returns user's email from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        return self.get_config_value('user', 'email', config_file)
kallithea/lib/vcs/utils/progressbar.py
Show inline comments
 
# encoding: UTF-8
 
import datetime
 
import string
 
import sys
 

	
 
from kallithea.lib.vcs.utils.filesize import filesizeformat
 

	
 

	
 
class ProgressBarError(Exception):
 
    pass
 

	
 

	
 
class AlreadyFinishedError(ProgressBarError):
 
    pass
 

	
 

	
 
class ProgressBar(object):
 

	
 
    default_elements = ['percentage', 'bar', 'steps']
 

	
 
    def __init__(self, steps=100, stream=None, elements=None):
 
        self.step = 0
 
        self.steps = steps
 
        self.stream = stream or sys.stderr
 
        self.bar_char = '='
 
        self.width = 50
 
        self.separator = ' | '
 
        self.elements = elements or self.default_elements
 
        self.started = None
 
        self.finished = False
 
        self.steps_label = 'Step'
 
        self.time_label = 'Time'
 
        self.eta_label = 'ETA'
 
        self.speed_label = 'Speed'
 
        self.transfer_label = 'Transfer'
 

	
 
    def __str__(self):
 
        return self.get_line()
 

	
 
    def __iter__(self):
 
        start = self.step
 
        end = self.steps + 1
 
        for x in xrange(start, end):
 
            self.render(x)
 
            yield x
 

	
 
    def get_separator(self):
 
        return self.separator
 

	
 
    def get_bar_char(self):
 
        return self.bar_char
 

	
 
    def get_bar(self):
 
        char = self.get_bar_char()
 
        perc = self.get_percentage()
 
        length = int(self.width * perc / 100)
 
        bar = char * length
 
        bar = bar.ljust(self.width)
 
        return bar
 

	
 
    def get_elements(self):
 
        return self.elements
 

	
 
    def get_template(self):
 
        separator = self.get_separator()
 
        elements = self.get_elements()
 
        return string.Template(separator.join((('$%s' % e) for e in elements)))
 

	
 
    def get_total_time(self, current_time=None):
 
        if current_time is None:
 
            current_time = datetime.datetime.now()
 
        if not self.started:
 
            return datetime.timedelta()
 
        return current_time - self.started
 

	
 
    def get_rendered_total_time(self):
 
        delta = self.get_total_time()
 
        if not delta:
 
            ttime = '-'
 
        else:
 
            ttime = str(delta)
 
        return '%s %s' % (self.time_label, ttime)
 

	
 
    def get_eta(self, current_time=None):
 
        if current_time is None:
 
            current_time = datetime.datetime.now()
 
        if self.step == 0:
 
            return datetime.timedelta()
 
        total_seconds = self.get_total_time().total_seconds()
 
        eta_seconds = total_seconds * self.steps / self.step - total_seconds
 
        return datetime.timedelta(seconds=int(eta_seconds))
 

	
 
    def get_rendered_eta(self):
 
        eta = self.get_eta()
 
        if not eta:
 
            eta = '--:--:--'
 
        else:
 
            eta = str(eta).rjust(8)
 
        return '%s: %s' % (self.eta_label, eta)
 

	
 
    def get_percentage(self):
 
        return float(self.step) / self.steps * 100
 

	
 
    def get_rendered_percentage(self):
 
        perc = self.get_percentage()
 
        return ('%s%%' % (int(perc))).rjust(5)
 

	
 
    def get_rendered_steps(self):
 
        return '%s: %s/%s' % (self.steps_label, self.step, self.steps)
 

	
 
    def get_rendered_speed(self, step=None, total_seconds=None):
 
        if step is None:
 
            step = self.step
 
        if total_seconds is None:
 
            total_seconds = self.get_total_time().total_seconds()
 
        if step <= 0 or total_seconds <= 0:
 
            speed = '-'
 
        else:
 
            speed = filesizeformat(float(step) / total_seconds)
 
        return '%s: %s/s' % (self.speed_label, speed)
 

	
 
    def get_rendered_transfer(self, step=None, steps=None):
 
        if step is None:
 
            step = self.step
 
        if steps is None:
 
            steps = self.steps
 

	
 
        if steps <= 0:
 
            return '%s: -' % self.transfer_label
 
        total = filesizeformat(float(steps))
 
        if step <= 0:
 
            transferred = '-'
 
        else:
 
            transferred = filesizeformat(float(step))
 
        return '%s: %s / %s' % (self.transfer_label, transferred, total)
 

	
 
    def get_context(self):
 
        return {
 
            'percentage': self.get_rendered_percentage(),
 
            'bar': self.get_bar(),
 
            'steps': self.get_rendered_steps(),
 
            'time': self.get_rendered_total_time(),
 
            'eta': self.get_rendered_eta(),
 
            'speed': self.get_rendered_speed(),
 
            'transfer': self.get_rendered_transfer(),
 
        }
 

	
 
    def get_line(self):
 
        template = self.get_template()
 
        context = self.get_context()
 
        return template.safe_substitute(**context)
 

	
 
    def write(self, data):
 
        self.stream.write(data)
 

	
 
    def render(self, step):
 
        if not self.started:
 
            self.started = datetime.datetime.now()
 
        if self.finished:
 
            raise AlreadyFinishedError
 
        self.step = step
 
        self.write('\r%s' % self)
 
        if step == self.steps:
 
            self.finished = True
 
        if step == self.steps:
 
            self.write('\n')
 

	
 

	
 
"""
 
termcolors.py
 

	
 
Grabbed from Django (http://www.djangoproject.com)
 
"""
 

	
 
color_names = ('black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white')
 
foreground = dict([(color_names[x], '3%s' % x) for x in range(8)])
 
background = dict([(color_names[x], '4%s' % x) for x in range(8)])
 

	
 
RESET = '0'
 
opt_dict = {'bold': '1', 'underscore': '4', 'blink': '5', 'reverse': '7', 'conceal': '8'}
 

	
 

	
 
def colorize(text='', opts=(), **kwargs):
 
    """
 
    Returns your text, enclosed in ANSI graphics codes.
 

	
 
    Depends on the keyword arguments 'fg' and 'bg', and the contents of
 
    the opts tuple/list.
 

	
 
    Returns the RESET code if no parameters are given.
 

	
 
    Valid colors:
 
        'black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white'
 

	
 
    Valid options:
 
        'bold'
 
        'underscore'
 
        'blink'
 
        'reverse'
 
        'conceal'
 
        'noreset' - string will not be auto-terminated with the RESET code
 

	
 
    Examples:
 
        colorize('hello', fg='red', bg='blue', opts=('blink',))
 
        colorize()
 
        colorize('goodbye', opts=('underscore',))
 
        print colorize('first line', fg='red', opts=('noreset',))
 
        print 'this should be red too'
 
        print colorize('and so should this')
 
        print 'this should not be red'
 
    """
 
    code_list = []
 
    if text == '' and len(opts) == 1 and opts[0] == 'reset':
 
        return '\x1b[%sm' % RESET
 
    for k, v in kwargs.iteritems():
 
        if k == 'fg':
 
            code_list.append(foreground[v])
 
        elif k == 'bg':
 
            code_list.append(background[v])
 
    for o in opts:
 
        if o in opt_dict:
 
            code_list.append(opt_dict[o])
 
    if 'noreset' not in opts:
 
        text = text + '\x1b[%sm' % RESET
 
    return ('\x1b[%sm' % ';'.join(code_list)) + text
 

	
 

	
 
def make_style(opts=(), **kwargs):
 
    """
 
    Returns a function with default parameters for colorize()
 

	
 
    Example:
 
        bold_red = make_style(opts=('bold',), fg='red')
 
        print bold_red('hello')
 
        KEYWORD = make_style(fg='yellow')
 
        COMMENT = make_style(fg='blue', opts=('bold',))
 
    """
 
    return lambda text: colorize(text, opts, **kwargs)
 

	
 

	
 
NOCOLOR_PALETTE = 'nocolor'
 
DARK_PALETTE = 'dark'
 
LIGHT_PALETTE = 'light'
 

	
 
PALETTES = {
 
    NOCOLOR_PALETTE: {
 
        'ERROR':        {},
 
        'NOTICE':       {},
 
        'SQL_FIELD':    {},
 
        'SQL_COLTYPE':  {},
 
        'SQL_KEYWORD':  {},
 
        'SQL_TABLE':    {},
 
        'HTTP_INFO':         {},
 
        'HTTP_SUCCESS':      {},
 
        'HTTP_REDIRECT':     {},
 
        'HTTP_NOT_MODIFIED': {},
 
        'HTTP_BAD_REQUEST':  {},
 
        'HTTP_NOT_FOUND':    {},
 
        'HTTP_SERVER_ERROR': {},
 
    },
 
    DARK_PALETTE: {
 
        'ERROR':        { 'fg': 'red', 'opts': ('bold',) },
 
        'NOTICE':       { 'fg': 'red' },
 
        'SQL_FIELD':    { 'fg': 'green', 'opts': ('bold',) },
 
        'SQL_COLTYPE':  { 'fg': 'green' },
 
        'SQL_KEYWORD':  { 'fg': 'yellow' },
 
        'SQL_TABLE':    { 'opts': ('bold',) },
 
        'HTTP_INFO':         { 'opts': ('bold',) },
 
        'HTTP_SUCCESS':      { },
 
        'HTTP_REDIRECT':     { 'fg': 'green' },
 
        'HTTP_NOT_MODIFIED': { 'fg': 'cyan' },
 
        'HTTP_BAD_REQUEST':  { 'fg': 'red', 'opts': ('bold',) },
 
        'HTTP_NOT_FOUND':    { 'fg': 'yellow' },
 
        'HTTP_SERVER_ERROR': { 'fg': 'magenta', 'opts': ('bold',) },
 
    },
 
    LIGHT_PALETTE: {
 
        'ERROR':        { 'fg': 'red', 'opts': ('bold',) },
 
        'NOTICE':       { 'fg': 'red' },
 
        'SQL_FIELD':    { 'fg': 'green', 'opts': ('bold',) },
 
        'SQL_COLTYPE':  { 'fg': 'green' },
 
        'SQL_KEYWORD':  { 'fg': 'blue' },
 
        'SQL_TABLE':    { 'opts': ('bold',) },
 
        'HTTP_INFO':         { 'opts': ('bold',) },
 
        'HTTP_SUCCESS':      { },
 
        'HTTP_REDIRECT':     { 'fg': 'green', 'opts': ('bold',) },
 
        'HTTP_NOT_MODIFIED': { 'fg': 'green' },
 
        'HTTP_BAD_REQUEST':  { 'fg': 'red', 'opts': ('bold',) },
 
        'HTTP_NOT_FOUND':    { 'fg': 'red' },
 
        'HTTP_SERVER_ERROR': { 'fg': 'magenta', 'opts': ('bold',) },
 
    }
 
}
 
DEFAULT_PALETTE = DARK_PALETTE
 

	
 
# ---------------------------- #
 
# --- End of termcolors.py --- #
 
# ---------------------------- #
 

	
 

	
 
class ColoredProgressBar(ProgressBar):
 

	
 
    BAR_COLORS = (
 
        (10, 'red'),
 
        (30, 'magenta'),
 
        (50, 'yellow'),
 
        (99, 'green'),
 
        (100, 'blue'),
 
    )
 

	
 
    def get_line(self):
 
        line = super(ColoredProgressBar, self).get_line()
 
        perc = self.get_percentage()
 
        if perc > 100:
 
            color = 'blue'
 
        for max_perc, color in self.BAR_COLORS:
 
            if perc <= max_perc:
 
                break
 
        return colorize(line, fg=color)
 

	
 

	
 
class AnimatedProgressBar(ProgressBar):
 

	
 
    def get_bar_char(self):
 
        chars = '-/|\\'
 
        if self.step >= self.steps:
 
            return '='
 
        return chars[self.step % len(chars)]
 

	
 

	
 
class BarOnlyProgressBar(ProgressBar):
 

	
 
    default_elements = ['bar', 'steps']
 

	
 
    def get_bar(self):
 
        bar = super(BarOnlyProgressBar, self).get_bar()
 
        perc = self.get_percentage()
 
        perc_text = '%s%%' % int(perc)
 
        text = (' %s%% ' % (perc_text)).center(self.width, '=')
 
        L = text.find(' ')
 
        R = text.rfind(' ')
 
        bar = ' '.join((bar[:L], perc_text, bar[R:]))
 
        return bar
 

	
 

	
 
class AnimatedColoredProgressBar(AnimatedProgressBar,
 
                                 ColoredProgressBar):
 
    pass
 

	
 

	
 
class BarOnlyColoredProgressBar(ColoredProgressBar,
 
                                BarOnlyProgressBar):
 
    pass
 

	
 

	
 
def main():
 
    import time
 

	
 
    print "Standard progress bar..."
 
    bar = ProgressBar(30)
 
    for x in xrange(1, 31):
 
            bar.render(x)
 
            time.sleep(0.02)
 
        bar.render(x)
 
        time.sleep(0.02)
 
    bar.stream.write('\n')
 
    print
 

	
 
    print "Empty bar..."
 
    bar = ProgressBar(50)
 
    bar.render(0)
 
    print
 
    print
 

	
 
    print "Colored bar..."
 
    bar = ColoredProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print
 

	
 
    print "Animated char bar..."
 
    bar = AnimatedProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print
 

	
 
    print "Animated + colored char bar..."
 
    bar = AnimatedColoredProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print
 

	
 
    print "Bar only ..."
 
    bar = BarOnlyProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print
 

	
 
    print "Colored, longer bar-only, eta, total time ..."
 
    bar = BarOnlyColoredProgressBar(40)
 
    bar.width = 60
 
    bar.elements += ['time', 'eta']
 
    for x in bar:
 
        time.sleep(0.01)
 
    print
 
    print
 

	
 
    print "File transfer bar, breaks after 2 seconds ..."
 
    total_bytes = 1024 * 1024 * 2
 
    bar = ProgressBar(total_bytes)
 
    bar.width = 50
 
    bar.elements.remove('steps')
 
    bar.elements += ['transfer', 'time', 'eta', 'speed']
 
    for x in xrange(0, bar.steps, 1024):
 
        bar.render(x)
 
        time.sleep(0.01)
 
        now = datetime.datetime.now()
 
        if now - bar.started >= datetime.timedelta(seconds=2):
 
            break
 
    print
 
    print
 

	
 

	
 

	
 
if __name__ == '__main__':
 
    main()
kallithea/model/validators.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Set of generic validators
 
"""
 

	
 
import logging
 
import os
 
import re
 
from collections import defaultdict
 

	
 
import formencode
 
import ipaddr
 
import sqlalchemy
 
from formencode.validators import CIDR, Bool, Email, FancyValidator, Int, IPAddress, NotEmpty, Number, OneOf, Regex, Set, String, StringBoolean, UnicodeString
 
from sqlalchemy import func
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.config.routing import ADMIN_PREFIX
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel
 
from kallithea.lib.compat import OrderedSet
 
from kallithea.lib.exceptions import LdapImportError
 
from kallithea.lib.utils import is_valid_repo_uri
 
from kallithea.lib.utils2 import aslist, repo_name_slug, str2bool
 
from kallithea.model.db import RepoGroup, Repository, User, UserGroup
 

	
 

	
 
# silence warnings and pylint
 
UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def UniqueListFromString():
 
    class _UniqueListFromString(formencode.FancyValidator):
 
        """
 
        Split value on ',' and make unique while preserving order
 
        """
 
        messages = dict(
 
            empty=_('Value cannot be an empty list'),
 
            missing_value=_('Value cannot be an empty list'),
 
        )
 

	
 
        def _convert_to_python(self, value, state):
 
            value = aslist(value, ',')
 
            seen = set()
 
            return [c for c in value if not (c in seen or seen.add(c))]
 

	
 
        def empty_value(self, value):
 
            return []
 

	
 
    return _UniqueListFromString
 

	
 

	
 
def ValidUsername(edit=False, old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'username_exists': _('Username "%(username)s" already exists'),
 
            'system_invalid_username':
 
                _('Username "%(username)s" cannot be used'),
 
            'invalid_username':
 
                _('Username may only contain alphanumeric characters '
 
                  'underscores, periods or dashes and must begin with an '
 
                  'alphanumeric character or underscore')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            if value in ['default', 'new_user']:
 
                msg = self.message('system_invalid_username', state, username=value)
 
                raise formencode.Invalid(msg, value, state)
 
            # check if user is unique
 
            old_un = None
 
            if edit:
 
                old_un = User.get(old_data.get('user_id')).username
 

	
 
            if old_un != value or not edit:
 
                if User.get_by_username(value, case_insensitive=True):
 
                    msg = self.message('username_exists', state, username=value)
 
                    raise formencode.Invalid(msg, value, state)
 

	
 
            if re.match(r'^[a-zA-Z0-9\_]{1}[a-zA-Z0-9\-\_\.]*$', value) is None:
 
                msg = self.message('invalid_username', state)
 
                raise formencode.Invalid(msg, value, state)
 
    return _validator
 

	
 

	
 
def ValidRegex(msg=None):
 
    class _validator(formencode.validators.Regex):
 
        messages = dict(invalid=msg or _('The input is not valid'))
 
    return _validator
 

	
 

	
 
def ValidRepoUser():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_username': _('Username %(username)s is not valid')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            try:
 
                User.query().filter(User.active == True) \
 
                    .filter(User.username == value).one()
 
            except sqlalchemy.exc.InvalidRequestError: # NoResultFound/MultipleResultsFound
 
                msg = self.message('invalid_username', state, username=value)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(username=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidUserGroup(edit=False, old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_group': _('Invalid user group name'),
 
            'group_exist': _('User group "%(usergroup)s" already exists'),
 
            'invalid_usergroup_name':
 
                _('user group name may only contain alphanumeric '
 
                  'characters underscores, periods or dashes and must begin '
 
                  'with alphanumeric character')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            if value in ['default']:
 
                msg = self.message('invalid_group', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(users_group_name=msg)
 
                )
 
            # check if group is unique
 
            old_ugname = None
 
            if edit:
 
                old_id = old_data.get('users_group_id')
 
                old_ugname = UserGroup.get(old_id).users_group_name
 

	
 
            if old_ugname != value or not edit:
 
                is_existing_group = UserGroup.get_by_group_name(value,
 
                                                        case_insensitive=True)
 
                if is_existing_group:
 
                    msg = self.message('group_exist', state, usergroup=value)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(users_group_name=msg)
 
                    )
 

	
 
            if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
 
                msg = self.message('invalid_usergroup_name', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(users_group_name=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidRepoGroup(edit=False, old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'parent_group_id': _('Cannot assign this group as parent'),
 
            'group_exists': _('Group "%(group_name)s" already exists'),
 
            'repo_exists':
 
                _('Repository with name "%(group_name)s" already exists')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            # TODO WRITE VALIDATIONS
 
            group_name = value.get('group_name')
 
            parent_group_id = value.get('parent_group_id')
 

	
 
            # slugify repo group just in case :)
 
            slug = repo_name_slug(group_name)
 

	
 
            # check for parent of self
 
            parent_of_self = lambda: (
 
                old_data['group_id'] == parent_group_id
 
                if parent_group_id else False
 
            )
 
            if edit and parent_of_self():
 
                msg = self.message('parent_group_id', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(parent_group_id=msg)
 
                )
 

	
 
            old_gname = None
 
            if edit:
 
                old_gname = RepoGroup.get(old_data.get('group_id')).group_name
 

	
 
            if old_gname != group_name or not edit:
 

	
 
                # check group
 
                gr = RepoGroup.query() \
 
                      .filter(func.lower(RepoGroup.group_name) == func.lower(slug)) \
 
                      .filter(RepoGroup.parent_group_id == parent_group_id) \
 
                      .scalar()
 
                if gr is not None:
 
                    msg = self.message('group_exists', state, group_name=slug)
 
                    raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(group_name=msg)
 
                    )
 

	
 
                # check for same repo
 
                repo = Repository.query() \
 
                      .filter(func.lower(Repository.repo_name) == func.lower(slug)) \
 
                      .scalar()
 
                if repo is not None:
 
                    msg = self.message('repo_exists', state, group_name=slug)
 
                    raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(group_name=msg)
 
                    )
 

	
 
    return _validator
 

	
 

	
 
def ValidPassword():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_password':
 
                _('Invalid characters (non-ascii) in password')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            try:
 
                (value or '').decode('ascii')
 
            except UnicodeError:
 
                msg = self.message('invalid_password', state)
 
                raise formencode.Invalid(msg, value, state,)
 
    return _validator
 

	
 

	
 
def ValidOldPassword(username):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_password': _('Invalid old password')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            from kallithea.lib import auth_modules
 
            if auth_modules.authenticate(username, value, '') is None:
 
                msg = self.message('invalid_password', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(current_password=msg)
 
                )
 
    return _validator
 

	
 

	
 
def ValidPasswordsMatch(password_field, password_confirmation_field):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'password_mismatch': _('Passwords do not match'),
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            if value.get(password_field) != value[password_confirmation_field]:
 
                msg = self.message('password_mismatch', state)
 
                raise formencode.Invalid(msg, value, state,
 
                     error_dict={password_field: msg, password_confirmation_field: msg}
 
                )
 
    return _validator
 

	
 

	
 
def ValidAuth():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_auth': _(u'Invalid username or password'),
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            from kallithea.lib import auth_modules
 

	
 
            password = value['password']
 
            username = value['username']
 

	
 
            # authenticate returns unused dict but has called
 
            # plugin._authenticate which has create_or_update'ed the username user in db
 
            if auth_modules.authenticate(username, password) is None:
 
                user = User.get_by_username_or_email(username)
 
                if user and not user.active:
 
                    log.warning('user %s is disabled', username)
 
                    msg = self.message('invalid_auth', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(username=' ', password=msg)
 
                    )
 
                else:
 
                    log.warning('user %s failed to authenticate', username)
 
                    msg = self.message('invalid_auth', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(username=' ', password=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidRepoName(edit=False, old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_repo_name':
 
                _('Repository name %(repo)s is not allowed'),
 
            'repository_exists':
 
                _('Repository named %(repo)s already exists'),
 
            'repository_in_group_exists': _('Repository "%(repo)s" already '
 
                                            'exists in group "%(group)s"'),
 
            'same_group_exists': _('Repository group with name "%(repo)s" '
 
                                   'already exists')
 
        }
 

	
 
        def _convert_to_python(self, value, state):
 
            repo_name = repo_name_slug(value.get('repo_name', ''))
 
            repo_group = value.get('repo_group')
 
            if repo_group:
 
                gr = RepoGroup.get(repo_group)
 
                group_path = gr.full_path
 
                group_name = gr.group_name
 
                # value needs to be aware of group name in order to check
 
                # db key This is an actual just the name to store in the
 
                # database
 
                repo_name_full = group_path + RepoGroup.url_sep() + repo_name
 
            else:
 
                group_name = group_path = ''
 
                repo_name_full = repo_name
 

	
 
            value['repo_name'] = repo_name
 
            value['repo_name_full'] = repo_name_full
 
            value['group_path'] = group_path
 
            value['group_name'] = group_name
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            repo_name = value.get('repo_name')
 
            repo_name_full = value.get('repo_name_full')
 
            group_path = value.get('group_path')
 
            group_name = value.get('group_name')
 

	
 
            if repo_name in [ADMIN_PREFIX, '']:
 
                msg = self.message('invalid_repo_name', state, repo=repo_name)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_name=msg)
 
                )
 

	
 
            rename = old_data.get('repo_name') != repo_name_full
 
            create = not edit
 
            if rename or create:
 
                repo = Repository.get_by_repo_name(repo_name_full, case_insensitive=True)
 
                repo_group = RepoGroup.get_by_group_name(repo_name_full, case_insensitive=True)
 
                if group_path != '':
 
                    if repo is not None:
 
                        msg = self.message('repository_in_group_exists', state,
 
                                repo=repo.repo_name, group=group_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 
                elif repo_group is not None:
 
                        msg = self.message('same_group_exists', state,
 
                                repo=repo_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 
                    msg = self.message('same_group_exists', state,
 
                            repo=repo_name)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_name=msg)
 
                    )
 
                elif repo is not None:
 
                        msg = self.message('repository_exists', state,
 
                                repo=repo.repo_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 
                    msg = self.message('repository_exists', state,
 
                            repo=repo.repo_name)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_name=msg)
 
                    )
 
            return value
 
    return _validator
 

	
 

	
 
def ValidForkName(*args, **kwargs):
 
    return ValidRepoName(*args, **kwargs)
 

	
 

	
 
def SlugifyName():
 
    class _validator(formencode.validators.FancyValidator):
 

	
 
        def _convert_to_python(self, value, state):
 
            return repo_name_slug(value)
 

	
 
        def _validate_python(self, value, state):
 
            pass
 

	
 
    return _validator
 

	
 

	
 
def ValidCloneUri():
 
    from kallithea.lib.utils import make_ui
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'clone_uri': _('Invalid repository URL'),
 
            'invalid_clone_uri': _('Invalid repository URL. It must be a '
 
                                   'valid http, https, ssh, svn+http or svn+https URL'),
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            repo_type = value.get('repo_type')
 
            url = value.get('clone_uri')
 

	
 
            if url and url != value.get('clone_uri_hidden'):
 
                try:
 
                    is_valid_repo_uri(repo_type, url, make_ui(clear_session=False))
 
                except Exception:
 
                    log.exception('URL validation failed')
 
                    msg = self.message('clone_uri', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(clone_uri=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidForkType(old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_fork_type': _('Fork has to be the same type as parent')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            if old_data['repo_type'] != value:
 
                msg = self.message('invalid_fork_type', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_type=msg)
 
                )
 
    return _validator
 

	
 

	
 
def CanWriteGroup(old_data=None):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _("You don't have permissions "
 
                                   "to create repository in this group"),
 
            'permission_denied_root': _("no permission to create repository "
 
                                        "in root location")
 
        }
 

	
 
        def _convert_to_python(self, value, state):
 
            # root location
 
            if value == -1:
 
                return None
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            # create repositories with write permission on group is set to true
 
            create_on_write = HasPermissionAny('hg.create.write_on_repogroup.true')()
 
            group_admin = HasRepoGroupPermissionLevel('admin')(gr_name,
 
                                            'can write into group validator')
 
            group_write = HasRepoGroupPermissionLevel('write')(gr_name,
 
                                            'can write into group validator')
 
            forbidden = not (group_admin or (group_write and create_on_write))
 
            can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository')
 
            gid = (old_data['repo_group'].get('group_id')
 
                   if (old_data and 'repo_group' in old_data) else None)
 
            value_changed = gid != value
 
            new = not old_data
 
            # do check if we changed the value, there's a case that someone got
 
            # revoked write permissions to a repository, he still created, we
 
            # don't need to check permission if he didn't change the value of
 
            # groups in form box
 
            if value_changed or new:
 
                # parent group need to be existing
 
                if gr and forbidden:
 
                    msg = self.message('permission_denied', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_type=msg)
 
                    )
 
                ## check if we can write to root location !
 
                elif gr is None and not can_create_repos():
 
                    msg = self.message('permission_denied_root', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_type=msg)
 
                    )
 

	
 
    return _validator
 

	
 

	
 
def CanCreateGroup(can_create_in_root=False):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _("You don't have permissions "
 
                                   "to create a group in this location")
 
        }
 

	
 
        def to_python(self, value, state):
 
            # root location
 
            if value == -1:
 
                return None
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            if can_create_in_root and gr is None:
 
                # we can create in root, we're fine no validations required
 
                return
 

	
 
            forbidden_in_root = gr is None and not can_create_in_root
 
            forbidden = not HasRepoGroupPermissionLevel('admin')(gr_name, 'can create group validator')
 
            if forbidden_in_root or forbidden:
 
                msg = self.message('permission_denied', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(parent_group_id=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidPerms(type_='repo'):
 
    if type_ == 'repo_group':
 
        EMPTY_PERM = 'group.none'
 
    elif type_ == 'repo':
 
        EMPTY_PERM = 'repository.none'
 
    elif type_ == 'user_group':
 
        EMPTY_PERM = 'usergroup.none'
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'perm_new_member_name':
 
                _('This username or user group name is not valid')
 
        }
 

	
 
        def to_python(self, value, state):
 
            perms_update = OrderedSet()
 
            perms_new = OrderedSet()
 
            # build a list of permission to update and new permission to create
 

	
 
            # CLEAN OUT ORG VALUE FROM NEW MEMBERS, and group them using
 
            new_perms_group = defaultdict(dict)
 
            for k, v in value.copy().iteritems():
 
                if k.startswith('perm_new_member'):
 
                    del value[k]
 
                    _type, part = k.split('perm_new_member_')
 
                    args = part.split('_')
 
                    if len(args) == 1:
 
                        new_perms_group[args[0]]['perm'] = v
 
                    elif len(args) == 2:
 
                        _key, pos = args
 
                        new_perms_group[pos][_key] = v
 

	
 
            # fill new permissions in order of how they were added
 
            for k in sorted(map(int, new_perms_group.keys())):
 
                perm_dict = new_perms_group[str(k)]
 
                new_member = perm_dict.get('name')
 
                new_perm = perm_dict.get('perm')
 
                new_type = perm_dict.get('type')
 
                if new_member and new_perm and new_type:
 
                    perms_new.add((new_member, new_perm, new_type))
 

	
 
            for k, v in value.iteritems():
 
                if k.startswith('u_perm_') or k.startswith('g_perm_'):
 
                    member = k[7:]
 
                    t = {'u': 'user',
 
                         'g': 'users_group'
 
                    }[k[0]]
 
                    if member == User.DEFAULT_USER:
 
                        if str2bool(value.get('repo_private')):
 
                            # set none for default when updating to
 
                            # private repo protects against form manipulation
 
                            v = EMPTY_PERM
 
                    perms_update.add((member, v, t))
 

	
 
            value['perms_updates'] = list(perms_update)
 
            value['perms_new'] = list(perms_new)
 

	
 
            # update permissions
 
            for k, v, t in perms_new:
 
                try:
 
                    if t == 'user':
 
                        self.user_db = User.query() \
 
                            .filter(User.active == True) \
 
                            .filter(User.username == k).one()
 
                    if t == 'users_group':
 
                        self.user_db = UserGroup.query() \
 
                            .filter(UserGroup.users_group_active == True) \
 
                            .filter(UserGroup.users_group_name == k).one()
 

	
 
                except Exception:
 
                    log.exception('Updated permission failed')
 
                    msg = self.message('perm_new_member_type', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(perm_new_member_name=msg)
 
                    )
 
            return value
 
    return _validator
 

	
 

	
 
def ValidSettings():
 
    class _validator(formencode.validators.FancyValidator):
 
        def _convert_to_python(self, value, state):
 
            # settings  form for users that are not admin
 
            # can't edit certain parameters, it's extra backup if they mangle
 
            # with forms
 

	
 
            forbidden_params = [
 
                'user', 'repo_type',
 
                'repo_enable_downloads', 'repo_enable_statistics'
 
            ]
 

	
 
            for param in forbidden_params:
 
                if param in value:
 
                    del value[param]
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            pass
 
    return _validator
 

	
 

	
 
def ValidPath():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_path': _('This is not a valid path')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            if not os.path.isdir(value):
 
                msg = self.message('invalid_path', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(paths_root_path=msg)
 
                )
 
    return _validator
 

	
 

	
 
def UniqSystemEmail(old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'email_taken': _('This email address is already in use')
 
        }
 

	
 
        def _convert_to_python(self, value, state):
 
            return value.lower()
 

	
 
        def _validate_python(self, value, state):
 
            if (old_data.get('email') or '').lower() != value:
 
                user = User.get_by_email(value)
 
                if user is not None:
 
                    msg = self.message('email_taken', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(email=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidSystemEmail():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'non_existing_email': _('Email address "%(email)s" not found')
 
        }
 

	
 
        def _convert_to_python(self, value, state):
 
            return value.lower()
 

	
 
        def _validate_python(self, value, state):
 
            user = User.get_by_email(value)
 
            if user is None:
 
                msg = self.message('non_existing_email', state, email=value)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(email=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def LdapLibValidator():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 

	
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            try:
 
                import ldap
 
                ldap  # pyflakes silence !
 
            except ImportError:
 
                raise LdapImportError()
 

	
 
    return _validator
 

	
 

	
 
def AttrLoginValidator():
 
    class _validator(formencode.validators.UnicodeString):
 
        messages = {
 
            'invalid_cn':
 
                  _('The LDAP Login attribute of the CN must be specified - '
 
                    'this is the name of the attribute that is equivalent '
 
                    'to "username"')
 
        }
 
        messages['empty'] = messages['invalid_cn']
 

	
 
    return _validator
 

	
 

	
 
def ValidIp():
 
    class _validator(CIDR):
 
        messages = dict(
 
            badFormat=_('Please enter a valid IPv4 or IPv6 address'),
 
            illegalBits=_('The network size (bits) must be within the range'
 
                ' of 0-32 (not %(bits)r)')
 
        )
 

	
 
        def to_python(self, value, state):
 
            v = super(_validator, self).to_python(value, state)
 
            v = v.strip()
 
            net = ipaddr.IPNetwork(address=v)
 
            if isinstance(net, ipaddr.IPv4Network):
 
                # if IPv4 doesn't end with a mask, add /32
 
                if '/' not in value:
 
                    v += '/32'
 
            if isinstance(net, ipaddr.IPv6Network):
 
                # if IPv6 doesn't end with a mask, add /128
 
                if '/' not in value:
 
                    v += '/128'
 
            return v
 

	
 
        def _validate_python(self, value, state):
 
            try:
 
                addr = value.strip()
 
                # this raises an ValueError if address is not IPv4 or IPv6
 
                ipaddr.IPNetwork(address=addr)
 
            except ValueError:
 
                raise formencode.Invalid(self.message('badFormat', state),
 
                                         value, state)
 

	
 
    return _validator
 

	
 

	
 
def FieldKey():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = dict(
 
            badFormat=_('Key name can only consist of letters, '
 
                        'underscore, dash or numbers')
 
        )
 

	
 
        def _validate_python(self, value, state):
 
            if not re.match('[a-zA-Z0-9_-]+$', value):
 
                raise formencode.Invalid(self.message('badFormat', state),
 
                                         value, state)
 
    return _validator
 

	
 

	
 
def BasePath():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = dict(
 
            badPath=_('Filename cannot be inside a directory')
 
        )
 

	
 
        def _convert_to_python(self, value, state):
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            if value != os.path.basename(value):
 
                raise formencode.Invalid(self.message('badPath', state),
 
                                         value, state)
 
    return _validator
 

	
 

	
 
def ValidAuthPlugins():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = dict(
 
            import_duplicate=_('Plugins %(loaded)s and %(next_to_load)s both export the same name')
 
        )
 

	
 
        def _convert_to_python(self, value, state):
 
            # filter empty values
 
            return filter(lambda s: s not in [None, ''], value)
 

	
 
        def _validate_python(self, value, state):
 
            from kallithea.lib import auth_modules
 
            module_list = value
 
            unique_names = {}
 
            try:
 
                for module in module_list:
 
                    plugin = auth_modules.loadplugin(module)
 
                    plugin_name = plugin.name
 
                    if plugin_name in unique_names:
 
                        msg = self.message('import_duplicate', state,
 
                                loaded=unique_names[plugin_name],
 
                                next_to_load=plugin_name)
 
                        raise formencode.Invalid(msg, value, state)
 
                    unique_names[plugin_name] = plugin
 
            except (ImportError, AttributeError, TypeError) as e:
 
                raise formencode.Invalid(str(e), value, state)
 

	
 
    return _validator
0 comments (0 inline, 0 general)