Changeset - 620c13a373c5
[Not reviewed]
default
0 14 0
Mads Kiilerich - 6 years ago 2019-12-26 15:17:51
mads@kiilerich.com
Grafted from: 3d7166c2b89e
py3: trivial renaming of unicode to str
12 files changed:
0 comments (0 inline, 0 general)
kallithea/controllers/api/__init__.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.api
 
~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
JSON RPC controller
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 20, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import inspect
 
import itertools
 
import logging
 
import time
 
import traceback
 
import types
 

	
 
from tg import Response, TGController, request, response
 
from webob.exc import HTTPError, HTTPException
 

	
 
from kallithea.lib import ext_json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.lib.base import _get_ip_addr as _get_ip
 
from kallithea.lib.base import get_path_info
 
from kallithea.lib.utils2 import ascii_bytes
 
from kallithea.model.db import User
 

	
 

	
 
log = logging.getLogger('JSONRPC')
 

	
 

	
 
class JSONRPCError(BaseException):
 

	
 
    def __init__(self, message):
 
        self.message = message
 
        super(JSONRPCError, self).__init__()
 

	
 
    def __str__(self):
 
        return self.message
 

	
 

	
 
class JSONRPCErrorResponse(Response, HTTPException):
 
    """
 
    Generate a Response object with a JSON-RPC error body
 
    """
 

	
 
    def __init__(self, message=None, retid=None, code=None):
 
        HTTPException.__init__(self, message, self)
 
        Response.__init__(self,
 
                          json_body=dict(id=retid, result=None, error=message),
 
                          status=code,
 
                          content_type='application/json')
 

	
 

	
 
class JSONRPCController(TGController):
 
    """
 
     A WSGI-speaking JSON-RPC controller class
 

	
 
     See the specification:
 
     <http://json-rpc.org/wiki/specification>`.
 

	
 
     Valid controller return values should be json-serializable objects.
 

	
 
     Sub-classes should catch their exceptions and raise JSONRPCError
 
     if they want to pass meaningful errors to the client.
 

	
 
     """
 

	
 
    def _get_ip_addr(self, environ):
 
        return _get_ip(environ)
 

	
 
    def _get_method_args(self):
 
        """
 
        Return `self._rpc_args` to dispatched controller method
 
        chosen by __call__
 
        """
 
        return self._rpc_args
 

	
 
    def _dispatch(self, state, remainder=None):
 
        """
 
        Parse the request body as JSON, look up the method on the
 
        controller and if it exists, dispatch to it.
 
        """
 
        # Since we are here we should respond as JSON
 
        response.content_type = 'application/json'
 

	
 
        environ = state.request.environ
 
        start = time.time()
 
        ip_addr = self._get_ip_addr(environ)
 
        self._req_id = None
 
        if 'CONTENT_LENGTH' not in environ:
 
            log.debug("No Content-Length")
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message="No Content-Length in request")
 
        else:
 
            length = environ['CONTENT_LENGTH'] or 0
 
            length = int(environ['CONTENT_LENGTH'])
 
            log.debug('Content-Length: %s', length)
 

	
 
        if length == 0:
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message="Content-Length is 0")
 

	
 
        raw_body = environ['wsgi.input'].read(length)
 

	
 
        try:
 
            json_body = ext_json.loads(raw_body)
 
        except ValueError as e:
 
            # catch JSON errors Here
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message="JSON parse error ERR:%s RAW:%r"
 
                                                % (e, raw_body))
 

	
 
        # check AUTH based on API key
 
        try:
 
            self._req_api_key = json_body['api_key']
 
            self._req_id = json_body['id']
 
            self._req_method = json_body['method']
 
            self._request_params = json_body['args']
 
            if not isinstance(self._request_params, dict):
 
                self._request_params = {}
 

	
 
            log.debug('method: %s, params: %s',
 
                      self._req_method, self._request_params)
 
        except KeyError as e:
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message='Incorrect JSON query missing %s' % e)
 

	
 
        # check if we can find this session using api_key
 
        try:
 
            u = User.get_by_api_key(self._req_api_key)
 
            auth_user = AuthUser.make(dbuser=u, ip_addr=ip_addr)
 
            if auth_user is None:
 
                raise JSONRPCErrorResponse(retid=self._req_id,
 
                                           message='Invalid API key')
 
        except Exception as e:
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message='Invalid API key')
 

	
 
        request.authuser = auth_user
 
        request.ip_addr = ip_addr
 

	
 
        self._error = None
 
        try:
 
            self._func = self._find_method()
 
        except AttributeError as e:
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message=str(e))
 

	
 
        # now that we have a method, add self._req_params to
 
        # self.kargs and dispatch control to WGIController
 
        argspec = inspect.getfullargspec(self._func)
 
        arglist = argspec.args[1:]
 
        argtypes = [type(arg) for arg in argspec.defaults or []]
 
        default_empty = type(NotImplemented)
 

	
 
        # kw arguments required by this method
 
        func_kwargs = dict(itertools.zip_longest(reversed(arglist), reversed(argtypes),
 
                                                  fillvalue=default_empty))
 

	
 
        # This attribute will need to be first param of a method that uses
 
        # api_key, which is translated to instance of user at that name
 
        USER_SESSION_ATTR = 'apiuser'
 

	
 
        # get our arglist and check if we provided them as args
 
        for arg, default in func_kwargs.items():
 
            if arg == USER_SESSION_ATTR:
 
                # USER_SESSION_ATTR is something translated from API key and
 
                # this is checked before so we don't need validate it
 
                continue
 

	
 
            # skip the required param check if it's default value is
 
            # NotImplementedType (default_empty)
 
            if default == default_empty and arg not in self._request_params:
 
                raise JSONRPCErrorResponse(
 
                    retid=self._req_id,
 
                    message='Missing non optional `%s` arg in JSON DATA' % arg,
 
                )
 

	
 
        extra = set(self._request_params).difference(func_kwargs)
 
        if extra:
 
            raise JSONRPCErrorResponse(
 
                retid=self._req_id,
 
                message='Unknown %s arg in JSON DATA' %
 
                        ', '.join('`%s`' % arg for arg in extra),
 
            )
 

	
 
        self._rpc_args = {}
 
        self._rpc_args.update(self._request_params)
 
        self._rpc_args['action'] = self._req_method
 
        self._rpc_args['environ'] = environ
 

	
 
        log.info('IP: %s Request to %s time: %.3fs' % (
 
            self._get_ip_addr(environ),
 
            get_path_info(environ), time.time() - start)
 
        )
 

	
 
        state.set_action(self._rpc_call, [])
 
        state.set_params(self._rpc_args)
 
        return state
 

	
 
    def _rpc_call(self, action, environ, **rpc_args):
 
        """
 
        Call the specified RPC Method
 
        """
 
        raw_response = ''
 
        try:
 
            raw_response = getattr(self, action)(**rpc_args)
 
            if isinstance(raw_response, HTTPError):
 
                self._error = str(raw_response)
 
        except JSONRPCError as e:
 
            self._error = unicode(e)
 
            self._error = str(e)
 
        except Exception as e:
 
            log.error('Encountered unhandled exception: %s',
 
                      traceback.format_exc(),)
 
            json_exc = JSONRPCError('Internal server error')
 
            self._error = unicode(json_exc)
 
            self._error = str(json_exc)
 

	
 
        if self._error is not None:
 
            raw_response = None
 

	
 
        response = dict(id=self._req_id, result=raw_response, error=self._error)
 
        try:
 
            return ascii_bytes(ext_json.dumps(response))
 
        except TypeError as e:
 
            log.error('API FAILED. Error encoding response for %s %s: %s\n%s', action, rpc_args, e, traceback.format_exc())
 
            return ascii_bytes(ext_json.dumps(
 
                dict(
 
                    id=self._req_id,
 
                    result=None,
 
                    error="Error encoding response",
 
                )
 
            ))
 

	
 
    def _find_method(self):
 
        """
 
        Return method named by `self._req_method` in controller if able
 
        """
 
        log.debug('Trying to find JSON-RPC method: %s', self._req_method)
 
        if self._req_method.startswith('_'):
 
            raise AttributeError("Method not allowed")
 

	
 
        try:
 
            func = getattr(self, self._req_method, None)
 
        except UnicodeEncodeError:
 
            raise AttributeError("Problem decoding unicode in requested "
 
                                 "method name.")
 

	
 
        if isinstance(func, types.MethodType):
 
            return func
 
        else:
 
            raise AttributeError("No such method: %s" % (self._req_method,))
kallithea/lib/celerylib/__init__.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.celerylib
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
celery libs for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Nov 27, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 
import os
 
from hashlib import md5
 

	
 
from decorator import decorator
 
from tg import config
 

	
 
from kallithea import CELERY_EAGER, CELERY_ON
 
from kallithea.lib.pidlock import DaemonLock, LockHeld
 
from kallithea.lib.utils2 import safe_bytes
 
from kallithea.model import meta
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class FakeTask(object):
 
    """Fake a sync result to make it look like a finished task"""
 

	
 
    def __init__(self, result):
 
        self.result = result
 

	
 
    def failed(self):
 
        return False
 

	
 
    traceback = None # if failed
 

	
 
    task_id = None
 

	
 

	
 
def task(f_org):
 
    """Wrapper of celery.task.task, running async if CELERY_ON
 
    """
 

	
 
    if CELERY_ON:
 
        def f_async(*args, **kwargs):
 
            log.info('executing %s task', f_org.__name__)
 
            try:
 
                f_org(*args, **kwargs)
 
            finally:
 
                log.info('executed %s task', f_org.__name__)
 
        f_async.__name__ = f_org.__name__
 
        from kallithea.lib import celerypylons
 
        runner = celerypylons.task(ignore_result=True)(f_async)
 

	
 
        def f_wrapped(*args, **kwargs):
 
            t = runner.apply_async(args=args, kwargs=kwargs)
 
            log.info('executing task %s in async mode - id %s', f_org, t.task_id)
 
            return t
 
    else:
 
        def f_wrapped(*args, **kwargs):
 
            log.info('executing task %s in sync', f_org.__name__)
 
            try:
 
                result = f_org(*args, **kwargs)
 
            except Exception as e:
 
                log.error('exception executing sync task %s in sync: %r', f_org.__name__, e)
 
                raise # TODO: return this in FakeTask as with async tasks?
 
            return FakeTask(result)
 

	
 
    return f_wrapped
 

	
 

	
 
def __get_lockkey(func, *fargs, **fkwargs):
 
    params = list(fargs)
 
    params.extend(['%s-%s' % ar for ar in fkwargs.items()])
 

	
 
    func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
 

	
 
    lockkey = 'task_%s.lock' % \
 
        md5(safe_bytes(func_name + '-' + '-'.join(unicode(x) for x in params))).hexdigest()
 
        md5(safe_bytes(func_name + '-' + '-'.join(str(x) for x in params))).hexdigest()
 
    return lockkey
 

	
 

	
 
