Changeset - 756e46bd926b
kallithea/bin/kallithea_cli_repo.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
This file was forked by the Kallithea project in July 2014 and later moved.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Feb 9, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 
import datetime
 
import os
 
import re
 
import shutil
 

	
 
import click
 

	
 
import kallithea.bin.kallithea_cli_base as cli_base
 
from kallithea.lib.utils import REMOVED_REPO_PAT, repo2db_mapper
 
from kallithea.lib.utils2 import ask_ok, safe_str, safe_unicode
 
from kallithea.model.db import Repository, Ui
 
from kallithea.model.meta import Session
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
@cli_base.register_command(config_file_initialize_app=True)
 
@click.option('--remove-missing', is_flag=True,
 
        help='Remove missing repositories from the Kallithea database.')
 
def repo_scan(remove_missing):
 
    """Scan filesystem for repositories.
 

	
 
    Search the configured repository root for new repositories and add them
 
    into Kallithea.
 
    Additionally, report repositories that were previously known to Kallithea
 
    but are no longer present on the filesystem. If option --remove-missing is
 
    given, remove the missing repositories from the Kallithea database.
 
    """
 
    click.echo('Now scanning root location for new repos ...')
 
    added, removed = repo2db_mapper(ScmModel().repo_scan(),
 
                                    remove_obsolete=remove_missing)
 
    click.echo('Scan completed.')
 
    if added:
 
        click.echo('Added: %s' % ', '.join(added))
 
    if removed:
 
        click.echo('%s: %s' % ('Removed' if remove_missing else 'Missing',
 
                          ', '.join(removed)))
 

	
 
@cli_base.register_command(config_file_initialize_app=True)
 
@click.argument('repositories', nargs=-1)
 
def repo_update_metadata(repositories):
 
    """
 
    Update repository metadata in database from repository content.
 

	
 
    In normal operation, Kallithea will keep caches up-to-date
 
    automatically. However, if repositories are externally modified, e.g. by
 
    a direct push via the filesystem rather than via a Kallithea URL,
 
    Kallithea is not aware of it. In this case, you should manually run this
 
    command to update the repository cache.
 

	
 
    If no repositories are specified, the caches of all repositories are
 
    updated.
 
    """
 
    if not repositories:
 
        repo_list = Repository.query().all()
 
    else:
 
        repo_names = [safe_unicode(n.strip()) for n in repositories]
 
        repo_list = list(Repository.query()
 
                        .filter(Repository.repo_name.in_(repo_names)))
 

	
 
    for repo in repo_list:
 
        # update latest revision metadata in database
 
        repo.update_changeset_cache()
 
        # invalidate in-memory VCS object cache... will be repopulated on
 
        # first access
 
        repo.set_invalidate()
 

	
 
    Session().commit()
 

	
 
    click.echo('Updated database with information about latest change in the following %s repositories:' % (len(repo_list)))
 
    click.echo('\n'.join(repo.repo_name for repo in repo_list))
 

	
 
@cli_base.register_command(config_file_initialize_app=True)
 
@click.option('--ask/--no-ask', default=True, help='Ask for confirmation or not. Default is --ask.')
 
@click.option('--older-than',
 
        help="""Only purge repositories that have been removed at least the given time ago.
 
        For example, '--older-than=30d' purges repositories deleted 30 days ago or longer.
 
        Possible suffixes: d (days), h (hours), m (minutes), s (seconds).""")
 
def repo_purge_deleted(ask, older_than):
 
    """Purge backups of deleted repositories.
 

	
 
    When a repository is deleted via the Kallithea web interface, the actual
 
    data is still present on the filesystem but set aside using a special name.
 
    This command allows to delete these files permanently.
 
    """
 
    def _parse_older_than(val):
 
        regex = re.compile(r'((?P<days>\d+?)d)?((?P<hours>\d+?)h)?((?P<minutes>\d+?)m)?((?P<seconds>\d+?)s)?')
 
        parts = regex.match(val)
 
        if not parts:
 
            return
 
        parts = parts.groupdict()
 
        time_params = {}
 
        for (name, param) in parts.iteritems():
 
        for name, param in parts.items():
 
            if param:
 
                time_params[name] = int(param)
 
        return datetime.timedelta(**time_params)
 

	
 
    def _extract_date(name):
 
        """
 
        Extract the date part from rm__<date> pattern of removed repos,
 
        and convert it to datetime object
 

	
 
        :param name:
 
        """
 
        date_part = name[4:19]  # 4:19 since we don't parse milliseconds
 
        return datetime.datetime.strptime(date_part, '%Y%m%d_%H%M%S')
 

	
 
    repos_location = Ui.get_repos_location()
 
    to_remove = []
 
    for dn_, dirs, f in os.walk(safe_str(repos_location)):
 
        alldirs = list(dirs)
 
        del dirs[:]
 
        if ('.hg' in alldirs or
 
            '.git' in alldirs or
 
            '.svn' in alldirs or
 
            'objects' in alldirs and ('refs' in alldirs or 'packed-refs' in f)
 
        ):
 
            continue
 
        for loc in alldirs:
 
            if REMOVED_REPO_PAT.match(loc):
 
                to_remove.append([os.path.join(dn_, loc),
 
                                  _extract_date(loc)])
 
            else:
 
                dirs.append(loc)
 
        if dirs:
 
            click.echo('Scanning: %s' % dn_)
 

	
 
    if not to_remove:
 
        click.echo('There are no deleted repositories.')
 
        return
 

	
 
    # filter older than (if present)!
 
    if older_than:
 
        now = datetime.datetime.now()
 
        to_remove_filtered = []
 
        older_than_date = _parse_older_than(older_than)
 
        for name, date_ in to_remove:
 
            repo_age = now - date_
 
            if repo_age > older_than_date:
 
                to_remove_filtered.append([name, date_])
 

	
 
        to_remove = to_remove_filtered
 

	
 
        if not to_remove:
 
            click.echo('There are no deleted repositories older than %s (%s)'
 
                    % (older_than, older_than_date))
 
            return
 

	
 
        click.echo('Considering %s deleted repositories older than %s (%s).'
 
            % (len(to_remove), older_than, older_than_date))
 
    else:
 
        click.echo('Considering %s deleted repositories.' % len(to_remove))
 

	
 
    if not ask:
 
        remove = True
 
    else:
 
        remove = ask_ok('The following repositories will be removed completely:\n%s\n'
 
                'Do you want to proceed? [y/n] '
 
                % '\n'.join(['%s deleted on %s' % (safe_str(x[0]), safe_str(x[1]))
 
                                     for x in to_remove]))
 

	
 
    if remove:
 
        for path, date_ in to_remove:
 
            click.echo('Purging repository %s' % path)
 
            shutil.rmtree(path)
 
    else:
 
        click.echo('Nothing done, exiting...')
kallithea/controllers/admin/defaults.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.defaults
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
default settings controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 27, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
from formencode import htmlfill
 
from tg import request
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound
 

	
 
from kallithea.config.routing import url
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasPermissionAnyDecorator, LoginRequired
 
from kallithea.lib.base import BaseController, render
 
from kallithea.model.db import Setting
 
from kallithea.model.forms import DefaultsForm
 
from kallithea.model.meta import Session
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class DefaultsController(BaseController):
 

	
 
    @LoginRequired()
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def _before(self, *args, **kwargs):
 
        super(DefaultsController, self)._before(*args, **kwargs)
 

	
 
    def index(self, format='html'):
 
        defaults = Setting.get_default_repo_settings()
 

	
 
        return htmlfill.render(
 
            render('admin/defaults/defaults.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    def update(self, id):
 
        _form = DefaultsForm()()
 

	
 
        try:
 
            form_result = _form.to_python(dict(request.POST))
 
            for k, v in form_result.iteritems():
 
            for k, v in form_result.items():
 
                setting = Setting.create_or_update(k, v)
 
            Session().commit()
 
            h.flash(_('Default settings updated successfully'),
 
                    category='success')
 

	
 
        except formencode.Invalid as errors:
 
            defaults = errors.value
 

	
 
            return htmlfill.render(
 
                render('admin/defaults/defaults.html'),
 
                defaults=defaults,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during update of defaults'),
 
                    category='error')
 

	
 
        raise HTTPFound(location=url('defaults'))
kallithea/controllers/admin/settings.py
Show inline comments
 
@@ -234,200 +234,200 @@ class SettingsController(BaseController)
 
                          'application settings'),
 
                          category='error')
 

	
 
            raise HTTPFound(location=url('admin_settings_global'))
 

	
 
        defaults = Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        return htmlfill.render(
 
            render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_visual(self):
 
        c.active = 'visual'
 
        if request.POST:
 
            application_form = ApplicationVisualisationForm()()
 
            try:
 
                form_result = application_form.to_python(dict(request.POST))
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    render('admin/settings/settings.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 

	
 
            try:
 
                settings = [
 
                    ('show_public_icon', 'show_public_icon', 'bool'),
 
                    ('show_private_icon', 'show_private_icon', 'bool'),
 
                    ('stylify_metalabels', 'stylify_metalabels', 'bool'),
 
                    ('repository_fields', 'repository_fields', 'bool'),
 
                    ('dashboard_items', 'dashboard_items', 'int'),
 
                    ('admin_grid_items', 'admin_grid_items', 'int'),
 
                    ('show_version', 'show_version', 'bool'),
 
                    ('use_gravatar', 'use_gravatar', 'bool'),
 
                    ('gravatar_url', 'gravatar_url', 'unicode'),
 
                    ('clone_uri_tmpl', 'clone_uri_tmpl', 'unicode'),
 
                    ('clone_ssh_tmpl', 'clone_ssh_tmpl', 'unicode'),
 
                ]
 
                for setting, form_key, type_ in settings:
 
                    Setting.create_or_update(setting, form_result[form_key], type_)
 

	
 
                Session().commit()
 
                set_app_settings(config)
 
                h.flash(_('Updated visualisation settings'),
 
                        category='success')
 

	
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                h.flash(_('Error occurred during updating '
 
                          'visualisation settings'),
 
                        category='error')
 

	
 
            raise HTTPFound(location=url('admin_settings_visual'))
 

	
 
        defaults = Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        return htmlfill.render(
 
            render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_email(self):
 
        c.active = 'email'
 
        if request.POST:
 
            test_email = request.POST.get('test_email')
 
            test_email_subj = 'Kallithea test email'
 
            test_body = ('Kallithea Email test, '
 
                               'Kallithea version: %s' % c.kallithea_version)
 
            if not test_email:
 
                h.flash(_('Please enter email address'), category='error')
 
                raise HTTPFound(location=url('admin_settings_email'))
 

	
 
            test_email_txt_body = EmailNotificationModel() \
 
                .get_email_tmpl(EmailNotificationModel.TYPE_DEFAULT,
 
                                'txt', body=test_body)
 
            test_email_html_body = EmailNotificationModel() \
 
                .get_email_tmpl(EmailNotificationModel.TYPE_DEFAULT,
 
                                'html', body=test_body)
 

	
 
            recipients = [test_email] if test_email else None
 

	
 
            tasks.send_email(recipients, test_email_subj,
 
                             test_email_txt_body, test_email_html_body)
 

	
 
            h.flash(_('Send email task created'), category='success')
 
            raise HTTPFound(location=url('admin_settings_email'))
 

	
 
        defaults = Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        import kallithea
 
        c.ini = kallithea.CONFIG
 

	
 
        return htmlfill.render(
 
            render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_hooks(self):
 
        c.active = 'hooks'
 
        if request.POST:
 
            if c.visual.allow_custom_hooks_settings:
 
                ui_key = request.POST.get('new_hook_ui_key')
 
                ui_value = request.POST.get('new_hook_ui_value')
 

	
 
                hook_id = request.POST.get('hook_id')
 

	
 
                try:
 
                    ui_key = ui_key and ui_key.strip()
 
                    if ui_key in (x.ui_key for x in Ui.get_custom_hooks()):
 
                        h.flash(_('Hook already exists'), category='error')
 
                    elif ui_key in (x.ui_key for x in Ui.get_builtin_hooks()):
 
                        h.flash(_('Builtin hooks are read-only. Please use another hook name.'), category='error')
 
                    elif ui_value and ui_key:
 
                        Ui.create_or_update_hook(ui_key, ui_value)
 
                        h.flash(_('Added new hook'), category='success')
 
                    elif hook_id:
 
                        Ui.delete(hook_id)
 
                        Session().commit()
 

	
 
                    # check for edits
 
                    update = False
 
                    _d = request.POST.dict_of_lists()
 
                    for k, v, ov in zip(_d.get('hook_ui_key', []),
 
                                        _d.get('hook_ui_value_new', []),
 
                                        _d.get('hook_ui_value', [])):
 
                        if v != ov:
 
                            Ui.create_or_update_hook(k, v)
 
                            update = True
 

	
 
                    if update:
 
                        h.flash(_('Updated hooks'), category='success')
 
                    Session().commit()
 
                except Exception:
 
                    log.error(traceback.format_exc())
 
                    h.flash(_('Error occurred during hook creation'),
 
                            category='error')
 

	
 
                raise HTTPFound(location=url('admin_settings_hooks'))
 

	
 
        defaults = Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        c.hooks = Ui.get_builtin_hooks()
 
        c.custom_hooks = Ui.get_custom_hooks()
 

	
 
        return htmlfill.render(
 
            render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_search(self):
 
        c.active = 'search'
 
        if request.POST:
 
            repo_location = self._get_hg_ui_settings()['paths_root_path']
 
            full_index = request.POST.get('full_index', False)
 
            tasks.whoosh_index(repo_location, full_index)
 
            h.flash(_('Whoosh reindex task scheduled'), category='success')
 
            raise HTTPFound(location=url('admin_settings_search'))
 

	
 
        defaults = Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        return htmlfill.render(
 
            render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_system(self):
 
        c.active = 'system'
 

	
 
        defaults = Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        import kallithea
 
        c.ini = kallithea.CONFIG
 
        server_info = Setting.get_server_info()
 
        for key, val in server_info.iteritems():
 
        for key, val in server_info.items():
 
            setattr(c, key, val)
 

	
 
        return htmlfill.render(
 
            render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
kallithea/controllers/api/__init__.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.api
 
~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
JSON RPC controller
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 20, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import inspect
 
import itertools
 
import logging
 
import time
 
import traceback
 
import types
 

	
 
from tg import Response, TGController, request, response
 
from webob.exc import HTTPError, HTTPException
 

	
 
from kallithea.lib import ext_json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.lib.base import _get_ip_addr as _get_ip
 
from kallithea.lib.base import get_path_info
 
from kallithea.lib.utils2 import ascii_bytes, safe_str
 
from kallithea.model.db import User
 

	
 

	
 
log = logging.getLogger('JSONRPC')
 

	
 

	
 
class JSONRPCError(BaseException):
 

	
 
    def __init__(self, message):
 
        self.message = message
 
        super(JSONRPCError, self).__init__()
 

	
 
    def __str__(self):
 
        return safe_str(self.message)
 

	
 

	
 
class JSONRPCErrorResponse(Response, HTTPException):
 
    """
 
    Generate a Response object with a JSON-RPC error body
 
    """
 

	
 
    def __init__(self, message=None, retid=None, code=None):
 
        HTTPException.__init__(self, message, self)
 
        Response.__init__(self,
 
                          json_body=dict(id=retid, result=None, error=message),
 
                          status=code,
 
                          content_type='application/json')
 

	
 

	
 
class JSONRPCController(TGController):
 
    """
 
     A WSGI-speaking JSON-RPC controller class
 

	
 
     See the specification:
 
     <http://json-rpc.org/wiki/specification>`.
 

	
 
     Valid controller return values should be json-serializable objects.
 

	
 
     Sub-classes should catch their exceptions and raise JSONRPCError
 
     if they want to pass meaningful errors to the client.
 

	
 
     """
 

	
 
    def _get_ip_addr(self, environ):
 
        return _get_ip(environ)
 

	
 
    def _get_method_args(self):
 
        """
 
        Return `self._rpc_args` to dispatched controller method
 
        chosen by __call__
 
        """
 
        return self._rpc_args
 

	
 
    def _dispatch(self, state, remainder=None):
 
        """
 
        Parse the request body as JSON, look up the method on the
 
        controller and if it exists, dispatch to it.
 
        """
 
        # Since we are here we should respond as JSON
 
        response.content_type = 'application/json'
 

	
 
        environ = state.request.environ
 
        start = time.time()
 
        ip_addr = self._get_ip_addr(environ)
 
        self._req_id = None
 
        if 'CONTENT_LENGTH' not in environ:
 
            log.debug("No Content-Length")
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message="No Content-Length in request")
 
        else:
 
            length = environ['CONTENT_LENGTH'] or 0
 
            length = int(environ['CONTENT_LENGTH'])
 
            log.debug('Content-Length: %s', length)
 

	
 
        if length == 0:
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message="Content-Length is 0")
 

	
 
        raw_body = environ['wsgi.input'].read(length)
 

	
 
        try:
 
            json_body = ext_json.loads(raw_body)
 
        except ValueError as e:
 
            # catch JSON errors Here
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message="JSON parse error ERR:%s RAW:%r"
 
                                                % (e, raw_body))
 

	
 
        # check AUTH based on API key
 
        try:
 
            self._req_api_key = json_body['api_key']
 
            self._req_id = json_body['id']
 
            self._req_method = json_body['method']
 
            self._request_params = json_body['args']
 
            if not isinstance(self._request_params, dict):
 
                self._request_params = {}
 

	
 
            log.debug('method: %s, params: %s',
 
                      self._req_method, self._request_params)
 
        except KeyError as e:
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message='Incorrect JSON query missing %s' % e)
 

	
 
        # check if we can find this session using api_key
 
        try:
 
            u = User.get_by_api_key(self._req_api_key)
 
            auth_user = AuthUser.make(dbuser=u, ip_addr=ip_addr)
 
            if auth_user is None:
 
                raise JSONRPCErrorResponse(retid=self._req_id,
 
                                           message='Invalid API key')
 
        except Exception as e:
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message='Invalid API key')
 

	
 
        request.authuser = auth_user
 
        request.ip_addr = ip_addr
 

	
 
        self._error = None
 
        try:
 
            self._func = self._find_method()
 
        except AttributeError as e:
 
            raise JSONRPCErrorResponse(retid=self._req_id,
 
                                       message=str(e))
 

	
 
        # now that we have a method, add self._req_params to
 
        # self.kargs and dispatch control to WGIController
 
        argspec = inspect.getargspec(self._func)
 
        arglist = argspec[0][1:]
 
        defaults = [type(arg) for arg in argspec[3] or []]
 
        default_empty = type(NotImplemented)
 

	
 
        # kw arguments required by this method
 
        func_kwargs = dict(itertools.izip_longest(reversed(arglist), reversed(defaults),
 
                                                  fillvalue=default_empty))
 

	
 
        # This attribute will need to be first param of a method that uses
 
        # api_key, which is translated to instance of user at that name
 
        USER_SESSION_ATTR = 'apiuser'
 

	
 
        # get our arglist and check if we provided them as args
 
        for arg, default in func_kwargs.iteritems():
 
        for arg, default in func_kwargs.items():
 
            if arg == USER_SESSION_ATTR:
 
                # USER_SESSION_ATTR is something translated from API key and
 
                # this is checked before so we don't need validate it
 
                continue
 

	
 
            # skip the required param check if it's default value is
 
            # NotImplementedType (default_empty)
 
            if default == default_empty and arg not in self._request_params:
 
                raise JSONRPCErrorResponse(
 
                    retid=self._req_id,
 
                    message='Missing non optional `%s` arg in JSON DATA' % arg,
 
                )
 

	
 
        extra = set(self._request_params).difference(func_kwargs)
 
        if extra:
 
            raise JSONRPCErrorResponse(
 
                retid=self._req_id,
 
                message='Unknown %s arg in JSON DATA' %
 
                        ', '.join('`%s`' % arg for arg in extra),
 
            )
 

	
 
        self._rpc_args = {}
 
        self._rpc_args.update(self._request_params)
 
        self._rpc_args['action'] = self._req_method
 
        self._rpc_args['environ'] = environ
 

	
 
        log.info('IP: %s Request to %s time: %.3fs' % (
 
            self._get_ip_addr(environ),
 
            get_path_info(environ), time.time() - start)
 
        )
 

	
 
        state.set_action(self._rpc_call, [])
 
        state.set_params(self._rpc_args)
 
        return state
 

	
 
    def _rpc_call(self, action, environ, **rpc_args):
 
        """
 
        Call the specified RPC Method
 
        """
 
        raw_response = ''
 
        try:
 
            raw_response = getattr(self, action)(**rpc_args)
 
            if isinstance(raw_response, HTTPError):
 
                self._error = str(raw_response)
 
        except JSONRPCError as e:
 
            self._error = unicode(e)
 
        except Exception as e:
 
            log.error('Encountered unhandled exception: %s',
 
                      traceback.format_exc(),)
 
            json_exc = JSONRPCError('Internal server error')
 
            self._error = unicode(json_exc)
 

	
 
        if self._error is not None:
 
            raw_response = None
 

	
 
        response = dict(id=self._req_id, result=raw_response, error=self._error)
 
        try:
 
            return ascii_bytes(ext_json.dumps(response))
 
        except TypeError as e:
 
            log.error('API FAILED. Error encoding response for %s %s: %s\n%s', action, rpc_args, e, traceback.format_exc())
 
            return ascii_bytes(ext_json.dumps(
 
                dict(
 
                    id=self._req_id,
 
                    result=None,
 
                    error="Error encoding response",
 
                )
 
            ))
 

	
 
    def _find_method(self):
 
        """
 
        Return method named by `self._req_method` in controller if able
 
        """
 
        log.debug('Trying to find JSON-RPC method: %s', self._req_method)
 
        if self._req_method.startswith('_'):
 
            raise AttributeError("Method not allowed")
 

	
 
        try:
 
            func = getattr(self, self._req_method, None)
 
        except UnicodeEncodeError:
 
            raise AttributeError("Problem decoding unicode in requested "
 
                                 "method name.")
 

	
 
        if isinstance(func, types.MethodType):
 
            return func
 
        else:
 
            raise AttributeError("No such method: %s" % (self._req_method,))
kallithea/controllers/pullrequests.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.pullrequests
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
pull requests controller for Kallithea for initializing pull requests
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: May 7, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
import mercurial.unionrepo
 
from tg import request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPBadRequest, HTTPForbidden, HTTPFound, HTTPNotFound
 

	
 
from kallithea.config.routing import url
 
from kallithea.controllers.changeset import _context_url, _ignorews_url, create_cs_pr_comment, delete_cs_pr_comment
 
from kallithea.lib import diffs
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController, jsonify, render
 
from kallithea.lib.graphmod import graph_data
 
from kallithea.lib.page import Page
 
from kallithea.lib.utils2 import ascii_bytes, safe_bytes, safe_int, safe_str
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError, EmptyRepositoryError
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.comment import ChangesetCommentsModel
 
from kallithea.model.db import ChangesetStatus, PullRequest, PullRequestReviewer, Repository, User
 
from kallithea.model.forms import PullRequestForm, PullRequestPostForm
 
from kallithea.model.meta import Session
 
from kallithea.model.pull_request import CreatePullRequestAction, CreatePullRequestIterationAction, PullRequestModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def _get_reviewer(user_id):
 
    """Look up user by ID and validate it as a potential reviewer."""
 
    try:
 
        user = User.get(int(user_id))
 
    except ValueError:
 
        user = None
 

	
 
    if user is None or user.is_default_user:
 
        h.flash(_('Invalid reviewer "%s" specified') % user_id, category='error')
 
        raise HTTPBadRequest()
 

	
 
    return user
 

	
 

	
 
class PullrequestsController(BaseRepoController):
 

	
 
    def _get_repo_refs(self, repo, rev=None, branch=None, branch_rev=None):
 
        """return a structure with repo's interesting changesets, suitable for
 
        the selectors in pullrequest.html
 

	
 
        rev: a revision that must be in the list somehow and selected by default
 
        branch: a branch that must be in the list and selected by default - even if closed
 
        branch_rev: a revision of which peers should be preferred and available."""
 
        # list named branches that has been merged to this named branch - it should probably merge back
 
        peers = []
 

	
 
        if rev:
 
            rev = safe_str(rev)
 

	
 
        if branch:
 
            branch = safe_str(branch)
 

	
 
        if branch_rev:
 
            # a revset not restricting to merge() would be better
 
            # (especially because it would get the branch point)
 
            # ... but is currently too expensive
 
            # including branches of children could be nice too
 
            peerbranches = set()
 
            for i in repo._repo.revs(
 
                b"sort(parents(branch(id(%s)) and merge()) - branch(id(%s)), -rev)",
 
                ascii_bytes(branch_rev), ascii_bytes(branch_rev),
 
            ):
 
                for abranch in repo.get_changeset(i).branches:
 
                    if abranch not in peerbranches:
 
                        n = 'branch:%s:%s' % (abranch, repo.get_changeset(abranch).raw_id)
 
                        peers.append((n, abranch))
 
                        peerbranches.add(abranch)
 

	
 
        selected = None
 
        tiprev = repo.tags.get('tip')
 
        tipbranch = None
 

	
 
        branches = []
 
        for abranch, branchrev in repo.branches.iteritems():
 
        for abranch, branchrev in repo.branches.items():
 
            n = 'branch:%s:%s' % (abranch, branchrev)
 
            desc = abranch
 
            if branchrev == tiprev:
 
                tipbranch = abranch
 
                desc = '%s (current tip)' % desc
 
            branches.append((n, desc))
 
            if rev == branchrev:
 
                selected = n
 
            if branch == abranch:
 
                if not rev:
 
                    selected = n
 
                branch = None
 
        if branch:  # branch not in list - it is probably closed
 
            branchrev = repo.closed_branches.get(branch)
 
            if branchrev:
 
                n = 'branch:%s:%s' % (branch, branchrev)
 
                branches.append((n, _('%s (closed)') % branch))
 
                selected = n
 
                branch = None
 
            if branch:
 
                log.debug('branch %r not found in %s', branch, repo)
 

	
 
        bookmarks = []
 
        for bookmark, bookmarkrev in repo.bookmarks.iteritems():
 
        for bookmark, bookmarkrev in repo.bookmarks.items():
 
            n = 'book:%s:%s' % (bookmark, bookmarkrev)
 
            bookmarks.append((n, bookmark))
 
            if rev == bookmarkrev:
 
                selected = n
 

	
 
        tags = []
 
        for tag, tagrev in repo.tags.iteritems():
 
        for tag, tagrev in repo.tags.items():
 
            if tag == 'tip':
 
                continue
 
            n = 'tag:%s:%s' % (tag, tagrev)
 
            tags.append((n, tag))
 
            # note: even if rev == tagrev, don't select the static tag - it must be chosen explicitly
 

	
 
        # prio 1: rev was selected as existing entry above
 

	
 
        # prio 2: create special entry for rev; rev _must_ be used
 
        specials = []
 
        if rev and selected is None:
 
            selected = 'rev:%s:%s' % (rev, rev)
 
            specials = [(selected, '%s: %s' % (_("Changeset"), rev[:12]))]
 

	
 
        # prio 3: most recent peer branch
 
        if peers and not selected:
 
            selected = peers[0][0]
 

	
 
        # prio 4: tip revision
 
        if not selected:
 
            if h.is_hg(repo):
 
                if tipbranch:
 
                    selected = 'branch:%s:%s' % (tipbranch, tiprev)
 
                else:
 
                    selected = 'tag:null:' + repo.EMPTY_CHANGESET
 
                    tags.append((selected, 'null'))
 
            else:
 
                if 'master' in repo.branches:
 
                    selected = 'branch:master:%s' % repo.branches['master']
 
                else:
 
                    k, v = list(repo.branches.items())[0]
 
                    selected = 'branch:%s:%s' % (k, v)
 

	
 
        groups = [(specials, _("Special")),
 
                  (peers, _("Peer branches")),
 
                  (bookmarks, _("Bookmarks")),
 
                  (branches, _("Branches")),
 
                  (tags, _("Tags")),
 
                  ]
 
        return [g for g in groups if g[0]], selected
 

	
 
    def _is_allowed_to_change_status(self, pull_request):
 
        if pull_request.is_closed():
 
            return False
 

	
 
        owner = request.authuser.user_id == pull_request.owner_id
 
        reviewer = PullRequestReviewer.query() \
 
            .filter(PullRequestReviewer.pull_request == pull_request) \
 
            .filter(PullRequestReviewer.user_id == request.authuser.user_id) \
 
            .count() != 0
 

	
 
        return request.authuser.admin or owner or reviewer
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def show_all(self, repo_name):
 
        c.from_ = request.GET.get('from_') or ''
 
        c.closed = request.GET.get('closed') or ''
 
        url_params = {}
 
        if c.from_:
 
            url_params['from_'] = 1
 
        if c.closed:
 
            url_params['closed'] = 1
 
        p = safe_int(request.GET.get('page'), 1)
 

	
 
        q = PullRequest.query(include_closed=c.closed, sorted=True)
 
        if c.from_:
 
            q = q.filter_by(org_repo=c.db_repo)
 
        else:
 
            q = q.filter_by(other_repo=c.db_repo)
 
        c.pull_requests = q.all()
 

	
 
        c.pullrequests_pager = Page(c.pull_requests, page=p, items_per_page=100, **url_params)
 

	
 
        return render('/pullrequests/pullrequest_show_all.html')
 

	
 
    @LoginRequired()
 
    def show_my(self):
 
        c.closed = request.GET.get('closed') or ''
 

	
 
        c.my_pull_requests = PullRequest.query(
 
            include_closed=c.closed,
 
            sorted=True,
 
        ).filter_by(owner_id=request.authuser.user_id).all()
 

	
 
        c.participate_in_pull_requests = []
 
        c.participate_in_pull_requests_todo = []
 
        done_status = set([ChangesetStatus.STATUS_APPROVED, ChangesetStatus.STATUS_REJECTED])
 
        for pr in PullRequest.query(
 
            include_closed=c.closed,
 
            reviewer_id=request.authuser.user_id,
 
            sorted=True,
 
        ):
 
            status = pr.user_review_status(request.authuser.user_id) # very inefficient!!!
 
            if status in done_status:
 
                c.participate_in_pull_requests.append(pr)
 
            else:
 
                c.participate_in_pull_requests_todo.append(pr)
 

	
 
        return render('/pullrequests/pullrequest_show_my.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    def index(self):
 
        org_repo = c.db_repo
 
        org_scm_instance = org_repo.scm_instance
 
        try:
 
            org_scm_instance.get_changeset()
 
        except EmptyRepositoryError as e:
 
            h.flash(_('There are no changesets yet'),
 
                    category='warning')
 
            raise HTTPFound(location=url('summary_home', repo_name=org_repo.repo_name))
 

	
 
        org_rev = request.GET.get('rev_end')
 
        # rev_start is not directly useful - its parent could however be used
 
        # as default for other and thus give a simple compare view
 
        rev_start = request.GET.get('rev_start')
 
        other_rev = None
 
        if rev_start:
 
            starters = org_repo.get_changeset(rev_start).parents
 
            if starters:
 
                other_rev = starters[0].raw_id
 
            else:
 
                other_rev = org_repo.scm_instance.EMPTY_CHANGESET
 
        branch = request.GET.get('branch')
 

	
 
        c.cs_repos = [(org_repo.repo_name, org_repo.repo_name)]
 
        c.default_cs_repo = org_repo.repo_name
 
        c.cs_refs, c.default_cs_ref = self._get_repo_refs(org_scm_instance, rev=org_rev, branch=branch)
 

	
 
        default_cs_ref_type, default_cs_branch, default_cs_rev = c.default_cs_ref.split(':')
 
        if default_cs_ref_type != 'branch':
 
            default_cs_branch = org_repo.get_changeset(default_cs_rev).branch
 

	
 
        # add org repo to other so we can open pull request against peer branches on itself
 
        c.a_repos = [(org_repo.repo_name, '%s (self)' % org_repo.repo_name)]
 

	
 
        if org_repo.parent:
 
            # add parent of this fork also and select it.
 
            # use the same branch on destination as on source, if available.
 
            c.a_repos.append((org_repo.parent.repo_name, '%s (parent)' % org_repo.parent.repo_name))
 
            c.a_repo = org_repo.parent
 
            c.a_refs, c.default_a_ref = self._get_repo_refs(
 
                    org_repo.parent.scm_instance, branch=default_cs_branch, rev=other_rev)
 

	
 
        else:
 
            c.a_repo = org_repo
 
            c.a_refs, c.default_a_ref = self._get_repo_refs(org_scm_instance, rev=other_rev)
 

	
 
        # gather forks and add to this list ... even though it is rare to
 
        # request forks to pull from their parent
 
        for fork in org_repo.forks:
 
            c.a_repos.append((fork.repo_name, fork.repo_name))
 

	
 
        return render('/pullrequests/pullrequest.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    @jsonify
 
    def repo_info(self, repo_name):
 
        repo = c.db_repo
 
        refs, selected_ref = self._get_repo_refs(repo.scm_instance)
 
        return {
 
            'description': repo.description.split('\n', 1)[0],
 
            'selected_ref': selected_ref,
 
            'refs': refs,
 
            }
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    def create(self, repo_name):
 
        repo = c.db_repo
 
        try:
 
            _form = PullRequestForm(repo.repo_id)().to_python(request.POST)
 
        except formencode.Invalid as errors:
 
            log.error(traceback.format_exc())
 
            log.error(str(errors))
 
            msg = _('Error creating pull request: %s') % errors.msg
 
            h.flash(msg, 'error')
 
            raise HTTPBadRequest
 

	
 
        # heads up: org and other might seem backward here ...
 
        org_ref = _form['org_ref'] # will have merge_rev as rev but symbolic name
 
        org_repo = Repository.guess_instance(_form['org_repo'])
 

	
 
        other_ref = _form['other_ref'] # will have symbolic name and head revision
 
        other_repo = Repository.guess_instance(_form['other_repo'])
 

	
 
        reviewers = []
 

	
 
        title = _form['pullrequest_title']
 
        description = _form['pullrequest_desc'].strip()
kallithea/lib/auth.py
Show inline comments
 
@@ -253,484 +253,484 @@ def _cached_perms_data(user_id, user_is_
 
    for perm in user_perms:
 
        permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # for each kind of global permissions, only keep the one with heighest weight
 
    kind_max_perm = {}
 
    for perm in sorted(permissions[GLOBAL], key=lambda n: PERM_WEIGHTS[n]):
 
        kind = perm.rsplit('.', 1)[0]
 
        kind_max_perm[kind] = perm
 
    permissions[GLOBAL] = set(kind_max_perm.values())
 
    ## END GLOBAL PERMISSIONS
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR REPOSITORIES !!
 
    #======================================================================
 
    #======================================================================
 
    # check if user is part of user groups for this repository and
 
    # fill in his permission from it.
 
    #======================================================================
 

	
 
    # user group for repositories permissions
 
    user_repo_perms_from_users_groups = \
 
     Session().query(UserGroupRepoToPerm, Permission, Repository,) \
 
        .join((Repository, UserGroupRepoToPerm.repository_id ==
 
               Repository.repo_id)) \
 
        .join((Permission, UserGroupRepoToPerm.permission_id ==
 
               Permission.permission_id)) \
 
        .join((UserGroup, UserGroupRepoToPerm.users_group_id ==
 
               UserGroup.users_group_id)) \
 
        .filter(UserGroup.users_group_active == True) \
 
        .join((UserGroupMember, UserGroupRepoToPerm.users_group_id ==
 
               UserGroupMember.users_group_id)) \
 
        .filter(UserGroupMember.user_id == user_id) \
 
        .all()
 

	
 
    for perm in user_repo_perms_from_users_groups:
 
        bump_permission(RK,
 
            perm.UserGroupRepoToPerm.repository.repo_name,
 
            perm.Permission.permission_name)
 

	
 
    # user permissions for repositories
 
    user_repo_perms = Permission.get_default_perms(user_id)
 
    for perm in user_repo_perms:
 
        bump_permission(RK,
 
            perm.UserRepoToPerm.repository.repo_name,
 
            perm.Permission.permission_name)
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR REPOSITORY GROUPS !!
 
    #======================================================================
 
    #======================================================================
 
    # check if user is part of user groups for this repository groups and
 
    # fill in his permission from it.
 
    #======================================================================
 
    # user group for repo groups permissions
 
    user_repo_group_perms_from_users_groups = \
 
     Session().query(UserGroupRepoGroupToPerm, Permission, RepoGroup) \
 
     .join((RepoGroup, UserGroupRepoGroupToPerm.group_id == RepoGroup.group_id)) \
 
     .join((Permission, UserGroupRepoGroupToPerm.permission_id
 
            == Permission.permission_id)) \
 
     .join((UserGroup, UserGroupRepoGroupToPerm.users_group_id ==
 
            UserGroup.users_group_id)) \
 
     .filter(UserGroup.users_group_active == True) \
 
     .join((UserGroupMember, UserGroupRepoGroupToPerm.users_group_id
 
            == UserGroupMember.users_group_id)) \
 
     .filter(UserGroupMember.user_id == user_id) \
 
     .all()
 

	
 
    for perm in user_repo_group_perms_from_users_groups:
 
        bump_permission(GK,
 
            perm.UserGroupRepoGroupToPerm.group.group_name,
 
            perm.Permission.permission_name)
 

	
 
    # user explicit permissions for repository groups
 
    user_repo_groups_perms = Permission.get_default_group_perms(user_id)
 
    for perm in user_repo_groups_perms:
 
        bump_permission(GK,
 
            perm.UserRepoGroupToPerm.group.group_name,
 
            perm.Permission.permission_name)
 

	
 
    #======================================================================
 
    # !! PERMISSIONS FOR USER GROUPS !!
 
    #======================================================================
 
    # user group for user group permissions
 
    user_group_user_groups_perms = \
 
     Session().query(UserGroupUserGroupToPerm, Permission, UserGroup) \
 
     .join((UserGroup, UserGroupUserGroupToPerm.target_user_group_id
 
            == UserGroup.users_group_id)) \
 
     .join((Permission, UserGroupUserGroupToPerm.permission_id
 
            == Permission.permission_id)) \
 
     .join((UserGroupMember, UserGroupUserGroupToPerm.user_group_id
 
            == UserGroupMember.users_group_id)) \
 
     .filter(UserGroupMember.user_id == user_id) \
 
     .join((UserGroup, UserGroupMember.users_group_id ==
 
            UserGroup.users_group_id), aliased=True, from_joinpoint=True) \
 
     .filter(UserGroup.users_group_active == True) \
 
     .all()
 

	
 
    for perm in user_group_user_groups_perms:
 
        bump_permission(UK,
 
            perm.UserGroupUserGroupToPerm.target_user_group.users_group_name,
 
            perm.Permission.permission_name)
 

	
 
    # user explicit permission for user groups
 
    user_user_groups_perms = Permission.get_default_user_group_perms(user_id)
 
    for perm in user_user_groups_perms:
 
        bump_permission(UK,
 
            perm.UserUserGroupToPerm.user_group.users_group_name,
 
            perm.Permission.permission_name)
 

	
 
    return permissions
 

	
 

	
 
class AuthUser(object):
 
    """
 
    Represents a Kallithea user, including various authentication and
 
    authorization information. Typically used to store the current user,
 
    but is also used as a generic user information data structure in
 
    parts of the code, e.g. user management.
 

	
 
    Constructed from a database `User` object, a user ID or cookie dict,
 
    it looks up the user (if needed) and copies all attributes to itself,
 
    adding various non-persistent data. If lookup fails but anonymous
 
    access to Kallithea is enabled, the default user is loaded instead.
 

	
 
    `AuthUser` does not by itself authenticate users. It's up to other parts of
 
    the code to check e.g. if a supplied password is correct, and if so, trust
 
    the AuthUser object as an authenticated user.
 

	
 
    However, `AuthUser` does refuse to load a user that is not `active`.
 

	
 
    Note that Kallithea distinguishes between the default user (an actual
 
    user in the database with username "default") and "no user" (no actual
 
    User object, AuthUser filled with blank values and username "None").
 

	
 
    If the default user is active, that will always be used instead of
 
    "no user". On the other hand, if the default user is disabled (and
 
    there is no login information), we instead get "no user"; this should
 
    only happen on the login page (as all other requests are redirected).
 

	
 
    `is_default_user` specifically checks if the AuthUser is the user named
 
    "default". Use `is_anonymous` to check for both "default" and "no user".
 
    """
 

	
 
    @classmethod
 
    def make(cls, dbuser=None, is_external_auth=False, ip_addr=None):
 
        """Create an AuthUser to be authenticated ... or return None if user for some reason can't be authenticated.
 
        Checks that a non-None dbuser is provided, is active, and that the IP address is ok.
 
        """
 
        assert ip_addr is not None
 
        if dbuser is None:
 
            log.info('No db user for authentication')
 
            return None
 
        if not dbuser.active:
 
            log.info('Db user %s not active', dbuser.username)
 
            return None
 
        allowed_ips = AuthUser.get_allowed_ips(dbuser.user_id, cache=True)
 
        if not check_ip_access(source_ip=ip_addr, allowed_ips=allowed_ips):
 
            log.info('Access for %s from %s forbidden - not in %s', dbuser.username, ip_addr, allowed_ips)
 
            return None
 
        return cls(dbuser=dbuser, is_external_auth=is_external_auth)
 

	
 
    def __init__(self, user_id=None, dbuser=None, is_external_auth=False):
 
        self.is_external_auth = is_external_auth # container auth - don't show logout option
 

	
 
        # These attributes will be overridden by fill_data, below, unless the
 
        # requested user cannot be found and the default anonymous user is
 
        # not enabled.
 
        self.user_id = None
 
        self.username = None
 
        self.api_key = None
 
        self.name = ''
 
        self.lastname = ''
 
        self.email = ''
 
        self.admin = False
 

	
 
        # Look up database user, if necessary.
 
        if user_id is not None:
 
            assert dbuser is None
 
            log.debug('Auth User lookup by USER ID %s', user_id)
 
            dbuser = UserModel().get(user_id)
 
            assert dbuser is not None
 
        else:
 
            assert dbuser is not None
 
            log.debug('Auth User lookup by database user %s', dbuser)
 

	
 
        log.debug('filling %s data', dbuser)
 
        self.is_anonymous = dbuser.is_default_user
 
        if dbuser.is_default_user and not dbuser.active:
 
            self.username = 'None'
 
            self.is_default_user = False
 
        else:
 
            # copy non-confidential database fields from a `db.User` to this `AuthUser`.
 
            for k, v in dbuser.get_dict().iteritems():
 
            for k, v in dbuser.get_dict().items():
 
                assert k not in ['api_keys', 'permissions']
 
                setattr(self, k, v)
 
            self.is_default_user = dbuser.is_default_user
 
        log.debug('Auth User is now %s', self)
 

	
 
    @LazyProperty
 
    def permissions(self):
 
        return self.__get_perms(user=self, cache=False)
 

	
 
    def has_repository_permission_level(self, repo_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['repository.read', 'repository.write', 'repository.admin'],
 
            'write': ['repository.write', 'repository.admin'],
 
            'admin': ['repository.admin'],
 
        }[level]
 
        actual_perm = self.permissions['repositories'].get(repo_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo %r (%s): %s (has %r)',
 
            self.username, level, repo_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    def has_repository_group_permission_level(self, repo_group_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['group.read', 'group.write', 'group.admin'],
 
            'write': ['group.write', 'group.admin'],
 
            'admin': ['group.admin'],
 
        }[level]
 
        actual_perm = self.permissions['repositories_groups'].get(repo_group_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r repo group %r (%s): %s (has %r)',
 
            self.username, level, repo_group_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    def has_user_group_permission_level(self, user_group_name, level, purpose=None):
 
        required_perms = {
 
            'read': ['usergroup.read', 'usergroup.write', 'usergroup.admin'],
 
            'write': ['usergroup.write', 'usergroup.admin'],
 
            'admin': ['usergroup.admin'],
 
        }[level]
 
        actual_perm = self.permissions['user_groups'].get(user_group_name)
 
        ok = actual_perm in required_perms
 
        log.debug('Checking if user %r can %r user group %r (%s): %s (has %r)',
 
            self.username, level, user_group_name, purpose, ok, actual_perm)
 
        return ok
 

	
 
    @property
 
    def api_keys(self):
 
        return self._get_api_keys()
 

	
 
    def __get_perms(self, user, cache=False):
 
        """
 
        Fills user permission attribute with permissions taken from database
 
        works for permissions given for repositories, and for permissions that
 
        are granted to groups
 

	
 
        :param user: `AuthUser` instance
 
        """
 
        user_id = user.user_id
 
        user_is_admin = user.is_admin
 

	
 
        log.debug('Getting PERMISSION tree')
 
        compute = conditional_cache('short_term', 'cache_desc',
 
                                    condition=cache, func=_cached_perms_data)
 
        return compute(user_id, user_is_admin)
 

	
 
    def _get_api_keys(self):
 
        api_keys = [self.api_key]
 
        for api_key in UserApiKeys.query() \
 
                .filter_by(user_id=self.user_id, is_expired=False):
 
            api_keys.append(api_key.api_key)
 

	
 
        return api_keys
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @property
 
    def repositories_admin(self):
 
        """
 
        Returns list of repositories you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories'].iteritems()
 
        return [x[0] for x in self.permissions['repositories'].items()
 
                if x[1] == 'repository.admin']
 

	
 
    @property
 
    def repository_groups_admin(self):
 
        """
 
        Returns list of repository groups you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['repositories_groups'].iteritems()
 
        return [x[0] for x in self.permissions['repositories_groups'].items()
 
                if x[1] == 'group.admin']
 

	
 
    @property
 
    def user_groups_admin(self):
 
        """
 
        Returns list of user groups you're an admin of
 
        """
 
        return [x[0] for x in self.permissions['user_groups'].iteritems()
 
        return [x[0] for x in self.permissions['user_groups'].items()
 
                if x[1] == 'usergroup.admin']
 

	
 
    def __repr__(self):
 
        return "<%s %s: %r>" % (self.__class__.__name__, self.user_id, self.username)
 

	
 
    def to_cookie(self):
 
        """ Serializes this login session to a cookie `dict`. """
 
        return {
 
            'user_id': self.user_id,
 
            'is_external_auth': self.is_external_auth,
 
        }
 

	
 
    @staticmethod
 
    def from_cookie(cookie, ip_addr):
 
        """
 
        Deserializes an `AuthUser` from a cookie `dict` ... or return None.
 
        """
 
        return AuthUser.make(
 
            dbuser=UserModel().get(cookie.get('user_id')),
 
            is_external_auth=cookie.get('is_external_auth', False),
 
            ip_addr=ip_addr,
 
        )
 

	
 
    @classmethod
 
    def get_allowed_ips(cls, user_id, cache=False):
 
        _set = set()
 

	
 
        default_ips = UserIpMap.query().filter(UserIpMap.user_id ==
 
                                        User.get_default_user(cache=True).user_id)
 
        if cache:
 
            default_ips = default_ips.options(FromCache("sql_cache_short",
 
                                              "get_user_ips_default"))
 
        for ip in default_ips:
 
            try:
 
                _set.add(ip.ip_addr)
 
            except ObjectDeletedError:
 
                # since we use heavy caching sometimes it happens that we get
 
                # deleted objects here, we just skip them
 
                pass
 

	
 
        user_ips = UserIpMap.query().filter(UserIpMap.user_id == user_id)
 
        if cache:
 
            user_ips = user_ips.options(FromCache("sql_cache_short",
 
                                                  "get_user_ips_%s" % user_id))
 
        for ip in user_ips:
 
            try:
 
                _set.add(ip.ip_addr)
 
            except ObjectDeletedError:
 
                # since we use heavy caching sometimes it happens that we get
 
                # deleted objects here, we just skip them
 
                pass
 
        return _set or set(['0.0.0.0/0', '::/0'])
 

	
 

	
 
#==============================================================================
 
# CHECK DECORATORS
 
#==============================================================================
 

	
 
def _redirect_to_login(message=None):
 
    """Return an exception that must be raised. It will redirect to the login
 
    page which will redirect back to the current URL after authentication.
 
    The optional message will be shown in a flash message."""
 
    from kallithea.lib import helpers as h
 
    if message:
 
        h.flash(message, category='warning')
 
    p = request.path_qs
 
    log.debug('Redirecting to login page, origin: %s', p)
 
    return HTTPFound(location=url('login_home', came_from=p))
 

	
 

	
 
# Use as decorator
 
class LoginRequired(object):
 
    """Client must be logged in as a valid User, or we'll redirect to the login
 
    page.
 

	
 
    If the "default" user is enabled and allow_default_user is true, that is
 
    considered valid too.
 

	
 
    Also checks that IP address is allowed.
 
    """
 

	
 
    def __init__(self, allow_default_user=False):
 
        self.allow_default_user = allow_default_user
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        controller = fargs[0]
 
        user = request.authuser
 
        loc = "%s:%s" % (controller.__class__.__name__, func.__name__)
 
        log.debug('Checking access for user %s @ %s', user, loc)
 

	
 
        # regular user authentication
 
        if user.is_default_user:
 
            if self.allow_default_user:
 
                log.info('default user @ %s', loc)
 
                return func(*fargs, **fkwargs)
 
            log.info('default user is not accepted here @ %s', loc)
 
        elif user.is_anonymous: # default user is disabled and no proper authentication
 
            log.warning('user is anonymous and NOT authenticated with regular auth @ %s', loc)
 
        else: # regular authentication
 
            log.info('user %s authenticated with regular auth @ %s', user, loc)
 
            return func(*fargs, **fkwargs)
 
        raise _redirect_to_login()
 

	
 

	
 
# Use as decorator
 
class NotAnonymous(object):
 
    """Ensures that client is not logged in as the "default" user, and
 
    redirects to the login page otherwise. Must be used together with
 
    LoginRequired."""
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        cls = fargs[0]
 
        user = request.authuser
 

	
 
        log.debug('Checking that user %s is not anonymous @%s', user.username, cls)
 

	
 
        if user.is_default_user:
 
            raise _redirect_to_login(_('You need to be a registered user to '
 
                                       'perform this action'))
 
        else:
 
            return func(*fargs, **fkwargs)
 

	
 

	
 
class _PermsDecorator(object):
 
    """Base class for controller decorators with multiple permissions"""
 

	
 
    def __init__(self, *required_perms):
 
        self.required_perms = required_perms # usually very short - a list is thus fine
 

	
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 

	
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        cls = fargs[0]
 
        user = request.authuser
 
        log.debug('checking %s permissions %s for %s %s',
 
          self.__class__.__name__, self.required_perms, cls, user)
 

	
 
        if self.check_permissions(user):
 
            log.debug('Permission granted for %s %s', cls, user)
 
            return func(*fargs, **fkwargs)
 

	
 
        else:
 
            log.info('Permission denied for %s %s', cls, user)
 
            if user.is_default_user:
 
                raise _redirect_to_login(_('You need to be signed in to view this page'))
 
            else:
 
                raise HTTPForbidden()
 

	
 
    def check_permissions(self, user):
 
        raise NotImplementedError()
 

	
 

	
 
class HasPermissionAnyDecorator(_PermsDecorator):
 
    """
 
    Checks the user has any of the given global permissions.
 
    """
 

	
 
    def check_permissions(self, user):
 
        global_permissions = user.permissions['global'] # usually very short
 
        return any(p in global_permissions for p in self.required_perms)
 

	
 

	
 
class _PermDecorator(_PermsDecorator):
 
    """Base class for controller decorators with a single permission"""
 

	
 
    def __init__(self, required_perm):
 
        _PermsDecorator.__init__(self, [required_perm])
 
        self.required_perm = required_perm
 

	
 

	
 
class HasRepoPermissionLevelDecorator(_PermDecorator):
 
    """
 
    Checks the user has at least the specified permission level for the requested repository.
 
    """
 

	
 
    def check_permissions(self, user):
 
        repo_name = get_repo_slug(request)
 
        return user.has_repository_permission_level(repo_name, self.required_perm)
 

	
 

	
 
class HasRepoGroupPermissionLevelDecorator(_PermDecorator):
 
    """
 
    Checks the user has any of given permissions for the requested repository group.
 
    """
 

	
kallithea/lib/diffs.py
Show inline comments
 
@@ -208,385 +208,385 @@ def wrapped_diff(filenode_old, filenode_
 
            a_path = f['old_filename']
 

	
 
        html_diff = as_html(parsed_lines=diff_processor.parsed, enable_comments=enable_comments)
 
        stats = diff_processor.stat()
 

	
 
    else:
 
        html_diff = wrap_to_table(_('Changeset was too big and was cut off, use '
 
                               'diff menu to display this diff'))
 
        stats = (0, 0)
 

	
 
    if not html_diff:
 
        submodules = [o for o in [filenode_new, filenode_old] if isinstance(o, SubModuleNode)]
 
        if submodules:
 
            html_diff = wrap_to_table(h.escape('Submodule %r' % submodules[0]))
 
        else:
 
            html_diff = wrap_to_table(_('No changes detected'))
 

	
 
    cs1 = filenode_old.changeset.raw_id
 
    cs2 = filenode_new.changeset.raw_id
 

	
 
    return cs1, cs2, a_path, html_diff, stats, op
 

	
 

	
 
def get_gitdiff(filenode_old, filenode_new, ignore_whitespace=True, context=3):
 
    """
 
    Returns git style diff between given ``filenode_old`` and ``filenode_new``.
 
    """
 
    # make sure we pass in default context
 
    context = context or 3
 
    submodules = [o for o in [filenode_new, filenode_old] if isinstance(o, SubModuleNode)]
 
    if submodules:
 
        return b''
 

	
 
    for filenode in (filenode_old, filenode_new):
 
        if not isinstance(filenode, FileNode):
 
            raise VCSError("Given object should be FileNode object, not %s"
 
                % filenode.__class__)
 

	
 
    repo = filenode_new.changeset.repository
 
    old_raw_id = getattr(filenode_old.changeset, 'raw_id', repo.EMPTY_CHANGESET)
 
    new_raw_id = getattr(filenode_new.changeset, 'raw_id', repo.EMPTY_CHANGESET)
 

	
 
    vcs_gitdiff = get_diff(repo, old_raw_id, new_raw_id, filenode_new.path,
 
                           ignore_whitespace, context)
 
    return vcs_gitdiff
 

	
 

	
 
def get_diff(scm_instance, rev1, rev2, path=None, ignore_whitespace=False, context=3):
 
    """
 
    A thin wrapper around vcs lib get_diff.
 
    """
 
    try:
 
        return scm_instance.get_diff(rev1, rev2, path=path,
 
                                     ignore_whitespace=ignore_whitespace, context=context)
 
    except MemoryError:
 
        h.flash('MemoryError: Diff is too big', category='error')
 
        return b''
 

	
 

	
 
NEW_FILENODE = 1
 
DEL_FILENODE = 2
 
MOD_FILENODE = 3
 
RENAMED_FILENODE = 4
 
COPIED_FILENODE = 5
 
CHMOD_FILENODE = 6
 
BIN_FILENODE = 7
 

	
 

	
 
class DiffProcessor(object):
 
    """
 
    Give it a unified or git diff and it returns a list of the files that were
 
    mentioned in the diff together with a dict of meta information that
 
    can be used to render it in a HTML template.
 
    """
 
    _diff_git_re = re.compile(b'^diff --git', re.MULTILINE)
 

	
 
    def __init__(self, diff, vcs='hg', diff_limit=None, inline_diff=True):
 
        """
 
        :param diff:   a text in diff format
 
        :param vcs: type of version control hg or git
 
        :param diff_limit: define the size of diff that is considered "big"
 
            based on that parameter cut off will be triggered, set to None
 
            to show full diff
 
        """
 
        if not isinstance(diff, bytes):
 
            raise Exception('Diff must be bytes - got %s' % type(diff))
 

	
 
        self._diff = memoryview(diff)
 
        self.adds = 0
 
        self.removes = 0
 
        self.diff_limit = diff_limit
 
        self.limited_diff = False
 
        self.vcs = vcs
 
        self.parsed = self._parse_gitdiff(inline_diff=inline_diff)
 

	
 
    def _parse_gitdiff(self, inline_diff):
 
        """Parse self._diff and return a list of dicts with meta info and chunks for each file.
 
        Might set limited_diff.
 
        Optionally, do an extra pass and to extra markup of one-liner changes.
 
        """
 
        _files = [] # list of dicts with meta info and chunks
 

	
 
        starts = [m.start() for m in self._diff_git_re.finditer(self._diff)]
 
        starts.append(len(self._diff))
 

	
 
        for start, end in zip(starts, starts[1:]):
 
            if self.diff_limit and end > self.diff_limit:
 
                self.limited_diff = True
 
                continue
 

	
 
            head, diff_lines = _get_header(self.vcs, self._diff[start:end])
 

	
 
            op = None
 
            stats = {
 
                'added': 0,
 
                'deleted': 0,
 
                'binary': False,
 
                'ops': {},
 
            }
 

	
 
            if head['deleted_file_mode']:
 
                op = 'removed'
 
                stats['binary'] = True
 
                stats['ops'][DEL_FILENODE] = 'deleted file'
 

	
 
            elif head['new_file_mode']:
 
                op = 'added'
 
                stats['binary'] = True
 
                stats['ops'][NEW_FILENODE] = 'new file %s' % head['new_file_mode']
 
            else:  # modify operation, can be cp, rename, chmod
 
                # CHMOD
 
                if head['new_mode'] and head['old_mode']:
 
                    op = 'modified'
 
                    stats['binary'] = True
 
                    stats['ops'][CHMOD_FILENODE] = ('modified file chmod %s => %s'
 
                                        % (head['old_mode'], head['new_mode']))
 
                # RENAME
 
                if (head['rename_from'] and head['rename_to']
 
                      and head['rename_from'] != head['rename_to']):
 
                    op = 'renamed'
 
                    stats['binary'] = True
 
                    stats['ops'][RENAMED_FILENODE] = ('file renamed from %s to %s'
 
                                    % (head['rename_from'], head['rename_to']))
 
                # COPY
 
                if head.get('copy_from') and head.get('copy_to'):
 
                    op = 'modified'
 
                    stats['binary'] = True
 
                    stats['ops'][COPIED_FILENODE] = ('file copied from %s to %s'
 
                                        % (head['copy_from'], head['copy_to']))
 
                # FALL BACK: detect missed old style add or remove
 
                if op is None:
 
                    if not head['a_file'] and head['b_file']:
 
                        op = 'added'
 
                        stats['binary'] = True
 
                        stats['ops'][NEW_FILENODE] = 'new file'
 

	
 
                    elif head['a_file'] and not head['b_file']:
 
                        op = 'removed'
 
                        stats['binary'] = True
 
                        stats['ops'][DEL_FILENODE] = 'deleted file'
 

	
 
                # it's not ADD not DELETE
 
                if op is None:
 
                    op = 'modified'
 
                    stats['binary'] = True
 
                    stats['ops'][MOD_FILENODE] = 'modified file'
 

	
 
            # a real non-binary diff
 
            if head['a_file'] or head['b_file']:
 
                chunks, added, deleted = _parse_lines(diff_lines)
 
                stats['binary'] = False
 
                stats['added'] = added
 
                stats['deleted'] = deleted
 
                # explicit mark that it's a modified file
 
                if op == 'modified':
 
                    stats['ops'][MOD_FILENODE] = 'modified file'
 
            else:  # Git binary patch (or empty diff)
 
                # Git binary patch
 
                if head['bin_patch']:
 
                    stats['ops'][BIN_FILENODE] = 'binary diff not shown'
 
                chunks = []
 

	
 
            if op == 'removed' and chunks:
 
                # a way of seeing deleted content could perhaps be nice - but
 
                # not with the current UI
 
                chunks = []
 

	
 
            chunks.insert(0, [{
 
                'old_lineno': '',
 
                'new_lineno': '',
 
                'action':     'context',
 
                'line':       msg,
 
                } for _op, msg in stats['ops'].iteritems()
 
                } for _op, msg in stats['ops'].items()
 
                  if _op not in [MOD_FILENODE]])
 

	
 
            _files.append({
 
                'old_filename':     head['a_path'],
 
                'filename':         head['b_path'],
 
                'old_revision':     head['a_blob_id'],
 
                'new_revision':     head['b_blob_id'],
 
                'chunks':           chunks,
 
                'operation':        op,
 
                'stats':            stats,
 
            })
 

	
 
        if not inline_diff:
 
            return _files
 

	
 
        # highlight inline changes when one del is followed by one add
 
        for diff_data in _files:
 
            for chunk in diff_data['chunks']:
 
                lineiter = iter(chunk)
 
                try:
 
                    peekline = next(lineiter)
 
                    while True:
 
                        # find a first del line
 
                        while peekline['action'] != 'del':
 
                            peekline = next(lineiter)
 
                        delline = peekline
 
                        peekline = next(lineiter)
 
                        # if not followed by add, eat all following del lines
 
                        if peekline['action'] != 'add':
 
                            while peekline['action'] == 'del':
 
                                peekline = next(lineiter)
 
                            continue
 
                        # found an add - make sure it is the only one
 
                        addline = peekline
 
                        try:
 
                            peekline = next(lineiter)
 
                        except StopIteration:
 
                            # add was last line - ok
 
                            _highlight_inline_diff(delline, addline)
 
                            raise
 
                        if peekline['action'] != 'add':
 
                            # there was only one add line - ok
 
                            _highlight_inline_diff(delline, addline)
 
                except StopIteration:
 
                    pass
 

	
 
        return _files
 

	
 
    def stat(self):
 
        """
 
        Returns tuple of added, and removed lines for this instance
 
        """
 
        return self.adds, self.removes
 

	
 

	
 
_escape_re = re.compile(r'(&)|(<)|(>)|(\t)|(\r)|(?<=.)( \n| $)')
 

	
 

	
 
def _escaper(string):
 
    """
 
    Do HTML escaping/markup
 
    """
 

	
 
    def substitute(m):
 
        groups = m.groups()
 
        if groups[0]:
 
            return '&amp;'
 
        if groups[1]:
 
            return '&lt;'
 
        if groups[2]:
 
            return '&gt;'
 
        if groups[3]:
 
            return '<u>\t</u>'
 
        if groups[4]:
 
            return '<u class="cr"></u>'
 
        if groups[5]:
 
            return ' <i></i>'
 
        assert False
 

	
 
    return _escape_re.sub(substitute, safe_unicode(string))
 

	
 

	
 
_git_header_re = re.compile(br"""
 
    ^diff[ ]--git[ ]a/(?P<a_path>.+?)[ ]b/(?P<b_path>.+?)\n
 
    (?:^old[ ]mode[ ](?P<old_mode>\d+)\n
 
       ^new[ ]mode[ ](?P<new_mode>\d+)(?:\n|$))?
 
    (?:^similarity[ ]index[ ](?P<similarity_index>\d+)%\n
 
       ^rename[ ]from[ ](?P<rename_from>.+)\n
 
       ^rename[ ]to[ ](?P<rename_to>.+)(?:\n|$))?
 
    (?:^new[ ]file[ ]mode[ ](?P<new_file_mode>.+)(?:\n|$))?
 
    (?:^deleted[ ]file[ ]mode[ ](?P<deleted_file_mode>.+)(?:\n|$))?
 
    (?:^index[ ](?P<a_blob_id>[0-9A-Fa-f]+)
 
        \.\.(?P<b_blob_id>[0-9A-Fa-f]+)[ ]?(?P<b_mode>.+)?(?:\n|$))?
 
    (?:^(?P<bin_patch>GIT[ ]binary[ ]patch)(?:\n|$))?
 
    (?:^---[ ](a/(?P<a_file>.+?)|/dev/null)\t?(?:\n|$))?
 
    (?:^\+\+\+[ ](b/(?P<b_file>.+?)|/dev/null)\t?(?:\n|$))?
 
""", re.VERBOSE | re.MULTILINE)
 

	
 

	
 
_hg_header_re = re.compile(br"""
 
    ^diff[ ]--git[ ]a/(?P<a_path>.+?)[ ]b/(?P<b_path>.+?)\n
 
    (?:^old[ ]mode[ ](?P<old_mode>\d+)\n
 
       ^new[ ]mode[ ](?P<new_mode>\d+)(?:\n|$))?
 
    (?:^similarity[ ]index[ ](?P<similarity_index>\d+)%(?:\n|$))?
 
    (?:^rename[ ]from[ ](?P<rename_from>.+)\n
 
       ^rename[ ]to[ ](?P<rename_to>.+)(?:\n|$))?
 
    (?:^copy[ ]from[ ](?P<copy_from>.+)\n
 
       ^copy[ ]to[ ](?P<copy_to>.+)(?:\n|$))?
 
    (?:^new[ ]file[ ]mode[ ](?P<new_file_mode>.+)(?:\n|$))?
 
    (?:^deleted[ ]file[ ]mode[ ](?P<deleted_file_mode>.+)(?:\n|$))?
 
    (?:^index[ ](?P<a_blob_id>[0-9A-Fa-f]+)
 
        \.\.(?P<b_blob_id>[0-9A-Fa-f]+)[ ]?(?P<b_mode>.+)?(?:\n|$))?
 
    (?:^(?P<bin_patch>GIT[ ]binary[ ]patch)(?:\n|$))?
 
    (?:^---[ ](a/(?P<a_file>.+?)|/dev/null)\t?(?:\n|$))?
 
    (?:^\+\+\+[ ](b/(?P<b_file>.+?)|/dev/null)\t?(?:\n|$))?
 
""", re.VERBOSE | re.MULTILINE)
 

	
 

	
 
_header_next_check = re.compile(br'''(?!@)(?!literal )(?!delta )''')
 

	
 

	
 
def _get_header(vcs, diff_chunk):
 
    """
 
    Parses a Git diff for a single file (header and chunks) and returns a tuple with:
 

	
 
    1. A dict with meta info:
 

	
 
        a_path, b_path, similarity_index, rename_from, rename_to,
 
        old_mode, new_mode, new_file_mode, deleted_file_mode,
 
        a_blob_id, b_blob_id, b_mode, a_file, b_file
 

	
 
    2. An iterator yielding lines with simple HTML markup.
 
    """
 
    match = None
 
    if vcs == 'git':
 
        match = _git_header_re.match(diff_chunk)
 
    elif vcs == 'hg':
 
        match = _hg_header_re.match(diff_chunk)
 
    if match is None:
 
        raise Exception('diff not recognized as valid %s diff' % vcs)
 
    meta_info = match.groupdict()
 
    rest = diff_chunk[match.end():]
 
    if rest and _header_next_check.match(rest):
 
        raise Exception('cannot parse %s diff header: %r followed by %r' % (vcs, diff_chunk[:match.end()], rest[:1000]))
 
    diff_lines = (_escaper(m.group(0)) for m in re.finditer(br'.*\n|.+$', rest)) # don't split on \r as str.splitlines do
 
    return meta_info, diff_lines
 

	
 

	
 
_chunk_re = re.compile(r'^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@(.*)')
 
_newline_marker = re.compile(r'^\\ No newline at end of file')
 

	
 

	
 
def _parse_lines(diff_lines):
 
    """
 
    Given an iterator of diff body lines, parse them and return a dict per
 
    line and added/removed totals.
 
    """
 
    added = deleted = 0
 
    old_line = old_end = new_line = new_end = None
 

	
 
    chunks = []
 
    try:
 
        line = next(diff_lines)
 

	
 
        while True:
 
            lines = []
 
            chunks.append(lines)
 

	
 
            match = _chunk_re.match(line)
 

	
 
            if not match:
 
                raise Exception('error parsing diff @@ line %r' % line)
 

	
 
            gr = match.groups()
 
            (old_line, old_end,
 
             new_line, new_end) = [int(x or 1) for x in gr[:-1]]
 
            old_line -= 1
 
            new_line -= 1
 

	
 
            context = len(gr) == 5
 
            old_end += old_line
 
            new_end += new_line
 

	
 
            if context:
 
                # skip context only if it's first line
 
                if int(gr[0]) > 1:
 
                    lines.append({
 
                        'old_lineno': '...',
 
                        'new_lineno': '...',
 
                        'action':     'context',
 
                        'line':       line,
 
                    })
kallithea/lib/markup_renderer.py
Show inline comments
 
@@ -30,219 +30,219 @@ import logging
 
import re
 
import traceback
 

	
 
import bleach
 
import markdown as markdown_mod
 

	
 
from kallithea.lib.utils2 import MENTIONS_REGEX, safe_unicode
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
url_re = re.compile(r'''\bhttps?://(?:[\da-zA-Z0-9@:.-]+)'''
 
                    r'''(?:[/a-zA-Z0-9_=@#~&+%.,:;?!*()-]*[/a-zA-Z0-9_=@#~])?''')
 

	
 

	
 
class MarkupRenderer(object):
 
    RESTRUCTUREDTEXT_DISALLOWED_DIRECTIVES = ['include', 'meta', 'raw']
 

	
 
    MARKDOWN_PAT = re.compile(r'md|mkdn?|mdown|markdown', re.IGNORECASE)
 
    RST_PAT = re.compile(r're?st', re.IGNORECASE)
 
    PLAIN_PAT = re.compile(r'readme', re.IGNORECASE)
 

	
 
    @classmethod
 
    def _detect_renderer(cls, source, filename):
 
        """
 
        runs detection of what renderer should be used for generating html
 
        from a markup language
 

	
 
        filename can be also explicitly a renderer name
 
        """
 

	
 
        if cls.MARKDOWN_PAT.findall(filename):
 
            return cls.markdown
 
        elif cls.RST_PAT.findall(filename):
 
            return cls.rst
 
        elif cls.PLAIN_PAT.findall(filename):
 
            return cls.rst
 
        return cls.plain
 

	
 
    @classmethod
 
    def _flavored_markdown(cls, text):
 
        """
 
        Github style flavored markdown
 

	
 
        :param text:
 
        """
 
        from hashlib import md5
 

	
 
        # Extract pre blocks.
 
        extractions = {}
 

	
 
        def pre_extraction_callback(matchobj):
 
            digest = md5(matchobj.group(0)).hexdigest()
 
            extractions[digest] = matchobj.group(0)
 
            return "{gfm-extraction-%s}" % digest
 
        pattern = re.compile(r'<pre>.*?</pre>', re.MULTILINE | re.DOTALL)
 
        text = re.sub(pattern, pre_extraction_callback, text)
 

	
 
        # Prevent foo_bar_baz from ending up with an italic word in the middle.
 
        def italic_callback(matchobj):
 
            s = matchobj.group(0)
 
            if list(s).count('_') >= 2:
 
                return s.replace('_', r'\_')
 
            return s
 
        text = re.sub(r'^(?! {4}|\t)\w+_\w+_\w[\w_]*', italic_callback, text)
 

	
 
        # In very clear cases, let newlines become <br /> tags.
 
        def newline_callback(matchobj):
 
            if len(matchobj.group(1)) == 1:
 
                return matchobj.group(0).rstrip() + '  \n'
 
            else:
 
                return matchobj.group(0)
 
        pattern = re.compile(r'^[\w\<][^\n]*(\n+)', re.MULTILINE)
 
        text = re.sub(pattern, newline_callback, text)
 

	
 
        # Insert pre block extractions.
 
        def pre_insert_callback(matchobj):
 
            return '\n\n' + extractions[matchobj.group(1)]
 
        text = re.sub(r'{gfm-extraction-([0-9a-f]{32})\}',
 
                      pre_insert_callback, text)
 

	
 
        return text
 

	
 
    @classmethod
 
    def render(cls, source, filename=None):
 
        """
 
        Renders a given filename using detected renderer
 
        it detects renderers based on file extension or mimetype.
 
        At last it will just do a simple html replacing new lines with <br/>
 

	
 
        >>> MarkupRenderer.render('''<img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg">''', '.md')
 
        u'<p><img id="a" src="http://example.com/test.jpg" style="color: red;"></p>'
 
        >>> MarkupRenderer.render('''<img class="c d" src="file://localhost/test.jpg">''', 'b.mkd')
 
        u'<p><img class="c d"></p>'
 
        >>> MarkupRenderer.render('''<a href="foo">foo</a>''', 'c.mkdn')
 
        u'<p><a href="foo">foo</a></p>'
 
        >>> MarkupRenderer.render('''<script>alert(1)</script>''', 'd.mdown')
 
        u'&lt;script&gt;alert(1)&lt;/script&gt;'
 
        >>> MarkupRenderer.render('''<div onclick="alert(2)">yo</div>''', 'markdown')
 
        u'<div>yo</div>'
 
        >>> MarkupRenderer.render('''<a href="javascript:alert(3)">yo</a>''', 'md')
 
        u'<p><a>yo</a></p>'
 
        """
 

	
 
        renderer = cls._detect_renderer(source, filename)
 
        readme_data = renderer(source)
 
        # Allow most HTML, while preventing XSS issues:
 
        # no <script> tags, no onclick attributes, no javascript
 
        # "protocol", and also limit styling to prevent defacing.
 
        return bleach.clean(readme_data,
 
            tags=['a', 'abbr', 'b', 'blockquote', 'br', 'code', 'dd',
 
                  'div', 'dl', 'dt', 'em', 'h1', 'h2', 'h3', 'h4', 'h5',
 
                  'h6', 'hr', 'i', 'img', 'li', 'ol', 'p', 'pre', 'span',
 
                  'strong', 'sub', 'sup', 'table', 'tbody', 'td', 'th',
 
                  'thead', 'tr', 'ul'],
 
            attributes=['class', 'id', 'style', 'label', 'title', 'alt', 'href', 'src'],
 
            styles=['color'],
 
            protocols=['http', 'https', 'mailto'],
 
            )
 

	
 
    @classmethod
 
    def plain(cls, source, universal_newline=True):
 
        source = safe_unicode(source)
 
        if universal_newline:
 
            newline = '\n'
 
            source = newline.join(source.splitlines())
 

	
 
        def url_func(match_obj):
 
            url_full = match_obj.group(0)
 
            return '<a href="%(url)s">%(url)s</a>' % ({'url': url_full})
 
        source = url_re.sub(url_func, source)
 
        return '<br />' + source.replace("\n", '<br />')
 

	
 
    @classmethod
 
    def markdown(cls, source, safe=True, flavored=False):
 
        """
 
        Convert Markdown (possibly GitHub Flavored) to INSECURE HTML, possibly
 
        with "safe" fall-back to plaintext. Output from this method should be sanitized before use.
 

	
 
        >>> MarkupRenderer.markdown('''<img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg">''')
 
        u'<p><img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg"></p>'
 
        >>> MarkupRenderer.markdown('''<img class="c d" src="file://localhost/test.jpg">''')
 
        u'<p><img class="c d" src="file://localhost/test.jpg"></p>'
 
        >>> MarkupRenderer.markdown('''<a href="foo">foo</a>''')
 
        u'<p><a href="foo">foo</a></p>'
 
        >>> MarkupRenderer.markdown('''<script>alert(1)</script>''')
 
        u'<script>alert(1)</script>'
 
        >>> MarkupRenderer.markdown('''<div onclick="alert(2)">yo</div>''')
 
        u'<div onclick="alert(2)">yo</div>'
 
        >>> MarkupRenderer.markdown('''<a href="javascript:alert(3)">yo</a>''')
 
        u'<p><a href="javascript:alert(3)">yo</a></p>'
 
        >>> MarkupRenderer.markdown('''## Foo''')
 
        u'<h2>Foo</h2>'
 
        >>> print MarkupRenderer.markdown('''
 
        ...     #!/bin/bash
 
        ...     echo "hello"
 
        ... ''')
 
        <table class="code-highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1
 
        2</pre></div></td><td class="code"><div class="code-highlight"><pre><span></span><span class="ch">#!/bin/bash</span>
 
        <span class="nb">echo</span> <span class="s2">&quot;hello&quot;</span>
 
        </pre></div>
 
        </td></tr></table>
 
        """
 
        source = safe_unicode(source)
 
        try:
 
            if flavored:
 
                source = cls._flavored_markdown(source)
 
            return markdown_mod.markdown(
 
                source,
 
                extensions=['markdown.extensions.codehilite', 'markdown.extensions.extra'],
 
                extension_configs={'markdown.extensions.codehilite': {'css_class': 'code-highlight'}})
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            if safe:
 
                log.debug('Falling back to render in plain mode')
 
                return cls.plain(source)
 
            else:
 
                raise
 

	
 
    @classmethod
 
    def rst(cls, source, safe=True):
 
        source = safe_unicode(source)
 
        try:
 
            from docutils.core import publish_parts
 
            from docutils.parsers.rst import directives
 
            docutils_settings = dict([(alias, None) for alias in
 
                                cls.RESTRUCTUREDTEXT_DISALLOWED_DIRECTIVES])
 

	
 
            docutils_settings.update({'input_encoding': 'unicode',
 
                                      'report_level': 4})
 

	
 
            for k, v in docutils_settings.iteritems():
 
            for k, v in docutils_settings.items():
 
                directives.register_directive(k, v)
 

	
 
            parts = publish_parts(source=source,
 
                                  writer_name="html4css1",
 
                                  settings_overrides=docutils_settings)
 

	
 
            return parts['html_title'] + parts["fragment"]
 
        except ImportError:
 
            log.warning('Install docutils to use this function')
 
            return cls.plain(source)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            if safe:
 
                log.debug('Falling back to render in plain mode')
 
                return cls.plain(source)
 
            else:
 
                raise
 

	
 
    @classmethod
 
    def rst_with_mentions(cls, source):
 

	
 
        def wrapp(match_obj):
 
            uname = match_obj.groups()[0]
 
            return r'\ **@%(uname)s**\ ' % {'uname': uname}
 
        mention_hl = MENTIONS_REGEX.sub(wrapp, source).strip()
 
        return cls.rst(mention_hl)
kallithea/lib/vcs/backends/git/changeset.py
Show inline comments
 
import re
 
from io import BytesIO
 
from itertools import chain
 
from subprocess import PIPE, Popen
 

	
 
from dulwich import objects
 
from dulwich.config import ConfigFile
 

	
 
from kallithea.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError, ChangesetError, ImproperArchiveTypeError, NodeDoesNotExistError, RepositoryError, VCSError
 
from kallithea.lib.vcs.nodes import (
 
    AddedFileNodesGenerator, ChangedFileNodesGenerator, DirNode, FileNode, NodeKind, RemovedFileNodesGenerator, RootNode, SubModuleNode)
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, date_fromtimestamp, safe_int, safe_str, safe_unicode
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 

	
 

	
 
class GitChangeset(BaseChangeset):
 
    """
 
    Represents state of the repository at a revision.
 
    """
 

	
 
    def __init__(self, repository, revision):
 
        self._stat_modes = {}
 
        self.repository = repository
 
        revision = safe_str(revision)
 
        try:
 
            commit = self.repository._repo[ascii_bytes(revision)]
 
            if isinstance(commit, objects.Tag):
 
                revision = safe_str(commit.object[1])
 
                commit = self.repository._repo.get_object(commit.object[1])
 
        except KeyError:
 
            raise RepositoryError("Cannot get object with id %s" % revision)
 
        self.raw_id = ascii_str(commit.id)
 
        self.short_id = self.raw_id[:12]
 
        self._commit = commit  # a Dulwich Commmit with .id
 
        self._tree_id = commit.tree
 
        self._committer_property = 'committer'
 
        self._author_property = 'author'
 
        self._date_property = 'commit_time'
 
        self._date_tz_property = 'commit_timezone'
 
        self.revision = repository.revisions.index(self.raw_id)
 

	
 
        self.nodes = {}
 
        self._paths = {}
 

	
 
    @LazyProperty
 
    def bookmarks(self):
 
        return ()
 

	
 
    @LazyProperty
 
    def message(self):
 
        return safe_unicode(self._commit.message)
 

	
 
    @LazyProperty
 
    def committer(self):
 
        return safe_unicode(getattr(self._commit, self._committer_property))
 

	
 
    @LazyProperty
 
    def author(self):
 
        return safe_unicode(getattr(self._commit, self._author_property))
 

	
 
    @LazyProperty
 
    def date(self):
 
        return date_fromtimestamp(getattr(self._commit, self._date_property),
 
                                  getattr(self._commit, self._date_tz_property))
 

	
 
    @LazyProperty
 
    def _timestamp(self):
 
        return getattr(self._commit, self._date_property)
 

	
 
    @LazyProperty
 
    def status(self):
 
        """
 
        Returns modified, added, removed, deleted files for current changeset
 
        """
 
        return self.changed, self.added, self.removed
 

	
 
    @LazyProperty
 
    def tags(self):
 
        _tags = []
 
        for tname, tsha in self.repository.tags.iteritems():
 
        for tname, tsha in self.repository.tags.items():
 
            if tsha == self.raw_id:
 
                _tags.append(tname)
 
        return _tags
 

	
 
    @LazyProperty
 
    def branch(self):
 
        # Note: This function will return one branch name for the changeset -
 
        # that might not make sense in Git where branches() is a better match
 
        # for the basic model
 
        heads = self.repository._heads(reverse=False)
 
        ref = heads.get(self._commit.id)
 
        if ref:
 
            return safe_unicode(ref)
 

	
 
    @LazyProperty
 
    def branches(self):
 
        heads = self.repository._heads(reverse=True)
 
        return [b for b in heads if heads[b] == self._commit.id] # FIXME: Inefficient ... and returning None!
 

	
 
    def _fix_path(self, path):
 
        """
 
        Paths are stored without trailing slash so we need to get rid off it if
 
        needed.
 
        """
 
        if path.endswith('/'):
 
            path = path.rstrip('/')
 
        return path
 

	
 
    def _get_id_for_path(self, path):
 
        path = safe_str(path)
 
        # FIXME: Please, spare a couple of minutes and make those codes cleaner;
 
        if path not in self._paths:
 
            path = path.strip('/')
 
            # set root tree
 
            tree = self.repository._repo[self._tree_id]
 
            if path == '':
 
                self._paths[''] = tree.id
 
                return tree.id
 
            splitted = path.split('/')
 
            dirs, name = splitted[:-1], splitted[-1]
 
            curdir = ''
 

	
 
            # initially extract things from root dir
 
            for item, stat, id in tree.iteritems():
 
            for item, stat, id in tree.items():
 
                if curdir:
 
                    name = '/'.join((curdir, item))
 
                else:
 
                    name = item
 
                self._paths[name] = id
 
                self._stat_modes[name] = stat
 

	
 
            for dir in dirs:
 
                if curdir:
 
                    curdir = '/'.join((curdir, dir))
 
                else:
 
                    curdir = dir
 
                dir_id = None
 
                for item, stat, id in tree.iteritems():
 
                for item, stat, id in tree.items():
 
                    if dir == item:
 
                        dir_id = id
 
                if dir_id:
 
                    # Update tree
 
                    tree = self.repository._repo[dir_id]
 
                    if not isinstance(tree, objects.Tree):
 
                        raise ChangesetError('%s is not a directory' % curdir)
 
                else:
 
                    raise ChangesetError('%s have not been found' % curdir)
 

	
 
                # cache all items from the given traversed tree
 
                for item, stat, id in tree.iteritems():
 
                for item, stat, id in tree.items():
 
                    if curdir:
 
                        name = '/'.join((curdir, item))
 
                    else:
 
                        name = item
 
                    self._paths[name] = id
 
                    self._stat_modes[name] = stat
 
            if path not in self._paths:
 
                raise NodeDoesNotExistError("There is no file nor directory "
 
                    "at the given path '%s' at revision %s"
 
                    % (path, safe_str(self.short_id)))
 
        return self._paths[path]
 

	
 
    def _get_kind(self, path):
 
        obj = self.repository._repo[self._get_id_for_path(path)]
 
        if isinstance(obj, objects.Blob):
 
            return NodeKind.FILE
 
        elif isinstance(obj, objects.Tree):
 
            return NodeKind.DIR
 

	
 
    def _get_filectx(self, path):
 
        path = self._fix_path(path)
 
        if self._get_kind(path) != NodeKind.FILE:
 
            raise ChangesetError("File does not exist for revision %s at "
 
                " '%s'" % (self.raw_id, path))
 
        return path
 

	
 
    def _get_file_nodes(self):
 
        return chain(*(t[2] for t in self.walk()))
 

	
 
    @LazyProperty
 
    def parents(self):
 
        """
 
        Returns list of parents changesets.
 
        """
 
        return [self.repository.get_changeset(ascii_str(parent_id))
 
                for parent_id in self._commit.parents]
 

	
 
    @LazyProperty
 
    def children(self):
 
        """
 
        Returns list of children changesets.
 
        """
 
        rev_filter = settings.GIT_REV_FILTER
 
        so = self.repository.run_git_command(
 
            ['rev-list', rev_filter, '--children']
 
        )
 
        return [
 
            self.repository.get_changeset(cs)
 
            for parts in (l.split(' ') for l in so.splitlines())
 
            if parts[0] == self.raw_id
 
            for cs in parts[1:]
 
        ]
 

	
 
    def next(self, branch=None):
 
        if branch and self.branch != branch:
 
            raise VCSError('Branch option used on changeset not belonging '
 
                           'to that branch')
 

	
 
        cs = self
 
        while True:
 
            try:
 
                next_ = cs.revision + 1
 
                next_rev = cs.repository.revisions[next_]
 
            except IndexError:
 
                raise ChangesetDoesNotExistError
 
            cs = cs.repository.get_changeset(next_rev)
 

	
 
            if not branch or branch == cs.branch:
 
                return cs
 

	
 
    def prev(self, branch=None):
 
        if branch and self.branch != branch:
 
            raise VCSError('Branch option used on changeset not belonging '
 
                           'to that branch')
 

	
 
        cs = self
 
        while True:
 
            try:
 
                prev_ = cs.revision - 1
 
                if prev_ < 0:
 
                    raise IndexError
 
                prev_rev = cs.repository.revisions[prev_]
 
            except IndexError:
 
                raise ChangesetDoesNotExistError
 
            cs = cs.repository.get_changeset(prev_rev)
 

	
 
            if not branch or branch == cs.branch:
 
                return cs
 

	
 
    def diff(self, ignore_whitespace=True, context=3):
 
        # Only used to feed diffstat
 
        rev1 = self.parents[0] if self.parents else self.repository.EMPTY_CHANGESET
 
        rev2 = self
 
        return b''.join(self.repository.get_diff(rev1, rev2,
 
                                    ignore_whitespace=ignore_whitespace,
 
                                    context=context))
 

	
 
    def get_file_mode(self, path):
 
        """
 
        Returns stat mode of the file at the given ``path``.
 
        """
 
        # ensure path is traversed
 
        path = safe_str(path)
 
        self._get_id_for_path(path)
 
        return self._stat_modes[path]
 

	
 
    def get_file_content(self, path):
 
        """
 
        Returns content of the file at given ``path``.
 
        """
 
        id = self._get_id_for_path(path)
 
        blob = self.repository._repo[id]
 
        return blob.as_pretty_string()
 

	
 
    def get_file_size(self, path):
 
        """
 
        Returns size of the file at given ``path``.
 
        """
 
        id = self._get_id_for_path(path)
 
        blob = self.repository._repo[id]
 
        return blob.raw_length()
 

	
 
    def get_file_changeset(self, path):
 
        """
 
        Returns last commit of the file at the given ``path``.
 
        """
 
        return self.get_file_history(path, limit=1)[0]
 

	
 
    def get_file_history(self, path, limit=None):
 
        """
 
        Returns history of file as reversed list of ``Changeset`` objects for
 
        which file at given ``path`` has been modified.
 

	
 
        TODO: This function now uses os underlying 'git' and 'grep' commands
 
        which is generally not good. Should be replaced with algorithm
 
        iterating commits.
 
        """
 
        self._get_filectx(path)
 
        f_path = safe_str(path)
 

	
 
        if limit is not None:
 
            cmd = ['log', '-n', str(safe_int(limit, 0)),
 
                   '--pretty=format:%H', '-s', self.raw_id, '--', f_path]
 

	
 
        else:
 
            cmd = ['log',
 
                   '--pretty=format:%H', '-s', self.raw_id, '--', f_path]
 
        so = self.repository.run_git_command(cmd)
 
        ids = re.findall(r'[0-9a-fA-F]{40}', so)
 
        return [self.repository.get_changeset(sha) for sha in ids]
 

	
 
    def get_file_history_2(self, path):
 
        """
 
        Returns history of file as reversed list of ``Changeset`` objects for
 
        which file at given ``path`` has been modified.
 

	
 
        """
 
        self._get_filectx(path)
 
        from dulwich.walk import Walker
 
        include = [self.raw_id]
 
        walker = Walker(self.repository._repo.object_store, include,
 
                        paths=[path], max_entries=1)
 
        return [self.repository.get_changeset(ascii_str(x.commit.id.decode))
 
                for x in walker]
 

	
 
    def get_file_annotate(self, path):
 
        """
 
        Returns a generator of four element tuples with
 
            lineno, sha, changeset lazy loader and line
 
        """
 
        # TODO: This function now uses os underlying 'git' command which is
 
        # generally not good. Should be replaced with algorithm iterating
 
        # commits.
 
        cmd = ['blame', '-l', '--root', '-r', self.raw_id, '--', path]
 
        # -l     ==> outputs long shas (and we need all 40 characters)
 
        # --root ==> doesn't put '^' character for boundaries
 
        # -r sha ==> blames for the given revision
 
        so = self.repository.run_git_command(cmd)
 

	
 
        for i, blame_line in enumerate(so.split('\n')[:-1]):
 
            sha, line = re.split(r' ', blame_line, 1)
 
            yield (i + 1, sha, lambda sha=sha: self.repository.get_changeset(sha), line)
 

	
 
    def fill_archive(self, stream=None, kind='tgz', prefix=None,
 
                     subrepos=False):
 
        """
 
        Fills up given stream.
 

	
 
        :param stream: file like object.
 
        :param kind: one of following: ``zip``, ``tgz`` or ``tbz2``.
 
            Default: ``tgz``.
 
        :param prefix: name of root directory in archive.
 
            Default is repository name and changeset's raw_id joined with dash
 
            (``repo-tip.<KIND>``).
 
        :param subrepos: include subrepos in this archive.
 

	
 
        :raise ImproperArchiveTypeError: If given kind is wrong.
 
        :raise VcsError: If given stream is None
 
        """
 
        allowed_kinds = settings.ARCHIVE_SPECS
 
        if kind not in allowed_kinds:
 
            raise ImproperArchiveTypeError('Archive kind not supported use one'
 
                'of %s' % ' '.join(allowed_kinds))
 

	
 
        if stream is None:
 
            raise VCSError('You need to pass in a valid stream for filling'
 
                           ' with archival data')
 

	
 
        if prefix is None:
 
            prefix = '%s-%s' % (self.repository.name, self.short_id)
 
        elif prefix.startswith('/'):
 
            raise VCSError("Prefix cannot start with leading slash")
 
        elif prefix.strip() == '':
 
            raise VCSError("Prefix cannot be empty")
 

	
 
        if kind == 'zip':
 
            frmt = 'zip'
 
        else:
 
            frmt = 'tar'
 
        _git_path = settings.GIT_EXECUTABLE_PATH
 
        cmd = '%s archive --format=%s --prefix=%s/ %s' % (_git_path,
 
                                                frmt, prefix, self.raw_id)
 
        if kind == 'tgz':
 
            cmd += ' | gzip -9'
 
        elif kind == 'tbz2':
 
            cmd += ' | bzip2 -9'
 

	
 
        if stream is None:
 
            raise VCSError('You need to pass in a valid stream for filling'
 
                           ' with archival data')
 
        popen = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True,
 
                      cwd=self.repository.path)
 

	
 
        buffer_size = 1024 * 8
 
        chunk = popen.stdout.read(buffer_size)
 
        while chunk:
 
            stream.write(chunk)
 
            chunk = popen.stdout.read(buffer_size)
 
        # Make sure all descriptors would be read
 
        popen.communicate()
 

	
 
    def get_nodes(self, path):
 
        """
 
        Returns combined ``DirNode`` and ``FileNode`` objects list representing
 
        state of changeset at the given ``path``. If node at the given ``path``
 
        is not instance of ``DirNode``, ChangesetError would be raised.
 
        """
 

	
 
        if self._get_kind(path) != NodeKind.DIR:
 
            raise ChangesetError("Directory does not exist for revision %s at "
 
                " '%s'" % (self.revision, path))
 
        path = self._fix_path(path)
 
        id = self._get_id_for_path(path)
 
        tree = self.repository._repo[id]
 
        dirnodes = []
 
        filenodes = []
 
        als = self.repository.alias
 
        for name, stat, id in tree.iteritems():
 
        for name, stat, id in tree.items():
 
            if path != '':
 
                obj_path = '/'.join((path, name))
 
            else:
 
                obj_path = name
 
            if objects.S_ISGITLINK(stat):
 
                root_tree = self.repository._repo[self._tree_id]
 
                cf = ConfigFile.from_file(BytesIO(self.repository._repo.get_object(root_tree[b'.gitmodules'][1]).data))
 
                url = ascii_str(cf.get(('submodule', obj_path), 'url'))
 
                dirnodes.append(SubModuleNode(obj_path, url=url, changeset=ascii_str(id),
 
                                              alias=als))
 
                continue
 

	
 
            obj = self.repository._repo.get_object(id)
 
            if obj_path not in self._stat_modes:
 
                self._stat_modes[obj_path] = stat
 
            if isinstance(obj, objects.Tree):
 
                dirnodes.append(DirNode(obj_path, changeset=self))
 
            elif isinstance(obj, objects.Blob):
 
                filenodes.append(FileNode(obj_path, changeset=self, mode=stat))
 
            else:
 
                raise ChangesetError("Requested object should be Tree "
 
                                     "or Blob, is %r" % type(obj))
 
        nodes = dirnodes + filenodes
 
        for node in nodes:
 
            if node.path not in self.nodes:
 
                self.nodes[node.path] = node
 
        nodes.sort()
 
        return nodes
 

	
 
    def get_node(self, path):
 
        """
 
        Returns ``Node`` object from the given ``path``. If there is no node at
 
        the given ``path``, ``ChangesetError`` would be raised.
 
        """
 
        path = self._fix_path(path)
 
        if path not in self.nodes:
 
            try:
 
                id_ = self._get_id_for_path(path)
 
            except ChangesetError:
 
                raise NodeDoesNotExistError("Cannot find one of parents' "
 
                    "directories for a given path: %s" % path)
 

	
 
            _GL = lambda m: m and objects.S_ISGITLINK(m)
 
            if _GL(self._stat_modes.get(path)):
 
                tree = self.repository._repo[self._tree_id]
 
                cf = ConfigFile.from_file(BytesIO(self.repository._repo.get_object(tree[b'.gitmodules'][1]).data))
 
                url = ascii_str(cf.get(('submodule', path), 'url'))
 
                node = SubModuleNode(path, url=url, changeset=ascii_str(id_),
 
                                     alias=self.repository.alias)
 
            else:
 
                obj = self.repository._repo.get_object(id_)
 

	
 
                if isinstance(obj, objects.Tree):
 
                    if path == '':
 
                        node = RootNode(changeset=self)
 
                    else:
 
                        node = DirNode(path, changeset=self)
 
                    node._tree = obj
 
                elif isinstance(obj, objects.Blob):
 
                    node = FileNode(path, changeset=self)
 
                    node._blob = obj
 
                else:
 
                    raise NodeDoesNotExistError("There is no file nor directory "
 
                        "at the given path: '%s' at revision %s"
 
                        % (path, self.short_id))
 
            # cache node
 
            self.nodes[path] = node
 
        return self.nodes[path]
 

	
 
    @LazyProperty
 
    def affected_files(self):
 
        """
 
        Gets a fast accessible file changes for given changeset
 
        """
 
        added, modified, deleted = self._changes_cache
 
        return list(added.union(modified).union(deleted))
 

	
 
    @LazyProperty
 
    def _changes_cache(self):
 
        added = set()
 
        modified = set()
 
        deleted = set()
 
        _r = self.repository._repo
 

	
 
        parents = self.parents
 
        if not self.parents:
 
            parents = [EmptyChangeset()]
 
        for parent in parents:
 
            if isinstance(parent, EmptyChangeset):
 
                oid = None
 
            else:
 
                oid = _r[parent._commit.id].tree
 
            changes = _r.object_store.tree_changes(oid, _r[self._commit.id].tree)
 
            for (oldpath, newpath), (_, _), (_, _) in changes:
 
                if newpath and oldpath:
 
                    modified.add(newpath)
 
                elif newpath and not oldpath:
 
                    added.add(newpath)
 
                elif not newpath and oldpath:
 
                    deleted.add(oldpath)
 
        return added, modified, deleted
 

	
 
    def _get_paths_for_status(self, status):
 
        """
 
        Returns sorted list of paths for given ``status``.
 

	
 
        :param status: one of: *added*, *modified* or *deleted*
 
        """
 
        added, modified, deleted = self._changes_cache
 
        return sorted({
 
            'added': list(added),
 
            'modified': list(modified),
 
            'deleted': list(deleted)}[status]
 
        )
 

	
 
    @LazyProperty
 
    def added(self):
 
        """
 
        Returns list of added ``FileNode`` objects.
 
        """
 
        if not self.parents:
 
            return list(self._get_file_nodes())
 
        return AddedFileNodesGenerator([n for n in
 
                                self._get_paths_for_status('added')], self)
 

	
 
    @LazyProperty
 
    def changed(self):
 
        """
 
        Returns list of modified ``FileNode`` objects.
 
        """
 
        if not self.parents:
 
            return []
 
        return ChangedFileNodesGenerator([n for n in
 
                                self._get_paths_for_status('modified')], self)
 

	
 
    @LazyProperty
 
    def removed(self):
 
        """
 
        Returns list of removed ``FileNode`` objects.
 
        """
 
        if not self.parents:
 
            return []
 
        return RemovedFileNodesGenerator([n for n in
 
                                self._get_paths_for_status('deleted')], self)
 

	
 
    extra = {}
kallithea/lib/vcs/backends/git/inmemory.py
Show inline comments
 
import datetime
 
import posixpath
 
import stat
 
import time
 

	
 
from dulwich import objects
 

	
 
from kallithea.lib.vcs.backends.base import BaseInMemoryChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils import ascii_str, safe_bytes
 

	
 

	
 
class GitInMemoryChangeset(BaseInMemoryChangeset):
 

	
 
    def commit(self, message, author, parents=None, branch=None, date=None,
 
               **kwargs):
 
        """
 
        Performs in-memory commit (doesn't check workdir in any way) and
 
        returns newly created ``Changeset``. Updates repository's
 
        ``revisions``.
 

	
 
        :param message: message of the commit
 
        :param author: full username, i.e. "Joe Doe <joe.doe@example.com>"
 
        :param parents: single parent or sequence of parents from which commit
 
          would be derived
 
        :param date: ``datetime.datetime`` instance. Defaults to
 
          ``datetime.datetime.now()``.
 
        :param branch: branch name, as string. If none given, default backend's
 
          branch would be used.
 

	
 
        :raises ``CommitError``: if any error occurs while committing
 
        """
 
        self.check_integrity(parents)
 

	
 
        from .repository import GitRepository
 
        if branch is None:
 
            branch = GitRepository.DEFAULT_BRANCH_NAME
 

	
 
        repo = self.repository._repo
 
        object_store = repo.object_store
 

	
 
        ENCODING = b"UTF-8"  # TODO: should probably be kept in sync with safe_unicode/safe_bytes and vcs/conf/settings.py DEFAULT_ENCODINGS
 

	
 
        # Create tree and populates it with blobs
 
        commit_tree = self.parents[0] and repo[self.parents[0]._commit.tree] or \
 
            objects.Tree()
 
        for node in self.added + self.changed:
 
            # Compute subdirs if needed
 
            dirpath, nodename = posixpath.split(node.path)
 
            dirnames = safe_bytes(dirpath).split(b'/') if dirpath else []
 
            parent = commit_tree
 
            ancestors = [('', parent)]
 

	
 
            # Tries to dig for the deepest existing tree
 
            while dirnames:
 
                curdir = dirnames.pop(0)
 
                try:
 
                    dir_id = parent[curdir][1]
 
                except KeyError:
 
                    # put curdir back into dirnames and stops
 
                    dirnames.insert(0, curdir)
 
                    break
 
                else:
 
                    # If found, updates parent
 
                    parent = self.repository._repo[dir_id]
 
                    ancestors.append((curdir, parent))
 
            # Now parent is deepest existing tree and we need to create subtrees
 
            # for dirnames (in reverse order) [this only applies for nodes from added]
 
            new_trees = []
 

	
 
            blob = objects.Blob.from_string(node.content)
 

	
 
            node_path = safe_bytes(node.name)
 
            if dirnames:
 
                # If there are trees which should be created we need to build
 
                # them now (in reverse order)
 
                reversed_dirnames = list(reversed(dirnames))
 
                curtree = objects.Tree()
 
                curtree[node_path] = node.mode, blob.id
 
                new_trees.append(curtree)
 
                for dirname in reversed_dirnames[:-1]:
 
                    newtree = objects.Tree()
 
                    #newtree.add(stat.S_IFDIR, dirname, curtree.id)
 
                    newtree[dirname] = stat.S_IFDIR, curtree.id
 
                    new_trees.append(newtree)
 
                    curtree = newtree
 
                parent[reversed_dirnames[-1]] = stat.S_IFDIR, curtree.id
 
            else:
 
                parent.add(name=node_path, mode=node.mode, hexsha=blob.id)
 

	
 
            new_trees.append(parent)
 
            # Update ancestors
 
            for parent, tree, path in reversed([(a[1], b[1], b[0]) for a, b in
 
                zip(ancestors, ancestors[1:])]
 
            ):
 
                parent[path] = stat.S_IFDIR, tree.id
 
                object_store.add_object(tree)
 

	
 
            object_store.add_object(blob)
 
            for tree in new_trees:
 
                object_store.add_object(tree)
 
        for node in self.removed:
 
            paths = safe_bytes(node.path).split(b'/')
 
            tree = commit_tree
 
            trees = [tree]
 
            # Traverse deep into the forest...
 
            for path in paths:
 
                try:
 
                    obj = self.repository._repo[tree[path][1]]
 
                    if isinstance(obj, objects.Tree):
 
                        trees.append(obj)
 
                        tree = obj
 
                except KeyError:
 
                    break
 
            # Cut down the blob and all rotten trees on the way back...
 
            for path, tree in reversed(zip(paths, trees)):
 
                del tree[path]
 
                if tree:
 
                    # This tree still has elements - don't remove it or any
 
                    # of it's parents
 
                    break
 

	
 
        object_store.add_object(commit_tree)
 

	
 
        # Create commit
 
        commit = objects.Commit()
 
        commit.tree = commit_tree.id
 
        commit.parents = [p._commit.id for p in self.parents if p]
 
        commit.author = commit.committer = safe_bytes(author)
 
        commit.encoding = ENCODING
 
        commit.message = safe_bytes(message)
 

	
 
        # Compute date
 
        if date is None:
 
            date = time.time()
 
        elif isinstance(date, datetime.datetime):
 
            date = time.mktime(date.timetuple())
 

	
 
        author_time = kwargs.pop('author_time', date)
 
        commit.commit_time = int(date)
 
        commit.author_time = int(author_time)
 
        tz = time.timezone
 
        author_tz = kwargs.pop('author_timezone', tz)
 
        commit.commit_timezone = tz
 
        commit.author_timezone = author_tz
 

	
 
        object_store.add_object(commit)
 

	
 
        # Update vcs repository object & recreate dulwich repo
 
        ref = b'refs/heads/%s' % safe_bytes(branch)
 
        repo.refs[ref] = commit.id
 
        self.repository.revisions.append(ascii_str(commit.id))
 
        # invalidate parsed refs after commit
 
        self.repository._parsed_refs = self.repository._get_parsed_refs()
 
        tip = self.repository.get_changeset()
 
        self.reset()
 
        return tip
 

	
 
    def _get_missing_trees(self, path, root_tree):
 
        """
 
        Creates missing ``Tree`` objects for the given path.
 

	
 
        :param path: path given as a string. It may be a path to a file node
 
          (i.e. ``foo/bar/baz.txt``) or directory path - in that case it must
 
          end with slash (i.e. ``foo/bar/``).
 
        :param root_tree: ``dulwich.objects.Tree`` object from which we start
 
          traversing (should be commit's root tree)
 
        """
 
        dirpath = posixpath.split(path)[0]
 
        dirs = dirpath.split('/')
 
        if not dirs or dirs == ['']:
 
            return []
 

	
 
        def get_tree_for_dir(tree, dirname):
 
            for name, mode, id in tree.iteritems():
 
            for name, mode, id in tree.items():
 
                if name == dirname:
 
                    obj = self.repository._repo[id]
 
                    if isinstance(obj, objects.Tree):
 
                        return obj
 
                    else:
 
                        raise RepositoryError("Cannot create directory %s "
 
                            "at tree %s as path is occupied and is not a "
 
                            "Tree" % (dirname, tree))
 
            return None
 

	
 
        trees = []
 
        parent = root_tree
 
        for dirname in dirs:
 
            tree = get_tree_for_dir(parent, dirname)
 
            if tree is None:
 
                tree = objects.Tree()
 
                parent.add(stat.S_IFDIR, dirname, tree.id)
 
                parent = tree
 
            # Always append tree
 
            trees.append(tree)
 
        return trees
kallithea/lib/vcs/backends/git/repository.py
Show inline comments
 
@@ -63,603 +63,603 @@ class GitRepository(BaseRepository):
 
            self.bare and abspath(self.path, 'config')
 
                      or abspath(self.path, '.git', 'config'),
 
             abspath(get_user_home(), '.gitconfig'),
 
         ]
 

	
 
    @property
 
    def _repo(self):
 
        return self.repo
 

	
 
    @property
 
    def head(self):
 
        try:
 
            return self._repo.head()
 
        except KeyError:
 
            return None
 

	
 
    @property
 
    def _empty(self):
 
        """
 
        Checks if repository is empty ie. without any changesets
 
        """
 

	
 
        try:
 
            self.revisions[0]
 
        except (KeyError, IndexError):
 
            return True
 
        return False
 

	
 
    @LazyProperty
 
    def revisions(self):
 
        """
 
        Returns list of revisions' ids, in ascending order.  Being lazy
 
        attribute allows external tools to inject shas from cache.
 
        """
 
        return self._get_all_revisions()
 

	
 
    @classmethod
 
    def _run_git_command(cls, cmd, cwd=None):
 
        """
 
        Runs given ``cmd`` as git command and returns output bytes in a tuple
 
        (stdout, stderr) ... or raise RepositoryError.
 

	
 
        :param cmd: git command to be executed
 
        :param cwd: passed directly to subprocess
 
        """
 
        # need to clean fix GIT_DIR !
 
        gitenv = dict(os.environ)
 
        gitenv.pop('GIT_DIR', None)
 
        gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
 

	
 
        assert isinstance(cmd, list), cmd
 
        cmd = [settings.GIT_EXECUTABLE_PATH, '-c', 'core.quotepath=false'] + cmd
 
        try:
 
            p = subprocessio.SubprocessIOChunker(cmd, cwd=cwd, env=gitenv, shell=False)
 
        except (EnvironmentError, OSError) as err:
 
            # output from the failing process is in str(EnvironmentError)
 
            msg = ("Couldn't run git command %s.\n"
 
                   "Subprocess failed with '%s': %s\n" %
 
                   (cmd, type(err).__name__, err)
 
            ).strip()
 
            log.error(msg)
 
            raise RepositoryError(msg)
 

	
 
        try:
 
            stdout = b''.join(p.output)
 
            stderr = b''.join(p.error)
 
        finally:
 
            p.close()
 
        # TODO: introduce option to make commands fail if they have any stderr output?
 
        if stderr:
 
            log.debug('stderr from %s:\n%s', cmd, stderr)
 
        else:
 
            log.debug('stderr from %s: None', cmd)
 
        return stdout, stderr
 

	
 
    def run_git_command(self, cmd):
 
        """
 
        Runs given ``cmd`` as git command with cwd set to current repo.
 
        Returns stdout as unicode str ... or raise RepositoryError.
 
        """
 
        cwd = None
 
        if os.path.isdir(self.path):
 
            cwd = self.path
 
        stdout, _stderr = self._run_git_command(cmd, cwd=cwd)
 
        return safe_unicode(stdout)
 

	
 
    @classmethod
 
    def _check_url(cls, url):
 
        """
 
        Function will check given url and try to verify if it's a valid
 
        link. Sometimes it may happened that git will issue basic
 
        auth request that can cause whole API to hang when used from python
 
        or other external calls.
 

	
 
        On failures it'll raise urllib2.HTTPError, exception is also thrown
 
        when the return code is non 200
 
        """
 
        # check first if it's not an local url
 
        if os.path.isdir(url) or url.startswith('file:'):
 
            return True
 

	
 
        if url.startswith('git://'):
 
            return True
 

	
 
        if '+' in url[:url.find('://')]:
 
            url = url[url.find('+') + 1:]
 

	
 
        handlers = []
 
        url_obj = mercurial.util.url(safe_bytes(url))
 
        test_uri, authinfo = url_obj.authinfo()
 
        if not test_uri.endswith('info/refs'):
 
            test_uri = test_uri.rstrip('/') + '/info/refs'
 

	
 
        url_obj.passwd = b'*****'
 
        cleaned_uri = str(url_obj)
 

	
 
        if authinfo:
 
            # create a password manager
 
            passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
 
            passmgr.add_password(*authinfo)
 

	
 
            handlers.extend((mercurial.url.httpbasicauthhandler(passmgr),
 
                             mercurial.url.httpdigestauthhandler(passmgr)))
 

	
 
        o = urllib2.build_opener(*handlers)
 
        o.addheaders = [('User-Agent', 'git/1.7.8.0')]  # fake some git
 

	
 
        req = urllib2.Request(
 
            "%s?%s" % (
 
                test_uri,
 
                urllib.urlencode({"service": 'git-upload-pack'})
 
            ))
 

	
 
        try:
 
            resp = o.open(req)
 
            if resp.code != 200:
 
                raise Exception('Return Code is not 200')
 
        except Exception as e:
 
            # means it cannot be cloned
 
            raise urllib2.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
 

	
 
        # now detect if it's proper git repo
 
        gitdata = resp.read()
 
        if 'service=git-upload-pack' not in gitdata:
 
            raise urllib2.URLError(
 
                "url [%s] does not look like an git" % cleaned_uri)
 

	
 
        return True
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False,
 
                  bare=False):
 
        if create and os.path.exists(self.path):
 
            raise RepositoryError("Location already exist")
 
        if src_url and not create:
 
            raise RepositoryError("Create should be set to True if src_url is "
 
                                  "given (clone operation creates repository)")
 
        try:
 
            if create and src_url:
 
                GitRepository._check_url(src_url)
 
                self.clone(src_url, update_after_clone, bare)
 
                return Repo(self.path)
 
            elif create:
 
                os.makedirs(self.path)
 
                if bare:
 
                    return Repo.init_bare(self.path)
 
                else:
 
                    return Repo.init(self.path)
 
            else:
 
                return Repo(self.path)
 
        except (NotGitRepository, OSError) as err:
 
            raise RepositoryError(err)
 

	
 
    def _get_all_revisions(self):
 
        # we must check if this repo is not empty, since later command
 
        # fails if it is. And it's cheaper to ask than throw the subprocess
 
        # errors
 
        try:
 
            self._repo.head()
 
        except KeyError:
 
            return []
 

	
 
        rev_filter = settings.GIT_REV_FILTER
 
        cmd = ['rev-list', rev_filter, '--reverse', '--date-order']
 
        try:
 
            so = self.run_git_command(cmd)
 
        except RepositoryError:
 
            # Can be raised for empty repositories
 
            return []
 
        return so.splitlines()
 

	
 
    def _get_all_revisions2(self):
 
        # alternate implementation using dulwich
 
        includes = [ascii_str(sha) for key, (sha, type_) in self._parsed_refs.iteritems()
 
        includes = [ascii_str(sha) for key, (sha, type_) in self._parsed_refs.items()
 
                    if type_ != b'T']
 
        return [c.commit.id for c in self._repo.get_walker(include=includes)]
 

	
 
    def _get_revision(self, revision):
 
        """
 
        Given any revision identifier, returns a 40 char string with revision hash.
 
        """
 
        if self._empty:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        if revision in (None, '', 'tip', 'HEAD', 'head', -1):
 
            revision = -1
 

	
 
        if isinstance(revision, int):
 
            try:
 
                return self.revisions[revision]
 
            except IndexError:
 
                msg = "Revision %r does not exist for %s" % (revision, self.name)
 
                raise ChangesetDoesNotExistError(msg)
 

	
 
        if isinstance(revision, (str, unicode)):
 
            if revision.isdigit() and (len(revision) < 12 or len(revision) == revision.count('0')):
 
                try:
 
                    return self.revisions[int(revision)]
 
                except IndexError:
 
                    msg = "Revision %r does not exist for %s" % (revision, self)
 
                    raise ChangesetDoesNotExistError(msg)
 

	
 
            # get by branch/tag name
 
            _ref_revision = self._parsed_refs.get(safe_bytes(revision))
 
            if _ref_revision:  # and _ref_revision[1] in [b'H', b'RH', b'T']:
 
                return ascii_str(_ref_revision[0])
 

	
 
            if revision in self.revisions:
 
                return revision
 

	
 
            # maybe it's a tag ? we don't have them in self.revisions
 
            if revision in self.tags.values():
 
                return revision
 

	
 
            if SHA_PATTERN.match(revision):
 
                msg = "Revision %r does not exist for %s" % (revision, self.name)
 
                raise ChangesetDoesNotExistError(msg)
 

	
 
        raise ChangesetDoesNotExistError("Given revision %r not recognized" % revision)
 

	
 
    def get_ref_revision(self, ref_type, ref_name):
 
        """
 
        Returns ``GitChangeset`` object representing repository's
 
        changeset at the given ``revision``.
 
        """
 
        return self._get_revision(ref_name)
 

	
 
    def _get_archives(self, archive_name='tip'):
 

	
 
        for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
 
            yield {"type": i[0], "extension": i[1], "node": archive_name}
 

	
 
    def _get_url(self, url):
 
        """
 
        Returns normalized url. If schema is not given, would fall to
 
        filesystem (``file:///``) schema.
 
        """
 
        url = safe_str(url)
 
        if url != 'default' and '://' not in url:
 
            url = ':///'.join(('file', url))
 
        return url
 

	
 
    @LazyProperty
 
    def name(self):
 
        return os.path.basename(self.path)
 

	
 
    @LazyProperty
 
    def last_change(self):
 
        """
 
        Returns last change made on this repository as datetime object
 
        """
 
        return date_fromtimestamp(self._get_mtime(), makedate()[1])
 

	
 
    def _get_mtime(self):
 
        try:
 
            return time.mktime(self.get_changeset().date.timetuple())
 
        except RepositoryError:
 
            idx_loc = '' if self.bare else '.git'
 
            # fallback to filesystem
 
            in_path = os.path.join(self.path, idx_loc, "index")
 
            he_path = os.path.join(self.path, idx_loc, "HEAD")
 
            if os.path.exists(in_path):
 
                return os.stat(in_path).st_mtime
 
            else:
 
                return os.stat(he_path).st_mtime
 

	
 
    @LazyProperty
 
    def description(self):
 
        return safe_unicode(self._repo.get_description() or b'unknown')
 

	
 
    @LazyProperty
 
    def contact(self):
 
        undefined_contact = u'Unknown'
 
        return undefined_contact
 

	
 
    @property
 
    def branches(self):
 
        if not self.revisions:
 
            return {}
 
        sortkey = lambda ctx: ctx[0]
 
        _branches = [(key, ascii_str(sha))
 
                     for key, (sha, type_) in self._parsed_refs.iteritems() if type_ == b'H']
 
                     for key, (sha, type_) in self._parsed_refs.items() if type_ == b'H']
 
        return OrderedDict(sorted(_branches, key=sortkey, reverse=False))
 

	
 
    @LazyProperty
 
    def closed_branches(self):
 
        return {}
 

	
 
    @LazyProperty
 
    def tags(self):
 
        return self._get_tags()
 

	
 
    def _get_tags(self):
 
        if not self.revisions:
 
            return {}
 

	
 
        sortkey = lambda ctx: ctx[0]
 
        _tags = [(key, ascii_str(sha))
 
                 for key, (sha, type_) in self._parsed_refs.iteritems() if type_ == b'T']
 
                 for key, (sha, type_) in self._parsed_refs.items() if type_ == b'T']
 
        return OrderedDict(sorted(_tags, key=sortkey, reverse=True))
 

	
 
    def tag(self, name, user, revision=None, message=None, date=None,
 
            **kwargs):
 
        """
 
        Creates and returns a tag for the given ``revision``.
 

	
 
        :param name: name for new tag
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param revision: changeset id for which new tag would be created
 
        :param message: message of the tag's commit
 
        :param date: date of tag's commit
 

	
 
        :raises TagAlreadyExistError: if tag with same name already exists
 
        """
 
        if name in self.tags:
 
            raise TagAlreadyExistError("Tag %s already exists" % name)
 
        changeset = self.get_changeset(revision)
 
        message = message or "Added tag %s for commit %s" % (name,
 
            changeset.raw_id)
 
        self._repo.refs[b"refs/tags/%s" % safe_bytes(name)] = changeset._commit.id
 

	
 
        self._parsed_refs = self._get_parsed_refs()
 
        self.tags = self._get_tags()
 
        return changeset
 

	
 
    def remove_tag(self, name, user, message=None, date=None):
 
        """
 
        Removes tag with the given ``name``.
 

	
 
        :param name: name of the tag to be removed
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param message: message of the tag's removal commit
 
        :param date: date of tag's removal commit
 

	
 
        :raises TagDoesNotExistError: if tag with given name does not exists
 
        """
 
        if name not in self.tags:
 
            raise TagDoesNotExistError("Tag %s does not exist" % name)
 
        # self._repo.refs is a DiskRefsContainer, and .path gives the full absolute path of '.git'
 
        tagpath = os.path.join(self._repo.refs.path, 'refs', 'tags', name)
 
        try:
 
            os.remove(tagpath)
 
            self._parsed_refs = self._get_parsed_refs()
 
            self.tags = self._get_tags()
 
        except OSError as e:
 
            raise RepositoryError(e.strerror)
 

	
 
    @LazyProperty
 
    def bookmarks(self):
 
        """
 
        Gets bookmarks for this repository
 
        """
 
        return {}
 

	
 
    @LazyProperty
 
    def _parsed_refs(self):
 
        return self._get_parsed_refs()
 

	
 
    def _get_parsed_refs(self):
 
        """Return refs as a dict, like:
 
        { b'v0.2.0': [b'599ba911aa24d2981225f3966eb659dfae9e9f30', b'T'] }
 
        """
 
        _repo = self._repo
 
        refs = _repo.get_refs()
 
        keys = [(b'refs/heads/', b'H'),
 
                (b'refs/remotes/origin/', b'RH'),
 
                (b'refs/tags/', b'T')]
 
        _refs = {}
 
        for ref, sha in refs.iteritems():
 
        for ref, sha in refs.items():
 
            for k, type_ in keys:
 
                if ref.startswith(k):
 
                    _key = ref[len(k):]
 
                    if type_ == b'T':
 
                        obj = _repo.get_object(sha)
 
                        if isinstance(obj, Tag):
 
                            sha = _repo.get_object(sha).object[1]
 
                    _refs[_key] = [sha, type_]
 
                    break
 
        return _refs
 

	
 
    def _heads(self, reverse=False):
 
        refs = self._repo.get_refs()
 
        heads = {}
 

	
 
        for key, val in refs.items():
 
            for ref_key in [b'refs/heads/', b'refs/remotes/origin/']:
 
                if key.startswith(ref_key):
 
                    n = key[len(ref_key):]
 
                    if n not in [b'HEAD']:
 
                        heads[n] = val
 

	
 
        return heads if reverse else dict((y, x) for x, y in heads.iteritems())
 
        return heads if reverse else dict((y, x) for x, y in heads.items())
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns ``GitChangeset`` object representing commit from git repository
 
        at the given revision or head (most recent commit) if None given.
 
        """
 
        if isinstance(revision, GitChangeset):
 
            return revision
 
        return GitChangeset(repository=self, revision=self._get_revision(revision))
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
 
           end_date=None, branch_name=None, reverse=False, max_revisions=None):
 
        """
 
        Returns iterator of ``GitChangeset`` objects from start to end (both
 
        are inclusive), in ascending date order (unless ``reverse`` is set).
 

	
 
        :param start: changeset ID, as str; first returned changeset
 
        :param end: changeset ID, as str; last returned changeset
 
        :param start_date: if specified, changesets with commit date less than
 
          ``start_date`` would be filtered out from returned set
 
        :param end_date: if specified, changesets with commit date greater than
 
          ``end_date`` would be filtered out from returned set
 
        :param branch_name: if specified, changesets not reachable from given
 
          branch would be filtered out from returned set
 
        :param reverse: if ``True``, returned generator would be reversed
 
          (meaning that returned changesets would have descending date order)
 

	
 
        :raise BranchDoesNotExistError: If given ``branch_name`` does not
 
            exist.
 
        :raise ChangesetDoesNotExistError: If changeset for given ``start`` or
 
          ``end`` could not be found.
 

	
 
        """
 
        if branch_name and branch_name not in self.branches:
 
            raise BranchDoesNotExistError("Branch '%s' not found"
 
                                          % branch_name)
 
        # actually we should check now if it's not an empty repo to not spaw
 
        # subprocess commands
 
        if self._empty:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        # %H at format means (full) commit hash, initial hashes are retrieved
 
        # in ascending date order
 
        cmd = ['log', '--date-order', '--reverse', '--pretty=format:%H']
 
        if max_revisions:
 
            cmd += ['--max-count=%s' % max_revisions]
 
        if start_date:
 
            cmd += ['--since', start_date.strftime('%m/%d/%y %H:%M:%S')]
 
        if end_date:
 
            cmd += ['--until', end_date.strftime('%m/%d/%y %H:%M:%S')]
 
        if branch_name:
 
            cmd.append(branch_name)
 
        else:
 
            cmd.append(settings.GIT_REV_FILTER)
 

	
 
        revs = self.run_git_command(cmd).splitlines()
 
        start_pos = 0
 
        end_pos = len(revs)
 
        if start:
 
            _start = self._get_revision(start)
 
            try:
 
                start_pos = revs.index(_start)
 
            except ValueError:
 
                pass
 

	
 
        if end is not None:
 
            _end = self._get_revision(end)
 
            try:
 
                end_pos = revs.index(_end)
 
            except ValueError:
 
                pass
 

	
 
        if None not in [start, end] and start_pos > end_pos:
 
            raise RepositoryError('start cannot be after end')
 

	
 
        if end_pos is not None:
 
            end_pos += 1
 

	
 
        revs = revs[start_pos:end_pos]
 
        if reverse:
 
            revs.reverse()
 

	
 
        return CollectionGenerator(self, revs)
 

	
 
    def get_diff(self, rev1, rev2, path=None, ignore_whitespace=False,
 
                 context=3):
 
        """
 
        Returns (git like) *diff*, as plain bytes text. Shows changes
 
        introduced by ``rev2`` since ``rev1``.
 

	
 
        :param rev1: Entry point from which diff is shown. Can be
 
          ``self.EMPTY_CHANGESET`` - in this case, patch showing all
 
          the changes since empty state of the repository until ``rev2``
 
        :param rev2: Until which revision changes should be shown.
 
        :param ignore_whitespace: If set to ``True``, would not show whitespace
 
          changes. Defaults to ``False``.
 
        :param context: How many lines before/after changed lines should be
 
          shown. Defaults to ``3``. Due to limitations in Git, if
 
          value passed-in is greater than ``2**31-1``
 
          (``2147483647``), it will be set to ``2147483647``
 
          instead. If negative value is passed-in, it will be set to
 
          ``0`` instead.
 
        """
 

	
 
        # Git internally uses a signed long int for storing context
 
        # size (number of lines to show before and after the
 
        # differences). This can result in integer overflow, so we
 
        # ensure the requested context is smaller by one than the
 
        # number that would cause the overflow. It is highly unlikely
 
        # that a single file will contain that many lines, so this
 
        # kind of change should not cause any realistic consequences.
 
        overflowed_long_int = 2**31
 

	
 
        if context >= overflowed_long_int:
 
            context = overflowed_long_int - 1
 

	
 
        # Negative context values make no sense, and will result in
 
        # errors. Ensure this does not happen.
 
        if context < 0:
 
            context = 0
 

	
 
        flags = ['-U%s' % context, '--full-index', '--binary', '-p', '-M', '--abbrev=40']
 
        if ignore_whitespace:
 
            flags.append('-w')
 

	
 
        if hasattr(rev1, 'raw_id'):
 
            rev1 = getattr(rev1, 'raw_id')
 

	
 
        if hasattr(rev2, 'raw_id'):
 
            rev2 = getattr(rev2, 'raw_id')
 

	
 
        if rev1 == self.EMPTY_CHANGESET:
 
            rev2 = self.get_changeset(rev2).raw_id
 
            cmd = ['show'] + flags + [rev2]
 
        else:
 
            rev1 = self.get_changeset(rev1).raw_id
 
            rev2 = self.get_changeset(rev2).raw_id
 
            cmd = ['diff'] + flags + [rev1, rev2]
 

	
 
        if path:
 
            cmd += ['--', path]
 

	
 
        stdout, stderr = self._run_git_command(cmd, cwd=self.path)
 
        # If we used 'show' command, strip first few lines (until actual diff
 
        # starts)
 
        if rev1 == self.EMPTY_CHANGESET:
 
            parts = stdout.split(b'\ndiff ', 1)
 
            if len(parts) > 1:
 
                stdout = b'diff ' + parts[1]
 
        return stdout
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        """
 
        Returns ``GitInMemoryChangeset`` object for this repository.
 
        """
 
        return GitInMemoryChangeset(self)
 

	
 
    def clone(self, url, update_after_clone=True, bare=False):
 
        """
 
        Tries to clone changes from external location.
 

	
 
        :param update_after_clone: If set to ``False``, git won't checkout
 
          working directory
 
        :param bare: If set to ``True``, repository would be cloned into
 
          *bare* git repository (no working directory at all).
 
        """
 
        url = self._get_url(url)
 
        cmd = ['clone', '-q']
 
        if bare:
 
            cmd.append('--bare')
 
        elif not update_after_clone:
 
            cmd.append('--no-checkout')
 
        cmd += ['--', url, self.path]
 
        # If error occurs run_git_command raises RepositoryError already
 
        self.run_git_command(cmd)
 

	
 
    def pull(self, url):
 
        """
 
        Tries to pull changes from external location.
 
        """
 
        url = self._get_url(url)
 
        cmd = ['pull', '--ff-only', url]
 
        # If error occurs run_git_command raises RepositoryError already
 
        self.run_git_command(cmd)
 

	
 
    def fetch(self, url):
 
        """
 
        Tries to pull changes from external location.
 
        """
 
        url = self._get_url(url)
 
        so = self.run_git_command(['ls-remote', '-h', url])
kallithea/lib/vcs/backends/hg/changeset.py
Show inline comments
 
@@ -151,257 +151,257 @@ class MercurialChangeset(BaseChangeset):
 
        Returns list of children changesets.
 
        """
 
        return [self.repository.get_changeset(child.rev())
 
                for child in self._ctx.children() if child.rev() >= 0]
 

	
 
    def next(self, branch=None):
 
        if branch and self.branch != branch:
 
            raise VCSError('Branch option used on changeset not belonging '
 
                           'to that branch')
 

	
 
        cs = self
 
        while True:
 
            try:
 
                next_ = cs.repository.revisions.index(cs.raw_id) + 1
 
                next_rev = cs.repository.revisions[next_]
 
            except IndexError:
 
                raise ChangesetDoesNotExistError
 
            cs = cs.repository.get_changeset(next_rev)
 

	
 
            if not branch or branch == cs.branch:
 
                return cs
 

	
 
    def prev(self, branch=None):
 
        if branch and self.branch != branch:
 
            raise VCSError('Branch option used on changeset not belonging '
 
                           'to that branch')
 

	
 
        cs = self
 
        while True:
 
            try:
 
                prev_ = cs.repository.revisions.index(cs.raw_id) - 1
 
                if prev_ < 0:
 
                    raise IndexError
 
                prev_rev = cs.repository.revisions[prev_]
 
            except IndexError:
 
                raise ChangesetDoesNotExistError
 
            cs = cs.repository.get_changeset(prev_rev)
 

	
 
            if not branch or branch == cs.branch:
 
                return cs
 

	
 
    def diff(self):
 
        # Only used to feed diffstat
 
        return b''.join(self._ctx.diff())
 

	
 
    def _fix_path(self, path):
 
        """
 
        Paths are stored without trailing slash so we need to get rid off it if
 
        needed. Also mercurial keeps filenodes as str so we need to decode
 
        from unicode to str
 
        """
 
        if path.endswith('/'):
 
            path = path.rstrip('/')
 

	
 
        return safe_str(path)
 

	
 
    def _get_kind(self, path):
 
        path = self._fix_path(path)
 
        if path in self._file_paths:
 
            return NodeKind.FILE
 
        elif path in self._dir_paths:
 
            return NodeKind.DIR
 
        else:
 
            raise ChangesetError("Node does not exist at the given path '%s'"
 
                % (path))
 

	
 
    def _get_filectx(self, path):
 
        path = self._fix_path(path)
 
        if self._get_kind(path) != NodeKind.FILE:
 
            raise ChangesetError("File does not exist for revision %s at "
 
                " '%s'" % (self.raw_id, path))
 
        return self._ctx.filectx(safe_bytes(path))
 

	
 
    def _extract_submodules(self):
 
        """
 
        returns a dictionary with submodule information from substate file
 
        of hg repository
 
        """
 
        return self._ctx.substate
 

	
 
    def get_file_mode(self, path):
 
        """
 
        Returns stat mode of the file at the given ``path``.
 
        """
 
        fctx = self._get_filectx(path)
 
        if b'x' in fctx.flags():
 
            return 0o100755
 
        else:
 
            return 0o100644
 

	
 
    def get_file_content(self, path):
 
        """
 
        Returns content of the file at given ``path``.
 
        """
 
        fctx = self._get_filectx(path)
 
        return fctx.data()
 

	
 
    def get_file_size(self, path):
 
        """
 
        Returns size of the file at given ``path``.
 
        """
 
        fctx = self._get_filectx(path)
 
        return fctx.size()
 

	
 
    def get_file_changeset(self, path):
 
        """
 
        Returns last commit of the file at the given ``path``.
 
        """
 
        return self.get_file_history(path, limit=1)[0]
 

	
 
    def get_file_history(self, path, limit=None):
 
        """
 
        Returns history of file as reversed list of ``Changeset`` objects for
 
        which file at given ``path`` has been modified.
 
        """
 
        fctx = self._get_filectx(path)
 
        hist = []
 
        cnt = 0
 
        for cs in reversed([x for x in fctx.filelog()]):
 
            cnt += 1
 
            hist.append(mercurial.node.hex(fctx.filectx(cs).node()))
 
            if limit is not None and cnt == limit:
 
                break
 

	
 
        return [self.repository.get_changeset(node) for node in hist]
 

	
 
    def get_file_annotate(self, path):
 
        """
 
        Returns a generator of four element tuples with
 
            lineno, sha, changeset lazy loader and line
 
        """
 
        annotations = self._get_filectx(path).annotate()
 
        annotation_lines = [(annotateline.fctx, annotateline.text) for annotateline in annotations]
 
        for i, (fctx, line) in enumerate(annotation_lines):
 
            sha = ascii_str(fctx.hex())
 
            yield (i + 1, sha, lambda sha=sha: self.repository.get_changeset(sha), line)
 

	
 
    def fill_archive(self, stream=None, kind='tgz', prefix=None,
 
                     subrepos=False):
 
        """
 
        Fills up given stream.
 

	
 
        :param stream: file like object.
 
        :param kind: one of following: ``zip``, ``tgz`` or ``tbz2``.
 
            Default: ``tgz``.
 
        :param prefix: name of root directory in archive.
 
            Default is repository name and changeset's raw_id joined with dash
 
            (``repo-tip.<KIND>``).
 
        :param subrepos: include subrepos in this archive.
 

	
 
        :raise ImproperArchiveTypeError: If given kind is wrong.
 
        :raise VcsError: If given stream is None
 
        """
 
        allowed_kinds = settings.ARCHIVE_SPECS
 
        if kind not in allowed_kinds:
 
            raise ImproperArchiveTypeError('Archive kind not supported use one'
 
                'of %s' % ' '.join(allowed_kinds))
 

	
 
        if stream is None:
 
            raise VCSError('You need to pass in a valid stream for filling'
 
                           ' with archival data')
 

	
 
        if prefix is None:
 
            prefix = '%s-%s' % (self.repository.name, self.short_id)
 
        elif prefix.startswith('/'):
 
            raise VCSError("Prefix cannot start with leading slash")
 
        elif prefix.strip() == '':
 
            raise VCSError("Prefix cannot be empty")
 

	
 
        mercurial.archival.archive(self.repository._repo, stream, ascii_bytes(self.raw_id),
 
                         safe_bytes(kind), prefix=safe_bytes(prefix), subrepos=subrepos)
 

	
 
    def get_nodes(self, path):
 
        """
 
        Returns combined ``DirNode`` and ``FileNode`` objects list representing
 
        state of changeset at the given ``path``. If node at the given ``path``
 
        is not instance of ``DirNode``, ChangesetError would be raised.
 
        """
 

	
 
        if self._get_kind(path) != NodeKind.DIR:
 
            raise ChangesetError("Directory does not exist for revision %s at "
 
                " '%s'" % (self.revision, path))
 
        path = self._fix_path(path)
 

	
 
        filenodes = [FileNode(f, changeset=self) for f in self._file_paths
 
            if os.path.dirname(f) == path]
 
        dirs = path == '' and '' or [d for d in self._dir_paths
 
            if d and posixpath.dirname(d) == path]
 
        dirnodes = [DirNode(d, changeset=self) for d in dirs
 
            if os.path.dirname(d) == path]
 

	
 
        als = self.repository.alias
 
        for k, vals in self._extract_submodules().iteritems():
 
        for k, vals in self._extract_submodules().items():
 
            #vals = url,rev,type
 
            loc = vals[0]
 
            cs = vals[1]
 
            dirnodes.append(SubModuleNode(k, url=loc, changeset=cs,
 
                                          alias=als))
 
        nodes = dirnodes + filenodes
 
        for node in nodes:
 
            self.nodes[node.path] = node
 
        nodes.sort()
 
        return nodes
 

	
 
    def get_node(self, path):
 
        """
 
        Returns ``Node`` object from the given ``path``. If there is no node at
 
        the given ``path``, ``ChangesetError`` would be raised.
 
        """
 
        path = self._fix_path(path)
 
        if path not in self.nodes:
 
            if path in self._file_paths:
 
                node = FileNode(path, changeset=self)
 
            elif path in self._dir_paths or path in self._dir_paths:
 
                if path == '':
 
                    node = RootNode(changeset=self)
 
                else:
 
                    node = DirNode(path, changeset=self)
 
            else:
 
                raise NodeDoesNotExistError("There is no file nor directory "
 
                    "at the given path: '%s' at revision %s"
 
                    % (path, self.short_id))
 
            # cache node
 
            self.nodes[path] = node
 
        return self.nodes[path]
 

	
 
    @LazyProperty
 
    def affected_files(self):
 
        """
 
        Gets a fast accessible file changes for given changeset
 
        """
 
        return self._ctx.files()
 

	
 
    @property
 
    def added(self):
 
        """
 
        Returns list of added ``FileNode`` objects.
 
        """
 
        return AddedFileNodesGenerator([n for n in self.status.added], self)
 

	
 
    @property
 
    def changed(self):
 
        """
 
        Returns list of modified ``FileNode`` objects.
 
        """
 
        return ChangedFileNodesGenerator([n for n in self.status.modified], self)
 

	
 
    @property
 
    def removed(self):
 
        """
 
        Returns list of removed ``FileNode`` objects.
 
        """
 
        return RemovedFileNodesGenerator([n for n in self.status.removed], self)
 

	
 
    @LazyProperty
 
    def extra(self):
 
        return self._ctx.extra()
kallithea/lib/vcs/utils/progressbar.py
Show inline comments
 
@@ -26,385 +26,385 @@ class ProgressBar(object):
 
        self.steps = steps
 
        self.stream = stream or sys.stderr
 
        self.bar_char = '='
 
        self.width = 50
 
        self.separator = ' | '
 
        self.elements = elements or self.default_elements
 
        self.started = None
 
        self.finished = False
 
        self.steps_label = 'Step'
 
        self.time_label = 'Time'
 
        self.eta_label = 'ETA'
 
        self.speed_label = 'Speed'
 
        self.transfer_label = 'Transfer'
 

	
 
    def __str__(self):
 
        return self.get_line()
 

	
 
    def __iter__(self):
 
        start = self.step
 
        end = self.steps + 1
 
        for x in xrange(start, end):
 
            self.render(x)
 
            yield x
 

	
 
    def get_separator(self):
 
        return self.separator
 

	
 
    def get_bar_char(self):
 
        return self.bar_char
 

	
 
    def get_bar(self):
 
        char = self.get_bar_char()
 
        perc = self.get_percentage()
 
        length = int(self.width * perc / 100)
 
        bar = char * length
 
        bar = bar.ljust(self.width)
 
        return bar
 

	
 
    def get_elements(self):
 
        return self.elements
 

	
 
    def get_template(self):
 
        separator = self.get_separator()
 
        elements = self.get_elements()
 
        return string.Template(separator.join((('$%s' % e) for e in elements)))
 

	
 
    def get_total_time(self, current_time=None):
 
        if current_time is None:
 
            current_time = datetime.datetime.now()
 
        if not self.started:
 
            return datetime.timedelta()
 
        return current_time - self.started
 

	
 
    def get_rendered_total_time(self):
 
        delta = self.get_total_time()
 
        if not delta:
 
            ttime = '-'
 
        else:
 
            ttime = str(delta)
 
        return '%s %s' % (self.time_label, ttime)
 

	
 
    def get_eta(self, current_time=None):
 
        if current_time is None:
 
            current_time = datetime.datetime.now()
 
        if self.step == 0:
 
            return datetime.timedelta()
 
        total_seconds = self.get_total_time().total_seconds()
 
        eta_seconds = total_seconds * self.steps / self.step - total_seconds
 
        return datetime.timedelta(seconds=int(eta_seconds))
 

	
 
    def get_rendered_eta(self):
 
        eta = self.get_eta()
 
        if not eta:
 
            eta = '--:--:--'
 
        else:
 
            eta = str(eta).rjust(8)
 
        return '%s: %s' % (self.eta_label, eta)
 

	
 
    def get_percentage(self):
 
        return float(self.step) / self.steps * 100
 

	
 
    def get_rendered_percentage(self):
 
        perc = self.get_percentage()
 
        return ('%s%%' % (int(perc))).rjust(5)
 

	
 
    def get_rendered_steps(self):
 
        return '%s: %s/%s' % (self.steps_label, self.step, self.steps)
 

	
 
    def get_rendered_speed(self, step=None, total_seconds=None):
 
        if step is None:
 
            step = self.step
 
        if total_seconds is None:
 
            total_seconds = self.get_total_time().total_seconds()
 
        if step <= 0 or total_seconds <= 0:
 
            speed = '-'
 
        else:
 
            speed = filesizeformat(float(step) / total_seconds)
 
        return '%s: %s/s' % (self.speed_label, speed)
 

	
 
    def get_rendered_transfer(self, step=None, steps=None):
 
        if step is None:
 
            step = self.step
 
        if steps is None:
 
            steps = self.steps
 

	
 
        if steps <= 0:
 
            return '%s: -' % self.transfer_label
 
        total = filesizeformat(float(steps))
 
        if step <= 0:
 
            transferred = '-'
 
        else:
 
            transferred = filesizeformat(float(step))
 
        return '%s: %s / %s' % (self.transfer_label, transferred, total)
 

	
 
    def get_context(self):
 
        return {
 
            'percentage': self.get_rendered_percentage(),
 
            'bar': self.get_bar(),
 
            'steps': self.get_rendered_steps(),
 
            'time': self.get_rendered_total_time(),
 
            'eta': self.get_rendered_eta(),
 
            'speed': self.get_rendered_speed(),
 
            'transfer': self.get_rendered_transfer(),
 
        }
 

	
 
    def get_line(self):
 
        template = self.get_template()
 
        context = self.get_context()
 
        return template.safe_substitute(**context)
 

	
 
    def write(self, data):
 
        self.stream.write(data)
 

	
 
    def render(self, step):
 
        if not self.started:
 
            self.started = datetime.datetime.now()
 
        if self.finished:
 
            raise AlreadyFinishedError
 
        self.step = step
 
        self.write('\r%s' % self)
 
        if step == self.steps:
 
            self.finished = True
 
        if step == self.steps:
 
            self.write('\n')
 

	
 

	
 
"""
 
termcolors.py
 

	
 
Grabbed from Django (http://www.djangoproject.com)
 
"""
 

	
 
color_names = ('black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white')
 
foreground = dict([(color_names[x], '3%s' % x) for x in range(8)])
 
background = dict([(color_names[x], '4%s' % x) for x in range(8)])
 

	
 
RESET = '0'
 
opt_dict = {'bold': '1', 'underscore': '4', 'blink': '5', 'reverse': '7', 'conceal': '8'}
 

	
 

	
 
def colorize(text='', opts=(), **kwargs):
 
    """
 
    Returns your text, enclosed in ANSI graphics codes.
 

	
 
    Depends on the keyword arguments 'fg' and 'bg', and the contents of
 
    the opts tuple/list.
 

	
 
    Returns the RESET code if no parameters are given.
 

	
 
    Valid colors:
 
        'black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white'
 

	
 
    Valid options:
 
        'bold'
 
        'underscore'
 
        'blink'
 
        'reverse'
 
        'conceal'
 
        'noreset' - string will not be auto-terminated with the RESET code
 

	
 
    Examples:
 
        colorize('hello', fg='red', bg='blue', opts=('blink',))
 
        colorize()
 
        colorize('goodbye', opts=('underscore',))
 
        print colorize('first line', fg='red', opts=('noreset',))
 
        print 'this should be red too'
 
        print colorize('and so should this')
 
        print 'this should not be red'
 
    """
 
    code_list = []
 
    if text == '' and len(opts) == 1 and opts[0] == 'reset':
 
        return '\x1b[%sm' % RESET
 
    for k, v in kwargs.iteritems():
 
    for k, v in kwargs.items():
 
        if k == 'fg':
 
            code_list.append(foreground[v])
 
        elif k == 'bg':
 
            code_list.append(background[v])
 
    for o in opts:
 
        if o in opt_dict:
 
            code_list.append(opt_dict[o])
 
    if 'noreset' not in opts:
 
        text = text + '\x1b[%sm' % RESET
 
    return ('\x1b[%sm' % ';'.join(code_list)) + text
 

	
 

	
 
def make_style(opts=(), **kwargs):
 
    """
 
    Returns a function with default parameters for colorize()
 

	
 
    Example:
 
        bold_red = make_style(opts=('bold',), fg='red')
 
        print bold_red('hello')
 
        KEYWORD = make_style(fg='yellow')
 
        COMMENT = make_style(fg='blue', opts=('bold',))
 
    """
 
    return lambda text: colorize(text, opts, **kwargs)
 

	
 

	
 
NOCOLOR_PALETTE = 'nocolor'
 
DARK_PALETTE = 'dark'
 
LIGHT_PALETTE = 'light'
 

	
 
PALETTES = {
 
    NOCOLOR_PALETTE: {
 
        'ERROR':        {},
 
        'NOTICE':       {},
 
        'SQL_FIELD':    {},
 
        'SQL_COLTYPE':  {},
 
        'SQL_KEYWORD':  {},
 
        'SQL_TABLE':    {},
 
        'HTTP_INFO':         {},
 
        'HTTP_SUCCESS':      {},
 
        'HTTP_REDIRECT':     {},
 
        'HTTP_NOT_MODIFIED': {},
 
        'HTTP_BAD_REQUEST':  {},
 
        'HTTP_NOT_FOUND':    {},
 
        'HTTP_SERVER_ERROR': {},
 
    },
 
    DARK_PALETTE: {
 
        'ERROR':        { 'fg': 'red', 'opts': ('bold',) },
 
        'NOTICE':       { 'fg': 'red' },
 
        'SQL_FIELD':    { 'fg': 'green', 'opts': ('bold',) },
 
        'SQL_COLTYPE':  { 'fg': 'green' },
 
        'SQL_KEYWORD':  { 'fg': 'yellow' },
 
        'SQL_TABLE':    { 'opts': ('bold',) },
 
        'HTTP_INFO':         { 'opts': ('bold',) },
 
        'HTTP_SUCCESS':      { },
 
        'HTTP_REDIRECT':     { 'fg': 'green' },
 
        'HTTP_NOT_MODIFIED': { 'fg': 'cyan' },
 
        'HTTP_BAD_REQUEST':  { 'fg': 'red', 'opts': ('bold',) },
 
        'HTTP_NOT_FOUND':    { 'fg': 'yellow' },
 
        'HTTP_SERVER_ERROR': { 'fg': 'magenta', 'opts': ('bold',) },
 
    },
 
    LIGHT_PALETTE: {
 
        'ERROR':        { 'fg': 'red', 'opts': ('bold',) },
 
        'NOTICE':       { 'fg': 'red' },
 
        'SQL_FIELD':    { 'fg': 'green', 'opts': ('bold',) },
 
        'SQL_COLTYPE':  { 'fg': 'green' },
 
        'SQL_KEYWORD':  { 'fg': 'blue' },
 
        'SQL_TABLE':    { 'opts': ('bold',) },
 
        'HTTP_INFO':         { 'opts': ('bold',) },
 
        'HTTP_SUCCESS':      { },
 
        'HTTP_REDIRECT':     { 'fg': 'green', 'opts': ('bold',) },
 
        'HTTP_NOT_MODIFIED': { 'fg': 'green' },
 
        'HTTP_BAD_REQUEST':  { 'fg': 'red', 'opts': ('bold',) },
 
        'HTTP_NOT_FOUND':    { 'fg': 'red' },
 
        'HTTP_SERVER_ERROR': { 'fg': 'magenta', 'opts': ('bold',) },
 
    }
 
}
 
DEFAULT_PALETTE = DARK_PALETTE
 

	
 
# ---------------------------- #
 
# --- End of termcolors.py --- #
 
# ---------------------------- #
 

	
 

	
 
class ColoredProgressBar(ProgressBar):
 

	
 
    BAR_COLORS = (
 
        (10, 'red'),
 
        (30, 'magenta'),
 
        (50, 'yellow'),
 
        (99, 'green'),
 
        (100, 'blue'),
 
    )
 

	
 
    def get_line(self):
 
        line = super(ColoredProgressBar, self).get_line()
 
        perc = self.get_percentage()
 
        if perc > 100:
 
            color = 'blue'
 
        for max_perc, color in self.BAR_COLORS:
 
            if perc <= max_perc:
 
                break
 
        return colorize(line, fg=color)
 

	
 

	
 
class AnimatedProgressBar(ProgressBar):
 

	
 
    def get_bar_char(self):
 
        chars = '-/|\\'
 
        if self.step >= self.steps:
 
            return '='
 
        return chars[self.step % len(chars)]
 

	
 

	
 
class BarOnlyProgressBar(ProgressBar):
 

	
 
    default_elements = ['bar', 'steps']
 

	
 
    def get_bar(self):
 
        bar = super(BarOnlyProgressBar, self).get_bar()
 
        perc = self.get_percentage()
 
        perc_text = '%s%%' % int(perc)
 
        text = (' %s%% ' % (perc_text)).center(self.width, '=')
 
        L = text.find(' ')
 
        R = text.rfind(' ')
 
        bar = ' '.join((bar[:L], perc_text, bar[R:]))
 
        return bar
 

	
 

	
 
class AnimatedColoredProgressBar(AnimatedProgressBar,
 
                                 ColoredProgressBar):
 
    pass
 

	
 

	
 
class BarOnlyColoredProgressBar(ColoredProgressBar,
 
                                BarOnlyProgressBar):
 
    pass
 

	
 

	
 
def main():
 
    import time
 

	
 
    print("Standard progress bar...")
 
    bar = ProgressBar(30)
 
    for x in xrange(1, 31):
 
        bar.render(x)
 
        time.sleep(0.02)
 
    bar.stream.write('\n')
 
    print()
 

	
 
    print("Empty bar...")
 
    bar = ProgressBar(50)
 
    bar.render(0)
 
    print()
 
    print()
 

	
 
    print("Colored bar...")
 
    bar = ColoredProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print()
 

	
 
    print("Animated char bar...")
 
    bar = AnimatedProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print()
 

	
 
    print("Animated + colored char bar...")
 
    bar = AnimatedColoredProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print()
 

	
 
    print("Bar only ...")
 
    bar = BarOnlyProgressBar(20)
 
    for x in bar:
 
        time.sleep(0.01)
 
    print()
 

	
 
    print("Colored, longer bar-only, eta, total time ...")
 
    bar = BarOnlyColoredProgressBar(40)
 
    bar.width = 60
 
    bar.elements += ['time', 'eta']
 
    for x in bar:
 
        time.sleep(0.01)
 
    print()
 
    print()
 

	
 
    print("File transfer bar, breaks after 2 seconds ...")
 
    total_bytes = 1024 * 1024 * 2
 
    bar = ProgressBar(total_bytes)
 
    bar.width = 50
kallithea/lib/vcs/utils/termcolors.py
Show inline comments
 
"""
 
termcolors.py
 

	
 
Grabbed from Django (http://www.djangoproject.com)
 
"""
 

	
 
color_names = ('black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white')
 
foreground = dict([(color_names[x], '3%s' % x) for x in range(8)])
 
background = dict([(color_names[x], '4%s' % x) for x in range(8)])
 

	
 
RESET = '0'
 
opt_dict = {'bold': '1', 'underscore': '4', 'blink': '5', 'reverse': '7', 'conceal': '8'}
 

	
 

	
 
def colorize(text='', opts=(), **kwargs):
 
    """
 
    Returns your text, enclosed in ANSI graphics codes.
 

	
 
    Depends on the keyword arguments 'fg' and 'bg', and the contents of
 
    the opts tuple/list.
 

	
 
    Returns the RESET code if no parameters are given.
 

	
 
    Valid colors:
 
        'black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white'
 

	
 
    Valid options:
 
        'bold'
 
        'underscore'
 
        'blink'
 
        'reverse'
 
        'conceal'
 
        'noreset' - string will not be auto-terminated with the RESET code
 

	
 
    Examples:
 
        colorize('hello', fg='red', bg='blue', opts=('blink',))
 
        colorize()
 
        colorize('goodbye', opts=('underscore',))
 
        print colorize('first line', fg='red', opts=('noreset',))
 
        print 'this should be red too'
 
        print colorize('and so should this')
 
        print 'this should not be red'
 
    """
 
    code_list = []
 
    if text == '' and len(opts) == 1 and opts[0] == 'reset':
 
        return '\x1b[%sm' % RESET
 
    for k, v in kwargs.iteritems():
 
    for k, v in kwargs.items():
 
        if k == 'fg':
 
            code_list.append(foreground[v])
 
        elif k == 'bg':
 
            code_list.append(background[v])
 
    for o in opts:
 
        if o in opt_dict:
 
            code_list.append(opt_dict[o])
 
    if 'noreset' not in opts:
 
        text = text + '\x1b[%sm' % RESET
 
    return ('\x1b[%sm' % ';'.join(code_list)) + text
 

	
 

	
 
def make_style(opts=(), **kwargs):
 
    """
 
    Returns a function with default parameters for colorize()
 

	
 
    Example:
 
        bold_red = make_style(opts=('bold',), fg='red')
 
        print bold_red('hello')
 
        KEYWORD = make_style(fg='yellow')
 
        COMMENT = make_style(fg='blue', opts=('bold',))
 
    """
 
    return lambda text: colorize(text, opts, **kwargs)
 

	
 

	
 
NOCOLOR_PALETTE = 'nocolor'
 
DARK_PALETTE = 'dark'
 
LIGHT_PALETTE = 'light'
 

	
 
PALETTES = {
 
    NOCOLOR_PALETTE: {
 
        'ERROR':        {},
 
        'NOTICE':       {},
 
        'SQL_FIELD':    {},
 
        'SQL_COLTYPE':  {},
 
        'SQL_KEYWORD':  {},
 
        'SQL_TABLE':    {},
 
        'HTTP_INFO':         {},
 
        'HTTP_SUCCESS':      {},
 
        'HTTP_REDIRECT':     {},
 
        'HTTP_NOT_MODIFIED': {},
 
        'HTTP_BAD_REQUEST':  {},
 
        'HTTP_NOT_FOUND':    {},
 
        'HTTP_SERVER_ERROR': {},
 
    },
 
    DARK_PALETTE: {
 
        'ERROR':        { 'fg': 'red', 'opts': ('bold',) },
 
        'NOTICE':       { 'fg': 'red' },
 
        'SQL_FIELD':    { 'fg': 'green', 'opts': ('bold',) },
 
        'SQL_COLTYPE':  { 'fg': 'green' },
 
        'SQL_KEYWORD':  { 'fg': 'yellow' },
 
        'SQL_TABLE':    { 'opts': ('bold',) },
 
        'HTTP_INFO':         { 'opts': ('bold',) },
 
        'HTTP_SUCCESS':      { },
 
        'HTTP_REDIRECT':     { 'fg': 'green' },
 
        'HTTP_NOT_MODIFIED': { 'fg': 'cyan' },
 
        'HTTP_BAD_REQUEST':  { 'fg': 'red', 'opts': ('bold',) },
 
        'HTTP_NOT_FOUND':    { 'fg': 'yellow' },
 
        'HTTP_SERVER_ERROR': { 'fg': 'magenta', 'opts': ('bold',) },
 
    },
 
    LIGHT_PALETTE: {
 
        'ERROR':        { 'fg': 'red', 'opts': ('bold',) },
 
        'NOTICE':       { 'fg': 'red' },
 
        'SQL_FIELD':    { 'fg': 'green', 'opts': ('bold',) },
 
        'SQL_COLTYPE':  { 'fg': 'green' },
 
        'SQL_KEYWORD':  { 'fg': 'blue' },
 
        'SQL_TABLE':    { 'opts': ('bold',) },
 
        'HTTP_INFO':         { 'opts': ('bold',) },
 
        'HTTP_SUCCESS':      { },
 
        'HTTP_REDIRECT':     { 'fg': 'green', 'opts': ('bold',) },
 
        'HTTP_NOT_MODIFIED': { 'fg': 'green' },
 
        'HTTP_BAD_REQUEST':  { 'fg': 'red', 'opts': ('bold',) },
 
        'HTTP_NOT_FOUND':    { 'fg': 'red' },
 
        'HTTP_SERVER_ERROR': { 'fg': 'magenta', 'opts': ('bold',) },
 
    }
 
}
 
DEFAULT_PALETTE = DARK_PALETTE
 

	
 

	
 
def parse_color_setting(config_string):
 
    """Parse a DJANGO_COLORS environment variable to produce the system palette
 

	
 
    The general form of a palette definition is:
 

	
 
        "palette;role=fg;role=fg/bg;role=fg,option,option;role=fg/bg,option,option"
 

	
 
    where:
 
        palette is a named palette; one of 'light', 'dark', or 'nocolor'.
 
        role is a named style used by Django
 
        fg is a background color.
 
        bg is a background color.
 
        option is a display options.
 

	
 
    Specifying a named palette is the same as manually specifying the individual
 
    definitions for each role. Any individual definitions following the palette
 
    definition will augment the base palette definition.
 

	
 
    Valid roles:
 
        'error', 'notice', 'sql_field', 'sql_coltype', 'sql_keyword', 'sql_table',
 
        'http_info', 'http_success', 'http_redirect', 'http_bad_request',
 
        'http_not_found', 'http_server_error'
 

	
 
    Valid colors:
 
        'black', 'red', 'green', 'yellow', 'blue', 'magenta', 'cyan', 'white'
 

	
 
    Valid options:
 
        'bold', 'underscore', 'blink', 'reverse', 'conceal'
 

	
 
    """
 
    if not config_string:
 
        return PALETTES[DEFAULT_PALETTE]
 

	
 
    # Split the color configuration into parts
 
    parts = config_string.lower().split(';')
 
    palette = PALETTES[NOCOLOR_PALETTE].copy()
 
    for part in parts:
 
        if part in PALETTES:
 
            # A default palette has been specified
 
            palette.update(PALETTES[part])
 
        elif '=' in part:
 
            # Process a palette defining string
 
            definition = {}
 

	
 
            # Break the definition into the role,
 
            # plus the list of specific instructions.
 
            # The role must be in upper case
 
            role, instructions = part.split('=')
 
            role = role.upper()
 

	
 
            styles = instructions.split(',')
 
            styles.reverse()
 

	
 
            # The first instruction can contain a slash
 
            # to break apart fg/bg.
 
            colors = styles.pop().split('/')
 
            colors.reverse()
 
            fg = colors.pop()
 
            if fg in color_names:
 
                definition['fg'] = fg
 
            if colors and colors[-1] in color_names:
 
                definition['bg'] = colors[-1]
 

	
 
            # All remaining instructions are options
 
            opts = tuple(s for s in styles if s in opt_dict)
 
            if opts:
 
                definition['opts'] = opts
 

	
 
            # The nocolor palette has all available roles.
 
            # Use that palette as the basis for determining
 
            # if the role is valid.
 
            if role in PALETTES[NOCOLOR_PALETTE] and definition:
 
                palette[role] = definition
 

	
 
    # If there are no colors specified, return the empty palette.
 
    if palette == PALETTES[NOCOLOR_PALETTE]:
 
        return None
 
    return palette
kallithea/model/db.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.db
 
~~~~~~~~~~~~~~~~~~
 

	
 
Database Models for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 08, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import base64
 
import collections
 
import datetime
 
import functools
 
import hashlib
 
import logging
 
import os
 
import time
 
import traceback
 

	
 
import ipaddr
 
import sqlalchemy
 
from beaker.cache import cache_region, region_invalidate
 
from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, LargeBinary, String, Unicode, UnicodeText, UniqueConstraint
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import class_mapper, joinedload, relationship, validates
 
from tg.i18n import lazy_ugettext as _
 
from webob.exc import HTTPNotFound
 

	
 
import kallithea
 
from kallithea.lib import ext_json
 
from kallithea.lib.caching_query import FromCache
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.utils2 import (
 
    Optional, ascii_bytes, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int, safe_str, safe_unicode, str2bool, urlreadable)
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.model.meta import Base, Session
 

	
 

	
 
URL_SEP = '/'
 
log = logging.getLogger(__name__)
 

	
 
#==============================================================================
 
# BASE CLASSES
 
#==============================================================================
 

	
 
def _hash_key(k):
 
    return hashlib.md5(safe_bytes(k)).hexdigest()
 

	
 

	
 
class BaseDbModel(object):
 
    """
 
    Base Model for all classes
 
    """
 

	
 
    @classmethod
 
    def _get_keys(cls):
 
        """return column names for this model """
 
        # Note: not a normal dict - iterator gives "users.firstname", but keys gives "firstname"
 
        return class_mapper(cls).c.keys()
 

	
 
    def get_dict(self):
 
        """
 
        return dict with keys and values corresponding
 
        to this model data """
 

	
 
        d = {}
 
        for k in self._get_keys():
 
            d[k] = getattr(self, k)
 

	
 
        # also use __json__() if present to get additional fields
 
        _json_attr = getattr(self, '__json__', None)
 
        if _json_attr:
 
            # update with attributes from __json__
 
            if callable(_json_attr):
 
                _json_attr = _json_attr()
 
            for k, val in _json_attr.iteritems():
 
            for k, val in _json_attr.items():
 
                d[k] = val
 
        return d
 

	
 
    def get_appstruct(self):
 
        """return list with keys and values tuples corresponding
 
        to this model data """
 

	
 
        return [
 
            (k, getattr(self, k))
 
            for k in self._get_keys()
 
        ]
 

	
 
    def populate_obj(self, populate_dict):
 
        """populate model with data from given populate_dict"""
 

	
 
        for k in self._get_keys():
 
            if k in populate_dict:
 
                setattr(self, k, populate_dict[k])
 

	
 
    @classmethod
 
    def query(cls):
 
        return Session().query(cls)
 

	
 
    @classmethod
 
    def get(cls, id_):
 
        if id_:
 
            return cls.query().get(id_)
 

	
 
    @classmethod
 
    def guess_instance(cls, value, callback=None):
 
        """Haphazardly attempt to convert `value` to a `cls` instance.
 

	
 
        If `value` is None or already a `cls` instance, return it. If `value`
 
        is a number (or looks like one if you squint just right), assume it's
 
        a database primary key and let SQLAlchemy sort things out. Otherwise,
 
        fall back to resolving it using `callback` (if specified); this could
 
        e.g. be a function that looks up instances by name (though that won't
 
        work if the name begins with a digit). Otherwise, raise Exception.
 
        """
 

	
 
        if value is None:
 
            return None
 
        if isinstance(value, cls):
 
            return value
 
        if isinstance(value, int):
 
            return cls.get(value)
 
        if isinstance(value, basestring) and value.isdigit():
 
            return cls.get(int(value))
 
        if callback is not None:
 
            return callback(value)
 

	
 
        raise Exception(
 
            'given object must be int, long or Instance of %s '
 
            'got %s, no callback provided' % (cls, type(value))
 
        )
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        try:
 
            id_ = int(id_)
 
        except (TypeError, ValueError):
 
            raise HTTPNotFound
 

	
 
        res = cls.query().get(id_)
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def delete(cls, id_):
 
        obj = cls.query().get(id_)
 
        Session().delete(obj)
 

	
 
    def __repr__(self):
 
        return '<DB:%s>' % (self.__class__.__name__)
 

	
 

	
 
_table_args_default_dict = {'extend_existing': True,
 
                            'mysql_engine': 'InnoDB',
 
                            'mysql_charset': 'utf8',
 
                            'sqlite_autoincrement': True,
 
                           }
 

	
 
class Setting(Base, BaseDbModel):
 
    __tablename__ = 'settings'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    SETTINGS_TYPES = {
 
        'str': safe_bytes,
 
        'int': safe_int,
 
        'unicode': safe_unicode,
 
        'bool': str2bool,
 
        'list': functools.partial(aslist, sep=',')
 
    }
 
    DEFAULT_UPDATE_URL = ''
 

	
 
    app_settings_id = Column(Integer(), primary_key=True)
 
    app_settings_name = Column(String(255), nullable=False, unique=True)
 
    _app_settings_value = Column("app_settings_value", Unicode(4096), nullable=False)
 
    _app_settings_type = Column("app_settings_type", String(255), nullable=True) # FIXME: not nullable?
 

	
 
    def __init__(self, key='', val='', type='unicode'):
 
        self.app_settings_name = key
 
        self.app_settings_value = val
 
        self.app_settings_type = type
 

	
 
    @validates('_app_settings_value')
 
    def validate_settings_value(self, key, val):
 
        assert isinstance(val, unicode)
 
        return val
 

	
 
    @hybrid_property
 
    def app_settings_value(self):
 
        v = self._app_settings_value
 
        _type = self.app_settings_type
 
        converter = self.SETTINGS_TYPES.get(_type) or self.SETTINGS_TYPES['unicode']
 
        return converter(v)
 

	
 
    @app_settings_value.setter
 
    def app_settings_value(self, val):
 
        """
 
        Setter that will always make sure we use unicode in app_settings_value
 

	
 
        :param val:
 
        """
 
        self._app_settings_value = safe_unicode(val)
 

	
 
    @hybrid_property
 
    def app_settings_type(self):
 
        return self._app_settings_type
 

	
 
    @app_settings_type.setter
 
    def app_settings_type(self, val):
 
        if val not in self.SETTINGS_TYPES:
 
            raise Exception('type must be one of %s got %s'
 
                            % (list(self.SETTINGS_TYPES), val))
 
        self._app_settings_type = val
 

	
 
    def __repr__(self):
 
        return "<%s %s.%s=%r>" % (
 
            self.__class__.__name__,
 
            self.app_settings_name, self.app_settings_type, self.app_settings_value
 
        )
 

	
 
    @classmethod
 
    def get_by_name(cls, key):
 
        return cls.query() \
 
            .filter(cls.app_settings_name == key).scalar()
 

	
 
    @classmethod
 
    def get_by_name_or_create(cls, key, val='', type='unicode'):
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            res = cls(key, val, type)
 
        return res
 

	
 
    @classmethod
 
    def create_or_update(cls, key, val=Optional(''), type=Optional('unicode')):
 
        """
 
        Creates or updates Kallithea setting. If updates are triggered, it will only
 
        update parameters that are explicitly set. Optional instance will be skipped.
 

	
 
        :param key:
 
        :param val:
 
        :param type:
 
        :return:
 
        """
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            val = Optional.extract(val)
 
            type = Optional.extract(type)
 
            res = cls(key, val, type)
 
            Session().add(res)
 
        else:
 
            res.app_settings_name = key
 
            if not isinstance(val, Optional):
 
                # update if set
 
                res.app_settings_value = val
 
            if not isinstance(type, Optional):
 
                # update if set
 
                res.app_settings_type = type
 
        return res
 

	
 
    @classmethod
 
    def get_app_settings(cls, cache=False):
 

	
 
        ret = cls.query()
 

	
 
        if cache:
 
            ret = ret.options(FromCache("sql_cache_short", "get_hg_settings"))
kallithea/model/scm.py
Show inline comments
 
@@ -482,297 +482,297 @@ class ScmModel(object):
 
        for f_path in nodes:
 
            content = nodes[f_path]['content']
 
            f_path = self._sanitize_path(f_path)
 
            f_path = safe_str(f_path)
 
            # decoding here will force that we have proper encoded values
 
            # in any other case this will throw exceptions and deny commit
 
            if isinstance(content, (basestring,)):
 
                content = safe_str(content)
 
            else:
 
                content = content.read()
 
            processed_nodes.append((f_path, content))
 

	
 
        message = safe_unicode(message)
 
        committer = user.full_contact
 
        author = safe_unicode(author) if author else committer
 

	
 
        IMC = self._get_IMC_module(scm_instance.alias)
 
        imc = IMC(scm_instance)
 

	
 
        if not parent_cs:
 
            parent_cs = EmptyChangeset(alias=scm_instance.alias)
 

	
 
        if isinstance(parent_cs, EmptyChangeset):
 
            # EmptyChangeset means we we're editing empty repository
 
            parents = None
 
        else:
 
            parents = [parent_cs]
 
        # add multiple nodes
 
        for path, content in processed_nodes:
 
            imc.add(FileNode(path, content=content))
 

	
 
        tip = imc.commit(message=message,
 
                         author=author,
 
                         parents=parents,
 
                         branch=parent_cs.branch)
 

	
 
        if trigger_push_hook:
 
            self._handle_push(scm_instance,
 
                              username=user.username,
 
                              ip_addr=ip_addr,
 
                              action='push_local',
 
                              repo_name=repo.repo_name,
 
                              revisions=[tip.raw_id])
 
        else:
 
            self.mark_for_invalidation(repo.repo_name)
 
        return tip
 

	
 
    def update_nodes(self, user, ip_addr, repo, message, nodes, parent_cs=None,
 
                     author=None, trigger_push_hook=True):
 
        """
 
        Commits specified nodes to repo. Again.
 
        """
 
        user = User.guess_instance(user)
 
        scm_instance = repo.scm_instance_no_cache()
 

	
 
        message = safe_unicode(message)
 
        committer = user.full_contact
 
        author = safe_unicode(author) if author else committer
 

	
 
        imc_class = self._get_IMC_module(scm_instance.alias)
 
        imc = imc_class(scm_instance)
 

	
 
        if not parent_cs:
 
            parent_cs = EmptyChangeset(alias=scm_instance.alias)
 

	
 
        if isinstance(parent_cs, EmptyChangeset):
 
            # EmptyChangeset means we we're editing empty repository
 
            parents = None
 
        else:
 
            parents = [parent_cs]
 

	
 
        # add multiple nodes
 
        for _filename, data in nodes.items():
 
            # new filename, can be renamed from the old one
 
            filename = self._sanitize_path(data['filename'])
 
            old_filename = self._sanitize_path(_filename)
 
            content = data['content']
 

	
 
            filenode = FileNode(old_filename, content=content)
 
            op = data['op']
 
            if op == 'add':
 
                imc.add(filenode)
 
            elif op == 'del':
 
                imc.remove(filenode)
 
            elif op == 'mod':
 
                if filename != old_filename:
 
                    # TODO: handle renames, needs vcs lib changes
 
                    imc.remove(filenode)
 
                    imc.add(FileNode(filename, content=content))
 
                else:
 
                    imc.change(filenode)
 

	
 
        # commit changes
 
        tip = imc.commit(message=message,
 
                         author=author,
 
                         parents=parents,
 
                         branch=parent_cs.branch)
 

	
 
        if trigger_push_hook:
 
            self._handle_push(scm_instance,
 
                              username=user.username,
 
                              ip_addr=ip_addr,
 
                              action='push_local',
 
                              repo_name=repo.repo_name,
 
                              revisions=[tip.raw_id])
 
        else:
 
            self.mark_for_invalidation(repo.repo_name)
 

	
 
    def delete_nodes(self, user, ip_addr, repo, message, nodes, parent_cs=None,
 
                     author=None, trigger_push_hook=True):
 
        """
 
        Deletes specified nodes from repo.
 

	
 
        :param user: Kallithea User object or user_id, the committer
 
        :param repo: Kallithea Repository object
 
        :param message: commit message
 
        :param nodes: mapping {filename:{'content':content},...}
 
        :param parent_cs: parent changeset, can be empty than it's initial commit
 
        :param author: author of commit, cna be different that committer only for git
 
        :param trigger_push_hook: trigger push hooks
 

	
 
        :returns: new committed changeset after deletion
 
        """
 

	
 
        user = User.guess_instance(user)
 
        scm_instance = repo.scm_instance_no_cache()
 

	
 
        processed_nodes = []
 
        for f_path in nodes:
 
            f_path = self._sanitize_path(f_path)
 
            # content can be empty but for compatibility it allows same dicts
 
            # structure as add_nodes
 
            content = nodes[f_path].get('content')
 
            processed_nodes.append((f_path, content))
 

	
 
        message = safe_unicode(message)
 
        committer = user.full_contact
 
        author = safe_unicode(author) if author else committer
 

	
 
        IMC = self._get_IMC_module(scm_instance.alias)
 
        imc = IMC(scm_instance)
 

	
 
        if not parent_cs:
 
            parent_cs = EmptyChangeset(alias=scm_instance.alias)
 

	
 
        if isinstance(parent_cs, EmptyChangeset):
 
            # EmptyChangeset means we we're editing empty repository
 
            parents = None
 
        else:
 
            parents = [parent_cs]
 
        # add multiple nodes
 
        for path, content in processed_nodes:
 
            imc.remove(FileNode(path, content=content))
 

	
 
        tip = imc.commit(message=message,
 
                         author=author,
 
                         parents=parents,
 
                         branch=parent_cs.branch)
 

	
 
        if trigger_push_hook:
 
            self._handle_push(scm_instance,
 
                              username=user.username,
 
                              ip_addr=ip_addr,
 
                              action='push_local',
 
                              repo_name=repo.repo_name,
 
                              revisions=[tip.raw_id])
 
        else:
 
            self.mark_for_invalidation(repo.repo_name)
 
        return tip
 

	
 
    def get_unread_journal(self):
 
        return UserLog.query().count()
 

	
 
    def get_repo_landing_revs(self, repo=None):
 
        """
 
        Generates select option with tags branches and bookmarks (for hg only)
 
        grouped by type
 

	
 
        :param repo:
 
        """
 

	
 
        hist_l = []
 
        choices = []
 
        repo = self.__get_repo(repo)
 
        hist_l.append(('rev:tip', _('latest tip')))
 
        choices.append('rev:tip')
 
        if repo is None:
 
            return choices, hist_l
 

	
 
        repo = repo.scm_instance
 

	
 
        branches_group = ([(u'branch:%s' % k, k) for k, v in
 
                           repo.branches.iteritems()], _("Branches"))
 
                           repo.branches.items()], _("Branches"))
 
        hist_l.append(branches_group)
 
        choices.extend([x[0] for x in branches_group[0]])
 

	
 
        if repo.alias == 'hg':
 
            bookmarks_group = ([(u'book:%s' % k, k) for k, v in
 
                                repo.bookmarks.iteritems()], _("Bookmarks"))
 
                                repo.bookmarks.items()], _("Bookmarks"))
 
            hist_l.append(bookmarks_group)
 
            choices.extend([x[0] for x in bookmarks_group[0]])
 

	
 
        tags_group = ([(u'tag:%s' % k, k) for k, v in
 
                       repo.tags.iteritems()], _("Tags"))
 
                       repo.tags.items()], _("Tags"))
 
        hist_l.append(tags_group)
 
        choices.extend([x[0] for x in tags_group[0]])
 

	
 
        return choices, hist_l
 

	
 
    def _get_git_hook_interpreter(self):
 
        """Return a suitable interpreter for Git hooks.
 

	
 
        Return a suitable string to be written in the POSIX #! shebang line for
 
        Git hook scripts so they invoke Kallithea code with the right Python
 
        interpreter and in the right environment.
 
        """
 
        # Note: sys.executable might not point at a usable Python interpreter. For
 
        # example, when using uwsgi, it will point at the uwsgi program itself.
 
        # FIXME This may not work on Windows and may need a shell wrapper script.
 
        return (kallithea.CONFIG.get('git_hook_interpreter')
 
                or sys.executable
 
                or '/usr/bin/env python3')
 

	
 
    def install_git_hooks(self, repo, force_create=False):
 
        """
 
        Creates a kallithea hook inside a git repository
 

	
 
        :param repo: Instance of VCS repo
 
        :param force_create: Create even if same name hook exists
 
        """
 

	
 
        loc = os.path.join(repo.path, 'hooks')
 
        if not repo.bare:
 
            loc = os.path.join(repo.path, '.git', 'hooks')
 
        if not os.path.isdir(loc):
 
            os.makedirs(loc)
 

	
 
        tmpl_post = b"#!%s\n" % safe_bytes(self._get_git_hook_interpreter())
 
        tmpl_post += pkg_resources.resource_string(
 
            'kallithea', os.path.join('config', 'post_receive_tmpl.py')
 
        )
 
        tmpl_pre = b"#!%s\n" % safe_bytes(self._get_git_hook_interpreter())
 
        tmpl_pre += pkg_resources.resource_string(
 
            'kallithea', os.path.join('config', 'pre_receive_tmpl.py')
 
        )
 

	
 
        for h_type, tmpl in [('pre', tmpl_pre), ('post', tmpl_post)]:
 
            _hook_file = os.path.join(loc, '%s-receive' % h_type)
 
            has_hook = False
 
            log.debug('Installing git hook in repo %s', repo)
 
            if os.path.exists(_hook_file):
 
                # let's take a look at this hook, maybe it's kallithea ?
 
                log.debug('hook exists, checking if it is from kallithea')
 
                with open(_hook_file, 'rb') as f:
 
                    data = f.read()
 
                    matches = re.search(br'^KALLITHEA_HOOK_VER\s*=\s*(.*)$', data, flags=re.MULTILINE)
 
                    if matches:
 
                        try:
 
                            ver = matches.groups()[0]
 
                            log.debug('Found Kallithea hook - it has KALLITHEA_HOOK_VER %r', ver)
 
                            has_hook = True
 
                        except Exception:
 
                            log.error(traceback.format_exc())
 
            else:
 
                # there is no hook in this dir, so we want to create one
 
                has_hook = True
 

	
 
            if has_hook or force_create:
 
                log.debug('writing %s hook file !', h_type)
 
                try:
 
                    with open(_hook_file, 'wb') as f:
 
                        tmpl = tmpl.replace(b'_TMPL_', safe_bytes(kallithea.__version__))
 
                        f.write(tmpl)
 
                    os.chmod(_hook_file, 0o755)
 
                except IOError as e:
 
                    log.error('error writing %s: %s', _hook_file, e)
 
            else:
 
                log.debug('skipping writing hook file')
 

	
 

	
 