def locked_task(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        lockkey = __get_lockkey(func, *fargs, **fkwargs)
 
        lockkey_path = config.get('cache_dir') or config['app_conf']['cache_dir']  # Backward compatibility for TurboGears < 2.4
 

	
 
        log.info('running task with lockkey %s', lockkey)
 
        try:
 
            l = DaemonLock(os.path.join(lockkey_path, lockkey))
 
            ret = func(*fargs, **fkwargs)
 
            l.release()
 
            return ret
 
        except LockHeld:
 
            log.info('LockHeld')
 
            return 'Task with key %s already running' % lockkey
 

	
 
    return decorator(__wrapper, func)
 

	
 

	
 
def get_session():
 
    sa = meta.Session()
 
    return sa
 

	
 

	
 
def dbsession(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        try:
 
            ret = func(*fargs, **fkwargs)
 
            return ret
 
        finally:
 
            if CELERY_ON and not CELERY_EAGER:
 
                meta.Session.remove()
 

	
 
    return decorator(__wrapper, func)
kallithea/lib/helpers.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Helper functions
 

	
 
Consists of functions to typically be used within templates, but also
 
available to Controllers. This module is available to both as 'h'.
 
"""
 
import hashlib
 
import json
 
import logging
 
import random
 
import re
 
import textwrap
 
import urllib.parse
 

	
 
from beaker.cache import cache_region
 
from pygments import highlight as code_highlight
 
from pygments.formatters.html import HtmlFormatter
 
from tg.i18n import ugettext as _
 
from webhelpers2.html import HTML, escape, literal
 
from webhelpers2.html.tags import NotGiven, Option, Options, _input, _make_safe_id_component, checkbox, end_form
 
from webhelpers2.html.tags import form as insecure_form
 
from webhelpers2.html.tags import hidden, link_to, password, radio
 
from webhelpers2.html.tags import select as webhelpers2_select
 
from webhelpers2.html.tags import submit, text, textarea
 
from webhelpers2.number import format_byte_size
 
from webhelpers2.text import chop_at, truncate, wrap_paragraphs
 

	
 
from kallithea.config.routing import url
 
from kallithea.lib.annotate import annotate_highlight
 
#==============================================================================
 
# PERMS
 
#==============================================================================
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel, HasRepoPermissionLevel
 
from kallithea.lib.markup_renderer import url_re
 
from kallithea.lib.pygmentsutils import get_custom_lexer
 
from kallithea.lib.utils2 import MENTIONS_REGEX, AttributeDict
 
from kallithea.lib.utils2 import age as _age
 
from kallithea.lib.utils2 import credentials_filter, safe_bytes, safe_int, safe_str, str2bool, time_to_datetime
 
from kallithea.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError
 
#==============================================================================
 
# SCM FILTERS available via h.
 
#==============================================================================
 
from kallithea.lib.vcs.utils import author_email, author_name
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def canonical_url(*args, **kargs):
 
    '''Like url(x, qualified=True), but returns url that not only is qualified
 
    but also canonical, as configured in canonical_url'''
 
    from kallithea import CONFIG
 
    try:
 
        parts = CONFIG.get('canonical_url', '').split('://', 1)
 
        kargs['host'] = parts[1]
 
        kargs['protocol'] = parts[0]
 
    except IndexError:
 
        kargs['qualified'] = True
 
    return url(*args, **kargs)
 

	
 

	
 
def canonical_hostname():
 
    '''Return canonical hostname of system'''
 
    from kallithea import CONFIG
 
    try:
 
        parts = CONFIG.get('canonical_url', '').split('://', 1)
 
        return parts[1].split('/', 1)[0]
 
    except IndexError:
 
        parts = url('home', qualified=True).split('://', 1)
 
        return parts[1].split('/', 1)[0]
 

	
 

	
 
def html_escape(s):
 
    """Return string with all html escaped.
 
    This is also safe for javascript in html but not necessarily correct.
 
    """
 
    return (s
 
        .replace('&', '&amp;')
 
        .replace(">", "&gt;")
 
        .replace("<", "&lt;")
 
        .replace('"', "&quot;")
 
        .replace("'", "&apos;") # Note: this is HTML5 not HTML4 and might not work in mails
 
        )
 

	
 
def js(value):
 
    """Convert Python value to the corresponding JavaScript representation.
 

	
 
    This is necessary to safely insert arbitrary values into HTML <script>
 
    sections e.g. using Mako template expression substitution.
 

	
 
    Note: Rather than using this function, it's preferable to avoid the
 
    insertion of values into HTML <script> sections altogether. Instead,
 
    data should (to the extent possible) be passed to JavaScript using
 
    data attributes or AJAX calls, eliminating the need for JS specific
 
    escaping.
 

	
 
    Note: This is not safe for use in attributes (e.g. onclick), because
 
    quotes are not escaped.
 

	
 
    Because the rules for parsing <script> varies between XHTML (where
 
    normal rules apply for any special characters) and HTML (where
 
    entities are not interpreted, but the literal string "</script>"
 
    is forbidden), the function ensures that the result never contains
 
    '&', '<' and '>', thus making it safe in both those contexts (but
 
    not in attributes).
 
    """
 
    return literal(
 
        ('(' + json.dumps(value) + ')')
 
        # In JSON, the following can only appear in string literals.
 
        .replace('&', r'\x26')
 
        .replace('<', r'\x3c')
 
        .replace('>', r'\x3e')
 
    )
 

	
 

	
 
def jshtml(val):
 
    """HTML escapes a string value, then converts the resulting string
 
    to its corresponding JavaScript representation (see `js`).
 

	
 
    This is used when a plain-text string (possibly containing special
 
    HTML characters) will be used by a script in an HTML context (e.g.
 
    element.innerHTML or jQuery's 'html' method).
 

	
 
    If in doubt, err on the side of using `jshtml` over `js`, since it's
 
    better to escape too much than too little.
 
    """
 
    return js(escape(val))
 

	
 

	
 
def shorter(s, size=20, firstline=False, postfix='...'):
 
    """Truncate s to size, including the postfix string if truncating.
 
    If firstline, truncate at newline.
 
    """
 
    if firstline:
 
        s = s.split('\n', 1)[0].rstrip()
 
    if len(s) > size:
 
        return s[:size - len(postfix)] + postfix
 
    return s
 

	
 

	
 
def reset(name, value, id=NotGiven, **attrs):
 
    """Create a reset button, similar to webhelpers2.html.tags.submit ."""
 
    return _input("reset", name, value, id, attrs)
 

	
 

	
 
def select(name, selected_values, options, id=NotGiven, **attrs):
 
    """Convenient wrapper of webhelpers2 to let it accept options as a tuple list"""
 
    if isinstance(options, list):
 
        option_list = options
 
        # Handle old value,label lists ... where value also can be value,label lists
 
        options = Options()
 
        for x in option_list:
 
            if isinstance(x, tuple) and len(x) == 2:
 
                value, label = x
 
            elif isinstance(x, str):
 
                value = label = x
 
            else:
 
                log.error('invalid select option %r', x)
 
                raise
 
            if isinstance(value, list):
 
                og = options.add_optgroup(label)
 
                for x in value:
 
                    if isinstance(x, tuple) and len(x) == 2:
 
                        group_value, group_label = x
 
                    elif isinstance(x, str):
 
                        group_value = group_label = x
 
                    else:
 
                        log.error('invalid select option %r', x)
 
                        raise
 
                    og.add_option(group_label, group_value)
 
            else:
 
                options.add_option(label, value)
 
    return webhelpers2_select(name, selected_values, options, id=id, **attrs)
 

	
 

	
 
safeid = _make_safe_id_component
 

	
 

	
 
def FID(raw_id, path):
 
    """
 
    Creates a unique ID for filenode based on it's hash of path and revision
 
    it's safe to use in urls
 

	
 
    :param raw_id:
 
    :param path:
 
    """
 

	
 
    return 'C-%s-%s' % (short_id(raw_id), hashlib.md5(safe_bytes(path)).hexdigest()[:12])
 

	
 

	
 
class _FilesBreadCrumbs(object):
 

	
 
    def __call__(self, repo_name, rev, paths):
 
        url_l = [link_to(repo_name, url('files_home',
 
                                        repo_name=repo_name,
 
                                        revision=rev, f_path=''),
 
                         class_='ypjax-link')]
 
        paths_l = paths.split('/')
 
        for cnt, p in enumerate(paths_l):
 
            if p != '':
 
                url_l.append(link_to(p,
 
                                     url('files_home',
 
                                         repo_name=repo_name,
 
                                         revision=rev,
 
                                         f_path='/'.join(paths_l[:cnt + 1])
 
                                         ),
 
                                     class_='ypjax-link'
 
                                     )
 
                             )
 

	
 
        return literal('/'.join(url_l))
 

	
 

	
 
files_breadcrumbs = _FilesBreadCrumbs()
 

	
 

	
 
class CodeHtmlFormatter(HtmlFormatter):
 
    """
 
    My code Html Formatter for source codes
 
    """
 

	
 
    def wrap(self, source, outfile):
 
        return self._wrap_div(self._wrap_pre(self._wrap_code(source)))
 

	
 
    def _wrap_code(self, source):
 
        for cnt, it in enumerate(source):
 
            i, t = it
 
            t = '<span id="L%s">%s</span>' % (cnt + 1, t)
 
            yield i, t
 

	
 
    def _wrap_tablelinenos(self, inner):
 
        inner_lines = []
 
        lncount = 0
 
        for t, line in inner:
 
            if t:
 
                lncount += 1
 
            inner_lines.append(line)
 

	
 
        fl = self.linenostart
 
        mw = len(str(lncount + fl - 1))
 
        sp = self.linenospecial
 
        st = self.linenostep
 
        la = self.lineanchors
 
        aln = self.anchorlinenos
 
        nocls = self.noclasses
 
        if sp:
 
            lines = []
 

	
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if i % sp == 0:
 
                        if aln:
 
                            lines.append('<a href="#%s%d" class="special">%*d</a>' %
 
                                         (la, i, mw, i))
 
                        else:
 
                            lines.append('<span class="special">%*d</span>' % (mw, i))
 
                    else:
 
                        if aln:
 
                            lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                        else:
 
                            lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 
        else:
 
            lines = []
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if aln:
 
                        lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                    else:
 
                        lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 

	
 
        # in case you wonder about the seemingly redundant <div> here: since the
 
        # content in the other cell also is wrapped in a div, some browsers in
 
        # some configurations seem to mess up the formatting...
 
        if nocls:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td><div class="linenodiv">'
 
                      '<pre>' + ls + '</pre></div></td>'
 
                      '<td id="hlcode" class="code">')
 
        else:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td class="linenos"><div class="linenodiv">'
 
                      '<pre>' + ls + '</pre></div></td>'
 
                      '<td id="hlcode" class="code">')
 
        yield 0, ''.join(inner_lines)
 
        yield 0, '</td></tr></table>'
 

	
 

	
 
_whitespace_re = re.compile(r'(\t)|( )(?=\n|</div>)')
 

	
 

	
 
def _markup_whitespace(m):
 
    groups = m.groups()
 
    if groups[0]:
 
        return '<u>\t</u>'
 
    if groups[1]:
 
        return ' <i></i>'
 

	
 

	
 
def markup_whitespace(s):
 
    return _whitespace_re.sub(_markup_whitespace, s)
 

	
 

	
 
def pygmentize(filenode, **kwargs):
 
    """
 
    pygmentize function using pygments
 

	
 
    :param filenode:
 
    """
 
    lexer = get_custom_lexer(filenode.extension) or filenode.lexer
 
    return literal(markup_whitespace(
 
        code_highlight(safe_str(filenode.content), lexer, CodeHtmlFormatter(**kwargs))))
 

	
 

	
 
def hsv_to_rgb(h, s, v):
 
    if s == 0.0:
 
        return v, v, v
 
    i = int(h * 6.0)  # XXX assume int() truncates!
 
    f = (h * 6.0) - i
 
    p = v * (1.0 - s)
 
    q = v * (1.0 - s * f)
 
    t = v * (1.0 - s * (1.0 - f))
 
    i = i % 6
 
    if i == 0:
 
        return v, t, p
 
    if i == 1:
 
        return q, v, p
 
    if i == 2:
 
        return p, v, t
 
    if i == 3:
 
        return p, q, v
 
    if i == 4:
 
        return t, p, v
 
    if i == 5:
 
        return v, p, q
 

	
 

	
 
def gen_color(n=10000):
 
    """generator for getting n of evenly distributed colors using
 
    hsv color and golden ratio. It always return same order of colors
 

	
 
    :returns: RGB tuple
 
    """
 

	
 
    golden_ratio = 0.618033988749895
 
    h = 0.22717784590367374
 

	
 
    for _unused in range(n):
 
        h += golden_ratio
 
        h %= 1
 
        HSV_tuple = [h, 0.95, 0.95]
 
        RGB_tuple = hsv_to_rgb(*HSV_tuple)
 
        yield [str(int(x * 256)) for x in RGB_tuple]
 

	
 

	
 
def pygmentize_annotation(repo_name, filenode, **kwargs):
 
    """
 
    pygmentize function for annotation
 

	
 
    :param filenode:
 
    """
 
    cgenerator = gen_color()
 
    color_dict = {}
 

	
 
    def get_color_string(cs):
 
        if cs in color_dict:
 
            col = color_dict[cs]
 
        else:
 
            col = color_dict[cs] = next(cgenerator)
 
        return "color: rgb(%s)! important;" % (', '.join(col))
 

	
 
    def url_func(changeset):
 
        author = escape(changeset.author)
 
        date = changeset.date
 
        message = escape(changeset.message)
 
        tooltip_html = ("<b>Author:</b> %s<br/>"
 
                        "<b>Date:</b> %s</b><br/>"
 
                        "<b>Message:</b> %s") % (author, date, message)
 

	
 
        lnk_format = show_id(changeset)
 
        uri = link_to(
 
                lnk_format,
 
                url('changeset_home', repo_name=repo_name,
 
                    revision=changeset.raw_id),
 
                style=get_color_string(changeset.raw_id),
 
                **{'data-toggle': 'popover',
 
                   'data-content': tooltip_html}
 
              )
 

	
 
        uri += '\n'
 
        return uri
 

	
 
    return literal(markup_whitespace(annotate_highlight(filenode, url_func, **kwargs)))
 

	
 

	
 
class _Message(object):
 
    """A message returned by ``pop_flash_messages()``.
 

	
 
    Converting the message to a string returns the message text. Instances
 
    also have the following attributes:
 

	
 
    * ``category``: the category specified when the message was created.
 
    * ``message``: the html-safe message text.
 
    """
 

	
 
    def __init__(self, category, message):
 
        self.category = category
 
        self.message = message
 

	
 

	
 
def _session_flash_messages(append=None, clear=False):
 
    """Manage a message queue in tg.session: return the current message queue
 
    after appending the given message, and possibly clearing the queue."""
 
    key = 'flash'
 
    from tg import session
 
    if key in session:
 
        flash_messages = session[key]
 
    else:
 
        if append is None:  # common fast path - also used for clearing empty queue
 
            return []  # don't bother saving
 
        flash_messages = []
 
        session[key] = flash_messages
 
    if append is not None and append not in flash_messages:
 
        flash_messages.append(append)
 
    if clear:
 
        session.pop(key, None)
 
    session.save()
 
    return flash_messages
 

	
 

	
 
def flash(message, category, logf=None):
 
    """
 
    Show a message to the user _and_ log it through the specified function
 

	
 
    category: notice (default), warning, error, success
 
    logf: a custom log function - such as log.debug
 

	
 
    logf defaults to log.info, unless category equals 'success', in which
 
    case logf defaults to log.debug.
 
    """
 
    assert category in ('error', 'success', 'warning'), category
 
    if hasattr(message, '__html__'):
 
        # render to HTML for storing in cookie
 
        safe_message = unicode(message)
 
        safe_message = str(message)
 
    else:
 
        # Apply str - the message might be an exception with __str__
 
        # Escape, so we can trust the result without further escaping, without any risk of injection
 
        safe_message = html_escape(unicode(message))
 
        safe_message = html_escape(str(message))
 
    if logf is None:
 
        logf = log.info
 
        if category == 'success':
 
            logf = log.debug
 

	
 
    logf('Flash %s: %s', category, safe_message)
 

	
 
    _session_flash_messages(append=(category, safe_message))
 

	
 

	
 
def pop_flash_messages():
 
    """Return all accumulated messages and delete them from the session.
 

	
 
    The return value is a list of ``Message`` objects.
 
    """
 
    return [_Message(category, message) for category, message in _session_flash_messages(clear=True)]
 

	
 

	
 
age = lambda x, y=False: _age(x, y)
 
capitalize = lambda x: x.capitalize()
 
email = author_email
 
short_id = lambda x: x[:12]
 
hide_credentials = lambda x: ''.join(credentials_filter(x))
 

	
 

	
 
def show_id(cs):
 
    """
 
    Configurable function that shows ID
 
    by default it's r123:fffeeefffeee
 

	
 
    :param cs: changeset instance
 
    """
 
    from kallithea import CONFIG
 
    def_len = safe_int(CONFIG.get('show_sha_length', 12))
 
    show_rev = str2bool(CONFIG.get('show_revision_number', False))
 

	
 
    raw_id = cs.raw_id[:def_len]
 
    if show_rev:
 
        return 'r%s:%s' % (cs.revision, raw_id)
 
    else:
 
        return raw_id
 

	
 

	
 
def fmt_date(date):
 
    if date:
 
        return date.strftime("%Y-%m-%d %H:%M:%S")
 
    return ""
 

	
 

	
 
def is_git(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'git'
 

	
 

	
 
def is_hg(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'hg'
 

	
 

	
 
@cache_region('long_term', 'user_attr_or_none')
 
def user_attr_or_none(author, show_attr):
 
    """Try to match email part of VCS committer string with a local user and return show_attr
 
    - or return None if user not found"""
 
    email = author_email(author)
 
    if email:
 
        from kallithea.model.db import User
 
        user = User.get_by_email(email, cache=True) # cache will only use sql_cache_short
 
        if user is not None:
 
            return getattr(user, show_attr)
 
    return None
 

	
 

	
 
def email_or_none(author):
 
    """Try to match email part of VCS committer string with a local user.
 
    Return primary email of user, email part of the specified author name, or None."""
 
    if not author:
 
        return None
 
    email = user_attr_or_none(author, 'email')
 
    if email is not None:
 
        return email # always use user's main email address - not necessarily the one used to find user
 

	
 
    # extract email from the commit string
 
    email = author_email(author)
 
    if email:
 
        return email
 

	
 
    # No valid email, not a valid user in the system, none!
 
    return None
 

	
 

	
 
def person(author, show_attr="username"):
 
    """Find the user identified by 'author', return one of the users attributes,
 
    default to the username attribute, None if there is no user"""
 
    from kallithea.model.db import User
 
    # if author is already an instance use it for extraction
 
    if isinstance(author, User):
 
        return getattr(author, show_attr)
 

	
 
    value = user_attr_or_none(author, show_attr)
 
    if value is not None:
 
        return value
 

	
 
    # Still nothing?  Just pass back the author name if any, else the email
 
    return author_name(author) or email(author)
 

	
 

	
 
def person_by_id(id_, show_attr="username"):
 
    from kallithea.model.db import User
 
    # attr to return from fetched user
 
    person_getter = lambda usr: getattr(usr, show_attr)
 

	
 
    # maybe it's an ID ?
 
    if str(id_).isdigit() or isinstance(id_, int):
 
        id_ = int(id_)
 
        user = User.get(id_)
 
        if user is not None:
 
            return person_getter(user)
 
    return id_
 

	
 

	
 
def boolicon(value):
 
    """Returns boolean value of a value, represented as small html image of true/false
 
    icons
 

	
 
    :param value: value
 
    """
 

	
 
    if value:
 
        return HTML.tag('i', class_="icon-ok")
 
    else:
 
        return HTML.tag('i', class_="icon-minus-circled")
 

	
 

	
 
def action_parser(user_log, feed=False, parse_cs=False):
 
    """
 
    This helper will action_map the specified string action into translated
 
    fancy names with icons and links
 

	
 
    :param user_log: user log instance
 
    :param feed: use output for feeds (no html and fancy icons)
 
    :param parse_cs: parse Changesets into VCS instances
 
    """
 

	
 
    action = user_log.action
 
    action_params = ' '
 

	
 
    x = action.split(':')
 

	
 
    if len(x) > 1:
 
        action, action_params = x
 

	
 
    def get_cs_links():
 
        revs_limit = 3  # display this amount always
 
        revs_top_limit = 50  # show upto this amount of changesets hidden
 
        revs_ids = action_params.split(',')
 
        deleted = user_log.repository is None
 
        if deleted:
 
            return ','.join(revs_ids)
 

	
 
        repo_name = user_log.repository.repo_name
 

	
 
        def lnk(rev, repo_name):
 
            lazy_cs = False
 
            title_ = None
 
            url_ = '#'
 
            if isinstance(rev, BaseChangeset) or isinstance(rev, AttributeDict):
 
                if rev.op and rev.ref_name:
 
                    if rev.op == 'delete_branch':
 
                        lbl = _('Deleted branch: %s') % rev.ref_name
 
                    elif rev.op == 'tag':
 
                        lbl = _('Created tag: %s') % rev.ref_name
 
                    else:
 
                        lbl = 'Unknown operation %s' % rev.op
 
                else:
 
                    lazy_cs = True
 
                    lbl = rev.short_id[:8]
 
                    url_ = url('changeset_home', repo_name=repo_name,
 
                               revision=rev.raw_id)
 
            else:
 
                # changeset cannot be found - it might have been stripped or removed
 
                lbl = rev[:12]
 
                title_ = _('Changeset %s not found') % lbl
 
            if parse_cs:
 
                return link_to(lbl, url_, title=title_, **{'data-toggle': 'tooltip'})
 
            return link_to(lbl, url_, class_='lazy-cs' if lazy_cs else '',
 
                           **{'data-raw_id': rev.raw_id, 'data-repo_name': repo_name})
 

	
 
        def _get_op(rev_txt):
 
            _op = None
 
            _name = rev_txt
 
            if len(rev_txt.split('=>')) == 2:
 
                _op, _name = rev_txt.split('=>')
 
            return _op, _name
 

	
 
        revs = []
 
        if len([v for v in revs_ids if v != '']) > 0:
 
            repo = None
 
            for rev in revs_ids[:revs_top_limit]:
 
                _op, _name = _get_op(rev)
 

	
 
                # we want parsed changesets, or new log store format is bad
 
                if parse_cs:
 
                    try:
 
                        if repo is None:
 
                            repo = user_log.repository.scm_instance
 
                        _rev = repo.get_changeset(rev)
 
                        revs.append(_rev)
 
                    except ChangesetDoesNotExistError:
 
                        log.error('cannot find revision %s in this repo', rev)
 
                        revs.append(rev)
 
                else:
 
                    _rev = AttributeDict({
 
                        'short_id': rev[:12],
 
                        'raw_id': rev,
 
                        'message': '',
 
                        'op': _op,
 
                        'ref_name': _name
 
                    })
 
                    revs.append(_rev)
 
        cs_links = [" " + ', '.join(
 
            [lnk(rev, repo_name) for rev in revs[:revs_limit]]
 
        )]
 
        _op1, _name1 = _get_op(revs_ids[0])
 
        _op2, _name2 = _get_op(revs_ids[-1])
 

	
 
        _rev = '%s...%s' % (_name1, _name2)
 

	
 
        compare_view = (
 
            ' <div class="compare_view" data-toggle="tooltip" title="%s">'
 
            '<a href="%s">%s</a> </div>' % (
 
                _('Show all combined changesets %s->%s') % (
 
                    revs_ids[0][:12], revs_ids[-1][:12]
 
                ),
 
                url('changeset_home', repo_name=repo_name,
 
                    revision=_rev
 
                ),
 
                _('Compare view')
 
            )
 
        )
 

	
 
        # if we have exactly one more than normally displayed
 
        # just display it, takes less space than displaying
 
        # "and 1 more revisions"
 
        if len(revs_ids) == revs_limit + 1:
 
            cs_links.append(", " + lnk(revs[revs_limit], repo_name))
 

	
 
        # hidden-by-default ones
 
        if len(revs_ids) > revs_limit + 1:
 
            uniq_id = revs_ids[0]
 
            html_tmpl = (
 
                '<span> %s <a class="show_more" id="_%s" '
 
                'href="#more">%s</a> %s</span>'
 
            )
 
            if not feed:
 
                cs_links.append(html_tmpl % (
 
                      _('and'),
 
                      uniq_id, _('%s more') % (len(revs_ids) - revs_limit),
 
                      _('revisions')
 
                    )
 
                )
 

	
 
            if not feed:
 
                html_tmpl = '<span id="%s" style="display:none">, %s </span>'
 
            else:
 
                html_tmpl = '<span id="%s"> %s </span>'
 

	
 
            morelinks = ', '.join(
 
              [lnk(rev, repo_name) for rev in revs[revs_limit:]]
 
            )
 

	
 
            if len(revs_ids) > revs_top_limit:
 
                morelinks += ', ...'
 

	
 
            cs_links.append(html_tmpl % (uniq_id, morelinks))
 
        if len(revs) > 1:
 
            cs_links.append(compare_view)
 
        return ''.join(cs_links)
 

	
 
    def get_fork_name():
 
        repo_name = action_params
 
        url_ = url('summary_home', repo_name=repo_name)
 
        return _('Fork name %s') % link_to(action_params, url_)
 

	
 
    def get_user_name():
 
        user_name = action_params
 
        return user_name
 

	
 
    def get_users_group():
 
        group_name = action_params
 
        return group_name
 

	
 
    def get_pull_request():
 
        from kallithea.model.db import PullRequest
 
        pull_request_id = action_params
 
        nice_id = PullRequest.make_nice_id(pull_request_id)
 

	
 
        deleted = user_log.repository is None
 
        if deleted:
 
            repo_name = user_log.repository_name
 
        else:
 
            repo_name = user_log.repository.repo_name
 

	
 
        return link_to(_('Pull request %s') % nice_id,
 
                    url('pullrequest_show', repo_name=repo_name,
 
                    pull_request_id=pull_request_id))
 

	
 
    def get_archive_name():
 
        archive_name = action_params
 
        return archive_name
 

	
 
    # action : translated str, callback(extractor), icon
 
    action_map = {
 
        'user_deleted_repo':           (_('[deleted] repository'),
 
                                        None, 'icon-trashcan'),
 
        'user_created_repo':           (_('[created] repository'),
 
                                        None, 'icon-plus'),
 
        'user_created_fork':           (_('[created] repository as fork'),
 
                                        None, 'icon-fork'),
 
        'user_forked_repo':            (_('[forked] repository'),
 
                                        get_fork_name, 'icon-fork'),
 
        'user_updated_repo':           (_('[updated] repository'),
 
                                        None, 'icon-pencil'),
 
        'user_downloaded_archive':      (_('[downloaded] archive from repository'),
 
                                        get_archive_name, 'icon-download-cloud'),
 
        'admin_deleted_repo':          (_('[delete] repository'),
 
                                        None, 'icon-trashcan'),
 
        'admin_created_repo':          (_('[created] repository'),
 
                                        None, 'icon-plus'),
 
        'admin_forked_repo':           (_('[forked] repository'),
 
                                        None, 'icon-fork'),
 
        'admin_updated_repo':          (_('[updated] repository'),
 
                                        None, 'icon-pencil'),
 
        'admin_created_user':          (_('[created] user'),
 
                                        get_user_name, 'icon-user'),
 
        'admin_updated_user':          (_('[updated] user'),
 
                                        get_user_name, 'icon-user'),
 
        'admin_created_users_group':   (_('[created] user group'),
 
                                        get_users_group, 'icon-pencil'),
 
        'admin_updated_users_group':   (_('[updated] user group'),
 
                                        get_users_group, 'icon-pencil'),
 
        'user_commented_revision':     (_('[commented] on revision in repository'),
 
                                        get_cs_links, 'icon-comment'),
 
        'user_commented_pull_request': (_('[commented] on pull request for'),
 
                                        get_pull_request, 'icon-comment'),
 
        'user_closed_pull_request':    (_('[closed] pull request for'),
 
                                        get_pull_request, 'icon-ok'),
 
        'push':                        (_('[pushed] into'),
 
                                        get_cs_links, 'icon-move-up'),
 
        'push_local':                  (_('[committed via Kallithea] into repository'),
 
                                        get_cs_links, 'icon-pencil'),
 
        'push_remote':                 (_('[pulled from remote] into repository'),
 
                                        get_cs_links, 'icon-move-up'),
 
        'pull':                        (_('[pulled] from'),
 
                                        None, 'icon-move-down'),
 
        'started_following_repo':      (_('[started following] repository'),
 
                                        None, 'icon-heart'),
 
        'stopped_following_repo':      (_('[stopped following] repository'),
 
                                        None, 'icon-heart-empty'),
 
    }
 

	
 
    action_str = action_map.get(action, action)
 
    if feed:
 
        action = action_str[0].replace('[', '').replace(']', '')
 
    else:
 
        action = action_str[0] \
 
            .replace('[', '<b>') \
 
            .replace(']', '</b>')
 

	
 
    action_params_func = lambda: ""
 

	
 
    if callable(action_str[1]):
 
        action_params_func = action_str[1]
 

	
 
    def action_parser_icon():
 
        action = user_log.action
 
        action_params = None
 
        x = action.split(':')
 

	
 
        if len(x) > 1:
 
            action, action_params = x
 

	
 
        ico = action_map.get(action, ['', '', ''])[2]
 
        html = """<i class="%s"></i>""" % ico
 
        return literal(html)
 

	
 
    # returned callbacks we need to call to get
 
    return [lambda: literal(action), action_params_func, action_parser_icon]
 

	
 

	
 
#==============================================================================
 
# GRAVATAR URL
 
#==============================================================================
 
def gravatar_div(email_address, cls='', size=30, **div_attributes):
 
    """Return an html literal with a span around a gravatar if they are enabled.
 
    Extra keyword parameters starting with 'div_' will get the prefix removed
 
    and '_' changed to '-' and be used as attributes on the div. The default
 
    class is 'gravatar'.
 
    """
 
    from tg import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ''
 
    if 'div_class' not in div_attributes:
 
        div_attributes['div_class'] = "gravatar"
 
    attributes = []
 
    for k, v in sorted(div_attributes.items()):
 
        assert k.startswith('div_'), k
 
        attributes.append(' %s="%s"' % (k[4:].replace('_', '-'), escape(v)))
 
    return literal("""<span%s>%s</span>""" %
 
                   (''.join(attributes),
 
                    gravatar(email_address, cls=cls, size=size)))
 

	
 

	
 
def gravatar(email_address, cls='', size=30):
 
    """return html element of the gravatar
 

	
 
    This method will return an <img> with the resolution double the size (for
 
    retina screens) of the image. If the url returned from gravatar_url is
 
    empty then we fallback to using an icon.
 

	
 
    """
 
    from tg import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ''
 

	
 
    src = gravatar_url(email_address, size * 2)
 

	
 
    if src:
 
        # here it makes sense to use style="width: ..." (instead of, say, a
 
        # stylesheet) because we using this to generate a high-res (retina) size
 
        html = ('<i class="icon-gravatar {cls}"'
 
                ' style="font-size: {size}px;background-size: {size}px;background-image: url(\'{src}\')"'
 
                '></i>').format(cls=cls, size=size, src=src)
 

	
 
    else:
 
        # if src is empty then there was no gravatar, so we use a font icon
 
        html = ("""<i class="icon-user {cls}" style="font-size: {size}px;"></i>"""
 
            .format(cls=cls, size=size, src=src))
 

	
 
    return literal(html)
 

	
 

	
 
def gravatar_url(email_address, size=30, default=''):
 
    # doh, we need to re-import those to mock it later
 
    from kallithea.config.routing import url
 
    from kallithea.model.db import User
 
    from tg import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ""
 

	
 
    _def = 'anonymous@kallithea-scm.org'  # default gravatar
 
    email_address = email_address or _def
 

	
 
    if email_address == _def:
 
        return default
 

	
 
    parsed_url = urllib.parse.urlparse(url.current(qualified=True))
 
    url = (c.visual.gravatar_url or User.DEFAULT_GRAVATAR_URL) \
 
               .replace('{email}', email_address) \
 
               .replace('{md5email}', hashlib.md5(safe_bytes(email_address).lower()).hexdigest()) \
 
               .replace('{netloc}', parsed_url.netloc) \
 
               .replace('{scheme}', parsed_url.scheme) \
 
               .replace('{size}', str(size))
 
    return url
 

	
 

	
 
def changed_tooltip(nodes):
 
    """
 
    Generates a html string for changed nodes in changeset page.
 
    It limits the output to 30 entries
 

	
 
    :param nodes: LazyNodesGenerator
 
    """
 
    if nodes:
 
        pref = ': <br/> '
 
        suf = ''
 
        if len(nodes) > 30:
 
            suf = '<br/>' + _(' and %s more') % (len(nodes) - 30)
 
        return literal(pref + '<br/> '.join([x.path
 
                                             for x in nodes[:30]]) + suf)
 
    else:
 
        return ': ' + _('No files')
 

	
 

	
 
def fancy_file_stats(stats):
 
    """
 
    Displays a fancy two colored bar for number of added/deleted
 
    lines of code on file
 

	
 
    :param stats: two element list of added/deleted lines of code
 
    """
 
    from kallithea.lib.diffs import NEW_FILENODE, DEL_FILENODE, \
 
        MOD_FILENODE, RENAMED_FILENODE, CHMOD_FILENODE, BIN_FILENODE
 

	
 
    a, d = stats['added'], stats['deleted']
 
    width = 100
 

	
 
    if stats['binary']:
 
        # binary mode
 
        lbl = ''
 
        bin_op = 1
 

	
 
        if BIN_FILENODE in stats['ops']:
 
            lbl = 'bin+'
 

	
 
        if NEW_FILENODE in stats['ops']:
 
            lbl += _('new file')
 
            bin_op = NEW_FILENODE
 
        elif MOD_FILENODE in stats['ops']:
 
            lbl += _('mod')
 
            bin_op = MOD_FILENODE
 
        elif DEL_FILENODE in stats['ops']:
 
            lbl += _('del')
 
            bin_op = DEL_FILENODE
 
        elif RENAMED_FILENODE in stats['ops']:
 
            lbl += _('rename')
 
            bin_op = RENAMED_FILENODE
 

	
 
        # chmod can go with other operations
 
        if CHMOD_FILENODE in stats['ops']:
 
            _org_lbl = _('chmod')
 
            lbl += _org_lbl if lbl.endswith('+') else '+%s' % _org_lbl
 

	
 
        #import ipdb;ipdb.set_trace()
 
        b_d = '<div class="bin bin%s progress-bar" style="width:100%%">%s</div>' % (bin_op, lbl)
 
        b_a = '<div class="bin bin1" style="width:0%"></div>'
 
        return literal('<div style="width:%spx" class="progress">%s%s</div>' % (width, b_a, b_d))
 

	
 
    t = stats['added'] + stats['deleted']
 
    unit = float(width) / (t or 1)
 

	
 
    # needs > 9% of width to be visible or 0 to be hidden
 
    a_p = max(9, unit * a) if a > 0 else 0
 
    d_p = max(9, unit * d) if d > 0 else 0
 
    p_sum = a_p + d_p
 

	
 
    if p_sum > width:
 
        # adjust the percentage to be == 100% since we adjusted to 9
 
        if a_p > d_p:
 
            a_p = a_p - (p_sum - width)
 
        else:
 
            d_p = d_p - (p_sum - width)
 

	
 
    a_v = a if a > 0 else ''
 
    d_v = d if d > 0 else ''
 

	
 
    d_a = '<div class="added progress-bar" style="width:%s%%">%s</div>' % (
 
        a_p, a_v
 
    )
 
    d_d = '<div class="deleted progress-bar" style="width:%s%%">%s</div>' % (
 
        d_p, d_v
 
    )
 
    return literal('<div class="progress" style="width:%spx">%s%s</div>' % (width, d_a, d_d))
 

	
 

	
 
_URLIFY_RE = re.compile(r'''
 
# URL markup
 
(?P<url>%s) |
 
# @mention markup
 
(?P<mention>%s) |
 
# Changeset hash markup
 
(?<!\w|[-_])
 
  (?P<hash>[0-9a-f]{12,40})
 
(?!\w|[-_]) |
 
# Markup of *bold text*
 
(?:
 
  (?:^|(?<=\s))
 
  (?P<bold> [*] (?!\s) [^*\n]* (?<!\s) [*] )
 
  (?![*\w])
 
) |
 
# "Stylize" markup
 
\[see\ \=&gt;\ *(?P<seen>[a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\] |
 
\[license\ \=&gt;\ *(?P<license>[a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\] |
 
\[(?P<tagtype>requires|recommends|conflicts|base)\ \=&gt;\ *(?P<tagvalue>[a-zA-Z0-9\-\/]*)\] |
 
\[(?:lang|language)\ \=&gt;\ *(?P<lang>[a-zA-Z\-\/\#\+]*)\] |
 
\[(?P<tag>[a-z]+)\]
 
''' % (url_re.pattern, MENTIONS_REGEX.pattern),
 
    re.VERBOSE | re.MULTILINE | re.IGNORECASE)
 

	
 

	
 
def urlify_text(s, repo_name=None, link_=None, truncate=None, stylize=False, truncatef=truncate):
 
    """
 
    Parses given text message and make literal html with markup.
 
    The text will be truncated to the specified length.
 
    Hashes are turned into changeset links to specified repository.
 
    URLs links to what they say.
 
    Issues are linked to given issue-server.
 
    If link_ is provided, all text not already linking somewhere will link there.
 
    """
 

	
 
    def _replace(match_obj):
 
        url = match_obj.group('url')
 
        if url is not None:
 
            return '<a href="%(url)s">%(url)s</a>' % {'url': url}
 
        mention = match_obj.group('mention')
 
        if mention is not None:
 
            return '<b>%s</b>' % mention
 
        hash_ = match_obj.group('hash')
 
        if hash_ is not None and repo_name is not None:
 
            from kallithea.config.routing import url  # doh, we need to re-import url to mock it later
 
            return '<a class="changeset_hash" href="%(url)s">%(hash)s</a>' % {
 
                 'url': url('changeset_home', repo_name=repo_name, revision=hash_),
 
                 'hash': hash_,
 
                }
 
        bold = match_obj.group('bold')
 
        if bold is not None:
 
            return '<b>*%s*</b>' % _urlify(bold[1:-1])
 
        if stylize:
 
            seen = match_obj.group('seen')
 
            if seen:
 
                return '<div class="label label-meta" data-tag="see">see =&gt; %s</div>' % seen
 
            license = match_obj.group('license')
 
            if license:
 
                return '<div class="label label-meta" data-tag="license"><a href="http://www.opensource.org/licenses/%s">%s</a></div>' % (license, license)
 
            tagtype = match_obj.group('tagtype')
 
            if tagtype:
 
                tagvalue = match_obj.group('tagvalue')
 
                return '<div class="label label-meta" data-tag="%s">%s =&gt; <a href="/%s">%s</a></div>' % (tagtype, tagtype, tagvalue, tagvalue)
 
            lang = match_obj.group('lang')
 
            if lang:
 
                return '<div class="label label-meta" data-tag="lang">%s</div>' % lang
 
            tag = match_obj.group('tag')
 
            if tag:
 
                return '<div class="label label-meta" data-tag="%s">%s</div>' % (tag, tag)
 
        return match_obj.group(0)
 

	
 
    def _urlify(s):
 
        """
 
        Extract urls from text and make html links out of them
 
        """
 
        return _URLIFY_RE.sub(_replace, s)
 

	
 
    if truncate is None:
 
        s = s.rstrip()
 
    else:
 
        s = truncatef(s, truncate, whole_word=True)
 
    s = html_escape(s)
 
    s = _urlify(s)
 
    if repo_name is not None:
 
        s = urlify_issues(s, repo_name)
 
    if link_ is not None:
 
        # make href around everything that isn't a href already
 
        s = linkify_others(s, link_)
 
    s = s.replace('\r\n', '<br/>').replace('\n', '<br/>')
 
    # Turn HTML5 into more valid HTML4 as required by some mail readers.
 
    # (This is not done in one step in html_escape, because character codes like
 
    # &#123; risk to be seen as an issue reference due to the presence of '#'.)
 
    s = s.replace("&apos;", "&#39;")
 
    return literal(s)
 

	
 

	
 
def linkify_others(t, l):
 
    """Add a default link to html with links.
 
    HTML doesn't allow nesting of links, so the outer link must be broken up
 
    in pieces and give space for other links.
 
    """
 
    urls = re.compile(r'(\<a.*?\<\/a\>)',)
 
    links = []
 
    for e in urls.split(t):
 
        if e.strip() and not urls.match(e):
 
            links.append('<a class="message-link" href="%s">%s</a>' % (l, e))
 
        else:
 
            links.append(e)
 

	
 
    return ''.join(links)
 

	
 

	
 
# Global variable that will hold the actual urlify_issues function body.
 
# Will be set on first use when the global configuration has been read.
 
_urlify_issues_f = None
 

	
 

	
 
def urlify_issues(newtext, repo_name):
 
    """Urlify issue references according to .ini configuration"""
 
    global _urlify_issues_f
 
    if _urlify_issues_f is None:
 
        from kallithea import CONFIG
 
        from kallithea.model.db import URL_SEP
 
        assert CONFIG['sqlalchemy.url'] # make sure config has been loaded
 

	
 
        # Build chain of urlify functions, starting with not doing any transformation
 
        tmp_urlify_issues_f = lambda s: s
 

	
 
        issue_pat_re = re.compile(r'issue_pat(.*)')
 
        for k in CONFIG:
 
            # Find all issue_pat* settings that also have corresponding server_link and prefix configuration
 
            m = issue_pat_re.match(k)
 
            if m is None:
 
                continue
 
            suffix = m.group(1)
 
            issue_pat = CONFIG.get(k)
 
            issue_server_link = CONFIG.get('issue_server_link%s' % suffix)
 
            issue_sub = CONFIG.get('issue_sub%s' % suffix)
 
            if not issue_pat or not issue_server_link or issue_sub is None: # issue_sub can be empty but should be present
 
                log.error('skipping incomplete issue pattern %r: %r -> %r %r', suffix, issue_pat, issue_server_link, issue_sub)
 
                continue
 

	
 
            # Wrap tmp_urlify_issues_f with substitution of this pattern, while making sure all loop variables (and compiled regexpes) are bound
 
            try:
 
                issue_re = re.compile(issue_pat)
 
            except re.error as e:
 
                log.error('skipping invalid issue pattern %r: %r -> %r %r. Error: %s', suffix, issue_pat, issue_server_link, issue_sub, str(e))
 
                continue
 

	
 
            log.debug('issue pattern %r: %r -> %r %r', suffix, issue_pat, issue_server_link, issue_sub)
 

	
 
            def issues_replace(match_obj,
 
                               issue_server_link=issue_server_link, issue_sub=issue_sub):
 
                try:
 
                    issue_url = match_obj.expand(issue_server_link)
 
                except (IndexError, re.error) as e:
 
                    log.error('invalid issue_url setting %r -> %r %r. Error: %s', issue_pat, issue_server_link, issue_sub, str(e))
 
                    issue_url = issue_server_link
 
                issue_url = issue_url.replace('{repo}', repo_name)
 
                issue_url = issue_url.replace('{repo_name}', repo_name.split(URL_SEP)[-1])
 
                # if issue_sub is empty use the matched issue reference verbatim
 
                if not issue_sub:
 
                    issue_text = match_obj.group()
 
                else:
 
                    try:
 
                        issue_text = match_obj.expand(issue_sub)
 
                    except (IndexError, re.error) as e:
 
                        log.error('invalid issue_sub setting %r -> %r %r. Error: %s', issue_pat, issue_server_link, issue_sub, str(e))
 
                        issue_text = match_obj.group()
 

	
 
                return (
 
                    '<a class="issue-tracker-link" href="%(url)s">'
 
                    '%(text)s'
 
                    '</a>'
 
                    ) % {
 
                     'url': issue_url,
 
                     'text': issue_text,
 
                    }
 
            tmp_urlify_issues_f = (lambda s,
 
                                          issue_re=issue_re, issues_replace=issues_replace, chain_f=tmp_urlify_issues_f:
 
                                   issue_re.sub(issues_replace, chain_f(s)))
 

	
 
        # Set tmp function globally - atomically
 
        _urlify_issues_f = tmp_urlify_issues_f
 

	
 
    return _urlify_issues_f(newtext)
 

	
 

	
 
def render_w_mentions(source, repo_name=None):
 
    """
 
    Render plain text with revision hashes and issue references urlified
 
    and with @mention highlighting.
 
    """
 
    s = safe_str(source)
 
    s = urlify_text(s, repo_name=repo_name)
 
    return literal('<div class="formatted-fixed">%s</div>' % s)
 

	
 

	
 
def short_ref(ref_type, ref_name):
 
    if ref_type == 'rev':
 
        return short_id(ref_name)
 
    return ref_name
 

	
 

	
kallithea/lib/recaptcha.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
import json
 
import urllib.parse
 
import urllib.request
 

	
 

	
 
class RecaptchaResponse(object):
 
    def __init__(self, is_valid, error_code=None):
 
        self.is_valid = is_valid
 
        self.error_code = error_code
 

	
 
    def __repr__(self):
 
        return '<RecaptchaResponse:%s>' % (self.is_valid)
 

	
 

	
 
def submit(g_recaptcha_response, private_key, remoteip):
 
    """
 
    Submits a reCAPTCHA request for verification. Returns RecaptchaResponse for the request
 

	
 
    g_recaptcha_response -- The value of g_recaptcha_response from the form
 
    private_key -- your reCAPTCHA private key
 
    remoteip -- the user's IP address
 
    """
 

	
 
    if not (g_recaptcha_response and len(g_recaptcha_response)):
 
        return RecaptchaResponse(is_valid=False, error_code='incorrect-captcha-sol')
 

	
 
    def encode_if_necessary(s):
 
        if isinstance(s, unicode):
 
        if isinstance(s, str):
 
            return s.encode('utf-8')
 
        return s
 

	
 
    params = urllib.parse.urlencode({
 
        'secret': encode_if_necessary(private_key),
 
        'remoteip': encode_if_necessary(remoteip),
 
        'response': encode_if_necessary(g_recaptcha_response),
 
    }).encode('ascii')
 

	
 
    req = urllib.request.Request(
 
        url="https://www.google.com/recaptcha/api/siteverify",
 
        data=params,
 
        headers={
 
            "Content-type": "application/x-www-form-urlencoded",
 
            "User-agent": "reCAPTCHA Python"
 
        }
 
    )
 

	
 
    httpresp = urllib.request.urlopen(req)
 
    return_values = json.loads(httpresp.read())
 
    httpresp.close()
 

	
 
    if not (isinstance(return_values, dict)):
 
        return RecaptchaResponse(is_valid=False, error_code='incorrect-captcha-sol')
 
    elif (("success" in return_values) and ((return_values["success"] is True) or (return_values["success"] == "true"))):
 
        return RecaptchaResponse(is_valid=True)
 
    elif (("error-codes" in return_values) and isinstance(return_values["error-codes"], list) and (len(return_values["error-codes"]) > 0)):
 
        return RecaptchaResponse(is_valid=False, error_code=return_values["error-codes"][0])
 
    else:
 
        return RecaptchaResponse(is_valid=False, error_code='incorrect-captcha-sol')
kallithea/lib/utils2.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.utils2
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
Some simple helper functions.
 
Note: all these functions should be independent of Kallithea classes, i.e.
 
models, controllers, etc.  to prevent import cycles.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jan 5, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from __future__ import print_function
 

	
 
import binascii
 
import datetime
 
import json
 
import os
 
import pwd
 
import re
 
import time
 
import urllib.parse
 

	
 
import urlobject
 
from tg.i18n import ugettext as _
 
from tg.i18n import ungettext
 
from webhelpers2.text import collapse, remove_formatting, strip_tags
 

	
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, safe_bytes, safe_str  # re-export
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 

	
 

	
 
def str2bool(_str):
 
    """
 
    returns True/False value from given string, it tries to translate the
 
    string into boolean
 

	
 
    :param _str: string value to translate into boolean
 
    :rtype: boolean
 
    :returns: boolean from given string
 
    """
 
    if _str is None:
 
        return False
 
    if _str in (True, False):
 
        return _str
 
    _str = str(_str).strip().lower()
 
    return _str in ('t', 'true', 'y', 'yes', 'on', '1')
 

	
 

	
 
def aslist(obj, sep=None, strip=True):
 
    """
 
    Returns given string separated by sep as list
 

	
 
    :param obj:
 
    :param sep:
 
    :param strip:
 
    """
 
    if isinstance(obj, (str)):
 
        lst = obj.split(sep)
 
        if strip:
 
            lst = [v.strip() for v in lst]
 
        return lst
 
    elif isinstance(obj, (list, tuple)):
 
        return obj
 
    elif obj is None:
 
        return []
 
    else:
 
        return [obj]
 

	
 

	
 
def convert_line_endings(line, mode):
 
    """
 
    Converts a given line  "line end" according to given mode
 

	
 
    Available modes are::
 
        0 - Unix
 
        1 - Mac
 
        2 - DOS
 

	
 
    :param line: given line to convert
 
    :param mode: mode to convert to
 
    :rtype: str
 
    :return: converted line according to mode
 
    """
 
    if mode == 0:
 
        line = line.replace('\r\n', '\n')
 
        line = line.replace('\r', '\n')
 
    elif mode == 1:
 
        line = line.replace('\r\n', '\r')
 
        line = line.replace('\n', '\r')
 
    elif mode == 2:
 
        line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
 
    return line
 

	
 

	
 
def detect_mode(line, default):
 
    """
 
    Detects line break for given line, if line break couldn't be found
 
    given default value is returned
 

	
 
    :param line: str line
 
    :param default: default
 
    :rtype: int
 
    :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
 
    """
 
    if line.endswith('\r\n'):
 
        return 2
 
    elif line.endswith('\n'):
 
        return 0
 
    elif line.endswith('\r'):
 
        return 1
 
    else:
 
        return default
 

	
 

	
 
def generate_api_key():
 
    """
 
    Generates a random (presumably unique) API key.
 

	
 
    This value is used in URLs and "Bearer" HTTP Authorization headers,
 
    which in practice means it should only contain URL-safe characters
 
    (RFC 3986):
 

	
 
        unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~"
 
    """
 
    # Hexadecimal certainly qualifies as URL-safe.
 
    return ascii_str(binascii.hexlify(os.urandom(20)))
 

	
 

	
 
def safe_int(val, default=None):
 
    """
 
    Returns int() of val if val is not convertable to int use default
 
    instead
 

	
 
    :param val:
 
    :param default:
 
    """
 
    try:
 
        val = int(val)
 
    except (ValueError, TypeError):
 
        val = default
 
    return val
 

	
 

	
 
def remove_suffix(s, suffix):
 
    if s.endswith(suffix):
 
        s = s[:-1 * len(suffix)]
 
    return s
 

	
 

	
 
def remove_prefix(s, prefix):
 
    if s.startswith(prefix):
 
        s = s[len(prefix):]
 
    return s
 

	
 

	
 
def age(prevdate, show_short_version=False, now=None):
 
    """
 
    turns a datetime into an age string.
 
    If show_short_version is True, then it will generate a not so accurate but shorter string,
 
    example: 2days ago, instead of 2 days and 23 hours ago.
 

	
 
    :param prevdate: datetime object
 
    :param show_short_version: if it should approximate the date and return a shorter string
 
    :rtype: unicode
 
    :returns: unicode words describing age
 
    :rtype: str
 
    :returns: str words describing age
 
    """
 
    now = now or datetime.datetime.now()
 
    order = ['year', 'month', 'day', 'hour', 'minute', 'second']
 
    deltas = {}
 
    future = False
 

	
 
    if prevdate > now:
 
        now, prevdate = prevdate, now
 
        future = True
 
    if future:
 
        prevdate = prevdate.replace(microsecond=0)
 
    # Get date parts deltas
 
    from dateutil import relativedelta
 
    for part in order:
 
        d = relativedelta.relativedelta(now, prevdate)
 
        deltas[part] = getattr(d, part + 's')
 

	
 
    # Fix negative offsets (there is 1 second between 10:59:59 and 11:00:00,
 
    # not 1 hour, -59 minutes and -59 seconds)
 
    for num, length in [(5, 60), (4, 60), (3, 24)]:  # seconds, minutes, hours
 
        part = order[num]
 
        carry_part = order[num - 1]
 

	
 
        if deltas[part] < 0:
 
            deltas[part] += length
 
            deltas[carry_part] -= 1
 

	
 
    # Same thing for days except that the increment depends on the (variable)
 
    # number of days in the month
 
    month_lengths = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
 
    if deltas['day'] < 0:
 
        if prevdate.month == 2 and (prevdate.year % 4 == 0 and
 
            (prevdate.year % 100 != 0 or prevdate.year % 400 == 0)
 
        ):
 
            deltas['day'] += 29
 
        else:
 
            deltas['day'] += month_lengths[prevdate.month - 1]
 

	
 
        deltas['month'] -= 1
 

	
 
    if deltas['month'] < 0:
 
        deltas['month'] += 12
 
        deltas['year'] -= 1
 

	
 
    # In short version, we want nicer handling of ages of more than a year
 
    if show_short_version:
 
        if deltas['year'] == 1:
 
            # ages between 1 and 2 years: show as months
 
            deltas['month'] += 12
 
            deltas['year'] = 0
 
        if deltas['year'] >= 2:
 
            # ages 2+ years: round
 
            if deltas['month'] > 6:
 
                deltas['year'] += 1
 
                deltas['month'] = 0
 

	
 
    # Format the result
 
    fmt_funcs = {
 
        'year': lambda d: ungettext(u'%d year', '%d years', d) % d,
 
        'month': lambda d: ungettext(u'%d month', '%d months', d) % d,
 
        'day': lambda d: ungettext(u'%d day', '%d days', d) % d,
 
        'hour': lambda d: ungettext(u'%d hour', '%d hours', d) % d,
 
        'minute': lambda d: ungettext(u'%d minute', '%d minutes', d) % d,
 
        'second': lambda d: ungettext(u'%d second', '%d seconds', d) % d,
 
    }
 

	
 
    for i, part in enumerate(order):
 
        value = deltas[part]
 
        if value == 0:
 
            continue
 

	
 
        if i < 5:
 
            sub_part = order[i + 1]
 
            sub_value = deltas[sub_part]
 
        else:
 
            sub_value = 0
 

	
 
        if sub_value == 0 or show_short_version:
 
            if future:
 
                return _('in %s') % fmt_funcs[part](value)
 
            else:
 
                return _('%s ago') % fmt_funcs[part](value)
 
        if future:
 
            return _('in %s and %s') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 
        else:
 
            return _('%s and %s ago') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 

	
 
    return _('just now')
 

	
 

	
 
def uri_filter(uri):
 
    """
 
    Removes user:password from given url string
 

	
 
    :param uri:
 
    :rtype: unicode
 
    :rtype: str
 
    :returns: filtered list of strings
 
    """
 
    if not uri:
 
        return []
 

	
 
    proto = ''
 

	
 
    for pat in ('https://', 'http://', 'git://'):
 
        if uri.startswith(pat):
 
            uri = uri[len(pat):]
 
            proto = pat
 
            break
 

	
 
    # remove passwords and username
 
    uri = uri[uri.find('@') + 1:]
 

	
 
    # get the port
 
    cred_pos = uri.find(':')
 
    if cred_pos == -1:
 
        host, port = uri, None
 
    else:
 
        host, port = uri[:cred_pos], uri[cred_pos + 1:]
 

	
 
    return [_f for _f in [proto, host, port] if _f]
 

	
 

	
 
def credentials_filter(uri):
 
    """
 
    Returns a url with removed credentials
 

	
 
    :param uri:
 
    """
 

	
 
    uri = uri_filter(uri)
 
    # check if we have port
 
    if len(uri) > 2 and uri[2]:
 
        uri[2] = ':' + uri[2]
 

	
 
    return ''.join(uri)
 

	
 

	
 
def get_clone_url(clone_uri_tmpl, prefix_url, repo_name, repo_id, username=None):
 
    parsed_url = urlobject.URLObject(prefix_url)
 
    prefix = urllib.parse.unquote(parsed_url.path.rstrip('/'))
 
    try:
 
        system_user = pwd.getpwuid(os.getuid()).pw_name
 
    except Exception: # TODO: support all systems - especially Windows
 
        system_user = 'kallithea' # hardcoded default value ...
 
    args = {
 
        'scheme': parsed_url.scheme,
 
        'user': urllib.parse.quote(username or ''),
 
        'netloc': parsed_url.netloc + prefix,  # like "hostname:port/prefix" (with optional ":port" and "/prefix")
 
        'prefix': prefix, # undocumented, empty or starting with /
 
        'repo': repo_name,
 
        'repoid': str(repo_id),
 
        'system_user': system_user,
 
        'hostname': parsed_url.hostname,
 
    }
 
    url = re.sub('{([^{}]+)}', lambda m: args.get(m.group(1), m.group(0)), clone_uri_tmpl)
 

	
 
    # remove leading @ sign if it's present. Case of empty user
 
    url_obj = urlobject.URLObject(url)
 
    if not url_obj.username:
 
        url_obj = url_obj.with_username(None)
 

	
 
    return str(url_obj)
 

	
 

	
 
def get_changeset_safe(repo, rev):
 
    """
 
    Safe version of get_changeset if this changeset doesn't exists for a
 
    repo it returns a Dummy one instead
 

	
 
    :param repo:
 
    :param rev:
 
    """
 
    from kallithea.lib.vcs.backends.base import BaseRepository
 
    from kallithea.lib.vcs.exceptions import RepositoryError
 
    from kallithea.lib.vcs.backends.base import EmptyChangeset
 
    if not isinstance(repo, BaseRepository):
 
        raise Exception('You must pass an Repository '
 
                        'object as first argument got %s' % type(repo))
 

	
 
    try:
 
        cs = repo.get_changeset(rev)
 
    except (RepositoryError, LookupError):
 
        cs = EmptyChangeset(requested_revision=rev)
 
    return cs
 

	
 

	
 
def datetime_to_time(dt):
 
    if dt:
 
        return time.mktime(dt.timetuple())
 

	
 

	
 
def time_to_datetime(tm):
 
    if tm:
 
        if isinstance(tm, str):
 
            try:
 
                tm = float(tm)
 
            except ValueError:
 
                return
 
        return datetime.datetime.fromtimestamp(tm)
 

	
 

	
 
# Must match regexp in kallithea/public/js/base.js MentionsAutoComplete()
 
# Check char before @ - it must not look like we are in an email addresses.
 
# Matching is greedy so we don't have to look beyond the end.
 
MENTIONS_REGEX = re.compile(r'(?:^|(?<=[^a-zA-Z0-9]))@([a-zA-Z0-9][-_.a-zA-Z0-9]*[a-zA-Z0-9])')
 

	
 

	
 
def extract_mentioned_usernames(text):
 
    r"""
 
    Returns list of (possible) usernames @mentioned in given text.
 

	
 
    >>> extract_mentioned_usernames('@1-2.a_X,@1234 not@not @ddd@not @n @ee @ff @gg, @gg;@hh @n\n@zz,')
 
    ['1-2.a_X', '1234', 'ddd', 'ee', 'ff', 'gg', 'gg', 'hh', 'zz']
 
    """
 
    return MENTIONS_REGEX.findall(text)
 

	
 

	
 
def extract_mentioned_users(text):
 
    """ Returns set of actual database Users @mentioned in given text. """
 
    from kallithea.model.db import User
 
    result = set()
 
    for name in extract_mentioned_usernames(text):
 
        user = User.get_by_username(name, case_insensitive=True)
 
        if user is not None and not user.is_default_user:
 
            result.add(user)
 
    return result
 

	
 

	
 
class AttributeDict(dict):
 
    def __getattr__(self, attr):
 
        return self.get(attr, None)
 
    __setattr__ = dict.__setitem__
 
    __delattr__ = dict.__delitem__
 

	
 

	
 
def obfuscate_url_pw(engine):
 
    from sqlalchemy.engine import url as sa_url
 
    from sqlalchemy.exc import ArgumentError
 
    try:
 
        _url = sa_url.make_url(engine or '')
 
    except ArgumentError:
 
        return engine
 
    if _url.password:
 
        _url.password = 'XXXXX'
 
    return str(_url)
 

	
 

	
 
class HookEnvironmentError(Exception): pass
 

	
 

	
 
def get_hook_environment():
 
    """
 
    Get hook context by deserializing the global KALLITHEA_EXTRAS environment
 
    variable.
 

	
 
    Called early in Git out-of-process hooks to get .ini config path so the
 
    basic environment can be configured properly. Also used in all hooks to get
 
    information about the action that triggered it.
 
    """
 

	
 
    try:
 
        kallithea_extras = os.environ['KALLITHEA_EXTRAS']
 
    except KeyError:
 
        raise HookEnvironmentError("Environment variable KALLITHEA_EXTRAS not found")
 

	
 
    extras = json.loads(kallithea_extras)
 
    for k in ['username', 'repository', 'scm', 'action', 'ip', 'config']:
 
        try:
 
            extras[k]
 
        except KeyError:
 
            raise HookEnvironmentError('Missing key %s in KALLITHEA_EXTRAS %s' % (k, extras))
 

	
 
    return AttributeDict(extras)
 

	
 

	
 
def set_hook_environment(username, ip_addr, repo_name, repo_alias, action=None):
 
    """Prepare global context for running hooks by serializing data in the
 
    global KALLITHEA_EXTRAS environment variable.
 

	
 
    Most importantly, this allow Git hooks to do proper logging and updating of
 
    caches after pushes.
 

	
 
    Must always be called before anything with hooks are invoked.
 
    """
 
    from kallithea import CONFIG
 
    extras = {
 
        'ip': ip_addr, # used in log_push/pull_action action_logger
 
        'username': username,
 
        'action': action or 'push_local', # used in log_push_action_raw_ids action_logger
 
        'repository': repo_name,
 
        'scm': repo_alias, # used to pick hack in log_push_action_raw_ids
 
        'config': CONFIG['__file__'], # used by git hook to read config
 
    }
 
    os.environ['KALLITHEA_EXTRAS'] = json.dumps(extras)
 

	
 

	
 
def get_current_authuser():
 
    """
 
    Gets kallithea user from threadlocal tmpl_context variable if it's
 
    defined, else returns None.
 
    """
 
    from tg import tmpl_context
 
    try:
 
        return getattr(tmpl_context, 'authuser', None)
 
    except TypeError:  # No object (name: context) has been registered for this thread
 
        return None
 

	
 

	
 
class OptionalAttr(object):
 
    """
 
    Special Optional Option that defines other attribute. Example::
 

	
 
        def test(apiuser, userid=Optional(OAttr('apiuser')):
 
            user = Optional.extract(userid)
 
            # calls
 

	
 
    """
 

	
 
    def __init__(self, attr_name):
 
        self.attr_name = attr_name
 

	
 
    def __repr__(self):
 
        return '<OptionalAttr:%s>' % self.attr_name
 

	
 
    def __call__(self):
 
        return self
 

	
 

	
 
# alias
 
OAttr = OptionalAttr
 

	
 

	
 
class Optional(object):
 
    """
 
    Defines an optional parameter::
 

	
 
        param = param.getval() if isinstance(param, Optional) else param
 
        param = param() if isinstance(param, Optional) else param
 

	
 
    is equivalent of::
 

	
 
        param = Optional.extract(param)
 

	
 
    """
 

	
 
    def __init__(self, type_):
 
        self.type_ = type_
 

	
 
    def __repr__(self):
 
        return '<Optional:%s>' % self.type_.__repr__()
 

	
 
    def __call__(self):
 
        return self.getval()
 

	
 
    def getval(self):
 
        """
 
        returns value from this Optional instance
 
        """
 
        if isinstance(self.type_, OAttr):
 
            # use params name
 
            return self.type_.attr_name
 
        return self.type_
 

	
 
    @classmethod
 
    def extract(cls, val):
 
        """
 
        Extracts value from Optional() instance
 

	
 
        :param val:
 
        :return: original value if it's not Optional instance else
 
            value of instance
 
        """
 
        if isinstance(val, cls):
 
            return val.getval()
 
        return val
 

	
 

	
 
def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub):
 
    return _cleanstringsub('_', s).rstrip('_')
 

	
 

	
 
def recursive_replace(str_, replace=' '):
 
    """
 
    Recursive replace of given sign to just one instance
 

	
 
    :param str_: given string
 
    :param replace: char to find and replace multiple instances
 

	
 
    Examples::
 
    >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
 
    'Mighty-Mighty-Bo-sstones'
 
    """
 

	
 
    if str_.find(replace * 2) == -1:
 
        return str_
 
    else:
 
        str_ = str_.replace(replace * 2, replace)
 
        return recursive_replace(str_, replace)
 

	
 

	
 
def repo_name_slug(value):
 
    """
 
    Return slug of name of repository
 
    This function is called on each creation/modification
 
    of repository to prevent bad names in repo
 
    """
 

	
 
    slug = remove_formatting(value)
 
    slug = strip_tags(slug)
 

	
 
    for c in r"""`?=[]\;'"<>,/~!@#$%^&*()+{}|: """:
 
        slug = slug.replace(c, '-')
 
    slug = recursive_replace(slug, '-')
 
    slug = collapse(slug, '-')
 
    return slug
 

	
 

	
 
def ask_ok(prompt, retries=4, complaint='Yes or no please!'):
 
    while True:
 
        ok = input(prompt)
 
        if ok in ('y', 'ye', 'yes'):
 
            return True
 
        if ok in ('n', 'no', 'nop', 'nope'):
 
            return False
 
        retries = retries - 1
 
        if retries < 0:
 
            raise IOError
 
        print(complaint)
kallithea/lib/vcs/backends/base.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    vcs.backends.base
 
    ~~~~~~~~~~~~~~~~~
 

	
 
    Base for all available scm backends
 

	
 
    :created_on: Apr 8, 2010
 
    :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
 
"""
 

	
 
import datetime
 
import itertools
 

	
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import (
 
    ChangesetError, EmptyRepositoryError, NodeAlreadyAddedError, NodeAlreadyChangedError, NodeAlreadyExistsError, NodeAlreadyRemovedError, NodeDoesNotExistError, NodeNotChangedError, RepositoryError)
 
from kallithea.lib.vcs.utils import author_email, author_name
 
from kallithea.lib.vcs.utils.helpers import get_dict_for_attrs
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 

	
 

	
 
class BaseRepository(object):
 
    """
 
    Base Repository for final backends
 

	
 
    **Attributes**
 

	
 
        ``DEFAULT_BRANCH_NAME``
 
            name of default branch (i.e. "trunk" for svn, "master" for git etc.
 

	
 
        ``scm``
 
            alias of scm, i.e. *git* or *hg*
 

	
 
        ``repo``
 
            object from external api
 

	
 
        ``revisions``
 
            list of all available revisions' ids, in ascending order
 

	
 
        ``changesets``
 
            storage dict caching returned changesets
 

	
 
        ``path``
 
            absolute path to the repository
 

	
 
        ``branches``
 
            branches as list of changesets
 

	
 
        ``tags``
 
            tags as list of changesets
 
    """
 
    scm = None
 
    DEFAULT_BRANCH_NAME = None
 
    EMPTY_CHANGESET = '0' * 40
 

	
 
    def __init__(self, repo_path, create=False, **kwargs):
 
        """
 
        Initializes repository. Raises RepositoryError if repository could
 
        not be find at the given ``repo_path`` or directory at ``repo_path``
 
        exists and ``create`` is set to True.
 

	
 
        :param repo_path: local path of the repository
 
        :param create=False: if set to True, would try to create repository.
 
        :param src_url=None: if set, should be proper url from which repository
 
          would be cloned; requires ``create`` parameter to be set to True -
 
          raises RepositoryError if src_url is set and create evaluates to
 
          False
 
        """
 
        raise NotImplementedError
 

	
 
    def __str__(self):
 
        return '<%s at %s>' % (self.__class__.__name__, self.path)
 

	
 
    def __repr__(self):
 
        return self.__str__()
 

	
 
    def __len__(self):
 
        return self.count()
 

	
 
    def __eq__(self, other):
 
        same_instance = isinstance(other, self.__class__)
 
        return same_instance and getattr(other, 'path', None) == self.path
 

	
 
    def __ne__(self, other):
 
        return not self.__eq__(other)
 

	
 
    @LazyProperty
 
    def alias(self):
 
        for k, v in settings.BACKENDS.items():
 
            if v.split('.')[-1] == str(self.__class__.__name__):
 
                return k
 

	
 
    @LazyProperty
 
    def name(self):
 
        """
 
        Return repository name (without group name)
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def owner(self):
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def description(self):
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def size(self):
 
        """
 
        Returns combined size in bytes for all repository files
 
        """
 

	
 
        size = 0
 
        try:
 
            tip = self.get_changeset()
 
            for topnode, dirs, files in tip.walk('/'):
 
                for f in files:
 
                    size += tip.get_file_size(f.path)
 

	
 
        except RepositoryError as e:
 
            pass
 
        return size
 

	
 
    def is_valid(self):
 
        """
 
        Validates repository.
 
        """
 
        raise NotImplementedError
 

	
 
    def is_empty(self):
 
        return self._empty
 

	
 
    #==========================================================================
 
    # CHANGESETS
 
    #==========================================================================
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns instance of ``Changeset`` class. If ``revision`` is None, most
 
        recent changeset is returned.
 

	
 
        :raises ``EmptyRepositoryError``: if there are no revisions
 
        """
 
        raise NotImplementedError
 

	
 
    def __iter__(self):
 
        """
 
        Allows Repository objects to be iterated.
 

	
 
        *Requires* implementation of ``__getitem__`` method.
 
        """
 
        for revision in self.revisions:
 
            yield self.get_changeset(revision)
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
 
                       end_date=None, branch_name=None, reverse=False, max_revisions=None):
 
        """
 
        Returns iterator of ``BaseChangeset`` objects from start to end,
 
        both inclusive.
 

	
 
        :param start: None or str
 
        :param end: None or str
 
        :param start_date:
 
        :param end_date:
 
        :param branch_name:
 
        :param reversed:
 
        """
 
        raise NotImplementedError
 

	
 
    def __getitem__(self, key):
 
        if isinstance(key, slice):
 
            return (self.get_changeset(rev) for rev in self.revisions[key])
 
        return self.get_changeset(key)
 

	
 
    def count(self):
 
        return len(self.revisions)
 

	
 
    def tag(self, name, user, revision=None, message=None, date=None, **opts):
 
        """
 
        Creates and returns a tag for the given ``revision``.
 

	
 
        :param name: name for new tag
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param revision: changeset id for which new tag would be created
 
        :param message: message of the tag's commit
 
        :param date: date of tag's commit
 

	
 
        :raises TagAlreadyExistError: if tag with same name already exists
 
        """
 
        raise NotImplementedError
 

	
 
    def remove_tag(self, name, user, message=None, date=None):
 
        """
 
        Removes tag with the given ``name``.
 

	
 
        :param name: name of the tag to be removed
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param message: message of the tag's removal commit
 
        :param date: date of tag's removal commit
 

	
 
        :raises TagDoesNotExistError: if tag with given name does not exists
 
        """
 
        raise NotImplementedError
 

	
 
    def get_diff(self, rev1, rev2, path=None, ignore_whitespace=False,
 
            context=3):
 
        """
 
        Returns (git like) *diff*, as plain text. Shows changes introduced by
 
        ``rev2`` since ``rev1``.
 

	
 
        :param rev1: Entry point from which diff is shown. Can be
 
          ``self.EMPTY_CHANGESET`` - in this case, patch showing all
 
          the changes since empty state of the repository until ``rev2``
 
        :param rev2: Until which revision changes should be shown.
 
        :param ignore_whitespace: If set to ``True``, would not show whitespace
 
          changes. Defaults to ``False``.
 
        :param context: How many lines before/after changed lines should be
 
          shown. Defaults to ``3``.
 
        """
 
        raise NotImplementedError
 

	
 
    # ========== #
 
    # COMMIT API #
 
    # ========== #
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        """
 
        Returns ``InMemoryChangeset`` object for this repository.
 
        """
 
        raise NotImplementedError
 

	
 
    def add(self, filenode, **kwargs):
 
        """
 
        Commit api function that will add given ``FileNode`` into this
 
        repository.
 

	
 
        :raises ``NodeAlreadyExistsError``: if there is a file with same path
 
          already in repository
 
        :raises ``NodeAlreadyAddedError``: if given node is already marked as
 
          *added*
 
        """
 
        raise NotImplementedError
 

	
 
    def remove(self, filenode, **kwargs):
 
        """
 
        Commit api function that will remove given ``FileNode`` into this
 
        repository.
 

	
 
        :raises ``EmptyRepositoryError``: if there are no changesets yet
 
        :raises ``NodeDoesNotExistError``: if there is no file with given path
 
        """
 
        raise NotImplementedError
 

	
 
    def commit(self, message, **kwargs):
 
        """
 
        Persists current changes made on this repository and returns newly
 
        created changeset.
 

	
 
        :raises ``NothingChangedError``: if no changes has been made
 
        """
 
        raise NotImplementedError
 

	
 
    def get_state(self):
 
        """
 
        Returns dictionary with ``added``, ``changed`` and ``removed`` lists
 
        containing ``FileNode`` objects.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_config_value(self, section, name, config_file=None):
 
        """
 
        Returns configuration value for a given [``section``] and ``name``.
 

	
 
        :param section: Section we want to retrieve value from
 
        :param name: Name of configuration we want to retrieve
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        raise NotImplementedError
 

	
 
    def get_user_name(self, config_file=None):
 
        """
 
        Returns user's name from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        raise NotImplementedError
 

	
 
    def get_user_email(self, config_file=None):
 
        """
 
        Returns user's email from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        raise NotImplementedError
 

	
 
    # =========== #
 
    # WORKDIR API #
 
    # =========== #
 

	
 
    @LazyProperty
 
    def workdir(self):
 
        """
 
        Returns ``Workdir`` instance for this repository.
 
        """
 
        raise NotImplementedError
 

	
 

	
 
class BaseChangeset(object):
 
    """
 
    Each backend should implement it's changeset representation.
 

	
 
    **Attributes**
 

	
 
        ``repository``
 
            repository object within which changeset exists
 

	
 
        ``raw_id``
 
            raw changeset representation (i.e. full 40 length sha for git
 
            backend)
 

	
 
        ``short_id``
 
            shortened (if apply) version of ``raw_id``; it would be simple
 
            shortcut for ``raw_id[:12]`` for git/mercurial backends or same
 
            as ``raw_id`` for subversion
 

	
 
        ``revision``
 
            revision number as integer
 

	
 
        ``files``
 
            list of ``FileNode`` (``Node`` with NodeKind.FILE) objects
 

	
 
        ``dirs``
 
            list of ``DirNode`` (``Node`` with NodeKind.DIR) objects
 

	
 
        ``nodes``
 
            combined list of ``Node`` objects
 

	
 
        ``author``
 
            author of the changeset, as unicode
 
            author of the changeset, as str
 

	
 
        ``message``
 
            message of the changeset, as unicode
 
            message of the changeset, as str
 

	
 
        ``parents``
 
            list of parent changesets
 

	
 
        ``last``
 
            ``True`` if this is last changeset in repository, ``False``
 
            otherwise; trying to access this attribute while there is no
 
            changesets would raise ``EmptyRepositoryError``
 
    """
 
    def __str__(self):
 
        return '<%s at %s:%s>' % (self.__class__.__name__, self.revision,
 
            self.short_id)
 

	
 
    def __repr__(self):
 
        return self.__str__()
 

	
 
    def __eq__(self, other):
 
        if type(self) is not type(other):
 
            return False
 
        return self.raw_id == other.raw_id
 

	
 
    def __json__(self, with_file_list=False):
 
        if with_file_list:
 
            return dict(
 
                short_id=self.short_id,
 
                raw_id=self.raw_id,
 
                revision=self.revision,
 
                message=self.message,
 
                date=self.date,
 
                author=self.author,
 
                added=[el.path for el in self.added],
 
                changed=[el.path for el in self.changed],
 
                removed=[el.path for el in self.removed],
 
            )
 
        else:
 
            return dict(
 
                short_id=self.short_id,
 
                raw_id=self.raw_id,
 
                revision=self.revision,
 
                message=self.message,
 
                date=self.date,
 
                author=self.author,
 
            )
 

	
 
    @LazyProperty
 
    def last(self):
 
        if self.repository is None:
 
            raise ChangesetError("Cannot check if it's most recent revision")
 
        return self.raw_id == self.repository.revisions[-1]
 

	
 
    @LazyProperty
 
    def parents(self):
 
        """
 
        Returns list of parents changesets.
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def children(self):
 
        """
 
        Returns list of children changesets.
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def raw_id(self):
 
        """
 
        Returns raw string identifying this changeset.
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def short_id(self):
 
        """
 
        Returns shortened version of ``raw_id`` attribute, as string,
 
        identifying this changeset, useful for web representation.
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def revision(self):
 
        """
 
        Returns integer identifying this changeset.
 

	
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def committer(self):
 
        """
 
        Returns Committer for given commit
 
        """
 

	
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def committer_name(self):
 
        """
 
        Returns Author name for given commit
 
        """
 

	
 
        return author_name(self.committer)
 

	
 
    @LazyProperty
 
    def committer_email(self):
 
        """
 
        Returns Author email address for given commit
 
        """
 

	
 
        return author_email(self.committer)
 

	
 
    @LazyProperty
 
    def author(self):
 
        """
 
        Returns Author for given commit
 
        """
 

	
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def author_name(self):
 
        """
 
        Returns Author name for given commit
 
        """
 

	
 
        return author_name(self.author)
 

	
 
    @LazyProperty
 
    def author_email(self):
 
        """
 
        Returns Author email address for given commit
 
        """
 

	
 
        return author_email(self.author)
 

	
 
    def get_file_mode(self, path):
 
        """
 
        Returns stat mode of the file at the given ``path``.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_file_content(self, path):
 
        """
 
        Returns content of the file at the given ``path``.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_file_size(self, path):
 
        """
 
        Returns size of the file at the given ``path``.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_file_changeset(self, path):
 
        """
 
        Returns last commit of the file at the given ``path``.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_file_history(self, path):
 
        """
 
        Returns history of file as reversed list of ``Changeset`` objects for
 
        which file at given ``path`` has been modified.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_nodes(self, path):
 
        """
 
        Returns combined ``DirNode`` and ``FileNode`` objects list representing
 
        state of changeset at the given ``path``.
 

	
 
        :raises ``ChangesetError``: if node at the given ``path`` is not
 
          instance of ``DirNode``
 
        """
 
        raise NotImplementedError
 

	
 
    def get_node(self, path):
 
        """
 
        Returns ``Node`` object from the given ``path``.
 

	
 
        :raises ``NodeDoesNotExistError``: if there is no node at the given
 
          ``path``
 
        """
 
        raise NotImplementedError
 

	
 
    def fill_archive(self, stream=None, kind='tgz', prefix=None):
 
        """
 
        Fills up given stream.
 

	
 
        :param stream: file like object.
 
        :param kind: one of following: ``zip``, ``tar``, ``tgz``
 
            or ``tbz2``. Default: ``tgz``.
 
        :param prefix: name of root directory in archive.
 
            Default is repository name and changeset's raw_id joined with dash.
 

	
 
            repo-tip.<kind>
 
        """
 

	
 
        raise NotImplementedError
 

	
 
    def get_chunked_archive(self, **kwargs):
 
        """
 
        Returns iterable archive. Tiny wrapper around ``fill_archive`` method.
 

	
 
        :param chunk_size: extra parameter which controls size of returned
 
            chunks. Default:8k.
 
        """
 

	
 
        chunk_size = kwargs.pop('chunk_size', 8192)
 
        stream = kwargs.get('stream')
 
        self.fill_archive(**kwargs)
 
        while True:
 
            data = stream.read(chunk_size)
 
            if not data:
 
                break
 
            yield data
 

	
 
    @LazyProperty
 
    def root(self):
 
        """
 
        Returns ``RootNode`` object for this changeset.
 
        """
 
        return self.get_node('')
 

	
 
    def next(self, branch=None):
 
        """
 
        Returns next changeset from current, if branch is gives it will return
 
        next changeset belonging to this branch
 

	
 
        :param branch: show changesets within the given named branch
 
        """
 
        raise NotImplementedError
 

	
 
    def prev(self, branch=None):
 
        """
 
        Returns previous changeset from current, if branch is gives it will
 
        return previous changeset belonging to this branch
 

	
 
        :param branch: show changesets within the given named branch
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def added(self):
 
        """
 
        Returns list of added ``FileNode`` objects.
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def changed(self):
 
        """
 
        Returns list of modified ``FileNode`` objects.
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def removed(self):
 
        """
 
        Returns list of removed ``FileNode`` objects.
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def size(self):
 
        """
 
        Returns total number of bytes from contents of all filenodes.
 
        """
 
        return sum((node.size for node in self.get_filenodes_generator()))
 

	
 
    def walk(self, topurl=''):
 
        """
 
        Similar to os.walk method. Instead of filesystem it walks through
 
        changeset starting at given ``topurl``.  Returns generator of tuples
 
        (topnode, dirnodes, filenodes).
 
        """
 
        topnode = self.get_node(topurl)
 
        yield (topnode, topnode.dirs, topnode.files)
 
        for dirnode in topnode.dirs:
 
            for tup in self.walk(dirnode.path):
 
                yield tup
 

	
 
    def get_filenodes_generator(self):
 
        """
 
        Returns generator that yields *all* file nodes.
 
        """
 
        for topnode, dirs, files in self.walk():
 
            for node in files:
 
                yield node
 

	
 
    def as_dict(self):
 
        """
 
        Returns dictionary with changeset's attributes and their values.
 
        """
 
        data = get_dict_for_attrs(self, ['raw_id', 'short_id',
 
            'revision', 'date', 'message'])
 
        data['author'] = {'name': self.author_name, 'email': self.author_email}
 
        data['added'] = [node.path for node in self.added]
 
        data['changed'] = [node.path for node in self.changed]
 
        data['removed'] = [node.path for node in self.removed]
 
        return data
 

	
 
    @LazyProperty
 
    def closesbranch(self):
 
        return False
 

	
 
    @LazyProperty
 
    def obsolete(self):
 
        return False
 

	
 
    @LazyProperty
 
    def bumped(self):
 
        return False
 

	
 
    @LazyProperty
 
    def divergent(self):
 
        return False
 

	
 
    @LazyProperty
 
    def extinct(self):
 
        return False
 

	
 
    @LazyProperty
 
    def unstable(self):
 
        return False
 

	
 
    @LazyProperty
 
    def phase(self):
 
        return ''
 

	
 

	
 
class BaseWorkdir(object):
 
    """
 
    Working directory representation of single repository.
 

	
 
    :attribute: repository: repository object of working directory
 
    """
 

	
 
    def __init__(self, repository):
 
        self.repository = repository
 

	
 
    def get_branch(self):
 
        """
 
        Returns name of current branch.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_changeset(self):
 
        """
 
        Returns current changeset.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_added(self):
 
        """
 
        Returns list of ``FileNode`` objects marked as *new* in working
 
        directory.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_changed(self):
 
        """
 
        Returns list of ``FileNode`` objects *changed* in working directory.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_removed(self):
 
        """
 
        Returns list of ``RemovedFileNode`` objects marked as *removed* in
 
        working directory.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_untracked(self):
 
        """
 
        Returns list of ``FileNode`` objects which are present within working
 
        directory however are not tracked by repository.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_status(self):
 
        """
 
        Returns dict with ``added``, ``changed``, ``removed`` and ``untracked``
 
        lists.
 
        """
 
        raise NotImplementedError
 

	
 
    def commit(self, message, **kwargs):
 
        """
 
        Commits local (from working directory) changes and returns newly
 
        created
 
        ``Changeset``. Updates repository's ``revisions`` list.
 

	
 
        :raises ``CommitError``: if any error occurs while committing
 
        """
 
        raise NotImplementedError
 

	
 
    def update(self, revision=None):
 
        """
 
        Fetches content of the given revision and populates it within working
 
        directory.
 
        """
 
        raise NotImplementedError
 

	
 
    def checkout_branch(self, branch=None):
 
        """
 
        Checks out ``branch`` or the backend's default branch.
 

	
 
        Raises ``BranchDoesNotExistError`` if the branch does not exist.
 
        """
 
        raise NotImplementedError
 

	
 

	
 
class BaseInMemoryChangeset(object):
 
    """
 
    Represents differences between repository's state (most recent head) and
 
    changes made *in place*.
 

	
 
    **Attributes**
 

	
 
        ``repository``
 
            repository object for this in-memory-changeset
 

	
 
        ``added``
 
            list of ``FileNode`` objects marked as *added*
 

	
 
        ``changed``
 
            list of ``FileNode`` objects marked as *changed*
 

	
 
        ``removed``
 
            list of ``FileNode`` or ``RemovedFileNode`` objects marked to be
 
            *removed*
 

	
 
        ``parents``
 
            list of ``Changeset`` representing parents of in-memory changeset.
 
            Should always be 2-element sequence.
 

	
 
    """
 

	
 
    def __init__(self, repository):
 
        self.repository = repository
 
        self.added = []
 
        self.changed = []
 
        self.removed = []
 
        self.parents = []
 

	
 
    def add(self, *filenodes):
 
        """
 
        Marks given ``FileNode`` objects as *to be committed*.
 

	
 
        :raises ``NodeAlreadyExistsError``: if node with same path exists at
 
          latest changeset
 
        :raises ``NodeAlreadyAddedError``: if node with same path is already
 
          marked as *added*
 
        """
 
        # Check if not already marked as *added* first
 
        for node in filenodes:
 
            if node.path in (n.path for n in self.added):
 
                raise NodeAlreadyAddedError("Such FileNode %s is already "
 
                    "marked for addition" % node.path)
 
        for node in filenodes:
 
            self.added.append(node)
 

	
 
    def change(self, *filenodes):
 
        """
 
        Marks given ``FileNode`` objects to be *changed* in next commit.
 

	
 
        :raises ``EmptyRepositoryError``: if there are no changesets yet
 
        :raises ``NodeAlreadyExistsError``: if node with same path is already
 
          marked to be *changed*
 
        :raises ``NodeAlreadyRemovedError``: if node with same path is already
 
          marked to be *removed*
 
        :raises ``NodeDoesNotExistError``: if node doesn't exist in latest
 
          changeset
 
        :raises ``NodeNotChangedError``: if node hasn't really be changed
 
        """
 
        for node in filenodes:
 
            if node.path in (n.path for n in self.removed):
 
                raise NodeAlreadyRemovedError("Node at %s is already marked "
 
                    "as removed" % node.path)
 
        try:
 
            self.repository.get_changeset()
 
        except EmptyRepositoryError:
 
            raise EmptyRepositoryError("Nothing to change - try to *add* new "
 
                "nodes rather than changing them")
 
        for node in filenodes:
 
            if node.path in (n.path for n in self.changed):
 
                raise NodeAlreadyChangedError("Node at '%s' is already "
 
                    "marked as changed" % node.path)
 
            self.changed.append(node)
 

	
 
    def remove(self, *filenodes):
 
        """
 
        Marks given ``FileNode`` (or ``RemovedFileNode``) objects to be
 
        *removed* in next commit.
 

	
 
        :raises ``NodeAlreadyRemovedError``: if node has been already marked to
 
          be *removed*
 
        :raises ``NodeAlreadyChangedError``: if node has been already marked to
 
          be *changed*
 
        """
 
        for node in filenodes:
 
            if node.path in (n.path for n in self.removed):
 
                raise NodeAlreadyRemovedError("Node is already marked to "
 
                    "for removal at %s" % node.path)
 
            if node.path in (n.path for n in self.changed):
 
                raise NodeAlreadyChangedError("Node is already marked to "
 
                    "be changed at %s" % node.path)
 
            # We only mark node as *removed* - real removal is done by
 
            # commit method
 
            self.removed.append(node)
 

	
 
    def reset(self):
 
        """
 
        Resets this instance to initial state (cleans ``added``, ``changed``
 
        and ``removed`` lists).
 
        """
 
        self.added = []
 
        self.changed = []
 
        self.removed = []
 
        self.parents = []
 

	
 
    def get_ipaths(self):
 
        """
 
        Returns generator of paths from nodes marked as added, changed or
 
        removed.
 
        """
 
        for node in itertools.chain(self.added, self.changed, self.removed):
 
            yield node.path
 

	
 
    def get_paths(self):
 
        """
 
        Returns list of paths from nodes marked as added, changed or removed.
 
        """
 
        return list(self.get_ipaths())
 

	
 
    def check_integrity(self, parents=None):
 
        """
 
        Checks in-memory changeset's integrity. Also, sets parents if not
 
        already set.
 

	
 
        :raises CommitError: if any error occurs (i.e.
 
          ``NodeDoesNotExistError``).
 
        """
 
        if not self.parents:
 
            parents = parents or []
 
            if len(parents) == 0:
 
                try:
 
                    parents = [self.repository.get_changeset(), None]
 
                except EmptyRepositoryError:
 
                    parents = [None, None]
 
            elif len(parents) == 1:
 
                parents += [None]
 
            self.parents = parents
 

	
 
        # Local parents, only if not None
 
        parents = [p for p in self.parents if p]
 

	
 
        # Check nodes marked as added
 
        for p in parents:
 
            for node in self.added:
 
                try:
 
                    p.get_node(node.path)
 
                except NodeDoesNotExistError:
 
                    pass
 
                else:
 
                    raise NodeAlreadyExistsError("Node at %s already exists "
 
                        "at %s" % (node.path, p))
 

	
 
        # Check nodes marked as changed
 
        missing = set(node.path for node in self.changed)
 
        not_changed = set(node.path for node in self.changed)
 
        if self.changed and not parents:
 
            raise NodeDoesNotExistError(self.changed[0].path)
 
        for p in parents:
 
            for node in self.changed:
 
                try:
 
                    old = p.get_node(node.path)
 
                    missing.remove(node.path)
 
                    # if content actually changed, remove node from unchanged
 
                    if old.content != node.content:
 
                        not_changed.remove(node.path)
 
                except NodeDoesNotExistError:
 
                    pass
 
        if self.changed and missing:
 
            raise NodeDoesNotExistError("Node at %s is missing "
 
                "(parents: %s)" % (node.path, parents))
 

	
 
        if self.changed and not_changed:
 
            raise NodeNotChangedError("Node at %s wasn't actually changed "
 
                "since parents' changesets: %s" % (not_changed.pop(),
 
                    parents)
 
            )
 

	
 
        # Check nodes marked as removed
 
        if self.removed and not parents:
 
            raise NodeDoesNotExistError("Cannot remove node at %s as there "
 
                "were no parents specified" % self.removed[0].path)
 
        really_removed = set()
 
        for p in parents:
 
            for node in self.removed:
 
                try:
 
                    p.get_node(node.path)
 
                    really_removed.add(node.path)
 
                except ChangesetError:
 
                    pass
 
        not_removed = list(set(node.path for node in self.removed) - really_removed)
 
        if not_removed:
 
            raise NodeDoesNotExistError("Cannot remove node at %s from "
 
                "following parents: %s" % (not_removed[0], parents))
 

	
 
    def commit(self, message, author, parents=None, branch=None, date=None,
 
            **kwargs):
 
        """
 
        Performs in-memory commit (doesn't check workdir in any way) and
 
        returns newly created ``Changeset``. Updates repository's
 
        ``revisions``.
 

	
 
        .. note::
 
            While overriding this method each backend's should call
 
            ``self.check_integrity(parents)`` in the first place.
 

	
 
        :param message: message of the commit
 
        :param author: full username, i.e. "Joe Doe <joe.doe@example.com>"
 
        :param parents: single parent or sequence of parents from which commit
 
          would be derived
 
        :param date: ``datetime.datetime`` instance. Defaults to
 
          ``datetime.datetime.now()``.
 
        :param branch: branch name, as string. If none given, default backend's
 
          branch would be used.
 

	
 
        :raises ``CommitError``: if any error occurs while committing
 
        """
 
        raise NotImplementedError
 

	
 

	
 
class EmptyChangeset(BaseChangeset):
 
    """
 
    An dummy empty changeset. It's possible to pass hash when creating
 
    an EmptyChangeset
 
    """
 

	
 
    def __init__(self, cs='0' * 40, repo=None, requested_revision=None,
 
                 alias=None, revision=-1, message='', author='', date=None):
 
        self._empty_cs = cs
 
        self.revision = revision
 
        self.message = message
 
        self.author = author
 
        self.date = date or datetime.datetime.fromtimestamp(0)
 
        self.repository = repo
 
        self.requested_revision = requested_revision
 
        self.alias = alias
 

	
 
    @LazyProperty
 
    def raw_id(self):
 
        """
 
        Returns raw string identifying this changeset, useful for web
 
        representation.
 
        """
 

	
 
        return self._empty_cs
 

	
 
    @LazyProperty
 
    def branch(self):
 
        from kallithea.lib.vcs.backends import get_backend
 
        return get_backend(self.alias).DEFAULT_BRANCH_NAME
 

	
 
    @LazyProperty
 
    def branches(self):
 
        from kallithea.lib.vcs.backends import get_backend
 
        return [get_backend(self.alias).DEFAULT_BRANCH_NAME]
 

	
 
    @LazyProperty
 
    def short_id(self):
 
        return self.raw_id[:12]
 

	
 
    def get_file_changeset(self, path):
 
        return self
 

	
 
    def get_file_content(self, path):
 
        return u''
 

	
 
    def get_file_size(self, path):
 
        return 0
 

	
 

	
 
class CollectionGenerator(object):
 

	
 
    def __init__(self, repo, revs):
 
        self.repo = repo
 
        self.revs = revs
 

	
 
    def __len__(self):
 
        return len(self.revs)
 

	
 
    def __iter__(self):
 
        for rev in self.revs:
 
            yield self.repo.get_changeset(rev)
 

	
 
    def __getitem__(self, what):
 
        """Return either a single element by index, or a sliced collection."""
 
        if isinstance(what, slice):
 
            return CollectionGenerator(self.repo, self.revs[what])
 
        else:
 
            # single item
 
            return self.repo.get_changeset(self.revs[what])
 

	
 
    def __repr__(self):
 
        return '<CollectionGenerator[len:%s]>' % (len(self))
kallithea/lib/vcs/backends/hg/repository.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    vcs.backends.hg.repository
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    Mercurial repository implementation.
 

	
 
    :created_on: Apr 8, 2010
 
    :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
 
"""
 

	
 
import datetime
 
import logging
 
import os
 
import time
 
import urllib.error
 
import urllib.parse
 
import urllib.request
 
from collections import OrderedDict
 

	
 
import mercurial.commands
 
import mercurial.error
 
import mercurial.exchange
 
import mercurial.hg
 
import mercurial.hgweb
 
import mercurial.httppeer
 
import mercurial.localrepo
 
import mercurial.match
 
import mercurial.mdiff
 
import mercurial.node
 
import mercurial.patch
 
import mercurial.scmutil
 
import mercurial.sshpeer
 
import mercurial.tags
 
import mercurial.ui
 
import mercurial.url
 
import mercurial.util
 

	
 
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
 
from kallithea.lib.vcs.exceptions import (
 
    BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError, TagDoesNotExistError, VCSError)
 
from kallithea.lib.vcs.utils import ascii_str, author_email, author_name, date_fromtimestamp, makedate, safe_bytes, safe_str
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.paths import abspath
 

	
 
from .changeset import MercurialChangeset
 
from .inmemory import MercurialInMemoryChangeset
 
from .workdir import MercurialWorkdir
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class MercurialRepository(BaseRepository):
 
    """
 
    Mercurial repository backend
 
    """
 
    DEFAULT_BRANCH_NAME = 'default'
 
    scm = 'hg'
 

	
 
    def __init__(self, repo_path, create=False, baseui=None, src_url=None,
 
                 update_after_clone=False):
 
        """
 
        Raises RepositoryError if repository could not be find at the given
 
        ``repo_path``.
 

	
 
        :param repo_path: local path of the repository
 
        :param create=False: if set to True, would try to create repository if
 
           it does not exist rather than raising exception
 
        :param baseui=None: user data
 
        :param src_url=None: would try to clone repository from given location
 
        :param update_after_clone=False: sets update of working copy after
 
          making a clone
 
        """
 

	
 
        if not isinstance(repo_path, str):
 
            raise VCSError('Mercurial backend requires repository path to '
 
                           'be instance of <str> got %s instead' %
 
                           type(repo_path))
 
        self.path = abspath(repo_path)
 
        self.baseui = baseui or mercurial.ui.ui()
 
        # We've set path and ui, now we can set _repo itself
 
        self._repo = self._get_repo(create, src_url, update_after_clone)
 

	
 
    @property
 
    def _empty(self):
 
        """
 
        Checks if repository is empty ie. without any changesets
 
        """
 
        # TODO: Following raises errors when using InMemoryChangeset...
 
        # return len(self._repo.changelog) == 0
 
        return len(self.revisions) == 0
 

	
 
    @LazyProperty
 
    def revisions(self):
 
        """
 
        Returns list of revisions' ids, in ascending order.  Being lazy
 
        attribute allows external tools to inject shas from cache.
 
        """
 
        return self._get_all_revisions()
 

	
 
    @LazyProperty
 
    def name(self):
 
        return os.path.basename(self.path)
 

	
 
    @LazyProperty
 
    def branches(self):
 
        return self._get_branches()
 

	
 
    @LazyProperty
 
    def closed_branches(self):
 
        return self._get_branches(normal=False, closed=True)
 

	
 
    @LazyProperty
 
    def allbranches(self):
 
        """
 
        List all branches, including closed branches.
 
        """
 
        return self._get_branches(closed=True)
 

	
 
    def _get_branches(self, normal=True, closed=False):
 
        """
 
        Gets branches for this repository
 
        Returns only not closed branches by default
 

	
 
        :param closed: return also closed branches for mercurial
 
        :param normal: return also normal branches
 
        """
 

	
 
        if self._empty:
 
            return {}
 

	
 
        bt = OrderedDict()
 
        for bn, _heads, node, isclosed in sorted(self._repo.branchmap().iterbranches()):
 
            if isclosed:
 
                if closed:
 
                    bt[safe_str(bn)] = ascii_str(mercurial.node.hex(node))
 
            else:
 
                if normal:
 
                    bt[safe_str(bn)] = ascii_str(mercurial.node.hex(node))
 
        return bt
 

	
 
    @LazyProperty
 
    def tags(self):
 
        """
 
        Gets tags for this repository
 
        """
 
        return self._get_tags()
 

	
 
    def _get_tags(self):
 
        if self._empty:
 
            return {}
 

	
 
        return OrderedDict(sorted(
 
            ((safe_str(n), ascii_str(mercurial.node.hex(h))) for n, h in self._repo.tags().items()),
 
            reverse=True,
 
            key=lambda x: x[0],  # sort by name
 
        ))
 

	
 
    def tag(self, name, user, revision=None, message=None, date=None,
 
            **kwargs):
 
        """
 
        Creates and returns a tag for the given ``revision``.
 

	
 
        :param name: name for new tag
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param revision: changeset id for which new tag would be created
 
        :param message: message of the tag's commit
 
        :param date: date of tag's commit
 

	
 
        :raises TagAlreadyExistError: if tag with same name already exists
 
        """
 
        if name in self.tags:
 
            raise TagAlreadyExistError("Tag %s already exists" % name)
 
        changeset = self.get_changeset(revision)
 
        local = kwargs.setdefault('local', False)
 

	
 
        if message is None:
 
            message = "Added tag %s for changeset %s" % (name,
 
                changeset.short_id)
 

	
 
        if date is None:
 
            date = safe_bytes(datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S'))
 

	
 
        try:
 
            mercurial.tags.tag(self._repo, safe_bytes(name), changeset._ctx.node(), safe_bytes(message), local, safe_bytes(user), date)
 
        except mercurial.error.Abort as e:
 
            raise RepositoryError(e.args[0])
 

	
 
        # Reinitialize tags
 
        self.tags = self._get_tags()
 
        tag_id = self.tags[name]
 

	
 
        return self.get_changeset(revision=tag_id)
 

	
 
    def remove_tag(self, name, user, message=None, date=None):
 
        """
 
        Removes tag with the given ``name``.
 

	
 
        :param name: name of the tag to be removed
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param message: message of the tag's removal commit
 
        :param date: date of tag's removal commit
 

	
 
        :raises TagDoesNotExistError: if tag with given name does not exists
 
        """
 
        if name not in self.tags:
 
            raise TagDoesNotExistError("Tag %s does not exist" % name)
 
        if message is None:
 
            message = "Removed tag %s" % name
 
        if date is None:
 
            date = safe_bytes(datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S'))
 
        local = False
 

	
 
        try:
 
            mercurial.tags.tag(self._repo, safe_bytes(name), mercurial.commands.nullid, safe_bytes(message), local, safe_bytes(user), date)
 
            self.tags = self._get_tags()
 
        except mercurial.error.Abort as e:
 
            raise RepositoryError(e.args[0])
 

	
 
    @LazyProperty
 
    def bookmarks(self):
 
        """
 
        Gets bookmarks for this repository
 
        """
 
        return self._get_bookmarks()
 

	
 
    def _get_bookmarks(self):
 
        if self._empty:
 
            return {}
 

	
 
        return OrderedDict(sorted(
 
            ((safe_str(n), ascii_str(h)) for n, h in self._repo._bookmarks.items()),
 
            reverse=True,
 
            key=lambda x: x[0],  # sort by name
 
        ))
 

	
 
    def _get_all_revisions(self):
 
        return [ascii_str(self._repo[x].hex()) for x in self._repo.filtered(b'visible').changelog.revs()]
 

	
 
    def get_diff(self, rev1, rev2, path='', ignore_whitespace=False,
 
                  context=3):
 
        """
 
        Returns (git like) *diff*, as plain text. Shows changes introduced by
 
        ``rev2`` since ``rev1``.
 

	
 
        :param rev1: Entry point from which diff is shown. Can be
 
          ``self.EMPTY_CHANGESET`` - in this case, patch showing all
 
          the changes since empty state of the repository until ``rev2``
 
        :param rev2: Until which revision changes should be shown.
 
        :param ignore_whitespace: If set to ``True``, would not show whitespace
 
          changes. Defaults to ``False``.
 
        :param context: How many lines before/after changed lines should be
 
          shown. Defaults to ``3``. If negative value is passed-in, it will be
 
          set to ``0`` instead.
 
        """
 

	
 
        # Negative context values make no sense, and will result in
 
        # errors. Ensure this does not happen.
 
        if context < 0:
 
            context = 0
 

	
 
        if hasattr(rev1, 'raw_id'):
 
            rev1 = getattr(rev1, 'raw_id')
 

	
 
        if hasattr(rev2, 'raw_id'):
 
            rev2 = getattr(rev2, 'raw_id')
 

	
 
        # Check if given revisions are present at repository (may raise
 
        # ChangesetDoesNotExistError)
 
        if rev1 != self.EMPTY_CHANGESET:
 
            self.get_changeset(rev1)
 
        self.get_changeset(rev2)
 
        if path:
 
            file_filter = mercurial.match.exact(path)
 
        else:
 
            file_filter = None
 

	
 
        return b''.join(mercurial.patch.diff(self._repo, rev1, rev2, match=file_filter,
 
                          opts=mercurial.mdiff.diffopts(git=True,
 
                                        showfunc=True,
 
                                        ignorews=ignore_whitespace,
 
                                        context=context)))
 

	
 
    @classmethod
 
    def _check_url(cls, url, repoui=None):
 
        """
 
        Function will check given url and try to verify if it's a valid
 
        link. Sometimes it may happened that mercurial will issue basic
 
        auth request that can cause whole API to hang when used from python
 
        or other external calls.
 

	
 
        On failures it'll raise urllib2.HTTPError, exception is also thrown
 
        when the return code is non 200
 
        """
 
        # check first if it's not an local url
 
        if os.path.isdir(url) or url.startswith(b'file:'):
 
            return True
 

	
 
        if url.startswith(b'ssh:'):
 
            # in case of invalid uri or authentication issues, sshpeer will
 
            # throw an exception.
 
            mercurial.sshpeer.instance(repoui or mercurial.ui.ui(), url, False).lookup(b'tip')
 
            return True
 

	
 
        url_prefix = None
 
        if b'+' in url[:url.find(b'://')]:
 
            url_prefix, url = url.split(b'+', 1)
 

	
 
        handlers = []
 
        url_obj = mercurial.util.url(url)
 
        test_uri, authinfo = url_obj.authinfo()
 
        url_obj.passwd = b'*****'
 
        cleaned_uri = str(url_obj)
 

	
 
        if authinfo:
 
            # create a password manager
 
            passmgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
 
            passmgr.add_password(*authinfo)
 

	
 
            handlers.extend((mercurial.url.httpbasicauthhandler(passmgr),
 
                             mercurial.url.httpdigestauthhandler(passmgr)))
 

	
 
        o = urllib.request.build_opener(*handlers)
 
        o.addheaders = [('Content-Type', 'application/mercurial-0.1'),
 
                        ('Accept', 'application/mercurial-0.1')]
 

	
 
        req = urllib.request.Request(
 
            "%s?%s" % (
 
                test_uri,
 
                urllib.parse.urlencode({
 
                    'cmd': 'between',
 
                    'pairs': "%s-%s" % ('0' * 40, '0' * 40),
 
                })
 
            ))
 

	
 
        try:
 
            resp = o.open(req)
 
            if resp.code != 200:
 
                raise Exception('Return Code is not 200')
 
        except Exception as e:
 
            # means it cannot be cloned
 
            raise urllib.error.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
 

	
 
        if not url_prefix: # skip svn+http://... (and git+... too)
 
            # now check if it's a proper hg repo
 
            try:
 
                mercurial.httppeer.instance(repoui or mercurial.ui.ui(), url, False).lookup(b'tip')
 
            except Exception as e:
 
                raise urllib.error.URLError(
 
                    "url [%s] does not look like an hg repo org_exc: %s"
 
                    % (cleaned_uri, e))
 

	
 
        return True
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False):
 
        """
 
        Function will check for mercurial repository in given path and return
 
        a localrepo object. If there is no repository in that path it will
 
        raise an exception unless ``create`` parameter is set to True - in
 
        that case repository would be created and returned.
 
        If ``src_url`` is given, would try to clone repository from the
 
        location at given clone_point. Additionally it'll make update to
 
        working copy accordingly to ``update_after_clone`` flag
 
        """
 
        try:
 
            if src_url:
 
                url = safe_bytes(self._get_url(src_url))
 
                opts = {}
 
                if not update_after_clone:
 
                    opts.update({'noupdate': True})
 
                MercurialRepository._check_url(url, self.baseui)
 
                mercurial.commands.clone(self.baseui, url, safe_bytes(self.path), **opts)
 

	
 
                # Don't try to create if we've already cloned repo
 
                create = False
 
            return mercurial.localrepo.instance(self.baseui, safe_bytes(self.path), create=create)
 
        except (mercurial.error.Abort, mercurial.error.RepoError) as err:
 
            if create:
 
                msg = "Cannot create repository at %s. Original error was %s" \
 
                    % (self.name, err)
 
            else:
 
                msg = "Not valid repository at %s. Original error was %s" \
 
                    % (self.name, err)
 
            raise RepositoryError(msg)
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        return MercurialInMemoryChangeset(self)
 

	
 
    @LazyProperty
 
    def description(self):
 
        _desc = self._repo.ui.config(b'web', b'description', None, untrusted=True)
 
        return safe_str(_desc or b'unknown')
 

	
 
    @LazyProperty
 
    def contact(self):
 
        return safe_str(mercurial.hgweb.common.get_contact(self._repo.ui.config)
 
                            or b'Unknown')
 

	
 
    @LazyProperty
 
    def last_change(self):
 
        """
 
        Returns last change made on this repository as datetime object
 
        """
 
        return date_fromtimestamp(self._get_mtime(), makedate()[1])
 

	
 
    def _get_mtime(self):
 
        try:
 
            return time.mktime(self.get_changeset().date.timetuple())
 
        except RepositoryError:
 
            # fallback to filesystem
 
            cl_path = os.path.join(self.path, '.hg', "00changelog.i")
 
            st_path = os.path.join(self.path, '.hg', "store")
 
            if os.path.exists(cl_path):
 
                return os.stat(cl_path).st_mtime
 
            else:
 
                return os.stat(st_path).st_mtime
 

	
 
    def _get_revision(self, revision):
 
        """
 
        Given any revision identifier, returns a 40 char string with revision hash.
 

	
 
        :param revision: str or int or None
 
        """
 
        if self._empty:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        if revision in [-1, None]:
 
            revision = b'tip'
 
        elif isinstance(revision, unicode):
 
        elif isinstance(revision, str):
 
            revision = safe_bytes(revision)
 

	
 
        try:
 
            if isinstance(revision, int):
 
                return ascii_str(self._repo[revision].hex())
 
            return ascii_str(mercurial.scmutil.revsymbol(self._repo, revision).hex())
 
        except (IndexError, ValueError, mercurial.error.RepoLookupError, TypeError):
 
            msg = "Revision %r does not exist for %s" % (safe_str(revision), self.name)
 
            raise ChangesetDoesNotExistError(msg)
 
        except (LookupError, ):
 
            msg = "Ambiguous identifier `%s` for %s" % (safe_str(revision), self.name)
 
            raise ChangesetDoesNotExistError(msg)
 

	
 
    def get_ref_revision(self, ref_type, ref_name):
 
        """
 
        Returns revision number for the given reference.
 
        """
 
        if ref_type == 'rev' and not ref_name.strip('0'):
 
            return self.EMPTY_CHANGESET
 
        # lookup up the exact node id
 
        _revset_predicates = {
 
                'branch': 'branch',
 
                'book': 'bookmark',
 
                'tag': 'tag',
 
                'rev': 'id',
 
            }
 
        # avoid expensive branch(x) iteration over whole repo
 
        rev_spec = "%%s & %s(%%s)" % _revset_predicates[ref_type]
 
        try:
 
            revs = self._repo.revs(rev_spec, ref_name, ref_name)
 
        except LookupError:
 
            msg = "Ambiguous identifier %s:%s for %s" % (ref_type, ref_name, self.name)
 
            raise ChangesetDoesNotExistError(msg)
 
        except mercurial.error.RepoLookupError:
 
            msg = "Revision %s:%s does not exist for %s" % (ref_type, ref_name, self.name)
 
            raise ChangesetDoesNotExistError(msg)
 
        if revs:
 
            revision = revs.last()
 
        else:
 
            # TODO: just report 'not found'?
 
            revision = ref_name
 

	
 
        return self._get_revision(revision)
 

	
 
    def _get_archives(self, archive_name='tip'):
 
        allowed = self.baseui.configlist(b"web", b"allow_archive",
 
                                         untrusted=True)
 
        for name, ext in [(b'zip', '.zip'), (b'gz', '.tar.gz'), (b'bz2', '.tar.bz2')]:
 
            if name in allowed or self._repo.ui.configbool(b"web",
 
                                                           b"allow" + name,
 
                                                           untrusted=True):
 
                yield {"type": safe_str(name), "extension": ext, "node": archive_name}
 

	
 
    def _get_url(self, url):
 
        """
 
        Returns normalized url. If schema is not given, fall back to
 
        filesystem (``file:///``) schema.
 
        """
 
        if url != 'default' and '://' not in url:
 
            url = "file:" + urllib.request.pathname2url(url)
 
        return url
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns ``MercurialChangeset`` object representing repository's
 
        changeset at the given ``revision``.
 
        """
 
        return MercurialChangeset(repository=self, revision=self._get_revision(revision))
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
 
                       end_date=None, branch_name=None, reverse=False, max_revisions=None):
 
        """
 
        Returns iterator of ``MercurialChangeset`` objects from start to end
 
        (both are inclusive)
 

	
 
        :param start: None, str, int or mercurial lookup format
 
        :param end:  None, str, int or mercurial lookup format
 
        :param start_date:
 
        :param end_date:
 
        :param branch_name:
 
        :param reversed: return changesets in reversed order
 
        """
 
        start_raw_id = self._get_revision(start)
 
        start_pos = None if start is None else self.revisions.index(start_raw_id)
 
        end_raw_id = self._get_revision(end)
 
        end_pos = None if end is None else self.revisions.index(end_raw_id)
 

	
 
        if start_pos is not None and end_pos is not None and start_pos > end_pos:
 
            raise RepositoryError("Start revision '%s' cannot be "
 
                                  "after end revision '%s'" % (start, end))
 

	
 
        if branch_name and branch_name not in self.allbranches:
 
            msg = "Branch %r not found in %s" % (branch_name, self.name)
 
            raise BranchDoesNotExistError(msg)
 
        if end_pos is not None:
 
            end_pos += 1
 
        # filter branches
 
        filter_ = []
 
        if branch_name:
 
            filter_.append(b'branch("%s")' % safe_bytes(branch_name))
 
        if start_date:
 
            filter_.append(b'date(">%s")' % safe_bytes(str(start_date)))
 
        if end_date:
 
            filter_.append(b'date("<%s")' % safe_bytes(str(end_date)))
 
        if filter_ or max_revisions:
 
            if filter_:
 
                revspec = b' and '.join(filter_)
 
            else:
 
                revspec = b'all()'
 
            if max_revisions:
 
                revspec = b'limit(%s, %d)' % (revspec, max_revisions)
 
            revisions = mercurial.scmutil.revrange(self._repo, [revspec])
 
        else:
 
            revisions = self.revisions
 

	
 
        # this is very much a hack to turn this into a list; a better solution
 
        # would be to get rid of this function entirely and use revsets
 
        revs = list(revisions)[start_pos:end_pos]
 
        if reverse:
 
            revs.reverse()
 

	
 
        return CollectionGenerator(self, revs)
 

	
 
    def pull(self, url):
 
        """
 
        Tries to pull changes from external location.
 
        """
 
        other = mercurial.hg.peer(self._repo, {}, safe_bytes(self._get_url(url)))
 
        try:
 
            mercurial.exchange.pull(self._repo, other, heads=None, force=None)
 
        except mercurial.error.Abort as err:
 
            # Propagate error but with vcs's type
 
            raise RepositoryError(str(err))
 

	
 
    @LazyProperty
 
    def workdir(self):
 
        """
 
        Returns ``Workdir`` instance for this repository.
 
        """
 
        return MercurialWorkdir(self)
 

	
 
    def get_config_value(self, section, name=None, config_file=None):
 
        """
 
        Returns configuration value for a given [``section``] and ``name``.
 

	
 
        :param section: Section we want to retrieve value from
 
        :param name: Name of configuration we want to retrieve
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        if config_file is None:
 
            config_file = []
 
        elif isinstance(config_file, str):
 
            config_file = [config_file]
 

	
 
        config = self._repo.ui
 
        if config_file:
 
            config = mercurial.ui.ui()
 
            for path in config_file:
 
                config.readconfig(safe_bytes(path))
 
        value = config.config(safe_bytes(section), safe_bytes(name))
 
        return value if value is None else safe_str(value)
 

	
 
    def get_user_name(self, config_file=None):
 
        """
 
        Returns user's name from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        username = self.get_config_value('ui', 'username', config_file=config_file)
 
        if username:
 
            return author_name(username)
 
        return None
 

	
 
    def get_user_email(self, config_file=None):
 
        """
 
        Returns user's email from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        username = self.get_config_value('ui', 'username', config_file=config_file)
 
        if username:
 
            return author_email(username)
 
        return None
kallithea/lib/vcs/subprocessio.py
Show inline comments
 
"""
 
Module provides a class allowing to wrap communication over subprocess.Popen
 
input, output, error streams into a meaningful, non-blocking, concurrent
 
stream processor exposing the output data as an iterator fitting to be a
 
return value passed by a WSGI application to a WSGI server per PEP 3333.
 

	
 
Copyright (c) 2011  Daniel Dotsenko <dotsa[at]hotmail.com>
 

	
 
This file is part of git_http_backend.py Project.
 

	
 
git_http_backend.py Project is free software: you can redistribute it and/or
 
modify it under the terms of the GNU Lesser General Public License as
 
published by the Free Software Foundation, either version 2.1 of the License,
 
or (at your option) any later version.
 

	
 
git_http_backend.py Project is distributed in the hope that it will be useful,
 
but WITHOUT ANY WARRANTY; without even the implied warranty of
 
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 
GNU Lesser General Public License for more details.
 

	
 
You should have received a copy of the GNU Lesser General Public License
 
along with git_http_backend.py Project.
 
If not, see <http://www.gnu.org/licenses/>.
 
"""
 
import collections
 
import os
 
import subprocess
 
import threading
 

	
 

	
 
class StreamFeeder(threading.Thread):
 
    """
 
    Normal writing into pipe-like is blocking once the buffer is filled.
 
    This thread allows a thread to seep data from a file-like into a pipe
 
    without blocking the main thread.
 
    We close inpipe once the end of the source stream is reached.
 
    """
 

	
 
    def __init__(self, source):
 
        super(StreamFeeder, self).__init__()
 
        self.daemon = True
 
        filelike = False
 
        self.bytes = bytes()
 
        if type(source) in (type(''), bytes, bytearray):  # string-like
 
            self.bytes = bytes(source)
 
        else:  # can be either file pointer or file-like
 
            if isinstance(source, int):  # file pointer it is
 
                # converting file descriptor (int) stdin into file-like
 
                source = os.fdopen(source, 'rb', 16384)
 
            # let's see if source is file-like by now
 
            filelike = hasattr(source, 'read')
 
        if not filelike and not self.bytes:
 
            raise TypeError("StreamFeeder's source object must be a readable "
 
                            "file-like, a file descriptor, or a string-like.")
 
        self.source = source
 
        self.readiface, self.writeiface = os.pipe()
 

	
 
    def run(self):
 
        t = self.writeiface
 
        if self.bytes:
 
            os.write(t, self.bytes)
 
        else:
 
            s = self.source
 
            b = s.read(4096)
 
            while b:
 
                os.write(t, b)
 
                b = s.read(4096)
 
        os.close(t)
 

	
 
    @property
 
    def output(self):
 
        return self.readiface
 

	
 

	
 
class InputStreamChunker(threading.Thread):
 
    def __init__(self, source, target, buffer_size, chunk_size):
 

	
 
        super(InputStreamChunker, self).__init__()
 

	
 
        self.daemon = True  # die die die.
 

	
 
        self.source = source
 
        self.target = target
 
        self.chunk_count_max = int(buffer_size / chunk_size) + 1
 
        self.chunk_size = chunk_size
 

	
 
        self.data_added = threading.Event()
 
        self.data_added.clear()
 

	
 
        self.keep_reading = threading.Event()
 
        self.keep_reading.set()
 

	
 
        self.EOF = threading.Event()
 
        self.EOF.clear()
 

	
 
        self.go = threading.Event()
 
        self.go.set()
 

	
 
    def stop(self):
 
        self.go.clear()
 
        self.EOF.set()
 
        try:
 
            # this is not proper, but is done to force the reader thread let
 
            # go of the input because, if successful, .close() will send EOF
 
            # down the pipe.
 
            self.source.close()
 
        except:
 
            pass
 

	
 
    def run(self):
 
        s = self.source
 
        t = self.target
 
        cs = self.chunk_size
 
        ccm = self.chunk_count_max
 
        kr = self.keep_reading
 
        da = self.data_added
 
        go = self.go
 

	
 
        try:
 
            b = s.read(cs)
 
        except ValueError:
 
            b = ''
 

	
 
        while b and go.is_set():
 
            if len(t) > ccm:
 
                kr.clear()
 
                kr.wait(2)
 
                # # this only works on 2.7.x and up
 
                # if not kr.wait(10):
 
                #     raise Exception("Timed out while waiting for input to be read.")
 
                # instead we'll use this
 
                if len(t) > ccm + 3:
 
                    raise IOError(
 
                        "Timed out while waiting for input from subprocess.")
 
            t.append(b)
 
            da.set()
 
            try:
 
                b = s.read(cs)
 
            except ValueError: # probably "I/O operation on closed file"
 
                b = ''
 

	
 
        self.EOF.set()
 
        da.set()  # for cases when done but there was no input.
 

	
 

	
 
class BufferedGenerator(object):
 
    """
 
    Class behaves as a non-blocking, buffered pipe reader.
 
    Reads chunks of data (through a thread)
 
    from a blocking pipe, and attaches these to an array (Deque) of chunks.
 
    Reading is halted in the thread when max chunks is internally buffered.
 
    The .next() may operate in blocking or non-blocking fashion by yielding
 
    '' if no data is ready
 
    to be sent or by not returning until there is some data to send
 
    When we get EOF from underlying source pipe we raise the marker to raise
 
    StopIteration after the last chunk of data is yielded.
 
    """
 

	
 
    def __init__(self, source, buffer_size=65536, chunk_size=4096,
 
                 starting_values=None, bottomless=False):
 
        starting_values = starting_values or []
 
        if bottomless:
 
            maxlen = int(buffer_size / chunk_size)
 
        else:
 
            maxlen = None
 

	
 
        self.data = collections.deque(starting_values, maxlen)
 
        self.worker = InputStreamChunker(source, self.data, buffer_size,
 
                                         chunk_size)
 
        if starting_values:
 
            self.worker.data_added.set()
 
        self.worker.start()
 

	
 
    ####################
 
    # Generator's methods
 
    ####################
 

	
 
    def __iter__(self):
 
        return self
 

	
 
    def __next__(self):
 
        while not len(self.data) and not self.worker.EOF.is_set():
 
            self.worker.data_added.clear()
 
            self.worker.data_added.wait(0.2)
 
        if len(self.data):
 
            self.worker.keep_reading.set()
 
            return bytes(self.data.popleft())
 
        elif self.worker.EOF.is_set():
 
            raise StopIteration
 

	
 
    def throw(self, type, value=None, traceback=None):
 
        if not self.worker.EOF.is_set():
 
            raise type(value)
 

	
 
    def start(self):
 
        self.worker.start()
 

	
 
    def stop(self):
 
        self.worker.stop()
 

	
 
    def close(self):
 
        try:
 
            self.worker.stop()
 
            self.throw(GeneratorExit)
 
        except (GeneratorExit, StopIteration):
 
            pass
 

	
 
    ####################
 
    # Threaded reader's infrastructure.
 
    ####################
 
    @property
 
    def input(self):
 
        return self.worker.w
 

	
 
    @property
 
    def data_added_event(self):
 
        return self.worker.data_added
 

	
 
    @property
 
    def data_added(self):
 
        return self.worker.data_added.is_set()
 

	
 
    @property
 
    def reading_paused(self):
 
        return not self.worker.keep_reading.is_set()
 

	
 
    @property
 
    def done_reading_event(self):
 
        """
 
        Done_reading does not mean that the iterator's buffer is empty.
 
        Iterator might have done reading from underlying source, but the read
 
        chunks might still be available for serving through .next() method.
 

	
 
        :returns: An threading.Event class instance.
 
        """
 
        return self.worker.EOF
 

	
 
    @property
 
    def done_reading(self):
 
        """
 
        Done_reading does not mean that the iterator's buffer is empty.
 
        Iterator might have done reading from underlying source, but the read
 
        chunks might still be available for serving through .next() method.
 

	
 
        :returns: An Bool value.
 
        """
 
        return self.worker.EOF.is_set()
 

	
 
    @property
 
    def length(self):
 
        """
 
        returns int.
 

	
 
        This is the length of the queue of chunks, not the length of
 
        the combined contents in those chunks.
 

	
 
        __len__() cannot be meaningfully implemented because this
 
        reader is just flying through a bottomless pit content and
 
        can only know the length of what it already saw.
 

	
 
        If __len__() on WSGI server per PEP 3333 returns a value,
 
        the response's length will be set to that. In order not to
 
        confuse WSGI PEP3333 servers, we will not implement __len__
 
        at all.
 
        """
 
        return len(self.data)
 

	
 
    def prepend(self, x):
 
        self.data.appendleft(x)
 

	
 
    def append(self, x):
 
        self.data.append(x)
 

	
 
    def extend(self, o):
 
        self.data.extend(o)
 

	
 
    def __getitem__(self, i):
 
        return self.data[i]
 

	
 

	
 
class SubprocessIOChunker(object):
 
    """
 
    Processor class wrapping handling of subprocess IO.
 

	
 
    In a way, this is a "communicate()" replacement with a twist.
 

	
 
    - We are multithreaded. Writing in and reading out, err are all sep threads.
 
    - We support concurrent (in and out) stream processing.
 
    - The output is not a stream. It's a queue of read string (bytes, not unicode)
 
    - The output is not a stream. It's a queue of read string (bytes, not str)
 
      chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
 
    - We are non-blocking in more respects than communicate()
 
      (reading from subprocess out pauses when internal buffer is full, but
 
       does not block the parent calling code. On the flip side, reading from
 
       slow-yielding subprocess may block the iteration until data shows up. This
 
       does not block the parallel inpipe reading occurring parallel thread.)
 

	
 
    The purpose of the object is to allow us to wrap subprocess interactions into
 
    an iterable that can be passed to a WSGI server as the application's return
 
    value. Because of stream-processing-ability, WSGI does not have to read ALL
 
    of the subprocess's output and buffer it, before handing it to WSGI server for
 
    HTTP response. Instead, the class initializer reads just a bit of the stream
 
    to figure out if error occurred or likely to occur and if not, just hands the
 
    further iteration over subprocess output to the server for completion of HTTP
 
    response.
 

	
 
    The real or perceived subprocess error is trapped and raised as one of
 
    EnvironmentError family of exceptions
 

	
 
    Example usage:
 
    #    try:
 
    #        answer = SubprocessIOChunker(
 
    #            cmd,
 
    #            input,
 
    #            buffer_size = 65536,
 
    #            chunk_size = 4096
 
    #            )
 
    #    except (EnvironmentError) as e:
 
    #        print str(e)
 
    #        raise e
 
    #
 
    #    return answer
 

	
 

	
 
    """
 

	
 
    def __init__(self, cmd, inputstream=None, buffer_size=65536,
 
                 chunk_size=4096, starting_values=None, **kwargs):
 
        """
 
        Initializes SubprocessIOChunker
 

	
 
        :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
 
        :param inputstream: (Default: None) A file-like, string, or file pointer.
 
        :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
 
        :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
 
        :param starting_values: (Default: []) An array of strings to put in front of output que.
 
        """
 
        starting_values = starting_values or []
 
        if inputstream:
 
            input_streamer = StreamFeeder(inputstream)
 
            input_streamer.start()
 
            inputstream = input_streamer.output
 

	
 
        # Note: fragile cmd mangling has been removed for use in Kallithea
 
        assert isinstance(cmd, list), cmd
 

	
 
        _p = subprocess.Popen(cmd, bufsize=-1,
 
                              stdin=inputstream,
 
                              stdout=subprocess.PIPE,
 
                              stderr=subprocess.PIPE,
 
                              **kwargs)
 

	
 
        bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
 
                                   starting_values)
 
        bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
 

	
 
        while not bg_out.done_reading and not bg_out.reading_paused:
 
            # doing this until we reach either end of file, or end of buffer.
 
            bg_out.data_added_event.wait(1)
 
            bg_out.data_added_event.clear()
 

	
 
        # at this point it's still ambiguous if we are done reading or just full buffer.
 
        # Either way, if error (returned by ended process, or implied based on
 
        # presence of stuff in stderr output) we error out.
 
        # Else, we are happy.
 
        returncode = _p.poll()
 
        if (returncode is not None # process has terminated
 
            and returncode != 0
 
        ): # and it failed
 
            bg_out.stop()
 
            out = b''.join(bg_out)
 
            bg_err.stop()
 
            err = b''.join(bg_err)
 
            if (err.strip() == b'fatal: The remote end hung up unexpectedly' and
 
                out.startswith(b'0034shallow ')
 
            ):
 
                # hack inspired by https://github.com/schacon/grack/pull/7
 
                bg_out = iter([out])
 
                _p = None
 
            elif err:
 
                raise EnvironmentError("Subprocess exited due to an error: %s" % err)
 
            else:
 
                raise EnvironmentError(
 
                    "Subprocess exited with non 0 ret code: %s" % returncode)
 
        self.process = _p
 
        self.output = bg_out
 
        self.error = bg_err
 
        self.inputstream = inputstream
 

	
 
    def __iter__(self):
 
        return self
 

	
 
    def __next__(self):
 
        if self.process:
 
            returncode = self.process.poll()
 
            if (returncode is not None # process has terminated
 
                and returncode != 0
 
            ): # and it failed
 
                self.output.stop()
 
                self.error.stop()
 
                err = ''.join(self.error)
 
                raise EnvironmentError("Subprocess exited due to an error:\n" + err)
 
        return next(self.output)
 

	
 
    def throw(self, type, value=None, traceback=None):
 
        if self.output.length or not self.output.done_reading:
 
            raise type(value)
 

	
 
    def close(self):
 
        try:
 
            self.process.terminate()
 
        except:
 
            pass
 
        try:
 
            self.output.close()
 
        except:
 
            pass
 
        try:
 
            self.error.close()
 
        except:
 
            pass
 
        try:
 
            os.close(self.inputstream)
 
        except:
 
            pass
kallithea/model/db.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.db
 
~~~~~~~~~~~~~~~~~~
 

	
 
Database Models for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 08, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import base64
 
import collections
 
import datetime
 
import functools
 
import hashlib
 
import logging
 
import os
 
import time
 
import traceback
 

	
 
import ipaddr
 
import sqlalchemy
 
from beaker.cache import cache_region, region_invalidate
 
from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, LargeBinary, String, Unicode, UnicodeText, UniqueConstraint
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import class_mapper, joinedload, relationship, validates
 
from tg.i18n import lazy_ugettext as _
 
from webob.exc import HTTPNotFound
 

	
 
import kallithea
 
from kallithea.lib import ext_json
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.utils2 import (
 
    Optional, ascii_bytes, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int, safe_str, str2bool, urlreadable)
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.model.meta import Base, Session
 

	
 

	
 
URL_SEP = '/'
 
log = logging.getLogger(__name__)
 

	
 
#==============================================================================
 
# BASE CLASSES
 
#==============================================================================
 

	
 
def _hash_key(k):
 
    return hashlib.md5(safe_bytes(k)).hexdigest()
 

	
 

	
 
class BaseDbModel(object):
 
    """
 
    Base Model for all classes
 
    """
 

	
 
    @classmethod
 
    def _get_keys(cls):
 
        """return column names for this model """
 
        # Note: not a normal dict - iterator gives "users.firstname", but keys gives "firstname"
 
        return class_mapper(cls).c.keys()
 

	
 
    def get_dict(self):
 
        """
 
        return dict with keys and values corresponding
 
        to this model data """
 

	
 
        d = {}
 
        for k in self._get_keys():
 
            d[k] = getattr(self, k)
 

	
 
        # also use __json__() if present to get additional fields
 
        _json_attr = getattr(self, '__json__', None)
 
        if _json_attr:
 
            # update with attributes from __json__
 
            if callable(_json_attr):
 
                _json_attr = _json_attr()
 
            for k, val in _json_attr.items():
 
                d[k] = val
 
        return d
 

	
 
    def get_appstruct(self):
 
        """return list with keys and values tuples corresponding
 
        to this model data """
 

	
 
        return [
 
            (k, getattr(self, k))
 
            for k in self._get_keys()
 
        ]
 

	
 
    def populate_obj(self, populate_dict):
 
        """populate model with data from given populate_dict"""
 

	
 
        for k in self._get_keys():
 
            if k in populate_dict:
 
                setattr(self, k, populate_dict[k])
 

	
 
    @classmethod
 
    def query(cls):
 
        return Session().query(cls)
 

	
 
    @classmethod
 
    def get(cls, id_):
 
        if id_:
 
            return cls.query().get(id_)
 

	
 
    @classmethod
 
    def guess_instance(cls, value, callback=None):
 
        """Haphazardly attempt to convert `value` to a `cls` instance.
 

	
 
        If `value` is None or already a `cls` instance, return it. If `value`
 
        is a number (or looks like one if you squint just right), assume it's
 
        a database primary key and let SQLAlchemy sort things out. Otherwise,
 
        fall back to resolving it using `callback` (if specified); this could
 
        e.g. be a function that looks up instances by name (though that won't
 
        work if the name begins with a digit). Otherwise, raise Exception.
 
        """
 

	
 
        if value is None:
 
            return None
 
        if isinstance(value, cls):
 
            return value
 
        if isinstance(value, int):
 
            return cls.get(value)
 
        if isinstance(value, str) and value.isdigit():
 
            return cls.get(int(value))
 
        if callback is not None:
 
            return callback(value)
 

	
 
        raise Exception(
 
            'given object must be int, long or Instance of %s '
 
            'got %s, no callback provided' % (cls, type(value))
 
        )
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        try:
 
            id_ = int(id_)
 
        except (TypeError, ValueError):
 
            raise HTTPNotFound
 

	
 
        res = cls.query().get(id_)
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def delete(cls, id_):
 
        obj = cls.query().get(id_)
 
        Session().delete(obj)
 

	
 
    def __repr__(self):
 
        return '<DB:%s>' % (self.__class__.__name__)
 

	
 

	
 
_table_args_default_dict = {'extend_existing': True,
 
                            'mysql_engine': 'InnoDB',
 
                            'mysql_charset': 'utf8',
 
                            'sqlite_autoincrement': True,
 
                           }
 

	
 
class Setting(Base, BaseDbModel):
 
    __tablename__ = 'settings'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    SETTINGS_TYPES = {
 
        'str': safe_bytes,
 
        'int': safe_int,
 
        'unicode': safe_str,
 
        'bool': str2bool,
 
        'list': functools.partial(aslist, sep=',')
 
    }
 
    DEFAULT_UPDATE_URL = ''
 

	
 
    app_settings_id = Column(Integer(), primary_key=True)
 
    app_settings_name = Column(String(255), nullable=False, unique=True)
 
    _app_settings_value = Column("app_settings_value", Unicode(4096), nullable=False)
 
    _app_settings_type = Column("app_settings_type", String(255), nullable=True) # FIXME: not nullable?
 

	
 
    def __init__(self, key='', val='', type='unicode'):
 
        self.app_settings_name = key
 
        self.app_settings_value = val
 
        self.app_settings_type = type
 

	
 
    @validates('_app_settings_value')
 
    def validate_settings_value(self, key, val):
 
        assert isinstance(val, unicode)
 
        assert isinstance(val, str)
 
        return val
 

	
 
    @hybrid_property
 
    def app_settings_value(self):
 
        v = self._app_settings_value
 
        _type = self.app_settings_type
 
        converter = self.SETTINGS_TYPES.get(_type) or self.SETTINGS_TYPES['unicode']
 
        return converter(v)
 

	
 
    @app_settings_value.setter
 
    def app_settings_value(self, val):
 
        """
 
        Setter that will always make sure we use str in app_settings_value
 
        """
 
        self._app_settings_value = safe_str(val)
 

	
 
    @hybrid_property
 
    def app_settings_type(self):
 
        return self._app_settings_type
 

	
 
    @app_settings_type.setter
 
    def app_settings_type(self, val):
 
        if val not in self.SETTINGS_TYPES:
 
            raise Exception('type must be one of %s got %s'
 
                            % (list(self.SETTINGS_TYPES), val))
 
        self._app_settings_type = val
 

	
 
    def __repr__(self):
 
        return "<%s %s.%s=%r>" % (
 
            self.__class__.__name__,
 
            self.app_settings_name, self.app_settings_type, self.app_settings_value
 
        )
 

	
 
    @classmethod
 
    def get_by_name(cls, key):
 
        return cls.query() \
 
            .filter(cls.app_settings_name == key).scalar()
 

	
 
    @classmethod
 
    def get_by_name_or_create(cls, key, val='', type='unicode'):
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            res = cls(key, val, type)
 
        return res
 

	
 
    @classmethod
 
    def create_or_update(cls, key, val=Optional(''), type=Optional('unicode')):
 
        """
 
        Creates or updates Kallithea setting. If updates are triggered, it will only
 
        update parameters that are explicitly set. Optional instance will be skipped.
 

	
 
        :param key:
 
        :param val:
 
        :param type:
 
        :return:
 
        """
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            val = Optional.extract(val)
 
            type = Optional.extract(type)
 
            res = cls(key, val, type)
 
            Session().add(res)
 
        else:
 
            res.app_settings_name = key
 
            if not isinstance(val, Optional):
 
                # update if set
 
                res.app_settings_value = val
 
            if not isinstance(type, Optional):
 
                # update if set
 
                res.app_settings_type = type
 
        return res
 

	
 
    @classmethod
 
    def get_app_settings(cls, cache=False):
 

	
 
        ret = cls.query()
 

	
 
        if cache:
 
            ret = ret.options(FromCache("sql_cache_short", "get_hg_settings"))
 

	
 
        if ret is None:
 
            raise Exception('Could not get application settings !')
 
        settings = {}
 
        for each in ret:
 
            settings[each.app_settings_name] = \
 
                each.app_settings_value
 

	
 
        return settings
 

	
 
    @classmethod
 
    def get_auth_settings(cls, cache=False):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('auth_')).all()
 
        fd = {}
 
        for row in ret:
 
            fd[row.app_settings_name] = row.app_settings_value
 
        return fd
 

	
 
    @classmethod
 
    def get_default_repo_settings(cls, cache=False, strip_prefix=False):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('default_')).all()
 
        fd = {}
 
        for row in ret:
 
            key = row.app_settings_name
 
            if strip_prefix:
 
                key = remove_prefix(key, prefix='default_')
 
            fd.update({key: row.app_settings_value})
 

	
 
        return fd
 

	
 
    @classmethod
 
    def get_server_info(cls):
 
        import pkg_resources
 
        import platform
 
        from kallithea.lib.utils import check_git_version
 
        mods = [(p.project_name, p.version) for p in pkg_resources.working_set]
 
        info = {
 
            'modules': sorted(mods, key=lambda k: k[0].lower()),
 
            'py_version': platform.python_version(),
 
            'platform': platform.platform(),
 
            'kallithea_version': kallithea.__version__,
 
            'git_version': str(check_git_version()),
 
            'git_path': kallithea.CONFIG.get('git_path')
 
        }
 
        return info
 

	
 

	
 
class Ui(Base, BaseDbModel):
 
    __tablename__ = 'ui'
 
    __table_args__ = (
 
        # FIXME: ui_key as key is wrong and should be removed when the corresponding
 
        # Ui.get_by_key has been replaced by the composite key
 
        UniqueConstraint('ui_key'),
 
        UniqueConstraint('ui_section', 'ui_key'),
 
        _table_args_default_dict,
 
    )
 

	
 
    HOOK_UPDATE = 'changegroup.update'
 
    HOOK_REPO_SIZE = 'changegroup.repo_size'
 

	
 
    ui_id = Column(Integer(), primary_key=True)
 
    ui_section = Column(String(255), nullable=False)
 
    ui_key = Column(String(255), nullable=False)
 
    ui_value = Column(String(255), nullable=True) # FIXME: not nullable?
 
    ui_active = Column(Boolean(), nullable=False, default=True)
 

	
 
    @classmethod
 
    def get_by_key(cls, section, key):
 
        """ Return specified Ui object, or None if not found. """
 
        return cls.query().filter_by(ui_section=section, ui_key=key).scalar()
 

	
 
    @classmethod
 
    def get_or_create(cls, section, key):
 
        """ Return specified Ui object, creating it if necessary. """
 
        setting = cls.get_by_key(section, key)
 
        if setting is None:
 
            setting = cls(ui_section=section, ui_key=key)
 
            Session().add(setting)
 
        return setting
 

	
 
    @classmethod
 
    def get_builtin_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        return q.all()
 

	
 
    @classmethod
 
    def get_custom_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        return q.all()
 

	
 
    @classmethod
 
    def get_repos_location(cls):
 
        return cls.get_by_key('paths', '/').ui_value
 

	
 
    @classmethod
 
    def create_or_update_hook(cls, key, val):
 
        new_ui = cls.get_or_create('hooks', key)
 
        new_ui.ui_active = True
 
        new_ui.ui_value = val
 

	
 
    def __repr__(self):
 
        return '<%s %s.%s=%r>' % (
 
            self.__class__.__name__,
 
            self.ui_section, self.ui_key, self.ui_value)
 

	
 

	
 
class User(Base, BaseDbModel):
 
    __tablename__ = 'users'
 
    __table_args__ = (
 
        Index('u_username_idx', 'username'),
 
        Index('u_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 

	
 
    DEFAULT_USER = 'default'
 
    DEFAULT_GRAVATAR_URL = 'https://secure.gravatar.com/avatar/{md5email}?d=identicon&s={size}'
 
    # The name of the default auth type in extern_type, 'internal' lives in auth_internal.py
 
    DEFAULT_AUTH_TYPE = 'internal'
 

	
 
    user_id = Column(Integer(), primary_key=True)
 
    username = Column(String(255), nullable=False, unique=True)
 
    password = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    admin = Column(Boolean(), nullable=False, default=False)
 
    name = Column("firstname", Unicode(255), nullable=False)
 
    lastname = Column(Unicode(255), nullable=False)
 
    _email = Column("email", String(255), nullable=True, unique=True) # FIXME: not nullable?
 
    last_login = Column(DateTime(timezone=False), nullable=True)
 
    extern_type = Column(String(255), nullable=True) # FIXME: not nullable?
 
    extern_name = Column(String(255), nullable=True) # FIXME: not nullable?
 
    api_key = Column(String(255), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _user_data = Column("user_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    user_log = relationship('UserLog')
 
    user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
 

	
 
    repositories = relationship('Repository')
 
    repo_groups = relationship('RepoGroup')
 
    user_groups = relationship('UserGroup')
 
    user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
 
    followings = relationship('UserFollowing', primaryjoin='UserFollowing.user_id==User.user_id', cascade='all')
 

	
 
    repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
 

	
 
    group_member = relationship('UserGroupMember', cascade='all')
 

	
 
    # comments created by this user
 
    user_comments = relationship('ChangesetComment', cascade='all')
 
    # extra emails for this user
 
    user_emails = relationship('UserEmailMap', cascade='all')
 
    # extra API keys
 
    user_api_keys = relationship('UserApiKeys', cascade='all')
 
    ssh_keys = relationship('UserSshKeys', cascade='all')
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 
    @property
 
    def firstname(self):
 
        # alias for future
 
        return self.name
 

	
 
    @property
 
    def emails(self):
 
        other = UserEmailMap.query().filter(UserEmailMap.user == self).all()
 
        return [self.email] + [x.email for x in other]
 

	
 
    @property
 
    def api_keys(self):
 
        other = UserApiKeys.query().filter(UserApiKeys.user == self).all()
 
        return [self.api_key] + [x.api_key for x in other]
 

	
 
    @property
 
    def ip_addresses(self):
 
        ret = UserIpMap.query().filter(UserIpMap.user == self).all()
 
        return [x.ip_addr for x in ret]
 

	
 
    @property
 
    def full_name(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def full_name_or_username(self):
 
        """
 
        Show full name.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s' % (self.firstname, self.lastname)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_name_and_username(self):
 
        """
 
        Show full name and username as 'Firstname Lastname (username)'.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s (%s)' % (self.firstname, self.lastname, self.username)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_contact(self):
 
        return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
 

	
 
    @property
 
    def short_contact(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @hybrid_property
 
    def is_default_user(self):
 
        return self.username == User.DEFAULT_USER
 

	
 
    @hybrid_property
 
    def user_data(self):
 
        if not self._user_data:
 
            return {}
 

	
 
        try:
 
            return ext_json.loads(self._user_data)
 
        except TypeError:
 
            return {}
 

	
 
    @user_data.setter
 
    def user_data(self, val):
 
        try:
 
            self._user_data = ascii_bytes(ext_json.dumps(val))
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __repr__(self):
 
        return "<%s %s: %r')>" % (self.__class__.__name__, self.user_id, self.username)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(User, cls).guess_instance(value, User.get_by_username)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_, allow_default=True):
 
        '''
 
        Overridden version of BaseDbModel.get_or_404, with an extra check on
 
        the default user.
 
        '''
 
        user = super(User, cls).get_or_404(id_)
 
        if not allow_default and user.is_default_user:
 
            raise DefaultUserException()
 
        return user
 

	
 
    @classmethod
 
    def get_by_username_or_email(cls, username_or_email, case_insensitive=False, cache=False):
 
        """
 
        For anything that looks like an email address, look up by the email address (matching
 
        case insensitively).
 
        For anything else, try to look up by the user name.
 

	
 
        This assumes no normal username can have '@' symbol.
 
        """
 
        if '@' in username_or_email:
 
            return User.get_by_email(username_or_email, cache=cache)
 
        else:
 
            return User.get_by_username(username_or_email, case_insensitive=case_insensitive, cache=cache)
 

	
 
    @classmethod
 
    def get_by_username(cls, username, case_insensitive=False, cache=False):
 
        if case_insensitive:
 
            q = cls.query().filter(sqlalchemy.func.lower(cls.username) == sqlalchemy.func.lower(username))
 
        else:
 
            q = cls.query().filter(cls.username == username)
 

	
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_user_%s" % _hash_key(username)
 
                          )
 
            )
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_api_key(cls, api_key, cache=False, fallback=True):
 
        if len(api_key) != 40 or not api_key.isalnum():
 
            return None
 

	
 
        q = cls.query().filter(cls.api_key == api_key)
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_api_key_%s" % api_key))
 
        res = q.scalar()
 

	
 
        if fallback and not res:
 
            # fallback to additional keys
 
            _res = UserApiKeys.query().filter_by(api_key=api_key, is_expired=False).first()
 
            if _res:
 
                res = _res.user
 
        if res is None or not res.active or res.is_default_user:
 
            return None
 
        return res
 

	
 
    @classmethod
 
    def get_by_email(cls, email, cache=False):
 
        q = cls.query().filter(sqlalchemy.func.lower(cls.email) == sqlalchemy.func.lower(email))
 

	
 
        if cache:
 
            q = q.options(FromCache("sql_cache_short",
 
                                    "get_email_key_%s" % email))
 

	
 
        ret = q.scalar()
 
        if ret is None:
 
            q = UserEmailMap.query()
 
            # try fetching in alternate email map
 
            q = q.filter(sqlalchemy.func.lower(UserEmailMap.email) == sqlalchemy.func.lower(email))
 
            q = q.options(joinedload(UserEmailMap.user))
 
            if cache:
 
                q = q.options(FromCache("sql_cache_short",
 
                                        "get_email_map_key_%s" % email))
 
            ret = getattr(q.scalar(), 'user', None)
 

	
 
        return ret
 

	
 
    @classmethod
 
    def get_from_cs_author(cls, author):
 
        """
 
        Tries to get User objects out of commit author string
 

	
 
        :param author:
 
        """
 
        from kallithea.lib.helpers import email, author_name
 
        # Valid email in the attribute passed, see if they're in the system
 
        _email = email(author)
 
        if _email:
 
            user = cls.get_by_email(_email)
 
            if user is not None:
 
                return user
 
        # Maybe we can match by username?
 
        _author = author_name(author)
 
        user = cls.get_by_username(_author, case_insensitive=True)
 
        if user is not None:
 
            return user
 

	
 
    def update_lastlogin(self):
 
        """Update user lastlogin"""
 
        self.last_login = datetime.datetime.now()
 
        log.debug('updated user %s lastlogin', self.username)
 

	
 
    @classmethod
 
    def get_first_admin(cls):
 
        user = User.query().filter(User.admin == True).first()
 
        if user is None:
 
            raise Exception('Missing administrative account!')
 
        return user
 

	
 
    @classmethod
 
    def get_default_user(cls, cache=False):
 
        user = User.get_by_username(User.DEFAULT_USER, cache=cache)
 
        if user is None:
 
            raise Exception('Missing default account!')
 
        return user
 

	
 
    def get_api_data(self, details=False):
 
        """
 
        Common function for generating user related data for API
 
        """
 
        user = self
 
        data = dict(
 
            user_id=user.user_id,
 
            username=user.username,
 
            firstname=user.name,
 
            lastname=user.lastname,
 
            email=user.email,
 
            emails=user.emails,
 
            active=user.active,
 
            admin=user.admin,
 
        )
 
        if details:
 
            data.update(dict(
 
                extern_type=user.extern_type,
 
                extern_name=user.extern_name,
 
                api_key=user.api_key,
 
                api_keys=user.api_keys,
 
                last_login=user.last_login,
 
                ip_addresses=user.ip_addresses
 
                ))
 
        return data
 

	
 
    def __json__(self):
 
        data = dict(
 
            full_name=self.full_name,
 
            full_name_or_username=self.full_name_or_username,
 
            short_contact=self.short_contact,
 
            full_contact=self.full_contact
 
        )
 
        data.update(self.get_api_data())
 
        return data
 

	
 

	
 
class UserApiKeys(Base, BaseDbModel):
 
    __tablename__ = 'user_api_keys'
 
    __table_args__ = (
 
        Index('uak_api_key_idx', 'api_key'),
 
        Index('uak_api_key_expires_idx', 'api_key', 'expires'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_api_key_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    api_key = Column(String(255), nullable=False, unique=True)
 
    description = Column(UnicodeText(), nullable=False)
 
    expires = Column(Float(53), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    user = relationship('User')
 

	
 
    @hybrid_property
 
    def is_expired(self):
 
        return (self.expires != -1) & (time.time() > self.expires)
 

	
 

	
 
class UserEmailMap(Base, BaseDbModel):
 
    __tablename__ = 'user_email_map'
 
    __table_args__ = (
 
        Index('uem_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 

	
 
    email_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    _email = Column("email", String(255), nullable=False, unique=True)
 
    user = relationship('User')
 

	
 
    @validates('_email')
 
    def validate_email(self, key, email):
 
        # check if this email is not main one
 
        main_email = Session().query(User).filter(User.email == email).scalar()
 
        if main_email is not None:
 
            raise AttributeError('email %s is present is user table' % email)
 
        return email
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 

	
 
class UserIpMap(Base, BaseDbModel):
 
    __tablename__ = 'user_ip_map'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'ip_addr'),
 
        _table_args_default_dict,
 
    )
 

	
 
    ip_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    ip_addr = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    user = relationship('User')
 

	
 
    @classmethod
 
    def _get_ip_range(cls, ip_addr):
 
        net = ipaddr.IPNetwork(address=ip_addr)
 
        return [str(net.network), str(net.broadcast)]
 

	
 
    def __json__(self):
 
        return dict(
 
          ip_addr=self.ip_addr,
 
          ip_range=self._get_ip_range(self.ip_addr)
 
        )
 

	
 
    def __repr__(self):
 
        return "<%s %s: %s>" % (self.__class__.__name__, self.user_id, self.ip_addr)
 

	
 

	
 
class UserLog(Base, BaseDbModel):
 
    __tablename__ = 'user_logs'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    user_log_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
 
    username = Column(String(255), nullable=False)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
 
    repository_name = Column(Unicode(255), nullable=False)
 
    user_ip = Column(String(255), nullable=True)
 
    action = Column(UnicodeText(), nullable=False)
 
    action_date = Column(DateTime(timezone=False), nullable=False)
 

	
 
    def __repr__(self):
 
        return "<%s %r: %r')>" % (self.__class__.__name__,
 
                                  self.repository_name,
 
                                  self.action)
 

	
 
    @property
 
    def action_as_day(self):
 
        return datetime.date(*self.action_date.timetuple()[:3])
 

	
 
    user = relationship('User')
 
    repository = relationship('Repository', cascade='')
 

	
 

	
 
class UserGroup(Base, BaseDbModel):
 
    __tablename__ = 'users_groups'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_id = Column(Integer(), primary_key=True)
 
    users_group_name = Column(Unicode(255), nullable=False, unique=True)
 
    user_group_description = Column(Unicode(10000), nullable=True) # FIXME: not nullable?
 
    users_group_active = Column(Boolean(), nullable=False)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _group_data = Column("group_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    members = relationship('UserGroupMember', cascade="all, delete-orphan")
 
    users_group_to_perm = relationship('UserGroupToPerm', cascade='all')
 
    users_group_repo_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
 
    users_group_repo_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
 
    user_user_group_to_perm = relationship('UserUserGroupToPerm ', cascade='all')
 
    user_group_user_group_to_perm = relationship('UserGroupUserGroupToPerm ', primaryjoin="UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id", cascade='all')
 

	
 
    owner = relationship('User')
 

	
 
    @hybrid_property
 
    def group_data(self):
 
        if not self._group_data:
 
            return {}
 

	
 
        try:
 
            return ext_json.loads(self._group_data)
 
        except TypeError:
 
            return {}
 

	
 
    @group_data.setter
 
    def group_data(self, val):
 
        try:
 
            self._group_data = ascii_bytes(ext_json.dumps(val))
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __repr__(self):
 
        return "<%s %s: %r')>" % (self.__class__.__name__,
 
                                  self.users_group_id,
 
                                  self.users_group_name)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(UserGroup, cls).guess_instance(value, UserGroup.get_by_group_name)
 

	
 
    @classmethod
 
    def get_by_group_name(cls, group_name, cache=False,
 
                          case_insensitive=False):
 
        if case_insensitive:
 
            q = cls.query().filter(sqlalchemy.func.lower(cls.users_group_name) == sqlalchemy.func.lower(group_name))
 
        else:
 
            q = cls.query().filter(cls.users_group_name == group_name)
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_group_%s" % _hash_key(group_name)
 
                          )
 
            )
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get(cls, user_group_id, cache=False):
 
        user_group = cls.query()
 
        if cache:
 
            user_group = user_group.options(FromCache("sql_cache_short",
 
                                    "get_users_group_%s" % user_group_id))
 
        return user_group.get(user_group_id)
 

	
 
    def get_api_data(self, with_members=True):
 
        user_group = self
 

	
 
        data = dict(
 
            users_group_id=user_group.users_group_id,
 
            group_name=user_group.users_group_name,
 
            group_description=user_group.user_group_description,
 
            active=user_group.users_group_active,
 
            owner=user_group.owner.username,
 
        )
 
        if with_members:
 
            data['members'] = [
 
                ugm.user.get_api_data()
 
                for ugm in user_group.members
 
            ]
 

	
 
        return data
 

	
 

	
 
class UserGroupMember(Base, BaseDbModel):
 
    __tablename__ = 'users_groups_members'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_member_id = Column(Integer(), primary_key=True)
 
    users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    users_group = relationship('UserGroup')
 

	
 
    def __init__(self, gr_id='', u_id=''):
 
        self.users_group_id = gr_id
 
        self.user_id = u_id
 

	
 

	
 
class RepositoryField(Base, BaseDbModel):
 
    __tablename__ = 'repositories_fields'
 
    __table_args__ = (
 
        UniqueConstraint('repository_id', 'field_key'),  # no-multi field
 
        _table_args_default_dict,
 
    )
 

	
 
    PREFIX = 'ex_'  # prefix used in form to not conflict with already existing fields
 

	
 
    repo_field_id = Column(Integer(), primary_key=True)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    field_key = Column(String(250), nullable=False)
 
    field_label = Column(String(1024), nullable=False)
 
    field_value = Column(String(10000), nullable=False)
 
    field_desc = Column(String(1024), nullable=False)
 
    field_type = Column(String(255), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    repository = relationship('Repository')
 

	
 
    @property
 
    def field_key_prefixed(self):
 
        return 'ex_%s' % self.field_key
 

	
 
    @classmethod
 
    def un_prefix_key(cls, key):
 
        if key.startswith(cls.PREFIX):
 
            return key[len(cls.PREFIX):]
 
        return key
 

	
 
    @classmethod
 
    def get_by_key_name(cls, key, repo):
 
        row = cls.query() \
 
                .filter(cls.repository == repo) \
 
                .filter(cls.field_key == key).scalar()
 
        return row
 

	
 

	
 
class Repository(Base, BaseDbModel):
 
    __tablename__ = 'repositories'
 
    __table_args__ = (
 
        Index('r_repo_name_idx', 'repo_name'),
 
        _table_args_default_dict,
 
    )
 

	
 
    DEFAULT_CLONE_URI = '{scheme}://{user}@{netloc}/{repo}'
 
    DEFAULT_CLONE_SSH = 'ssh://{system_user}@{hostname}/{repo}'
 

	
 
    STATE_CREATED = u'repo_state_created'
 
    STATE_PENDING = u'repo_state_pending'
 
    STATE_ERROR = u'repo_state_error'
 

	
 
    repo_id = Column(Integer(), primary_key=True)
 
    repo_name = Column(Unicode(255), nullable=False, unique=True)
 
    repo_state = Column(String(255), nullable=False)
 

	
 
    clone_uri = Column(String(255), nullable=True) # FIXME: not nullable?
 
    repo_type = Column(String(255), nullable=False) # 'hg' or 'git'
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    private = Column(Boolean(), nullable=False)
 
    enable_statistics = Column("statistics", Boolean(), nullable=False, default=True)
 
    enable_downloads = Column("downloads", Boolean(), nullable=False, default=True)
 
    description = Column(Unicode(10000), nullable=False)
kallithea/model/gist.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.gist
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
gist model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: May 9, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import os
 
import random
 
import shutil
 
import time
 
import traceback
 

	
 
from kallithea.lib import ext_json
 
from kallithea.lib.utils2 import AttributeDict, ascii_bytes, safe_int, time_to_datetime
 
from kallithea.model.db import Gist, Session, User
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
GIST_STORE_LOC = '.rc_gist_store'
 
GIST_METADATA_FILE = '.rc_gist_metadata'
 

	
 

	
 
def make_gist_access_id():
 
    """Generate a random, URL safe, almost certainly unique gist identifier."""
 
    rnd = random.SystemRandom() # use cryptographically secure system PRNG
 
    alphabet = '23456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghjklmnpqrstuvwxyz'
 
    length = 20
 
    return u''.join(rnd.choice(alphabet) for _ in range(length))
 

	
 

	
 
class GistModel(object):
 

	
 
    def __delete_gist(self, gist):
 
        """
 
        removes gist from filesystem
 

	
 
        :param gist: gist object
 
        """
 
        root_path = RepoModel().repos_path
 
        rm_path = os.path.join(root_path, GIST_STORE_LOC, gist.gist_access_id)
 
        log.info("Removing %s", rm_path)
 
        shutil.rmtree(rm_path)
 

	
 
    def _store_metadata(self, repo, gist_id, gist_access_id, user_id, gist_type,
 
                        gist_expires):
 
        """
 
        store metadata inside the gist, this can be later used for imports
 
        or gist identification
 
        """
 
        metadata = {
 
            'metadata_version': '1',
 
            'gist_db_id': gist_id,
 
            'gist_access_id': gist_access_id,
 
            'gist_owner_id': user_id,
 
            'gist_type': gist_type,
 
            'gist_expires': gist_expires,
 
            'gist_updated': time.time(),
 
        }
 
        with open(os.path.join(repo.path, '.hg', GIST_METADATA_FILE), 'wb') as f:
 
            f.write(ascii_bytes(ext_json.dumps(metadata)))
 

	
 
    def get_gist(self, gist):
 
        return Gist.guess_instance(gist)
 

	
 
    def get_gist_files(self, gist_access_id, revision=None):
 
        """
 
        Get files for given gist
 

	
 
        :param gist_access_id:
 
        """
 
        repo = Gist.get_by_access_id(gist_access_id)
 
        cs = repo.scm_instance.get_changeset(revision)
 
        return cs, [n for n in cs.get_node('/')]
 

	
 
    def create(self, description, owner, ip_addr, gist_mapping,
 
               gist_type=Gist.GIST_PUBLIC, lifetime=-1):
 
        """
 

	
 
        :param description: description of the gist
 
        :param owner: user who created this gist
 
        :param gist_mapping: mapping {filename:{'content':content},...}
 
        :param gist_type: type of gist private/public
 
        :param lifetime: in minutes, -1 == forever
 
        """
 
        owner = User.guess_instance(owner)
 
        gist_access_id = make_gist_access_id()
 
        lifetime = safe_int(lifetime, -1)
 
        gist_expires = time.time() + (lifetime * 60) if lifetime != -1 else -1
 
        log.debug('set GIST expiration date to: %s',
 
                  time_to_datetime(gist_expires)
 
                   if gist_expires != -1 else 'forever')
 
        # create the Database version
 
        gist = Gist()
 
        gist.gist_description = description
 
        gist.gist_access_id = gist_access_id
 
        gist.owner_id = owner.user_id
 
        gist.gist_expires = gist_expires
 
        gist.gist_type = gist_type
 
        Session().add(gist)
 
        Session().flush() # make database assign gist.gist_id
 
        if gist_type == Gist.GIST_PUBLIC:
 
            # use DB ID for easy to use GIST ID
 
            gist.gist_access_id = unicode(gist.gist_id)
 
            gist.gist_access_id = str(gist.gist_id)
 

	
 
        log.debug('Creating new %s GIST repo %s', gist_type, gist.gist_access_id)
 
        repo = RepoModel()._create_filesystem_repo(
 
            repo_name=gist.gist_access_id, repo_type='hg', repo_group=GIST_STORE_LOC)
 

	
 
        processed_mapping = {}
 
        for filename in gist_mapping:
 
            if filename != os.path.basename(filename):
 
                raise Exception('Filename cannot be inside a directory')
 

	
 
            content = gist_mapping[filename]['content']
 
            # TODO: expand support for setting explicit lexers
 
#             if lexer is None:
 
#                 try:
 
#                     guess_lexer = pygments.lexers.guess_lexer_for_filename
 
#                     lexer = guess_lexer(filename,content)
 
#                 except pygments.util.ClassNotFound:
 
#                     lexer = 'text'
 
            processed_mapping[filename] = {'content': content}
 

	
 
        # now create single multifile commit
 
        message = 'added file'
 
        message += 's: ' if len(processed_mapping) > 1 else ': '
 
        message += ', '.join([x for x in processed_mapping])
 

	
 
        # fake Kallithea Repository object
 
        fake_repo = AttributeDict(dict(
 
            repo_name=os.path.join(GIST_STORE_LOC, gist.gist_access_id),
 
            scm_instance_no_cache=lambda: repo,
 
        ))
 
        ScmModel().create_nodes(
 
            user=owner.user_id,
 
            ip_addr=ip_addr,
 
            repo=fake_repo,
 
            message=message,
 
            nodes=processed_mapping,
 
            trigger_push_hook=False
 
        )
 

	
 
        self._store_metadata(repo, gist.gist_id, gist.gist_access_id,
 
                             owner.user_id, gist.gist_type, gist.gist_expires)
 
        return gist
 

	
 
    def delete(self, gist, fs_remove=True):
 
        gist = Gist.guess_instance(gist)
 
        try:
 
            Session().delete(gist)
 
            if fs_remove:
 
                self.__delete_gist(gist)
 
            else:
 
                log.debug('skipping removal from filesystem')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def update(self, gist, description, owner, ip_addr, gist_mapping, gist_type,
 
               lifetime):
 
        gist = Gist.guess_instance(gist)
 
        gist_repo = gist.scm_instance
 

	
 
        lifetime = safe_int(lifetime, -1)
 
        if lifetime == 0:  # preserve old value
 
            gist_expires = gist.gist_expires
 
        else:
 
            gist_expires = time.time() + (lifetime * 60) if lifetime != -1 else -1
 

	
 
        # calculate operation type based on given data
 
        gist_mapping_op = {}
 
        for k, v in gist_mapping.items():
 
            # add, mod, del
 
            if not v['org_filename'] and v['filename']:
 
                op = 'add'
 
            elif v['org_filename'] and not v['filename']:
 
                op = 'del'
 
            else:
 
                op = 'mod'
 

	
 
            v['op'] = op
 
            gist_mapping_op[k] = v
 

	
 
        gist.gist_description = description
 
        gist.gist_expires = gist_expires
 
        gist.owner = owner
 
        gist.gist_type = gist_type
 

	
 
        message = 'updated file'
 
        message += 's: ' if len(gist_mapping) > 1 else ': '
 
        message += ', '.join([x for x in gist_mapping])
 

	
 
        # fake Kallithea Repository object
 
        fake_repo = AttributeDict(dict(
 
            repo_name=os.path.join(GIST_STORE_LOC, gist.gist_access_id),
 
            scm_instance_no_cache=lambda: gist_repo,
 
        ))
 

	
 
        self._store_metadata(gist_repo, gist.gist_id, gist.gist_access_id,
 
                             owner.user_id, gist.gist_type, gist.gist_expires)
 

	
 
        ScmModel().update_nodes(
 
            user=owner.user_id,
 
            ip_addr=ip_addr,
 
            repo=fake_repo,
 
            message=message,
 
            nodes=gist_mapping_op,
 
            trigger_push_hook=False
 
        )
 

	
 
        return gist
kallithea/model/notification.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.notification
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Model for notifications
 

	
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Nov 20, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import logging
 

	
 
from tg import app_globals
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 

	
 
import kallithea
 
from kallithea.lib import helpers as h
 
from kallithea.model.db import User
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class NotificationModel(object):
 

	
 
    TYPE_CHANGESET_COMMENT = u'cs_comment'
 
    TYPE_MESSAGE = u'message'
 
    TYPE_MENTION = u'mention' # not used
 
    TYPE_REGISTRATION = u'registration'
 
    TYPE_PULL_REQUEST = u'pull_request'
 
    TYPE_PULL_REQUEST_COMMENT = u'pull_request_comment'
 

	
 
    def create(self, created_by, subject, body, recipients=None,
 
               type_=TYPE_MESSAGE, with_email=True,
 
               email_kwargs=None, repo_name=None):
 
        """
 

	
 
        Creates notification of given type
 

	
 
        :param created_by: int, str or User instance. User who created this
 
            notification
 
        :param subject:
 
        :param body:
 
        :param recipients: list of int, str or User objects, when None
 
            is given send to all admins
 
        :param type_: type of notification
 
        :param with_email: send email with this notification
 
        :param email_kwargs: additional dict to pass as args to email template
 
        """
 
        from kallithea.lib.celerylib import tasks
 
        email_kwargs = email_kwargs or {}
 
        if recipients and not getattr(recipients, '__iter__', False):
 
            raise Exception('recipients must be a list or iterable')
 

	
 
        created_by_obj = User.guess_instance(created_by)
 

	
 
        recipients_objs = set()
 
        if recipients:
 
            for u in recipients:
 
                obj = User.guess_instance(u)
 
                if obj is not None:
 
                    recipients_objs.add(obj)
 
                else:
 
                    # TODO: inform user that requested operation couldn't be completed
 
                    log.error('cannot email unknown user %r', u)
 
            log.debug('sending notifications %s to %s',
 
                type_, recipients_objs
 
            )
 
        elif recipients is None:
 
            # empty recipients means to all admins
 
            recipients_objs = User.query().filter(User.admin == True).all()
 
            log.debug('sending notifications %s to admins: %s',
 
                type_, recipients_objs
 
            )
 
        #else: silently skip notification mails?
 

	
 
        if not with_email:
 
            return
 

	
 
        headers = {}
 
        headers['X-Kallithea-Notification-Type'] = type_
 
        if 'threading' in email_kwargs:
 
            headers['References'] = ' '.join('<%s>' % x for x in email_kwargs['threading'])
 

	
 
        # this is passed into template
 
        created_on = h.fmt_date(datetime.datetime.now())
 
        html_kwargs = {
 
                  'subject': subject,
 
                  'body': h.render_w_mentions(body, repo_name),
 
                  'when': created_on,
 
                  'user': created_by_obj.username,
 
                  }
 

	
 
        txt_kwargs = {
 
                  'subject': subject,
 
                  'body': body,
 
                  'when': created_on,
 
                  'user': created_by_obj.username,
 
                  }
 

	
 
        html_kwargs.update(email_kwargs)
 
        txt_kwargs.update(email_kwargs)
 
        email_subject = EmailNotificationModel() \
 
                            .get_email_description(type_, **txt_kwargs)
 
        email_txt_body = EmailNotificationModel() \
 
                            .get_email_tmpl(type_, 'txt', **txt_kwargs)
 
        email_html_body = EmailNotificationModel() \
 
                            .get_email_tmpl(type_, 'html', **html_kwargs)
 

	
 
        # don't send email to person who created this comment
 
        rec_objs = set(recipients_objs).difference(set([created_by_obj]))
 

	
 
        # send email with notification to all other participants
 
        for rec in rec_objs:
 
            tasks.send_email([rec.email], email_subject, email_txt_body,
 
                     email_html_body, headers, author=created_by_obj)
 

	
 

	
 
class EmailNotificationModel(object):
 

	
 
    TYPE_CHANGESET_COMMENT = NotificationModel.TYPE_CHANGESET_COMMENT
 
    TYPE_MESSAGE = NotificationModel.TYPE_MESSAGE # only used for testing
 
    # NotificationModel.TYPE_MENTION is not used
 
    TYPE_PASSWORD_RESET = 'password_link'
 
    TYPE_REGISTRATION = NotificationModel.TYPE_REGISTRATION
 
    TYPE_PULL_REQUEST = NotificationModel.TYPE_PULL_REQUEST
 
    TYPE_PULL_REQUEST_COMMENT = NotificationModel.TYPE_PULL_REQUEST_COMMENT
 
    TYPE_DEFAULT = 'default'
 

	
 
    def __init__(self):
 
        super(EmailNotificationModel, self).__init__()
 
        self._template_root = kallithea.CONFIG['paths']['templates'][0]
 
        self._tmpl_lookup = app_globals.mako_lookup
 
        self.email_types = {
 
            self.TYPE_CHANGESET_COMMENT: 'changeset_comment',
 
            self.TYPE_PASSWORD_RESET: 'password_reset',
 
            self.TYPE_REGISTRATION: 'registration',
 
            self.TYPE_DEFAULT: 'default',
 
            self.TYPE_PULL_REQUEST: 'pull_request',
 
            self.TYPE_PULL_REQUEST_COMMENT: 'pull_request_comment',
 
        }
 
        self._subj_map = {
 
            self.TYPE_CHANGESET_COMMENT: _('[Comment] %(repo_name)s changeset %(short_id)s "%(message_short)s" on %(branch)s'),
 
            self.TYPE_MESSAGE: 'Test Message',
 
            # self.TYPE_PASSWORD_RESET
 
            self.TYPE_REGISTRATION: _('New user %(new_username)s registered'),
 
            # self.TYPE_DEFAULT
 
            self.TYPE_PULL_REQUEST: _('[Review] %(repo_name)s PR %(pr_nice_id)s "%(pr_title_short)s" from %(pr_source_branch)s by %(pr_owner_username)s'),
 
            self.TYPE_PULL_REQUEST_COMMENT: _('[Comment] %(repo_name)s PR %(pr_nice_id)s "%(pr_title_short)s" from %(pr_source_branch)s by %(pr_owner_username)s'),
 
        }
 

	
 
    def get_email_description(self, type_, **kwargs):
 
        """
 
        return subject for email based on given type
 
        """
 
        tmpl = self._subj_map[type_]
 
        try:
 
            subj = tmpl % kwargs
 
        except KeyError as e:
 
            log.error('error generating email subject for %r from %s: %s', type_, ', '.join(self._subj_map), e)
 
            raise
 
        # gmail doesn't do proper threading but will ignore leading square
 
        # bracket content ... so that is where we put status info
 
        bracket_tags = []
 
        status_change = kwargs.get('status_change')
 
        if status_change:
 
            bracket_tags.append(unicode(status_change))  # apply unicode to evaluate LazyString before .join
 
            bracket_tags.append(str(status_change))  # apply str to evaluate LazyString before .join
 
        if kwargs.get('closing_pr'):
 
            bracket_tags.append(_('Closing'))
 
        if bracket_tags:
 
            if subj.startswith('['):
 
                subj = '[' + ', '.join(bracket_tags) + ': ' + subj[1:]
 
            else:
 
                subj = '[' + ', '.join(bracket_tags) + '] ' + subj
 
        return subj
 

	
 
    def get_email_tmpl(self, type_, content_type, **kwargs):
 
        """
 
        return generated template for email based on given type
 
        """
 

	
 
        base = 'email_templates/' + self.email_types.get(type_, self.email_types[self.TYPE_DEFAULT]) + '.' + content_type
 
        email_template = self._tmpl_lookup.get_template(base)
 
        # translator and helpers inject
 
        _kwargs = {'_': _,
 
                   'h': h,
 
                   'c': c}
 
        _kwargs.update(kwargs)
 
        if content_type == 'html':
 
            _kwargs.update({
 
                "color_text": "#202020",
 
                "color_emph": "#395fa0",
 
                "color_link": "#395fa0",
 
                "color_border": "#ddd",
 
                "color_background_grey": "#f9f9f9",
 
                "color_button": "#395fa0",
 
                "monospace_style": "font-family:Lucida Console,Consolas,Monaco,Inconsolata,Liberation Mono,monospace",
 
                "sans_style": "font-family:Helvetica,Arial,sans-serif",
 
                })
 
            _kwargs.update({
 
                "default_style": "%(sans_style)s;font-weight:200;font-size:14px;line-height:17px;color:%(color_text)s" % _kwargs,
 
                "comment_style": "%(monospace_style)s;white-space:pre-wrap" % _kwargs,
 
                "data_style": "border:%(color_border)s 1px solid;background:%(color_background_grey)s" % _kwargs,
 
                "emph_style": "font-weight:600;color:%(color_emph)s" % _kwargs,
 
                "link_style": "color:%(color_link)s;text-decoration:none" % _kwargs,
 
                "link_text_style": "color:%(color_text)s;text-decoration:none;border:%(color_border)s 1px solid;background:%(color_background_grey)s" % _kwargs,
 
                })
 

	
 
        log.debug('rendering tmpl %s with kwargs %s', base, _kwargs)
 
        return email_template.render_unicode(**_kwargs)
kallithea/tests/vcs/test_git.py
Show inline comments
 
import datetime
 
import os
 
import sys
 
import urllib.error
 

	
 
import mock
 
import pytest
 

	
 
from kallithea.lib.vcs.backends.git import GitChangeset, GitRepository
 
from kallithea.lib.vcs.exceptions import NodeDoesNotExistError, RepositoryError, VCSError
 
from kallithea.lib.vcs.nodes import DirNode, FileNode, NodeKind, NodeState
 
from kallithea.model.scm import ScmModel
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import TEST_GIT_REPO, TEST_GIT_REPO_CLONE, TESTS_TMP_PATH, get_new_dir
 

	
 

	
 
class TestGitRepository(object):
 

	
 
    def __check_for_existing_repo(self):
 
        if os.path.exists(TEST_GIT_REPO_CLONE):
 
            pytest.fail('Cannot test git clone repo as location %s already '
 
                      'exists. You should manually remove it first.'
 
                      % TEST_GIT_REPO_CLONE)
 

	
 
    def setup_method(self):
 
        self.repo = GitRepository(TEST_GIT_REPO)
 

	
 
    def test_wrong_repo_path(self):
 
        wrong_repo_path = os.path.join(TESTS_TMP_PATH, 'errorrepo')
 
        with pytest.raises(RepositoryError):
 
            GitRepository(wrong_repo_path)
 

	
 
    def test_git_cmd_injection(self):
 
        repo_inject_path = TEST_GIT_REPO + '; echo "Cake";'
 
        with pytest.raises(urllib.error.URLError):
 
            # Should fail because URL will contain the parts after ; too
 
            GitRepository(get_new_dir('injection-repo'), src_url=repo_inject_path, update_after_clone=True, create=True)
 

	
 
        with pytest.raises(RepositoryError):
 
            # Should fail on direct clone call, which as of this writing does not happen outside of class
 
            clone_fail_repo = GitRepository(get_new_dir('injection-repo'), create=True)
 
            clone_fail_repo.clone(repo_inject_path, update_after_clone=True,)
 

	
 
        # Verify correct quoting of evil characters that should work on posix file systems
 
        if sys.platform == 'win32':
 
            # windows does not allow '"' in dir names
 
            # and some versions of the git client don't like ` and '
 
            tricky_path = get_new_dir("tricky-path-repo-$")
 
        else:
 
            tricky_path = get_new_dir("tricky-path-repo-$'\"`")
 
        successfully_cloned = GitRepository(tricky_path, src_url=TEST_GIT_REPO, update_after_clone=True, create=True)
 
        # Repo should have been created
 
        assert not successfully_cloned._repo.bare
 

	
 
        if sys.platform == 'win32':
 
            # windows does not allow '"' in dir names
 
            # and some versions of the git client don't like ` and '
 
            tricky_path_2 = get_new_dir("tricky-path-2-repo-$")
 
        else:
 
            tricky_path_2 = get_new_dir("tricky-path-2-repo-$'\"`")
 
        successfully_cloned2 = GitRepository(tricky_path_2, src_url=tricky_path, bare=True, create=True)
 
        # Repo should have been created and thus used correct quoting for clone
 
        assert successfully_cloned2._repo.bare
 

	
 
        # Should pass because URL has been properly quoted
 
        successfully_cloned.pull(tricky_path_2)
 
        successfully_cloned2.fetch(tricky_path)
 

	
 
    def test_repo_create_with_spaces_in_path(self):
 
        repo_path = get_new_dir("path with spaces")
 
        repo = GitRepository(repo_path, src_url=None, bare=True, create=True)
 
        # Repo should have been created
 
        assert repo._repo.bare
 

	
 
    def test_repo_clone(self):
 
        self.__check_for_existing_repo()
 
        repo = GitRepository(TEST_GIT_REPO)
 
        repo_clone = GitRepository(TEST_GIT_REPO_CLONE,
 
            src_url=TEST_GIT_REPO, create=True, update_after_clone=True)
 
        assert len(repo.revisions) == len(repo_clone.revisions)
 
        # Checking hashes of changesets should be enough
 
        for changeset in repo.get_changesets():
 
            raw_id = changeset.raw_id
 
            assert raw_id == repo_clone.get_changeset(raw_id).raw_id
 

	
 
    def test_repo_clone_with_spaces_in_path(self):
 
        repo_path = get_new_dir("path with spaces")
 
        successfully_cloned = GitRepository(repo_path, src_url=TEST_GIT_REPO, update_after_clone=True, create=True)
 
        # Repo should have been created
 
        assert not successfully_cloned._repo.bare
 

	
 
        successfully_cloned.pull(TEST_GIT_REPO)
 
        self.repo.fetch(repo_path)
 

	
 
    def test_repo_clone_without_create(self):
 
        with pytest.raises(RepositoryError):
 
            GitRepository(TEST_GIT_REPO_CLONE + '_wo_create', src_url=TEST_GIT_REPO)
 

	
 
    def test_repo_clone_with_update(self):
 
        repo = GitRepository(TEST_GIT_REPO)
 
        clone_path = TEST_GIT_REPO_CLONE + '_with_update'
 
        repo_clone = GitRepository(clone_path,
 
            create=True, src_url=TEST_GIT_REPO, update_after_clone=True)
 
        assert len(repo.revisions) == len(repo_clone.revisions)
 

	
 
        # check if current workdir was updated
 
        fpath = os.path.join(clone_path, 'MANIFEST.in')
 
        assert os.path.isfile(fpath) == True, 'Repo was cloned and updated but file %s could not be found' % fpath
 

	
 
    def test_repo_clone_without_update(self):
 
        repo = GitRepository(TEST_GIT_REPO)
 
        clone_path = TEST_GIT_REPO_CLONE + '_without_update'
 
        repo_clone = GitRepository(clone_path,
 
            create=True, src_url=TEST_GIT_REPO, update_after_clone=False)
 
        assert len(repo.revisions) == len(repo_clone.revisions)
 
        # check if current workdir was *NOT* updated
 
        fpath = os.path.join(clone_path, 'MANIFEST.in')
 
        # Make sure it's not bare repo
 
        assert not repo_clone._repo.bare
 
        assert os.path.isfile(fpath) == False, 'Repo was cloned and updated but file %s was found' % fpath
 

	
 
    def test_repo_clone_into_bare_repo(self):
 
        repo = GitRepository(TEST_GIT_REPO)
 
        clone_path = TEST_GIT_REPO_CLONE + '_bare.git'
 
        repo_clone = GitRepository(clone_path, create=True,
 
            src_url=repo.path, bare=True)
 
        assert repo_clone._repo.bare
 

	
 
    def test_create_repo_is_not_bare_by_default(self):
 
        repo = GitRepository(get_new_dir('not-bare-by-default'), create=True)
 
        assert not repo._repo.bare
 

	
 
    def test_create_bare_repo(self):
 
        repo = GitRepository(get_new_dir('bare-repo'), create=True, bare=True)
 
        assert repo._repo.bare
 

	
 
    def test_revisions(self):
 
        # there are 112 revisions (by now)
 
        # so we can assume they would be available from now on
 
        subset = set([
 
            'c1214f7e79e02fc37156ff215cd71275450cffc3',
 
            '38b5fe81f109cb111f549bfe9bb6b267e10bc557',
 
            'fa6600f6848800641328adbf7811fd2372c02ab2',
 
            '102607b09cdd60e2793929c4f90478be29f85a17',
 
            '49d3fd156b6f7db46313fac355dca1a0b94a0017',
 
            '2d1028c054665b962fa3d307adfc923ddd528038',
 
            'd7e0d30fbcae12c90680eb095a4f5f02505ce501',
 
            'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
 
            'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
 
            '8430a588b43b5d6da365400117c89400326e7992',
 
            'd955cd312c17b02143c04fa1099a352b04368118',
 
            'f67b87e5c629c2ee0ba58f85197e423ff28d735b',
 
            'add63e382e4aabc9e1afdc4bdc24506c269b7618',
 
            'f298fe1189f1b69779a4423f40b48edf92a703fc',
 
            'bd9b619eb41994cac43d67cf4ccc8399c1125808',
 
            '6e125e7c890379446e98980d8ed60fba87d0f6d1',
 
            'd4a54db9f745dfeba6933bf5b1e79e15d0af20bd',
 
            '0b05e4ed56c802098dfc813cbe779b2f49e92500',
 
            '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
 
            '45223f8f114c64bf4d6f853e3c35a369a6305520',
 
            'ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
 
            'f5ea29fc42ef67a2a5a7aecff10e1566699acd68',
 
            '27d48942240f5b91dfda77accd2caac94708cc7d',
 
            '622f0eb0bafd619d2560c26f80f09e3b0b0d78af',
 
            'e686b958768ee96af8029fe19c6050b1a8dd3b2b'])
 
        assert subset.issubset(set(self.repo.revisions))
 

	
 
    def test_slicing(self):
 
        # 4 1 5 10 95
 
        for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
 
                                 (10, 20, 10), (5, 100, 95)]:
 
            revs = list(self.repo[sfrom:sto])
 
            assert len(revs) == size
 
            assert revs[0] == self.repo.get_changeset(sfrom)
 
            assert revs[-1] == self.repo.get_changeset(sto - 1)
 

	
 
    def test_branches(self):
 
        # TODO: Need more tests here
 
        # Removed (those are 'remotes' branches for cloned repo)
 
        #assert 'master' in self.repo.branches
 
        #assert 'gittree' in self.repo.branches
 
        #assert 'web-branch' in self.repo.branches
 
        for name, id in self.repo.branches.items():
 
            assert isinstance(self.repo.get_changeset(id), GitChangeset)
 

	
 
    def test_tags(self):
 
        # TODO: Need more tests here
 
        assert 'v0.1.1' in self.repo.tags
 
        assert 'v0.1.2' in self.repo.tags
 
        for name, id in self.repo.tags.items():
 
            assert isinstance(self.repo.get_changeset(id), GitChangeset)
 

	
 
    def _test_single_changeset_cache(self, revision):
 
        chset = self.repo.get_changeset(revision)
 
        assert revision in self.repo.changesets
 
        assert chset is self.repo.changesets[revision]
 

	
 
    def test_initial_changeset(self):
 
        id = self.repo.revisions[0]
 
        init_chset = self.repo.get_changeset(id)
 
        assert init_chset.message == 'initial import\n'
 
        assert init_chset.author == 'Marcin Kuzminski <marcin@python-blog.com>'
 
        for path in ('vcs/__init__.py',
 
                     'vcs/backends/BaseRepository.py',
 
                     'vcs/backends/__init__.py'):
 
            assert isinstance(init_chset.get_node(path), FileNode)
 
        for path in ('', 'vcs', 'vcs/backends'):
 
            assert isinstance(init_chset.get_node(path), DirNode)
 

	
 
        with pytest.raises(NodeDoesNotExistError):
 
            init_chset.get_node(path='foobar')
 

	
 
        node = init_chset.get_node('vcs/')
 
        assert hasattr(node, 'kind')
 
        assert node.kind == NodeKind.DIR
 

	
 
        node = init_chset.get_node('vcs')
 
        assert hasattr(node, 'kind')
 
        assert node.kind == NodeKind.DIR
 

	
 
        node = init_chset.get_node('vcs/__init__.py')
 
        assert hasattr(node, 'kind')
 
        assert node.kind == NodeKind.FILE
 

	
 
    def test_not_existing_changeset(self):
 
        with pytest.raises(RepositoryError):
 
            self.repo.get_changeset('f' * 40)
 

	
 
    def test_changeset10(self):
 

	
 
        chset10 = self.repo.get_changeset(self.repo.revisions[9])
 
        readme = b"""===
 
VCS
 
===
 

	
 
Various Version Control System management abstraction layer for Python.
 

	
 
Introduction
 
------------
 

	
 
TODO: To be written...
 

	
 
"""
 
        node = chset10.get_node('README.rst')
 
        assert node.kind == NodeKind.FILE
 
        assert node.content == readme
 

	
 

	
 
class TestGitChangeset(object):
 

	
 
    def setup_method(self):
 
        self.repo = GitRepository(TEST_GIT_REPO)
 

	
 
    def test_default_changeset(self):
 
        tip = self.repo.get_changeset()
 
        assert tip == self.repo.get_changeset(None)
 
        assert tip == self.repo.get_changeset('tip')
 

	
 
    def test_root_node(self):
 
        tip = self.repo.get_changeset()
 
        assert tip.root is tip.get_node('')
 

	
 
    def test_lazy_fetch(self):
 
        """
 
        Test if changeset's nodes expands and are cached as we walk through
 
        the revision. This test is somewhat hard to write as order of tests
 
        is a key here. Written by running command after command in a shell.
 
        """
 
        commit_id = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
 
        assert commit_id in self.repo.revisions
 
        chset = self.repo.get_changeset(commit_id)
 
        assert len(chset.nodes) == 0
 
        root = chset.root
 
        assert len(chset.nodes) == 1
 
        assert len(root.nodes) == 8
 
        # accessing root.nodes updates chset.nodes
 
        assert len(chset.nodes) == 9
 

	
 
        docs = root.get_node('docs')
 
        # we haven't yet accessed anything new as docs dir was already cached
 
        assert len(chset.nodes) == 9
 
        assert len(docs.nodes) == 8
 
        # accessing docs.nodes updates chset.nodes
 
        assert len(chset.nodes) == 17
 

	
 
        assert docs is chset.get_node('docs')
 
        assert docs is root.nodes[0]
 
        assert docs is root.dirs[0]
 
        assert docs is chset.get_node('docs')
 

	
 
    def test_nodes_with_changeset(self):
 
        commit_id = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
 
        chset = self.repo.get_changeset(commit_id)
 
        root = chset.root
 
        docs = root.get_node('docs')
 
        assert docs is chset.get_node('docs')
 
        api = docs.get_node('api')
 
        assert api is chset.get_node('docs/api')
 
        index = api.get_node('index.rst')
 
        assert index is chset.get_node('docs/api/index.rst')
 
        assert index is chset.get_node('docs') \
 
                             .get_node('api') \
 
                             .get_node('index.rst')
 

	
 
    def test_branch_and_tags(self):
 
        # Those tests seem to show wrong results:
 
        # in Git, only heads have a branch - most changesets don't
 
        rev0 = self.repo.revisions[0]
 
        chset0 = self.repo.get_changeset(rev0)
 
        assert chset0.branch is None # should be 'master'?
 
        assert chset0.branches == [] # should be 'master'?
 
        assert chset0.tags == []
 

	
 
        rev10 = self.repo.revisions[10]
 
        chset10 = self.repo.get_changeset(rev10)
 
        assert chset10.branch is None # should be 'master'?
 
        assert chset10.branches == [] # should be 'master'?
 
        assert chset10.tags == []
 

	
 
        rev44 = self.repo.revisions[44]
 
        chset44 = self.repo.get_changeset(rev44)
 
        assert chset44.branch is None # should be 'web-branch'?
 
        assert chset44.branches == [] # should be 'web-branch'?
 

	
 
        tip = self.repo.get_changeset('tip')
 
        assert 'tip' not in tip.tags # it should be?
 
        assert not tip.tags # how it is!
 

	
 
    def _test_slices(self, limit, offset):
 
        count = self.repo.count()
 
        changesets = self.repo.get_changesets(limit=limit, offset=offset)
 
        idx = 0
 
        for changeset in changesets:
 
            rev = offset + idx
 
            idx += 1
 
            rev_id = self.repo.revisions[rev]
 
            if idx > limit:
 
                pytest.fail("Exceeded limit already (getting revision %s, "
 
                    "there are %s total revisions, offset=%s, limit=%s)"
 
                    % (rev_id, count, offset, limit))
 
            assert changeset == self.repo.get_changeset(rev_id)
 
        result = list(self.repo.get_changesets(limit=limit, offset=offset))
 
        start = offset
 
        end = limit and offset + limit or None
 
        sliced = list(self.repo[start:end])
 
        pytest.assertEqual(result, sliced,
 
            msg="Comparison failed for limit=%s, offset=%s"
 
            "(get_changeset returned: %s and sliced: %s"
 
            % (limit, offset, result, sliced))
 

	
 
    def _test_file_size(self, revision, path, size):
 
        node = self.repo.get_changeset(revision).get_node(path)
 
        assert node.is_file()
 
        assert node.size == size
 

	
 
    def test_file_size(self):
 
        to_check = (
 
            ('c1214f7e79e02fc37156ff215cd71275450cffc3',
 
                'vcs/backends/BaseRepository.py', 502),
 
            ('d7e0d30fbcae12c90680eb095a4f5f02505ce501',
 
                'vcs/backends/hg.py', 854),
 
            ('6e125e7c890379446e98980d8ed60fba87d0f6d1',
 
                'setup.py', 1068),
 
            ('d955cd312c17b02143c04fa1099a352b04368118',
 
                'vcs/backends/base.py', 2921),
 
            ('ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
 
                'vcs/backends/base.py', 3936),
 
            ('f50f42baeed5af6518ef4b0cb2f1423f3851a941',
 
                'vcs/backends/base.py', 6189),
 
        )
 
        for revision, path, size in to_check:
 
            self._test_file_size(revision, path, size)
 

	
 
    def _test_dir_size(self, revision, path, size):
 
        node = self.repo.get_changeset(revision).get_node(path)
 
        assert node.size == size
 

	
 
    def test_dir_size(self):
 
        to_check = (
 
            ('5f2c6ee195929b0be80749243c18121c9864a3b3', '/', 674076),
 
            ('7ab37bc680b4aa72c34d07b230c866c28e9fc204', '/', 674049),
 
            ('6892503fb8f2a552cef5f4d4cc2cdbd13ae1cd2f', '/', 671830),
 
        )
 
        for revision, path, size in to_check:
 
            self._test_dir_size(revision, path, size)
 

	
 
    def test_repo_size(self):
 
        assert self.repo.size == 674076
 

	
 
    def test_file_history(self):
 
        # we can only check if those revisions are present in the history
 
        # as we cannot update this test every time file is changed
 
        files = {
 
            'setup.py': [
 
                '54386793436c938cff89326944d4c2702340037d',
 
                '51d254f0ecf5df2ce50c0b115741f4cf13985dab',
 
                '998ed409c795fec2012b1c0ca054d99888b22090',
 
                '5e0eb4c47f56564395f76333f319d26c79e2fb09',
 
                '0115510b70c7229dbc5dc49036b32e7d91d23acd',
 
                '7cb3fd1b6d8c20ba89e2264f1c8baebc8a52d36e',
 
                '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
 
                '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
 
                'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
 
            ],
 
            'vcs/nodes.py': [
 
                '33fa3223355104431402a888fa77a4e9956feb3e',
 
                'fa014c12c26d10ba682fadb78f2a11c24c8118e1',
 
                'e686b958768ee96af8029fe19c6050b1a8dd3b2b',
 
                'ab5721ca0a081f26bf43d9051e615af2cc99952f',
 
                'c877b68d18e792a66b7f4c529ea02c8f80801542',
 
                '4313566d2e417cb382948f8d9d7c765330356054',
 
                '6c2303a793671e807d1cfc70134c9ca0767d98c2',
 
                '54386793436c938cff89326944d4c2702340037d',
 
                '54000345d2e78b03a99d561399e8e548de3f3203',
 
                '1c6b3677b37ea064cb4b51714d8f7498f93f4b2b',
 
                '2d03ca750a44440fb5ea8b751176d1f36f8e8f46',
 
                '2a08b128c206db48c2f0b8f70df060e6db0ae4f8',
 
                '30c26513ff1eb8e5ce0e1c6b477ee5dc50e2f34b',
 
                'ac71e9503c2ca95542839af0ce7b64011b72ea7c',
 
                '12669288fd13adba2a9b7dd5b870cc23ffab92d2',
 
                '5a0c84f3e6fe3473e4c8427199d5a6fc71a9b382',
 
                '12f2f5e2b38e6ff3fbdb5d722efed9aa72ecb0d5',
 
                '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
 
                'f50f42baeed5af6518ef4b0cb2f1423f3851a941',
 
                'd7e390a45f6aa96f04f5e7f583ad4f867431aa25',
 
                'f15c21f97864b4f071cddfbf2750ec2e23859414',
 
                'e906ef056cf539a4e4e5fc8003eaf7cf14dd8ade',
 
                'ea2b108b48aa8f8c9c4a941f66c1a03315ca1c3b',
 
                '84dec09632a4458f79f50ddbbd155506c460b4f9',
 
                '0115510b70c7229dbc5dc49036b32e7d91d23acd',
 
                '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
 
                '3bf1c5868e570e39569d094f922d33ced2fa3b2b',
 
                'b8d04012574729d2c29886e53b1a43ef16dd00a1',
 
                '6970b057cffe4aab0a792aa634c89f4bebf01441',
 
                'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
 
                'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
 
            ],
 
            'vcs/backends/git.py': [
 
                '4cf116ad5a457530381135e2f4c453e68a1b0105',
 
                '9a751d84d8e9408e736329767387f41b36935153',
 
                'cb681fb539c3faaedbcdf5ca71ca413425c18f01',
 
                '428f81bb652bcba8d631bce926e8834ff49bdcc6',
 
                '180ab15aebf26f98f714d8c68715e0f05fa6e1c7',
 
                '2b8e07312a2e89e92b90426ab97f349f4bce2a3a',
 
                '50e08c506174d8645a4bb517dd122ac946a0f3bf',
 
                '54000345d2e78b03a99d561399e8e548de3f3203',
 
            ],
 
        }
 
        for path, revs in files.items():
 
            node = self.repo.get_changeset(revs[0]).get_node(path)
 
            node_revs = [chset.raw_id for chset in node.history]
 
            assert set(revs).issubset(set(node_revs)), "We assumed that %s is subset of revisions for which file %s " \
 
                "has been changed, and history of that node returned: %s" \
 
                % (revs, path, node_revs)
 

	
 
    def test_file_annotate(self):
 
        files = {
 
            'vcs/backends/__init__.py': {
 
                'c1214f7e79e02fc37156ff215cd71275450cffc3': {
 
                    'lines_no': 1,
 
                    'changesets': [
 
                        'c1214f7e79e02fc37156ff215cd71275450cffc3',
 
                    ],
 
                },
 
                '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647': {
 
                    'lines_no': 21,
 
                    'changesets': [
 
                        '49d3fd156b6f7db46313fac355dca1a0b94a0017',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                    ],
 
                },
 
                'e29b67bd158580fc90fc5e9111240b90e6e86064': {
 
                    'lines_no': 32,
 
                    'changesets': [
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '54000345d2e78b03a99d561399e8e548de3f3203',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '78c3f0c23b7ee935ec276acb8b8212444c33c396',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '78c3f0c23b7ee935ec276acb8b8212444c33c396',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                    ],
 
                },
 
            },
 
        }
 

	
 
        for fname, revision_dict in files.items():
 
            for rev, data in revision_dict.items():
 
                cs = self.repo.get_changeset(rev)
 

	
 
                l1_1 = [x[1] for x in cs.get_file_annotate(fname)]
 
                l1_2 = [x[2]().raw_id for x in cs.get_file_annotate(fname)]
 
                assert l1_1 == l1_2
 
                l1 = l1_1
 
                l2 = files[fname][rev]['changesets']
 
                assert l1 == l2, "The lists of revision for %s@rev %s" \
 
                    "from annotation list should match each other, " \
 
                    "got \n%s \nvs \n%s " % (fname, rev, l1, l2)
 

	
 
    def test_files_state(self):
 
        """
 
        Tests state of FileNodes.
 
        """
 
        node = self.repo \
 
            .get_changeset('e6ea6d16e2f26250124a1f4b4fe37a912f9d86a0') \
 
            .get_node('vcs/utils/diffs.py')
 
        assert node.state, NodeState.ADDED
 
        assert node.added
 
        assert not node.changed
 
        assert not node.not_changed
 
        assert not node.removed
 

	
 
        node = self.repo \
 
            .get_changeset('33fa3223355104431402a888fa77a4e9956feb3e') \
 
            .get_node('.hgignore')
 
        assert node.state, NodeState.CHANGED
 
        assert not node.added
 
        assert node.changed
 
        assert not node.not_changed
 
        assert not node.removed
 

	
 
        node = self.repo \
 
            .get_changeset('e29b67bd158580fc90fc5e9111240b90e6e86064') \
 
            .get_node('setup.py')
 
        assert node.state, NodeState.NOT_CHANGED
 
        assert not node.added
 
        assert not node.changed
 
        assert node.not_changed
 
        assert not node.removed
 

	
 
        # If node has REMOVED state then trying to fetch it would raise
 
        # ChangesetError exception
 
        chset = self.repo.get_changeset(
 
            'fa6600f6848800641328adbf7811fd2372c02ab2')
 
        path = 'vcs/backends/BaseRepository.py'
 
        with pytest.raises(NodeDoesNotExistError):
 
            chset.get_node(path)
 
        # but it would be one of ``removed`` (changeset's attribute)
 
        assert path in [rf.path for rf in chset.removed]
 

	
 
        chset = self.repo.get_changeset(
 
            '54386793436c938cff89326944d4c2702340037d')
 
        changed = ['setup.py', 'tests/test_nodes.py', 'vcs/backends/hg.py',
 
            'vcs/nodes.py']
 
        assert set(changed) == set([f.path for f in chset.changed])
 

	
 
    def test_commit_message_is_unicode(self):
 
    def test_commit_message_is_str(self):
 
        for cs in self.repo:
 
            assert isinstance(cs.message, unicode)
 
            assert isinstance(cs.message, str)
 

	
 
    def test_changeset_author_is_unicode(self):
 
    def test_changeset_author_is_str(self):
 
        for cs in self.repo:
 
            assert isinstance(cs.author, unicode)
 
            assert isinstance(cs.author, str)
 

	
 
    def test_repo_files_content_is_bytes(self):
 
        changeset = self.repo.get_changeset()
 
        for node in changeset.get_node('/'):
 
            if node.is_file():
 
                assert isinstance(node.content, bytes)
 

	
 
    def test_wrong_path(self):
 
        # There is 'setup.py' in the root dir but not there:
 
        path = 'foo/bar/setup.py'
 
        tip = self.repo.get_changeset()
 
        with pytest.raises(VCSError):
 
            tip.get_node(path)
 

	
 
    def test_author_email(self):
 
        assert 'marcin@python-blog.com' == self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3').author_email
 
        assert 'lukasz.balcerzak@python-center.pl' == self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b').author_email
 
        assert '' == self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992').author_email
 

	
 
    def test_author_username(self):
 
        assert 'Marcin Kuzminski' == self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3').author_name
 
        assert 'Lukasz Balcerzak' == self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b').author_name
 
        assert 'marcink none@none' == self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992').author_name
 

	
 

	
 
class TestGitSpecificWithRepo(_BackendTestMixin):
 
    backend_alias = 'git'
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': 'Initial',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar/static/js/admin/base.js', content='base'),
 
                    FileNode('foobar/static/admin', content='admin',
 
                        mode=0o120000), # this is a link
 
                    FileNode('foo', content='foo'),
 
                ],
 
            },
 
            {
 
                'message': 'Second',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [
 
                    FileNode('foo2', content='foo2'),
 
                ],
 
            },
 
        ]
 

	
 
    def test_paths_slow_traversing(self):
 
        cs = self.repo.get_changeset()
 
        assert cs.get_node('foobar').get_node('static').get_node('js').get_node('admin').get_node('base.js').content == b'base'
 

	
 
    def test_paths_fast_traversing(self):
 
        cs = self.repo.get_changeset()
 
        assert cs.get_node('foobar/static/js/admin/base.js').content == b'base'
 

	
 
    def test_workdir_get_branch(self):
 
        self.repo.run_git_command(['checkout', '-b', 'production'])
 
        # Regression test: one of following would fail if we don't check
 
        # .git/HEAD file
 
        self.repo.run_git_command(['checkout', 'production'])
 
        assert self.repo.workdir.get_branch() == 'production'
 
        self.repo.run_git_command(['checkout', 'master'])
 
        assert self.repo.workdir.get_branch() == 'master'
 

	
 
    def test_get_diff_runs_git_command_with_hashes(self):
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1)
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1)], cwd=self.repo.path)
 

	
 
    def test_get_diff_runs_git_command_with_str_hashes(self):
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(self.repo.EMPTY_CHANGESET, 1)
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['show', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(1)], cwd=self.repo.path)
 

	
 
    def test_get_diff_runs_git_command_with_path_if_its_given(self):
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo')
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 
    def test_get_diff_does_not_sanitize_valid_context(self):
 
        almost_overflowed_long_int = 2**31-1
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=almost_overflowed_long_int)
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U' + str(almost_overflowed_long_int), '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 
    def test_get_diff_sanitizes_overflowing_context(self):
 
        overflowed_long_int = 2**31
 
        sanitized_overflowed_long_int = overflowed_long_int-1
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=overflowed_long_int)
 

	
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U' + str(sanitized_overflowed_long_int), '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 
    def test_get_diff_does_not_sanitize_zero_context(self):
 
        zero_context = 0
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=zero_context)
 

	
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U' + str(zero_context), '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 
    def test_get_diff_sanitizes_negative_context(self):
 
        negative_context = -10
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=negative_context)
 

	
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U0', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 

	
 