def AvailableRepoGroupChoices(top_perms, repo_group_perm_level, extras=()):
 
    """Return group_id,string tuples with choices for all the repo groups where
 
    the user has the necessary permissions.
 

	
 
    Top level is -1.
 
    """
 
    groups = RepoGroup.query().all()
 
    if HasPermissionAny('hg.admin')('available repo groups'):
 
        groups.append(None)
 
    else:
 
        groups = list(RepoGroupList(groups, perm_level=repo_group_perm_level))
 
        if top_perms and HasPermissionAny(*top_perms)('available repo groups'):
 
            groups.append(None)
 
        for extra in extras:
 
            if not any(rg == extra for rg in groups):
 
                groups.append(extra)
 
    return RepoGroup.groups_choices(groups=groups)
kallithea/model/validators.py
Show inline comments
 
@@ -355,405 +355,405 @@ def ValidRepoName(edit=False, old_data=N
 
            rename = old_data.get('repo_name') != repo_name_full
 
            create = not edit
 
            if rename or create:
 
                repo = Repository.get_by_repo_name(repo_name_full, case_insensitive=True)
 
                repo_group = RepoGroup.get_by_group_name(repo_name_full, case_insensitive=True)
 
                if group_path != '':
 
                    if repo is not None:
 
                        msg = self.message('repository_in_group_exists', state,
 
                                repo=repo.repo_name, group=group_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 
                elif repo_group is not None:
 
                    msg = self.message('same_group_exists', state,
 
                            repo=repo_name)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_name=msg)
 
                    )
 
                elif repo is not None:
 
                    msg = self.message('repository_exists', state,
 
                            repo=repo.repo_name)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_name=msg)
 
                    )
 
            return value
 
    return _validator
 

	
 

	
 
def ValidForkName(*args, **kwargs):
 
    return ValidRepoName(*args, **kwargs)
 

	
 

	
 
def SlugifyName():
 
    class _validator(formencode.validators.FancyValidator):
 

	
 
        def _convert_to_python(self, value, state):
 
            return repo_name_slug(value)
 

	
 
        def _validate_python(self, value, state):
 
            pass
 

	
 
    return _validator
 

	
 

	
 
def ValidCloneUri():
 
    from kallithea.lib.utils import make_ui
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'clone_uri': _('Invalid repository URL'),
 
            'invalid_clone_uri': _('Invalid repository URL. It must be a '
 
                                   'valid http, https, ssh, svn+http or svn+https URL'),
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            repo_type = value.get('repo_type')
 
            url = value.get('clone_uri')
 

	
 
            if url and url != value.get('clone_uri_hidden'):
 
                try:
 
                    is_valid_repo_uri(repo_type, url, make_ui())
 
                except Exception:
 
                    log.exception('URL validation failed')
 
                    msg = self.message('clone_uri', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(clone_uri=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidForkType(old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_fork_type': _('Fork has to be the same type as parent')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            if old_data['repo_type'] != value:
 
                msg = self.message('invalid_fork_type', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_type=msg)
 
                )
 
    return _validator
 

	
 

	
 
def CanWriteGroup(old_data=None):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _("You don't have permissions "
 
                                   "to create repository in this group"),
 
            'permission_denied_root': _("no permission to create repository "
 
                                        "in root location")
 
        }
 

	
 
        def _convert_to_python(self, value, state):
 
            # root location
 
            if value == -1:
 
                return None
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            # create repositories with write permission on group is set to true
 
            create_on_write = HasPermissionAny('hg.create.write_on_repogroup.true')()
 
            group_admin = HasRepoGroupPermissionLevel('admin')(gr_name,
 
                                            'can write into group validator')
 
            group_write = HasRepoGroupPermissionLevel('write')(gr_name,
 
                                            'can write into group validator')
 
            forbidden = not (group_admin or (group_write and create_on_write))
 
            can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository')
 
            gid = (old_data['repo_group'].get('group_id')
 
                   if (old_data and 'repo_group' in old_data) else None)
 
            value_changed = gid != value
 
            new = not old_data
 
            # do check if we changed the value, there's a case that someone got
 
            # revoked write permissions to a repository, he still created, we
 
            # don't need to check permission if he didn't change the value of
 
            # groups in form box
 
            if value_changed or new:
 
                # parent group need to be existing
 
                if gr and forbidden:
 
                    msg = self.message('permission_denied', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_type=msg)
 
                    )
 
                ## check if we can write to root location !
 
                elif gr is None and not can_create_repos():
 
                    msg = self.message('permission_denied_root', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_type=msg)
 
                    )
 

	
 
    return _validator
 

	
 

	
 