class TestGitRegression(_BackendTestMixin):
 
    backend_alias = 'git'
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': 'Initial',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('bot/__init__.py', content='base'),
 
                    FileNode('bot/templates/404.html', content='base'),
 
                    FileNode('bot/templates/500.html', content='base'),
 
                ],
 
            },
 
            {
 
                'message': 'Second',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [
 
                    FileNode('bot/build/migrations/1.py', content='foo2'),
 
                    FileNode('bot/build/migrations/2.py', content='foo2'),
 
                    FileNode('bot/build/static/templates/f.html', content='foo2'),
 
                    FileNode('bot/build/static/templates/f1.html', content='foo2'),
 
                    FileNode('bot/build/templates/err.html', content='foo2'),
 
                    FileNode('bot/build/templates/err2.html', content='foo2'),
 
                ],
 
            },
 
        ]
 

	
 
    def test_similar_paths(self):
 
        cs = self.repo.get_changeset()
 
        paths = lambda *n: [x.path for x in n]
 
        assert paths(*cs.get_nodes('bot')) == ['bot/build', 'bot/templates', 'bot/__init__.py']
 
        assert paths(*cs.get_nodes('bot/build')) == ['bot/build/migrations', 'bot/build/static', 'bot/build/templates']
 
        assert paths(*cs.get_nodes('bot/build/static')) == ['bot/build/static/templates']
 
        # this get_nodes below causes troubles !
 
        assert paths(*cs.get_nodes('bot/build/static/templates')) == ['bot/build/static/templates/f.html', 'bot/build/static/templates/f1.html']
 
        assert paths(*cs.get_nodes('bot/build/templates')) == ['bot/build/templates/err.html', 'bot/build/templates/err2.html']
 
        assert paths(*cs.get_nodes('bot/templates/')) == ['bot/templates/404.html', 'bot/templates/500.html']
 

	
 

	
 