def CanCreateGroup(can_create_in_root=False):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _("You don't have permissions "
 
                                   "to create a group in this location")
 
        }
 

	
 
        def to_python(self, value, state):
 
            # root location
 
            if value == -1:
 
                return None
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            if can_create_in_root and gr is None:
 
                # we can create in root, we're fine no validations required
 
                return
 

	
 
            forbidden_in_root = gr is None and not can_create_in_root
 
            forbidden = not HasRepoGroupPermissionLevel('admin')(gr_name, 'can create group validator')
 
            if forbidden_in_root or forbidden:
 
                msg = self.message('permission_denied', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(parent_group_id=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidPerms(type_='repo'):
 
    if type_ == 'repo_group':
 
        EMPTY_PERM = 'group.none'
 
    elif type_ == 'repo':
 
        EMPTY_PERM = 'repository.none'
 
    elif type_ == 'user_group':
 
        EMPTY_PERM = 'usergroup.none'
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'perm_new_member_name':
 
                _('This username or user group name is not valid')
 
        }
 

	
 
        def to_python(self, value, state):
 
            perms_update = OrderedSet()
 
            perms_new = OrderedSet()
 
            # build a list of permission to update and new permission to create
 

	
 
            # CLEAN OUT ORG VALUE FROM NEW MEMBERS, and group them using
 
            new_perms_group = defaultdict(dict)
 
            for k, v in value.copy().iteritems():
 
            for k, v in value.copy().items():
 
                if k.startswith('perm_new_member'):
 
                    del value[k]
 
                    _type, part = k.split('perm_new_member_')
 
                    args = part.split('_')
 
                    if len(args) == 1:
 
                        new_perms_group[args[0]]['perm'] = v
 
                    elif len(args) == 2:
 
                        _key, pos = args
 
                        new_perms_group[pos][_key] = v
 

	
 
            # fill new permissions in order of how they were added
 
            for k in sorted(new_perms_group, key=lambda k: int(k)):
 
                perm_dict = new_perms_group[k]
 
                new_member = perm_dict.get('name')
 
                new_perm = perm_dict.get('perm')
 
                new_type = perm_dict.get('type')
 
                if new_member and new_perm and new_type:
 
                    perms_new.add((new_member, new_perm, new_type))
 

	
 
            for k, v in value.iteritems():
 
            for k, v in value.items():
 
                if k.startswith('u_perm_') or k.startswith('g_perm_'):
 
                    member = k[7:]
 
                    t = {'u': 'user',
 
                         'g': 'users_group'
 
                    }[k[0]]
 
                    if member == User.DEFAULT_USER:
 
                        if str2bool(value.get('repo_private')):
 
                            # set none for default when updating to
 
                            # private repo protects against form manipulation
 
                            v = EMPTY_PERM
 
                    perms_update.add((member, v, t))
 

	
 
            value['perms_updates'] = list(perms_update)
 
            value['perms_new'] = list(perms_new)
 

	
 
            # update permissions
 
            for k, v, t in perms_new:
 
                try:
 
                    if t == 'user':
 
                        self.user_db = User.query() \
 
                            .filter(User.active == True) \
 
                            .filter(User.username == k).one()
 
                    if t == 'users_group':
 
                        self.user_db = UserGroup.query() \
 
                            .filter(UserGroup.users_group_active == True) \
 
                            .filter(UserGroup.users_group_name == k).one()
 

	
 
                except Exception:
 
                    log.exception('Updated permission failed')
 
                    msg = self.message('perm_new_member_type', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(perm_new_member_name=msg)
 
                    )
 
            return value
 
    return _validator
 

	
 

	
 
def ValidSettings():
 
    class _validator(formencode.validators.FancyValidator):
 
        def _convert_to_python(self, value, state):
 
            # settings  form for users that are not admin
 
            # can't edit certain parameters, it's extra backup if they mangle
 
            # with forms
 

	
 
            forbidden_params = [
 
                'user', 'repo_type',
 
                'repo_enable_downloads', 'repo_enable_statistics'
 
            ]
 

	
 
            for param in forbidden_params:
 
                if param in value:
 
                    del value[param]
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            pass
 
    return _validator
 

	
 

	
 
def ValidPath():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_path': _('This is not a valid path')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            if not os.path.isdir(value):
 
                msg = self.message('invalid_path', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(paths_root_path=msg)
 
                )
 
    return _validator
 

	
 

	
 
def UniqSystemEmail(old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'email_taken': _('This email address is already in use')
 
        }
 

	
 
        def _convert_to_python(self, value, state):
 
            return value.lower()
 

	
 
        def _validate_python(self, value, state):
 
            if (old_data.get('email') or '').lower() != value:
 
                user = User.get_by_email(value)
 
                if user is not None:
 
                    msg = self.message('email_taken', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(email=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidSystemEmail():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'non_existing_email': _('Email address "%(email)s" not found')
 
        }
 

	
 
        def _convert_to_python(self, value, state):
 
            return value.lower()
 

	
 
        def _validate_python(self, value, state):
 
            user = User.get_by_email(value)
 
            if user is None:
 
                msg = self.message('non_existing_email', state, email=value)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(email=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def LdapLibValidator():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 

	
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            try:
 
                import ldap
 
                ldap  # pyflakes silence !
 
            except ImportError:
 
                raise LdapImportError()
 

	
 
    return _validator
 

	
 

	
 
def AttrLoginValidator():
 
    class _validator(formencode.validators.UnicodeString):
 
        messages = {
 
            'invalid_cn':
 
                  _('The LDAP Login attribute of the CN must be specified - '
 
                    'this is the name of the attribute that is equivalent '
 
                    'to "username"')
 
        }
 
        messages['empty'] = messages['invalid_cn']
 

	
 
    return _validator
 

	
 

	
 
def ValidIp():
 
    class _validator(CIDR):
 
        messages = dict(
 
            badFormat=_('Please enter a valid IPv4 or IPv6 address'),
 
            illegalBits=_('The network size (bits) must be within the range'
 
                ' of 0-32 (not %(bits)r)')
 
        )
 

	
 
        def to_python(self, value, state):
 
            v = super(_validator, self).to_python(value, state)
 
            v = v.strip()
 
            net = ipaddr.IPNetwork(address=v)
 
            if isinstance(net, ipaddr.IPv4Network):
 
                # if IPv4 doesn't end with a mask, add /32
 
                if '/' not in value:
 
                    v += '/32'
 
            if isinstance(net, ipaddr.IPv6Network):
 
                # if IPv6 doesn't end with a mask, add /128
 
                if '/' not in value:
 
                    v += '/128'
 
            return v
 

	
 
        def _validate_python(self, value, state):
 
            try:
 
                addr = value.strip()
 
                # this raises an ValueError if address is not IPv4 or IPv6
 
                ipaddr.IPNetwork(address=addr)
 
            except ValueError:
 
                raise formencode.Invalid(self.message('badFormat', state),
 
                                         value, state)
 

	
 
    return _validator
 

	
 

	
 
def FieldKey():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = dict(
 
            badFormat=_('Key name can only consist of letters, '
 
                        'underscore, dash or numbers')
 
        )
 

	
 
        def _validate_python(self, value, state):
 
            if not re.match('[a-zA-Z0-9_-]+$', value):
 
                raise formencode.Invalid(self.message('badFormat', state),
 
                                         value, state)
 
    return _validator
 

	
kallithea/templates/changeset/changeset_file_comment.html
Show inline comments
 
## -*- coding: utf-8 -*-
 
## usage:
 
## <%namespace name="comment" file="/changeset/changeset_file_comment.html"/>
 
## ${comment.comment_block(co)}
 
##
 
<%def name="comment_block(co)">
 
  <div class="comment" id="comment-${co.comment_id}">
 
    <div class="comment-prev-next-links"></div>
 
    <div class="panel panel-default">
 
      <div class="panel-heading">
 
          ${h.gravatar_div(co.author.email, size=20)}
 
          <span class="user">
 
              ${co.author.full_name_and_username}
 
          </span>
 

	
 
          <span data-toggle="tooltip" title="${h.fmt_date(co.modified_at)}">
 
              ${h.age(co.modified_at)}
 
          </span>
 
              %if co.pull_request:
 
                <a href="${co.url()}">${_("comment")}</a>
 
                ${_('on pull request')}
 
                <a href="${co.pull_request.url()}">"${co.pull_request.title or _("No title")}"</a>
 
              %else:
 
                ${_('on this changeset')}
 
              %endif
 
              <a class="permalink" href="${co.url()}">&para;</a>
 

	
 
          %if co.author_id == request.authuser.user_id or h.HasRepoPermissionLevel('admin')(c.repo_name):
 
            %if co.deletable():
 
              <button type="button" onClick="confirm('${_('Delete comment?')}') && deleteComment(${co.comment_id})" class="pull-right buttons delete-comment btn btn-default btn-xs">${_('Delete')}</button>
 
            %endif
 
          %endif
 
      </div>
 
      <div class="panel-body">
 
        %if co.status_change:
 
           <div class="automatic-comment">
 
             <p>
 
               ${_("Status change")}: <span class="comment-status-label">${co.status_change[0].status_lbl}</span>
 
               <i class="icon-circle changeset-status-${co.status_change[0].status}"></i>
 
             </p>
 
           </div>
 
        %endif
 
        <div class="comment-text">
 
        %if co.text:
 
          ${h.render_w_mentions(co.text, c.repo_name)|n}
 
        %endif
 
        </div>
 
      </div>
 
    </div>
 
  </div>
 
</%def>
 

	
 

	
 
<%def name="comment_inline_form()">
 
<div id='comment-inline-form-template' style="display: none;">
 
  <div class="comment comment-preview submitting" style="display: none;">
 
    <div class="panel panel-default">
 
      <div class="panel-heading">
 
          ${h.gravatar_div(request.authuser.email, size=20)}
 
          <span class="user">
 
              ${request.authuser.full_name_or_username}
 
          </span>
 

	
 
          <span class="comment-submission-status">
 
              ${_('Submitting ...')}
 
          </span>
 
      </div>
 
      <div class="panel-body">
 
           <div class="automatic-comment" style="display: none;">
 
             <p>
 
               ${_("Status change")}: <span class="comment-status-label"></span>
 
               <i class="icon-circle"></i>
 
             </p>
 
           </div>
 
           <div class="comment-text">
 
             <div class="formatted-fixed">
 
             </div>
 
           </div>
 
      </div>
 
    </div>
 
  </div>
 
  <div class="ac">
 
  %if request.authuser.username != 'default':
 
    ${h.form('#', class_='inline-form')}
 
      <div class="well well-sm clearfix comment-inline-well">
 
        <div class="comment-help">
 
          <span class="text-muted">${_('Comments are in plain text. Use @username to notify another user.')|n}</span>
 
        </div>
 
        <textarea name="text" class="form-control"></textarea>
 

	
 
        <div id="status_block_container" class="status-block general-only hidden">
 
                %if c.pull_request is None:
 
                  ${_('Set changeset status')}:
 
                %else:
 
                  ${_('Vote for pull request status')}:
 
                %endif
 
                <span class="general-only cs-only">
 
                </span>
 
                <label class="radio-inline">
 
                    <input type="radio" class="status_change_radio" name="changeset_status" id="changeset_status_unchanged" value="" checked="checked" />
 
                    ${_('No change')}
 
                </label>
 
                %for status, lbl in c.changeset_statuses:
 
                    <label class="radio-inline">
 
                        <input type="radio" class="status_change_radio" name="changeset_status" id="${status}" value="${status}">
 
                        ${lbl}<i class="icon-circle changeset-status-${status}"></i>
 
                    </label>
 
                %endfor
 

	
 
                %if c.pull_request is not None and ( \
 
                    h.HasPermissionAny('hg.admin')() or h.HasRepoPermissionLevel('admin')(c.repo_name) \
 
                    or c.pull_request.owner_id == request.authuser.user_id):
 
                <div>
 
                  ${_('Finish pull request')}:
 
                  <label class="checkbox-inline">
 
                    <input id="save_close" type="checkbox" name="save_close" class="status_change_checkbox">
 
                    ${_("Close")}
 
                  </label>
 
                  <label class="checkbox-inline">
 
                    <input id="save_delete" type="checkbox" name="save_delete" value="delete" class="status_change_checkbox">
 
                    ${_("Delete")}
 
                  </label>
 
                </div>
 
                %endif
 
        </div>
 

	
 
      </div>
 
      <div class="comment-button">
 
        ${h.submit('save', _('Comment'), class_='btn btn-default btn-sm save-inline-form')}
 
        ${h.reset('hide-inline-form', _('Cancel'), class_='btn btn-default btn-sm hide-inline-form')}
 
      </div>
 
    ${h.end_form()}
 
  %else:
 
      ${h.form('')}
 
      <div class="clearfix">
 
          <div class="comment-help">
 
            ${_('You need to be logged in to comment.')} <a href="${h.url('login_home', came_from=request.path_qs)}">${_('Login now')}</a>
 
          </div>
 
      </div>
 
      <div class="comment-button">
 
      ${h.reset('hide-inline-form', _('Hide'), class_='btn btn-default btn-sm hide-inline-form')}
 
      </div>
 
      ${h.end_form()}
 
  %endif
 
  </div>
 
</div>
 
</%def>
 

	
 

	
 
## show comment count as "x comments (y inline, z general)"
 
<%def name="comment_count(inline_cnt, general_cnt)">
 
    ${'%s (%s, %s)' % (
 
        ungettext("%d comment", "%d comments", inline_cnt + general_cnt) % (inline_cnt + general_cnt),
 
        ungettext("%d inline", "%d inline", inline_cnt) % inline_cnt,
 
        ungettext("%d general", "%d general", general_cnt) % general_cnt
 
    )}
 
    <span class="firstlink"></span>
 
</%def>
 

	
 

	
 
## generate inline comments and the main ones
 
<%def name="generate_comments()">
 
## original location of comments ... but the ones outside diff context remains here
 
<div class="comments inline-comments">
 
  %for f_path, lines in c.inline_comments:
 
    %for line_no, comments in lines.iteritems():
 
    %for line_no, comments in lines.items():
 
      <div class="comments-list-chunk" data-f_path="${f_path}" data-line_no="${line_no}" data-target-id="${h.safeid(h.safe_unicode(f_path))}_${line_no}">
 
        %for co in comments:
 
            ${comment_block(co)}
 
        %endfor
 
      </div>
 
    %endfor
 
  %endfor
 

	
 
      <div class="comments-list-chunk" data-f_path="" data-line_no="" data-target-id="general-comments">
 
        %for co in c.comments:
 
            ${comment_block(co)}
 
        %endfor
 
      </div>
 
</div>
 
<div class="comments-number">
 
    ${comment_count(c.inline_cnt, len(c.comments))}
 
</div>
 
</%def>
 

	
 
## MAIN COMMENT FORM
 
<%def name="comments(change_status=True)">
 
<div class="inline-comments inline-comments-general
 
            ${'show-general-status' if change_status else ''}">
 
  <div id="comments-general-comments" class="">
 
  ## comment_div for general comments
 
  </div>
 
</div>
 

	
 
<script>
 

	
 
$(document).ready(function () {
 

	
 
   $(window).on('beforeunload', function(){
 
      var $textareas = $('.comment-inline-form textarea[name=text]');
 
      if($textareas.length > 1 ||
 
         $textareas.val()) {
 
         // this message will not be displayed on all browsers
 
         // (e.g. some versions of Firefox), but the user will still be warned
 
         return 'There are uncommitted comments.';
 
      }
 
   });
 

	
 
});
 
</script>
 
</%def>
kallithea/tests/functional/test_admin.py
Show inline comments
 
import csv
 
import datetime
 
import os
 
from os.path import dirname
 

	
 
from kallithea.lib.utils2 import safe_unicode
 
from kallithea.model.db import UserLog
 
from kallithea.model.meta import Session
 
from kallithea.tests import base
 

	
 

	
 
FIXTURES = os.path.join(dirname(dirname(os.path.abspath(__file__))), 'fixtures')
 

	
 

	
 
class TestAdminController(base.TestController):
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        UserLog.query().delete()
 
        Session().commit()
 

	
 
        def strptime(val):
 
            fmt = '%Y-%m-%d %H:%M:%S'
 
            if '.' not in val:
 
                return datetime.datetime.strptime(val, fmt)
 

	
 
            nofrag, frag = val.split(".")
 
            date = datetime.datetime.strptime(nofrag, fmt)
 

	
 
            frag = frag[:6]  # truncate to microseconds
 
            frag += (6 - len(frag)) * '0'  # add 0s
 
            return date.replace(microsecond=int(frag))
 

	
 
        with open(os.path.join(FIXTURES, 'journal_dump.csv')) as f:
 
            for row in csv.DictReader(f):
 
                ul = UserLog()
 
                for k, v in row.iteritems():
 
                for k, v in row.items():
 
                    v = safe_unicode(v)
 
                    if k == 'action_date':
 
                        v = strptime(v)
 
                    if k in ['user_id', 'repository_id']:
 
                        # nullable due to FK problems
 
                        v = None
 
                    setattr(ul, k, v)
 
                Session().add(ul)
 
            Session().commit()
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        UserLog.query().delete()
 
        Session().commit()
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index'))
 
        response.mustcontain('Admin Journal')
 

	
 
    def test_filter_all_entries(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',))
 
        response.mustcontain(' 2036 Entries')
 

	
 
    def test_filter_journal_filter_exact_match_on_repository(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='repository:xxx'))
 
        response.mustcontain(' 3 Entries')
 

	
 
    def test_filter_journal_filter_exact_match_on_repository_CamelCase(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='repository:XxX'))
 
        response.mustcontain(' 3 Entries')
 

	
 
    def test_filter_journal_filter_wildcard_on_repository(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='repository:*test*'))
 
        response.mustcontain(' 862 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_repository(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='repository:test*'))
 
        response.mustcontain(' 257 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_repository_CamelCase(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='repository:Test*'))
 
        response.mustcontain(' 257 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_repository_and_user(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='repository:test* AND username:demo'))
 
        response.mustcontain(' 130 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_repository_or_other_repo(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='repository:test* OR repository:xxx'))
 
        response.mustcontain(' 260 Entries')  # 257 + 3
 

	
 
    def test_filter_journal_filter_exact_match_on_username(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='username:demo'))
 
        response.mustcontain(' 1087 Entries')
 

	
 
    def test_filter_journal_filter_exact_match_on_username_camelCase(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='username:DemO'))
 
        response.mustcontain(' 1087 Entries')
 

	
 
    def test_filter_journal_filter_wildcard_on_username(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='username:*test*'))
 
        response.mustcontain(' 100 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_username(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='username:demo*'))
 
        response.mustcontain(' 1101 Entries')
 

	
 
    def test_filter_journal_filter_prefix_on_user_or_other_user(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='username:demo OR username:volcan'))
 
        response.mustcontain(' 1095 Entries')  # 1087 + 8
 

	
 
    def test_filter_journal_filter_wildcard_on_action(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='action:*pull_request*'))
 
        response.mustcontain(' 187 Entries')
 

	
 
    def test_filter_journal_filter_on_date(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='date:20121010'))
 
        response.mustcontain(' 47 Entries')
 

	
 
    def test_filter_journal_filter_on_date_2(self):
 
        self.log_user()
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter='date:20121020'))
 
        response.mustcontain(' 17 Entries')
 

	
 
    @base.parametrize('filter,hit', [
 
        #### "repository:" filtering
 
        # "/" is used for grouping
 
        ('repository:group/test', 4),
 
        # "-" is often used for "-fork"
 
        ('repository:fork-test1', 5),
 
        # using "stop words"
 
        ('repository:this', 1),
 
        ('repository:this/is-it', 1),
 

	
 
        ## additional tests to quickly find out regression in the future
 
        ## (and check case-insensitive search, too)
 
        # non-ascii character "." and "-"
 
        ('repository:TESTIES1.2.3', 4),
 
        ('repository:test_git_repo', 2),
 
        # combination with wildcard "*"
 
        ('repository:GROUP/*', 182),
 
        ('repository:*/test', 7),
 
        ('repository:fork-*', 273),
 
        ('repository:*-Test1', 5),
 

	
 
        #### "username:" filtering
 
        # "-" is valid character
 
        ('username:peso-xxx', 4),
 
        # using "stop words"
 
        ('username:this-is-it', 2),
 

	
 
        ## additional tests to quickly find out regression in the future
 
        ## (and check case-insensitive search, too)
 
        # non-ascii character "." and "-"
 
        ('username:ADMIN_xanroot', 6),
 
        ('username:robert.Zaremba', 3),
 
        # combination with wildcard "*"
 
        ('username:THIS-*', 2),
 
        ('username:*-IT', 2),
 
    ])
 
    def test_filter_journal_filter_tokenization(self, filter, hit):
 
        self.log_user()
 

	
 
        response = self.app.get(base.url(controller='admin/admin', action='index',
 
                                    filter=filter))
 
        if hit != 1:
 
            response.mustcontain(' %s Entries' % hit)
 
        else:
 
            response.mustcontain(' 1 Entry')
kallithea/tests/vcs/test_changesets.py
Show inline comments
 
# encoding: utf-8
 

	
 
import datetime
 

	
 
import pytest
 

	
 
from kallithea.lib import vcs
 
from kallithea.lib.vcs.backends.base import BaseChangeset
 
from kallithea.lib.vcs.exceptions import BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError
 
from kallithea.lib.vcs.nodes import AddedFileNodesGenerator, ChangedFileNodesGenerator, FileNode, RemovedFileNodesGenerator
 
from kallithea.tests.vcs.base import _BackendTestMixin
 

	
 

	
 
class TestBaseChangeset(object):
 

	
 
    def test_as_dict(self):
 
        changeset = BaseChangeset()
 
        changeset.raw_id = 'RAW_ID'
 
        changeset.short_id = 'SHORT_ID'
 
        changeset.revision = 1009
 
        changeset.date = datetime.datetime(2011, 1, 30, 1, 45)
 
        changeset.message = 'Message of a commit'
 
        changeset.author = 'Joe Doe <joe.doe@example.com>'
 
        changeset.added = [FileNode('foo/bar/baz'), FileNode(u'foobar'), FileNode(u'blåbærgrød')]
 
        changeset.changed = []
 
        changeset.removed = []
 
        assert changeset.as_dict() == {
 
            'raw_id': 'RAW_ID',
 
            'short_id': 'SHORT_ID',
 
            'revision': 1009,
 
            'date': datetime.datetime(2011, 1, 30, 1, 45),
 
            'message': 'Message of a commit',
 
            'author': {
 
                'name': 'Joe Doe',
 
                'email': 'joe.doe@example.com',
 
            },
 
            'added': ['foo/bar/baz', 'foobar', u'bl\xe5b\xe6rgr\xf8d'],
 
            'changed': [],
 
            'removed': [],
 
        }
 

	
 

	
 
class _ChangesetsWithCommitsTestCaseixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_new_branch(self):
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 
        assert 'foobar' in self.repo.branches
 
        assert foobar_tip.branch == 'foobar'
 
        assert foobar_tip.branches == ['foobar']
 
        # 'foobar' should be the only branch that contains the new commit
 
        branch_tips = list(self.repo.branches.values())
 
        assert branch_tips.count(str(foobar_tip.raw_id)) == 1
 

	
 
    def test_new_head_in_default_branch(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
            parents=[tip],
 
        )
 
        self.imc.change(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\nand more...\n'))
 
        newtip = self.imc.commit(
 
            message=u'At default branch',
 
            author=u'joe',
 
            branch=foobar_tip.branch,
 
            parents=[foobar_tip],
 
        )
 

	
 
        newest_tip = self.imc.commit(
 
            message=u'Merged with %s' % foobar_tip.raw_id,
 
            author=u'joe',
 
            branch=self.backend_class.DEFAULT_BRANCH_NAME,
 
            parents=[newtip, foobar_tip],
 
        )
 

	
 
        assert newest_tip.branch == self.backend_class.DEFAULT_BRANCH_NAME
 
        assert newest_tip.branches == [self.backend_class.DEFAULT_BRANCH_NAME]
 

	
 
    def test_get_changesets_respects_branch_name(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        doc_changeset = self.imc.commit(
 
            message=u'New branch: docs',
 
            author=u'joe',
 
            branch='docs',
 
        )
 
        self.imc.add(vcs.nodes.FileNode('newfile', content=''))
 
        self.imc.commit(
 
            message=u'Back in default branch',
 
            author=u'joe',
 
            parents=[tip],
 
        )
 
        default_branch_changesets = self.repo.get_changesets(
 
            branch_name=self.repo.DEFAULT_BRANCH_NAME)
 
        assert doc_changeset not in default_branch_changesets
 

	
 
    def test_get_changeset_by_branch(self):
 
        for branch, sha in self.repo.branches.iteritems():
 
        for branch, sha in self.repo.branches.items():
 
            assert sha == self.repo.get_changeset(branch).raw_id
 

	
 
    def test_get_changeset_by_tag(self):
 
        for tag, sha in self.repo.tags.iteritems():
 
        for tag, sha in self.repo.tags.items():
 
            assert sha == self.repo.get_changeset(tag).raw_id
 

	
 
    def test_get_changeset_parents(self):
 
        for test_rev in [1, 2, 3]:
 
            sha = self.repo.get_changeset(test_rev-1)
 
            assert [sha] == self.repo.get_changeset(test_rev).parents
 

	
 
    def test_get_changeset_children(self):
 
        for test_rev in [1, 2, 3]:
 
            sha = self.repo.get_changeset(test_rev+1)
 
            assert [sha] == self.repo.get_changeset(test_rev).children
 

	
 

	
 
class _ChangesetsTestCaseMixin(_BackendTestMixin):
 
    recreate_repo_per_test = False
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': u'Commit %d' % x,
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_simple(self):
 
        tip = self.repo.get_changeset()
 
        assert tip.date == datetime.datetime(2010, 1, 3, 20)
 

	
 
    def test_get_changesets_is_ordered_by_date(self):
 
        changesets = list(self.repo.get_changesets())
 
        ordered_by_date = sorted(changesets,
 
            key=lambda cs: cs.date)
 

	
 
        assert changesets == ordered_by_date
 

	
 
    def test_get_changesets_respects_start(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        assert len(changesets) == 4
 

	
 
    def test_get_changesets_numerical_id_respects_start(self):
 
        second_id = 1
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        assert len(changesets) == 4
 

	
 
    def test_get_changesets_includes_start_changeset(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        assert changesets[0].raw_id == second_id
 

	
 
    def test_get_changesets_respects_end(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        assert changesets[-1].raw_id == second_id
 
        assert len(changesets) == 2
 

	
 
    def test_get_changesets_numerical_id_respects_end(self):
 
        second_id = 1
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        assert changesets.index(changesets[-1]) == second_id
 
        assert len(changesets) == 2
 

	
 
    def test_get_changesets_respects_both_start_and_end(self):
 
        second_id = self.repo.revisions[1]
 
        third_id = self.repo.revisions[2]
 
        changesets = list(self.repo.get_changesets(start=second_id,
 
            end=third_id))
 
        assert len(changesets) == 2
 

	
 
    def test_get_changesets_numerical_id_respects_both_start_and_end(self):
 
        changesets = list(self.repo.get_changesets(start=2, end=3))
 
        assert len(changesets) == 2
 

	
 
    def test_get_changesets_on_empty_repo_raises_EmptyRepository_error(self):
 
        repo = self.setup_empty_repo(self.backend_class)
 
        with pytest.raises(EmptyRepositoryError):
 
            list(repo.get_changesets(start='foobar'))
 

	
 
    def test_get_changesets_includes_end_changeset(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        assert changesets[-1].raw_id == second_id
 

	
 
    def test_get_changesets_respects_start_date(self):
 
        start_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(start_date=start_date):
 
            assert cs.date >= start_date
 

	
 
    def test_get_changesets_respects_end_date(self):
 
        start_date = datetime.datetime(2010, 1, 1)
 
        end_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(start_date=start_date,
 
                                           end_date=end_date):
 
            assert cs.date >= start_date
 
            assert cs.date <= end_date
 

	
 
    def test_get_changesets_respects_start_date_and_end_date(self):
 
        end_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(end_date=end_date):
 
            assert cs.date <= end_date
 

	
 
    def test_get_changesets_respects_reverse(self):
 
        changesets_id_list = [cs.raw_id for cs in
 
            self.repo.get_changesets(reverse=True)]
 
        assert changesets_id_list == list(reversed(self.repo.revisions))
 

	
 
    def test_get_filenodes_generator(self):
 
        tip = self.repo.get_changeset()
 
        filepaths = [node.path for node in tip.get_filenodes_generator()]
 
        assert filepaths == ['file_%d.txt' % x for x in xrange(5)]
 

	
 
    def test_size(self):
 
        tip = self.repo.get_changeset()
 
        size = 5 * len('Foobar N') # Size of 5 files
 
        assert tip.size == size
 

	
 
    def test_author(self):
 
        tip = self.repo.get_changeset()
 
        assert tip.author == u'Joe Doe <joe.doe@example.com>'
 

	
 
    def test_author_name(self):
 
        tip = self.repo.get_changeset()
 
        assert tip.author_name == u'Joe Doe'
 

	
 
    def test_author_email(self):
 
        tip = self.repo.get_changeset()
 
        assert tip.author_email == u'joe.doe@example.com'
 

	
 
    def test_get_changesets_raise_changesetdoesnotexist_for_wrong_start(self):
 
        with pytest.raises(ChangesetDoesNotExistError):
 
            list(self.repo.get_changesets(start='foobar'))
 

	
 
    def test_get_changesets_raise_changesetdoesnotexist_for_wrong_end(self):
 
        with pytest.raises(ChangesetDoesNotExistError):
 
            list(self.repo.get_changesets(end='foobar'))
 

	
 
    def test_get_changesets_raise_branchdoesnotexist_for_wrong_branch_name(self):
 
        with pytest.raises(BranchDoesNotExistError):
 
            list(self.repo.get_changesets(branch_name='foobar'))
 

	
 
    def test_get_changesets_raise_repositoryerror_for_wrong_start_end(self):
 
        start = self.repo.revisions[-1]
 
        end = self.repo.revisions[0]
 
        with pytest.raises(RepositoryError):
 
            list(self.repo.get_changesets(start=start, end=end))
 

	
 
    def test_get_changesets_numerical_id_reversed(self):
 
        with pytest.raises(RepositoryError):
 
            [x for x in self.repo.get_changesets(start=3, end=2)]
 

	
 
    def test_get_changesets_numerical_id_respects_both_start_and_end_last(self):
 
        with pytest.raises(RepositoryError):
 
            last = len(self.repo.revisions)
 
            list(self.repo.get_changesets(start=last-1, end=last-2))
 

	
 
    def test_get_changesets_numerical_id_last_zero_error(self):
 
        with pytest.raises(RepositoryError):
 
            last = len(self.repo.revisions)
 
            list(self.repo.get_changesets(start=last-1, end=0))
 

	
 

	
 
class _ChangesetsChangesTestCaseMixin(_BackendTestMixin):
 
    recreate_repo_per_test = False
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': u'Initial',
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foo/bar', content='foo'),
 
                    FileNode('foo/bał', content='foo'),
 
                    FileNode('foobar', content='foo'),
 
                    FileNode('qwe', content='foo'),
 
                ],
 
            },
 
            {
 
                'message': u'Massive changes',
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [FileNode('fallout', content='War never changes')],
 
                'changed': [
 
                    FileNode('foo/bar', content='baz'),
 
                    FileNode('foobar', content='baz'),
 
                ],
kallithea/tests/vcs/test_git.py
Show inline comments
 
@@ -602,245 +602,245 @@ class TestGitChangeset(object):
 
            if node.is_file():
 
                assert isinstance(node.content, bytes)
 

	
 
    def test_wrong_path(self):
 
        # There is 'setup.py' in the root dir but not there:
 
        path = 'foo/bar/setup.py'
 
        tip = self.repo.get_changeset()
 
        with pytest.raises(VCSError):
 
            tip.get_node(path)
 

	
 
    def test_author_email(self):
 
        assert 'marcin@python-blog.com' == self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3').author_email
 
        assert 'lukasz.balcerzak@python-center.pl' == self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b').author_email
 
        assert '' == self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992').author_email
 

	
 
    def test_author_username(self):
 
        assert 'Marcin Kuzminski' == self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3').author_name
 
        assert 'Lukasz Balcerzak' == self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b').author_name
 
        assert 'marcink none@none' == self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992').author_name
 

	
 

	
 
class TestGitSpecificWithRepo(_BackendTestMixin):
 
    backend_alias = 'git'
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': 'Initial',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar/static/js/admin/base.js', content='base'),
 
                    FileNode('foobar/static/admin', content='admin',
 
                        mode=0o120000), # this is a link
 
                    FileNode('foo', content='foo'),
 
                ],
 
            },
 
            {
 
                'message': 'Second',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [
 
                    FileNode('foo2', content='foo2'),
 
                ],
 
            },
 
        ]
 

	
 
    def test_paths_slow_traversing(self):
 
        cs = self.repo.get_changeset()
 
        assert cs.get_node('foobar').get_node('static').get_node('js').get_node('admin').get_node('base.js').content == b'base'
 

	
 
    def test_paths_fast_traversing(self):
 
        cs = self.repo.get_changeset()
 
        assert cs.get_node('foobar/static/js/admin/base.js').content == b'base'
 

	
 
    def test_workdir_get_branch(self):
 
        self.repo.run_git_command(['checkout', '-b', 'production'])
 
        # Regression test: one of following would fail if we don't check
 
        # .git/HEAD file
 
        self.repo.run_git_command(['checkout', 'production'])
 
        assert self.repo.workdir.get_branch() == 'production'
 
        self.repo.run_git_command(['checkout', 'master'])
 
        assert self.repo.workdir.get_branch() == 'master'
 

	
 
    def test_get_diff_runs_git_command_with_hashes(self):
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1)
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1)], cwd=self.repo.path)
 

	
 
    def test_get_diff_runs_git_command_with_str_hashes(self):
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(self.repo.EMPTY_CHANGESET, 1)
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['show', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(1)], cwd=self.repo.path)
 

	
 
    def test_get_diff_runs_git_command_with_path_if_its_given(self):
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo')
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 
    def test_get_diff_does_not_sanitize_valid_context(self):
 
        almost_overflowed_long_int = 2**31-1
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=almost_overflowed_long_int)
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U' + str(almost_overflowed_long_int), '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 
    def test_get_diff_sanitizes_overflowing_context(self):
 
        overflowed_long_int = 2**31
 
        sanitized_overflowed_long_int = overflowed_long_int-1
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=overflowed_long_int)
 

	
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U' + str(sanitized_overflowed_long_int), '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 
    def test_get_diff_does_not_sanitize_zero_context(self):
 
        zero_context = 0
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=zero_context)
 

	
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U' + str(zero_context), '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 
    def test_get_diff_sanitizes_negative_context(self):
 
        negative_context = -10
 

	
 
        self.repo._run_git_command = mock.Mock(return_value=(b'', b''))
 
        self.repo.get_diff(0, 1, 'foo', context=negative_context)
 

	
 
        self.repo._run_git_command.assert_called_once_with(
 
            ['diff', '-U0', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'], cwd=self.repo.path)
 

	
 

	
 
class TestGitRegression(_BackendTestMixin):
 
    backend_alias = 'git'
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': 'Initial',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('bot/__init__.py', content='base'),
 
                    FileNode('bot/templates/404.html', content='base'),
 
                    FileNode('bot/templates/500.html', content='base'),
 
                ],
 
            },
 
            {
 
                'message': 'Second',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [
 
                    FileNode('bot/build/migrations/1.py', content='foo2'),
 
                    FileNode('bot/build/migrations/2.py', content='foo2'),
 
                    FileNode('bot/build/static/templates/f.html', content='foo2'),
 
                    FileNode('bot/build/static/templates/f1.html', content='foo2'),
 
                    FileNode('bot/build/templates/err.html', content='foo2'),
 
                    FileNode('bot/build/templates/err2.html', content='foo2'),
 
                ],
 
            },
 
        ]
 

	
 
    def test_similar_paths(self):
 
        cs = self.repo.get_changeset()
 
        paths = lambda *n: [x.path for x in n]
 
        assert paths(*cs.get_nodes('bot')) == ['bot/build', 'bot/templates', 'bot/__init__.py']
 
        assert paths(*cs.get_nodes('bot/build')) == ['bot/build/migrations', 'bot/build/static', 'bot/build/templates']
 
        assert paths(*cs.get_nodes('bot/build/static')) == ['bot/build/static/templates']
 
        # this get_nodes below causes troubles !
 
        assert paths(*cs.get_nodes('bot/build/static/templates')) == ['bot/build/static/templates/f.html', 'bot/build/static/templates/f1.html']
 
        assert paths(*cs.get_nodes('bot/build/templates')) == ['bot/build/templates/err.html', 'bot/build/templates/err2.html']
 
        assert paths(*cs.get_nodes('bot/templates/')) == ['bot/templates/404.html', 'bot/templates/500.html']
 

	
 

	
 