class TestGitHooks(object):
 
    """
 
    Tests related to hook functionality of Git repositories.
 
    """
 

	
 
    def setup_method(self):
 
        # For each run we want a fresh repo.
 
        self.repo_directory = get_new_dir("githookrepo")
 
        self.repo = GitRepository(self.repo_directory, create=True)
 

	
 
        # Create a dictionary where keys are hook names, and values are paths to
 
        # them in the non-bare repo. Deduplicates code in tests a bit.
 
        self.kallithea_hooks = {
 
            "pre-receive": os.path.join(self.repo.path, '.git', 'hooks', "pre-receive"),
 
            "post-receive": os.path.join(self.repo.path, '.git', 'hooks', "post-receive"),
 
        }
 

	
 
    def test_hooks_created_if_missing(self):
 
        """
 
        Tests if hooks are installed in repository if they are missing.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            if os.path.exists(hook_path):
 
                os.remove(hook_path)
 

	
 
        ScmModel().install_git_hooks(repo=self.repo)
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            assert os.path.exists(hook_path)
 

	
 
    def test_kallithea_hooks_updated(self):
 
        """
 
        Tests if hooks are updated if they are Kallithea hooks already.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path, "w") as f:
 
                f.write("KALLITHEA_HOOK_VER=0.0.0\nJUST_BOGUS")
 

	
 
        ScmModel().install_git_hooks(repo=self.repo)
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path) as f:
 
                assert "JUST_BOGUS" not in f.read()
 

	
 
    def test_custom_hooks_untouched(self):
 
        """
 
        Tests if hooks are left untouched if they are not Kallithea hooks.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path, "w") as f:
 
                f.write("#!/bin/bash\n#CUSTOM_HOOK")
 

	
 
        ScmModel().install_git_hooks(repo=self.repo)
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path) as f:
 
                assert "CUSTOM_HOOK" in f.read()
 

	
 
    def test_custom_hooks_forced_update(self):
 
        """
 
        Tests if hooks are forcefully updated even though they are custom hooks.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path, "w") as f:
 
                f.write("#!/bin/bash\n#CUSTOM_HOOK")
 

	
 
        ScmModel().install_git_hooks(repo=self.repo, force_create=True)
 

	
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path) as f:
 
                assert "KALLITHEA_HOOK_VER" in f.read()

Changeset was too big and was cut off... Show full diff anyway

0 comments (0 inline, 0 general)