class TestGitHooks(object):
 
    """
 
    Tests related to hook functionality of Git repositories.
 
    """
 

	
 
    def setup_method(self):
 
        # For each run we want a fresh repo.
 
        self.repo_directory = get_new_dir("githookrepo")
 
        self.repo = GitRepository(self.repo_directory, create=True)
 

	
 
        # Create a dictionary where keys are hook names, and values are paths to
 
        # them in the non-bare repo. Deduplicates code in tests a bit.
 
        self.kallithea_hooks = {
 
            "pre-receive": os.path.join(self.repo.path, '.git', 'hooks', "pre-receive"),
 
            "post-receive": os.path.join(self.repo.path, '.git', 'hooks', "post-receive"),
 
        }
 

	
 
    def test_hooks_created_if_missing(self):
 
        """
 
        Tests if hooks are installed in repository if they are missing.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            if os.path.exists(hook_path):
 
                os.remove(hook_path)
 

	
 
        ScmModel().install_git_hooks(repo=self.repo)
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            assert os.path.exists(hook_path)
 

	
 
    def test_kallithea_hooks_updated(self):
 
        """
 
        Tests if hooks are updated if they are Kallithea hooks already.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path, "w") as f:
 
                f.write("KALLITHEA_HOOK_VER=0.0.0\nJUST_BOGUS")
 

	
 
        ScmModel().install_git_hooks(repo=self.repo)
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path) as f:
 
                assert "JUST_BOGUS" not in f.read()
 

	
 
    def test_custom_hooks_untouched(self):
 
        """
 
        Tests if hooks are left untouched if they are not Kallithea hooks.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path, "w") as f:
 
                f.write("#!/bin/bash\n#CUSTOM_HOOK")
 

	
 
        ScmModel().install_git_hooks(repo=self.repo)
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path) as f:
 
                assert "CUSTOM_HOOK" in f.read()
 

	
 
    def test_custom_hooks_forced_update(self):
 
        """
 
        Tests if hooks are forcefully updated even though they are custom hooks.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path, "w") as f:
 
                f.write("#!/bin/bash\n#CUSTOM_HOOK")
 

	
 
        ScmModel().install_git_hooks(repo=self.repo, force_create=True)
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
        for hook, hook_path in self.kallithea_hooks.items():
 
            with open(hook_path) as f:
 
                assert "KALLITHEA_HOOK_VER" in f.read()
0 comments (0 inline, 0 general)