Changeset - 45bfab30d433
[Not reviewed]
default
0 28 0
Mads Kiilerich - 6 years ago 2019-12-28 20:25:59
mads@kiilerich.com
Grafted from: afa8fc203bd0
py3: add b'' annotations in some places where they will be needed later

Mostly entirely trivial adding of b prefix that is a ignored for py2 ... and
also a bit of related trivial reformatting/refactorings.
20 files changed:
0 comments (0 inline, 0 general)
kallithea/config/app_cfg.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Global configuration file for TurboGears2 specific settings in Kallithea.
 

	
 
This file complements the .ini file.
 
"""
 

	
 
import logging
 
import os
 
import platform
 
import sys
 

	
 
import alembic.config
 
import mercurial
 
import tg
 
from alembic.migration import MigrationContext
 
from alembic.script.base import ScriptDirectory
 
from sqlalchemy import create_engine
 
from tg import hooks
 
from tg.configuration import AppConfig
 
from tg.support.converters import asbool
 

	
 
import kallithea.lib.locale
 
import kallithea.model.base
 
import kallithea.model.meta
 
from kallithea.lib.middleware.https_fixup import HttpsFixup
 
from kallithea.lib.middleware.permanent_repo_url import PermanentRepoUrl
 
from kallithea.lib.middleware.simplegit import SimpleGit
 
from kallithea.lib.middleware.simplehg import SimpleHg
 
from kallithea.lib.middleware.wrapper import RequestWrapper
 
from kallithea.lib.utils import check_git_version, load_rcextensions, make_ui, set_app_settings, set_indexer_config, set_vcs_config
 
from kallithea.lib.utils2 import str2bool
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class KallitheaAppConfig(AppConfig):
 
    # Note: AppConfig has a misleading name, as it's not the application
 
    # configuration, but the application configurator. The AppConfig values are
 
    # used as a template to create the actual configuration, which might
 
    # overwrite or extend the one provided by the configurator template.
 

	
 
    # To make it clear, AppConfig creates the config and sets into it the same
 
    # values that AppConfig itself has. Then the values from the config file and
 
    # gearbox options are loaded and merged into the configuration. Then an
 
    # after_init_config(conf) method of AppConfig is called for any change that
 
    # might depend on options provided by configuration files.
 

	
 
    def __init__(self):
 
        super(KallitheaAppConfig, self).__init__()
 

	
 
        self['package'] = kallithea
 

	
 
        self['prefer_toscawidgets2'] = False
 
        self['use_toscawidgets'] = False
 

	
 
        self['renderers'] = []
 

	
 
        # Enable json in expose
 
        self['renderers'].append('json')
 

	
 
        # Configure template rendering
 
        self['renderers'].append('mako')
 
        self['default_renderer'] = 'mako'
 
        self['use_dotted_templatenames'] = False
 

	
 
        # Configure Sessions, store data as JSON to avoid pickle security issues
 
        self['session.enabled'] = True
 
        self['session.data_serializer'] = 'json'
 

	
 
        # Configure the base SQLALchemy Setup
 
        self['use_sqlalchemy'] = True
 
        self['model'] = kallithea.model.base
 
        self['DBSession'] = kallithea.model.meta.Session
 

	
 
        # Configure App without an authentication backend.
 
        self['auth_backend'] = None
 

	
 
        # Use custom error page for these errors. By default, Turbogears2 does not add
 
        # 400 in this list.
 
        # Explicitly listing all is considered more robust than appending to defaults,
 
        # in light of possible future framework changes.
 
        self['errorpage.status_codes'] = [400, 401, 403, 404]
 

	
 
        # Disable transaction manager -- currently Kallithea takes care of transactions itself
 
        self['tm.enabled'] = False
 

	
 

	
 
base_config = KallitheaAppConfig()
 

	
 
# TODO still needed as long as we use pylonslib
 
sys.modules['pylons'] = tg
 

	
 
# DebugBar, a debug toolbar for TurboGears2.
 
# (https://github.com/TurboGears/tgext.debugbar)
 
# To enable it, install 'tgext.debugbar' and 'kajiki', and run Kallithea with
 
# 'debug = true' (not in production!)
 
# See the Kallithea documentation for more information.
 
try:
 
    from tgext.debugbar import enable_debugbar
 
    import kajiki # only to check its existence
 
except ImportError:
 
    pass
 
else:
 
    base_config['renderers'].append('kajiki')
 
    enable_debugbar(base_config)
 

	
 

	
 
def setup_configuration(app):
 
    config = app.config
 

	
 
    if not kallithea.lib.locale.current_locale_is_valid():
 
        log.error("Terminating ...")
 
        sys.exit(1)
 

	
 
    # Mercurial sets encoding at module import time, so we have to monkey patch it
 
    hgencoding = config.get('hgencoding')
 
    if hgencoding:
 
        mercurial.encoding.encoding = hgencoding
 

	
 
    if config.get('ignore_alembic_revision', False):
 
        log.warn('database alembic revision checking is disabled')
 
    else:
 
        dbconf = config['sqlalchemy.url']
 
        alembic_cfg = alembic.config.Config()
 
        alembic_cfg.set_main_option('script_location', 'kallithea:alembic')
 
        alembic_cfg.set_main_option('sqlalchemy.url', dbconf)
 
        script_dir = ScriptDirectory.from_config(alembic_cfg)
 
        available_heads = sorted(script_dir.get_heads())
 

	
 
        engine = create_engine(dbconf)
 
        with engine.connect() as conn:
 
            context = MigrationContext.configure(conn)
 
            current_heads = sorted(str(s) for s in context.get_current_heads())
 
        if current_heads != available_heads:
 
            log.error('Failed to run Kallithea:\n\n'
 
                      'The database version does not match the Kallithea version.\n'
 
                      'Please read the documentation on how to upgrade or downgrade the database.\n'
 
                      'Current database version id(s): %s\n'
 
                      'Expected database version id(s): %s\n'
 
                      'If you are a developer and you know what you are doing, you can add `ignore_alembic_revision = True` '
 
                      'to your .ini file to skip the check.\n' % (' '.join(current_heads), ' '.join(available_heads)))
 
            sys.exit(1)
 

	
 
    # store some globals into kallithea
 
    kallithea.CELERY_ON = str2bool(config.get('use_celery'))
 
    kallithea.CELERY_EAGER = str2bool(config.get('celery.always.eager'))
 
    kallithea.CONFIG = config
 

	
 
    load_rcextensions(root_path=config['here'])
 

	
 
    repos_path = make_ui().configitems('paths')[0][1]
 
    repos_path = make_ui().configitems(b'paths')[0][1]
 
    config['base_path'] = repos_path
 
    set_app_settings(config)
 

	
 
    instance_id = kallithea.CONFIG.get('instance_id', '*')
 
    if instance_id == '*':
 
        instance_id = '%s-%s' % (platform.uname()[1], os.getpid())
 
        kallithea.CONFIG['instance_id'] = instance_id
 

	
 
    # update kallithea.CONFIG with the meanwhile changed 'config'
 
    kallithea.CONFIG.update(config)
 

	
 
    # configure vcs and indexer libraries (they are supposed to be independent
 
    # as much as possible and thus avoid importing tg.config or
 
    # kallithea.CONFIG).
 
    set_vcs_config(kallithea.CONFIG)
 
    set_indexer_config(kallithea.CONFIG)
 

	
 
    check_git_version()
 

	
 
    kallithea.model.meta.Session.remove()
 

	
 

	
 
hooks.register('configure_new_app', setup_configuration)
 

	
 

	
 
def setup_application(app):
 
    config = app.config
 

	
 
    # we want our low level middleware to get to the request ASAP. We don't
 
    # need any stack middleware in them - especially no StatusCodeRedirect buffering
 
    app = SimpleHg(app, config)
 
    app = SimpleGit(app, config)
 

	
 
    # Enable https redirects based on HTTP_X_URL_SCHEME set by proxy
 
    if any(asbool(config.get(x)) for x in ['https_fixup', 'force_https', 'use_htsts']):
 
        app = HttpsFixup(app, config)
 

	
 
    app = PermanentRepoUrl(app, config)
 

	
 
    # Optional and undocumented wrapper - gives more verbose request/response logging, but has a slight overhead
 
    if str2bool(config.get('use_wsgi_wrapper')):
 
        app = RequestWrapper(app, config)
 

	
 
    return app
 

	
 

	
 
hooks.register('before_config', setup_application)
kallithea/controllers/compare.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.compare
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
compare controller showing differences between two
 
repos, branches, bookmarks or tips
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: May 6, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 
import re
 

	
 
from tg import request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPBadRequest, HTTPFound, HTTPNotFound
 

	
 
from kallithea.config.routing import url
 
from kallithea.controllers.changeset import _context_url, _ignorews_url
 
from kallithea.lib import diffs
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController, render
 
from kallithea.lib.graphmod import graph_data
 
from kallithea.lib.utils2 import safe_int, safe_str
 
from kallithea.lib.vcs.utils.hgcompat import unionrepo
 
from kallithea.model.db import Repository
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class CompareController(BaseRepoController):
 

	
 
    def _before(self, *args, **kwargs):
 
        super(CompareController, self)._before(*args, **kwargs)
 

	
 
        # The base repository has already been retrieved.
 
        c.a_repo = c.db_repo
 

	
 
        # Retrieve the "changeset" repository (default: same as base).
 
        other_repo = request.GET.get('other_repo', None)
 
        if other_repo is None:
 
            c.cs_repo = c.a_repo
 
        else:
 
            c.cs_repo = Repository.get_by_repo_name(other_repo)
 
            if c.cs_repo is None:
 
                msg = _('Could not find other repository %s') % other_repo
 
                h.flash(msg, category='error')
 
                raise HTTPFound(location=url('compare_home', repo_name=c.a_repo.repo_name))
 

	
 
        # Verify that it's even possible to compare these two repositories.
 
        if c.a_repo.scm_instance.alias != c.cs_repo.scm_instance.alias:
 
            msg = _('Cannot compare repositories of different types')
 
            h.flash(msg, category='error')
 
            raise HTTPFound(location=url('compare_home', repo_name=c.a_repo.repo_name))
 

	
 
    @staticmethod
 
    def _get_changesets(alias, org_repo, org_rev, other_repo, other_rev):
 
        """
 
        Returns lists of changesets that can be merged from org_repo@org_rev
 
        to other_repo@other_rev
 
        ... and the other way
 
        ... and the ancestors that would be used for merge
 

	
 
        :param org_repo: repo object, that is most likely the original repo we forked from
 
        :param org_rev: the revision we want our compare to be made
 
        :param other_repo: repo object, most likely the fork of org_repo. It has
 
            all changesets that we need to obtain
 
        :param other_rev: revision we want out compare to be made on other_repo
 
        """
 
        ancestors = None
 
        if org_rev == other_rev:
 
            org_changesets = []
 
            other_changesets = []
 

	
 
        elif alias == 'hg':
 
            # case two independent repos
 
            if org_repo != other_repo:
 
                hgrepo = unionrepo.makeunionrepository(other_repo.baseui,
 
                                                       other_repo.path,
 
                                                       org_repo.path)
 
                # all ancestors of other_rev will be in other_repo and
 
                # rev numbers from hgrepo can be used in other_repo - org_rev ancestors cannot
 

	
 
            # no remote compare do it on the same repository
 
            else:
 
                hgrepo = other_repo._repo
 

	
 
            ancestors = [hgrepo[ancestor].hex() for ancestor in
 
                         hgrepo.revs("id(%s) & ::id(%s)", other_rev, org_rev)]
 
                         hgrepo.revs(b"id(%s) & ::id(%s)", other_rev, org_rev)]
 
            if ancestors:
 
                log.debug("shortcut found: %s is already an ancestor of %s", other_rev, org_rev)
 
            else:
 
                log.debug("no shortcut found: %s is not an ancestor of %s", other_rev, org_rev)
 
                ancestors = [hgrepo[ancestor].hex() for ancestor in
 
                             hgrepo.revs("heads(::id(%s) & ::id(%s))", org_rev, other_rev)] # FIXME: expensive!
 
                             hgrepo.revs(b"heads(::id(%s) & ::id(%s))", org_rev, other_rev)] # FIXME: expensive!
 

	
 
            other_revs = hgrepo.revs("ancestors(id(%s)) and not ancestors(id(%s)) and not id(%s)",
 
                                     other_rev, org_rev, org_rev)
 
            other_changesets = [other_repo.get_changeset(rev) for rev in other_revs]
 
            org_revs = hgrepo.revs("ancestors(id(%s)) and not ancestors(id(%s)) and not id(%s)",
 
                                   org_rev, other_rev, other_rev)
 
            org_changesets = [org_repo.get_changeset(hgrepo[rev].hex()) for rev in org_revs]
 
            other_changesets = [
 
                other_repo.get_changeset(rev)
 
                for rev in hgrepo.revs(
 
                    b"ancestors(id(%s)) and not ancestors(id(%s)) and not id(%s)",
 
                     other_rev, org_rev, org_rev)
 
            ]
 
            org_changesets = [
 
                org_repo.get_changeset(hgrepo[rev].hex())
 
                for rev in hgrepo.revs(
 
                    b"ancestors(id(%s)) and not ancestors(id(%s)) and not id(%s)",
 
                    org_rev, other_rev, other_rev)
 
            ]
 

	
 
        elif alias == 'git':
 
            if org_repo != other_repo:
 
                from dulwich.repo import Repo
 
                from dulwich.client import SubprocessGitClient
 

	
 
                gitrepo = Repo(org_repo.path)
 
                SubprocessGitClient(thin_packs=False).fetch(safe_str(other_repo.path), gitrepo)
 

	
 
                gitrepo_remote = Repo(other_repo.path)
 
                SubprocessGitClient(thin_packs=False).fetch(safe_str(org_repo.path), gitrepo_remote)
 

	
 
                revs = [
 
                    x.commit.id
 
                    for x in gitrepo_remote.get_walker(include=[other_rev],
 
                                                       exclude=[org_rev])
 
                ]
 
                other_changesets = [other_repo.get_changeset(rev) for rev in reversed(revs)]
 
                if other_changesets:
 
                    ancestors = [other_changesets[0].parents[0].raw_id]
 
                else:
 
                    # no changesets from other repo, ancestor is the other_rev
 
                    ancestors = [other_rev]
 

	
 
                gitrepo.close()
 
                gitrepo_remote.close()
 

	
 
            else:
 
                so = org_repo.run_git_command(
 
                    ['log', '--reverse', '--pretty=format:%H',
 
                     '-s', '%s..%s' % (org_rev, other_rev)]
 
                )
 
                other_changesets = [org_repo.get_changeset(cs)
 
                              for cs in re.findall(r'[0-9a-fA-F]{40}', so)]
 
                so = org_repo.run_git_command(
 
                    ['merge-base', org_rev, other_rev]
 
                )
 
                ancestors = [re.findall(r'[0-9a-fA-F]{40}', so)[0]]
 
            org_changesets = []
 

	
 
        else:
 
            raise Exception('Bad alias only git and hg is allowed')
 

	
 
        return other_changesets, org_changesets, ancestors
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def index(self, repo_name):
 
        c.compare_home = True
 
        c.a_ref_name = c.cs_ref_name = None
 
        return render('compare/compare_diff.html')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def compare(self, repo_name, org_ref_type, org_ref_name, other_ref_type, other_ref_name):
 
        org_ref_name = org_ref_name.strip()
 
        other_ref_name = other_ref_name.strip()
 

	
 
        # If merge is True:
 
        #   Show what org would get if merged with other:
 
        #   List changesets that are ancestors of other but not of org.
 
        #   New changesets in org is thus ignored.
 
        #   Diff will be from common ancestor, and merges of org to other will thus be ignored.
 
        # If merge is False:
 
        #   Make a raw diff from org to other, no matter if related or not.
 
        #   Changesets in one and not in the other will be ignored
 
        merge = bool(request.GET.get('merge'))
 
        # fulldiff disables cut_off_limit
 
        fulldiff = request.GET.get('fulldiff')
 
        # partial uses compare_cs.html template directly
 
        partial = request.environ.get('HTTP_X_PARTIAL_XHR')
 
        # is_ajax_preview puts hidden input field with changeset revisions
 
        c.is_ajax_preview = partial and request.GET.get('is_ajax_preview')
 
        # swap url for compare_diff page - never partial and never is_ajax_preview
 
        c.swap_url = h.url('compare_url',
 
            repo_name=c.cs_repo.repo_name,
 
            org_ref_type=other_ref_type, org_ref_name=other_ref_name,
 
            other_repo=c.a_repo.repo_name,
 
            other_ref_type=org_ref_type, other_ref_name=org_ref_name,
 
            merge=merge or '')
 

	
 
        # set callbacks for generating markup for icons
 
        c.ignorews_url = _ignorews_url
 
        c.context_url = _context_url
 
        ignore_whitespace = request.GET.get('ignorews') == '1'
 
        line_context = safe_int(request.GET.get('context'), 3)
 

	
 
        c.a_rev = self._get_ref_rev(c.a_repo, org_ref_type, org_ref_name,
 
            returnempty=True)
 
        c.cs_rev = self._get_ref_rev(c.cs_repo, other_ref_type, other_ref_name)
 

	
 
        c.compare_home = False
 
        c.a_ref_name = org_ref_name
 
        c.a_ref_type = org_ref_type
 
        c.cs_ref_name = other_ref_name
 
        c.cs_ref_type = other_ref_type
 

	
 
        c.cs_ranges, c.cs_ranges_org, c.ancestors = self._get_changesets(
 
            c.a_repo.scm_instance.alias, c.a_repo.scm_instance, c.a_rev,
 
            c.cs_repo.scm_instance, c.cs_rev)
 
        raw_ids = [x.raw_id for x in c.cs_ranges]
 
        c.cs_comments = c.cs_repo.get_comments(raw_ids)
 
        c.cs_statuses = c.cs_repo.statuses(raw_ids)
 

	
 
        revs = [ctx.revision for ctx in reversed(c.cs_ranges)]
 
        c.jsdata = graph_data(c.cs_repo.scm_instance, revs)
 

	
 
        if partial:
 
            return render('compare/compare_cs.html')
 

	
 
        org_repo = c.a_repo
 
        other_repo = c.cs_repo
 

	
 
        if merge:
 
            rev1 = msg = None
 
            if not c.cs_ranges:
 
                msg = _('Cannot show empty diff')
 
            elif not c.ancestors:
 
                msg = _('No ancestor found for merge diff')
 
            elif len(c.ancestors) == 1:
 
                rev1 = c.ancestors[0]
 
            else:
 
                msg = _('Multiple merge ancestors found for merge compare')
 
            if rev1 is None:
 
                h.flash(msg, category='error')
 
                log.error(msg)
 
                raise HTTPNotFound
 

	
 
            # case we want a simple diff without incoming changesets,
 
            # previewing what will be merged.
 
            # Make the diff on the other repo (which is known to have other_rev)
 
            log.debug('Using ancestor %s as rev1 instead of %s',
 
                      rev1, c.a_rev)
 
            org_repo = other_repo
 
        else: # comparing tips, not necessarily linearly related
 
            if org_repo != other_repo:
 
                # TODO: we could do this by using hg unionrepo
 
                log.error('cannot compare across repos %s and %s', org_repo, other_repo)
 
                h.flash(_('Cannot compare repositories without using common ancestor'), category='error')
 
                raise HTTPBadRequest
 
            rev1 = c.a_rev
 

	
 
        diff_limit = None if fulldiff else self.cut_off_limit
 

	
 
        log.debug('running diff between %s and %s in %s',
 
                  rev1, c.cs_rev, org_repo.scm_instance.path)
 
        raw_diff = diffs.get_diff(org_repo.scm_instance, rev1=rev1, rev2=c.cs_rev,
 
                                      ignore_whitespace=ignore_whitespace,
 
                                      context=line_context)
 

	
 
        diff_processor = diffs.DiffProcessor(raw_diff, diff_limit=diff_limit)
 
        c.limited_diff = diff_processor.limited_diff
 
        c.file_diff_data = []
 
        c.lines_added = 0
 
        c.lines_deleted = 0
 
        for f in diff_processor.parsed:
 
            st = f['stats']
 
            c.lines_added += st['added']
 
            c.lines_deleted += st['deleted']
 
            filename = f['filename']
 
            fid = h.FID('', filename)
 
            html_diff = diffs.as_html(enable_comments=False, parsed_lines=[f])
 
            c.file_diff_data.append((fid, None, f['operation'], f['old_filename'], filename, html_diff, st))
 

	
 
        return render('compare/compare_diff.html')
kallithea/controllers/pullrequests.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.pullrequests
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
pull requests controller for Kallithea for initializing pull requests
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: May 7, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
from tg import request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPBadRequest, HTTPForbidden, HTTPFound, HTTPNotFound
 

	
 
from kallithea.config.routing import url
 
from kallithea.controllers.changeset import _context_url, _ignorews_url, create_cs_pr_comment, delete_cs_pr_comment
 
from kallithea.lib import diffs
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController, jsonify, render
 
from kallithea.lib.graphmod import graph_data
 
from kallithea.lib.page import Page
 
from kallithea.lib.utils2 import safe_int
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError, EmptyRepositoryError
 
from kallithea.lib.vcs.utils import safe_str
 
from kallithea.lib.vcs.utils.hgcompat import unionrepo
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.comment import ChangesetCommentsModel
 
from kallithea.model.db import ChangesetStatus, PullRequest, PullRequestReviewer, Repository, User
 
from kallithea.model.forms import PullRequestForm, PullRequestPostForm
 
from kallithea.model.meta import Session
 
from kallithea.model.pull_request import CreatePullRequestAction, CreatePullRequestIterationAction, PullRequestModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def _get_reviewer(user_id):
 
    """Look up user by ID and validate it as a potential reviewer."""
 
    try:
 
        user = User.get(int(user_id))
 
    except ValueError:
 
        user = None
 

	
 
    if user is None or user.is_default_user:
 
        h.flash(_('Invalid reviewer "%s" specified') % user_id, category='error')
 
        raise HTTPBadRequest()
 

	
 
    return user
 

	
 

	
 
class PullrequestsController(BaseRepoController):
 

	
 
    def _get_repo_refs(self, repo, rev=None, branch=None, branch_rev=None):
 
        """return a structure with repo's interesting changesets, suitable for
 
        the selectors in pullrequest.html
 

	
 
        rev: a revision that must be in the list somehow and selected by default
 
        branch: a branch that must be in the list and selected by default - even if closed
 
        branch_rev: a revision of which peers should be preferred and available."""
 
        # list named branches that has been merged to this named branch - it should probably merge back
 
        peers = []
 

	
 
        if rev:
 
            rev = safe_str(rev)
 

	
 
        if branch:
 
            branch = safe_str(branch)
 

	
 
        if branch_rev:
 
            branch_rev = safe_str(branch_rev)
 
            # a revset not restricting to merge() would be better
 
            # (especially because it would get the branch point)
 
            # ... but is currently too expensive
 
            # including branches of children could be nice too
 
            peerbranches = set()
 
            for i in repo._repo.revs(
 
                "sort(parents(branch(id(%s)) and merge()) - branch(id(%s)), -rev)",
 
                b"sort(parents(branch(id(%s)) and merge()) - branch(id(%s)), -rev)",
 
                branch_rev, branch_rev
 
            ):
 
                for abranch in repo.get_changeset(i).branches:
 
                    if abranch not in peerbranches:
 
                        n = 'branch:%s:%s' % (abranch, repo.get_changeset(abranch).raw_id)
 
                        peers.append((n, abranch))
 
                        peerbranches.add(abranch)
 

	
 
        selected = None
 
        tiprev = repo.tags.get('tip')
 
        tipbranch = None
 

	
 
        branches = []
 
        for abranch, branchrev in repo.branches.iteritems():
 
            n = 'branch:%s:%s' % (abranch, branchrev)
 
            desc = abranch
 
            if branchrev == tiprev:
 
                tipbranch = abranch
 
                desc = '%s (current tip)' % desc
 
            branches.append((n, desc))
 
            if rev == branchrev:
 
                selected = n
 
            if branch == abranch:
 
                if not rev:
 
                    selected = n
 
                branch = None
 
        if branch:  # branch not in list - it is probably closed
 
            branchrev = repo.closed_branches.get(branch)
 
            if branchrev:
 
                n = 'branch:%s:%s' % (branch, branchrev)
 
                branches.append((n, _('%s (closed)') % branch))
 
                selected = n
 
                branch = None
 
            if branch:
 
                log.debug('branch %r not found in %s', branch, repo)
 

	
 
        bookmarks = []
 
        for bookmark, bookmarkrev in repo.bookmarks.iteritems():
 
            n = 'book:%s:%s' % (bookmark, bookmarkrev)
 
            bookmarks.append((n, bookmark))
 
            if rev == bookmarkrev:
 
                selected = n
 

	
 
        tags = []
 
        for tag, tagrev in repo.tags.iteritems():
 
            if tag == 'tip':
 
                continue
 
            n = 'tag:%s:%s' % (tag, tagrev)
 
            tags.append((n, tag))
 
            # note: even if rev == tagrev, don't select the static tag - it must be chosen explicitly
 

	
 
        # prio 1: rev was selected as existing entry above
 

	
 
        # prio 2: create special entry for rev; rev _must_ be used
 
        specials = []
 
        if rev and selected is None:
 
            selected = 'rev:%s:%s' % (rev, rev)
 
            specials = [(selected, '%s: %s' % (_("Changeset"), rev[:12]))]
 

	
 
        # prio 3: most recent peer branch
 
        if peers and not selected:
 
            selected = peers[0][0]
 

	
 
        # prio 4: tip revision
 
        if not selected:
 
            if h.is_hg(repo):
 
                if tipbranch:
 
                    selected = 'branch:%s:%s' % (tipbranch, tiprev)
 
                else:
 
                    selected = 'tag:null:' + repo.EMPTY_CHANGESET
 
                    tags.append((selected, 'null'))
 
            else:
 
                if 'master' in repo.branches:
 
                    selected = 'branch:master:%s' % repo.branches['master']
 
                else:
 
                    k, v = list(repo.branches.items())[0]
 
                    selected = 'branch:%s:%s' % (k, v)
 

	
 
        groups = [(specials, _("Special")),
 
                  (peers, _("Peer branches")),
 
                  (bookmarks, _("Bookmarks")),
 
                  (branches, _("Branches")),
 
                  (tags, _("Tags")),
 
                  ]
 
        return [g for g in groups if g[0]], selected
 

	
 
    def _is_allowed_to_change_status(self, pull_request):
 
        if pull_request.is_closed():
 
            return False
 

	
 
        owner = request.authuser.user_id == pull_request.owner_id
 
        reviewer = PullRequestReviewer.query() \
 
            .filter(PullRequestReviewer.pull_request == pull_request) \
 
            .filter(PullRequestReviewer.user_id == request.authuser.user_id) \
 
            .count() != 0
 

	
 
        return request.authuser.admin or owner or reviewer
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def show_all(self, repo_name):
 
        c.from_ = request.GET.get('from_') or ''
 
        c.closed = request.GET.get('closed') or ''
 
        url_params = {}
 
        if c.from_:
 
            url_params['from_'] = 1
 
        if c.closed:
 
            url_params['closed'] = 1
 
        p = safe_int(request.GET.get('page'), 1)
 

	
 
        q = PullRequest.query(include_closed=c.closed, sorted=True)
 
        if c.from_:
 
            q = q.filter_by(org_repo=c.db_repo)
 
        else:
 
            q = q.filter_by(other_repo=c.db_repo)
 
        c.pull_requests = q.all()
 

	
 
        c.pullrequests_pager = Page(c.pull_requests, page=p, items_per_page=100, **url_params)
 

	
 
        return render('/pullrequests/pullrequest_show_all.html')
 

	
 
    @LoginRequired()
 
    def show_my(self):
 
        c.closed = request.GET.get('closed') or ''
 

	
 
        c.my_pull_requests = PullRequest.query(
 
            include_closed=c.closed,
 
            sorted=True,
 
        ).filter_by(owner_id=request.authuser.user_id).all()
 

	
 
        c.participate_in_pull_requests = []
 
        c.participate_in_pull_requests_todo = []
 
        done_status = set([ChangesetStatus.STATUS_APPROVED, ChangesetStatus.STATUS_REJECTED])
 
        for pr in PullRequest.query(
 
            include_closed=c.closed,
 
            reviewer_id=request.authuser.user_id,
 
            sorted=True,
 
        ):
 
            status = pr.user_review_status(request.authuser.user_id) # very inefficient!!!
 
            if status in done_status:
 
                c.participate_in_pull_requests.append(pr)
 
            else:
 
                c.participate_in_pull_requests_todo.append(pr)
 

	
 
        return render('/pullrequests/pullrequest_show_my.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    def index(self):
 
        org_repo = c.db_repo
 
        org_scm_instance = org_repo.scm_instance
 
        try:
 
            org_scm_instance.get_changeset()
 
        except EmptyRepositoryError as e:
 
            h.flash(_('There are no changesets yet'),
 
                    category='warning')
 
            raise HTTPFound(location=url('summary_home', repo_name=org_repo.repo_name))
 

	
 
        org_rev = request.GET.get('rev_end')
 
        # rev_start is not directly useful - its parent could however be used
 
        # as default for other and thus give a simple compare view
 
        rev_start = request.GET.get('rev_start')
 
        other_rev = None
 
        if rev_start:
 
            starters = org_repo.get_changeset(rev_start).parents
 
            if starters:
 
                other_rev = starters[0].raw_id
 
            else:
 
                other_rev = org_repo.scm_instance.EMPTY_CHANGESET
 
        branch = request.GET.get('branch')
 

	
 
        c.cs_repos = [(org_repo.repo_name, org_repo.repo_name)]
 
        c.default_cs_repo = org_repo.repo_name
 
        c.cs_refs, c.default_cs_ref = self._get_repo_refs(org_scm_instance, rev=org_rev, branch=branch)
 

	
 
        default_cs_ref_type, default_cs_branch, default_cs_rev = c.default_cs_ref.split(':')
 
        if default_cs_ref_type != 'branch':
 
            default_cs_branch = org_repo.get_changeset(default_cs_rev).branch
 

	
 
        # add org repo to other so we can open pull request against peer branches on itself
 
        c.a_repos = [(org_repo.repo_name, '%s (self)' % org_repo.repo_name)]
 

	
 
        if org_repo.parent:
 
            # add parent of this fork also and select it.
 
            # use the same branch on destination as on source, if available.
 
            c.a_repos.append((org_repo.parent.repo_name, '%s (parent)' % org_repo.parent.repo_name))
 
            c.a_repo = org_repo.parent
 
            c.a_refs, c.default_a_ref = self._get_repo_refs(
 
                    org_repo.parent.scm_instance, branch=default_cs_branch, rev=other_rev)
 

	
 
        else:
 
            c.a_repo = org_repo
kallithea/lib/diffs.py
Show inline comments
 
@@ -47,633 +47,633 @@ def _safe_id(idstring):
 
    The HTML spec says that id attributes 'must begin with
 
    a letter ([A-Za-z]) and may be followed by any number
 
    of letters, digits ([0-9]), hyphens ("-"), underscores
 
    ("_"), colons (":"), and periods (".")'. These regexps
 
    are slightly over-zealous, in that they remove colons
 
    and periods unnecessarily.
 

	
 
    Whitespace is transformed into underscores, and then
 
    anything which is not a hyphen or a character that
 
    matches \w (alphanumerics and underscore) is removed.
 

	
 
    """
 
    # Transform all whitespace to underscore
 
    idstring = re.sub(r'\s', "_", idstring)
 
    # Remove everything that is not a hyphen or a member of \w
 
    idstring = re.sub(r'(?!-)\W', "", idstring).lower()
 
    return idstring
 

	
 

	
 
def as_html(table_class='code-difftable', line_class='line',
 
            old_lineno_class='lineno old', new_lineno_class='lineno new',
 
            no_lineno_class='lineno',
 
            code_class='code', enable_comments=False, parsed_lines=None):
 
    """
 
    Return given diff as html table with customized css classes
 
    """
 
    def _link_to_if(condition, label, url):
 
        """
 
        Generates a link if condition is meet or just the label if not.
 
        """
 

	
 
        if condition:
 
            return '''<a href="%(url)s" data-pseudo-content="%(label)s"></a>''' % {
 
                'url': url,
 
                'label': label
 
            }
 
        else:
 
            return label
 

	
 
    _html_empty = True
 
    _html = []
 
    _html.append('''<table class="%(table_class)s">\n''' % {
 
        'table_class': table_class
 
    })
 

	
 
    for diff in parsed_lines:
 
        for line in diff['chunks']:
 
            _html_empty = False
 
            for change in line:
 
                _html.append('''<tr class="%(lc)s %(action)s">\n''' % {
 
                    'lc': line_class,
 
                    'action': change['action']
 
                })
 
                anchor_old_id = ''
 
                anchor_new_id = ''
 
                anchor_old = "%(filename)s_o%(oldline_no)s" % {
 
                    'filename': _safe_id(diff['filename']),
 
                    'oldline_no': change['old_lineno']
 
                }
 
                anchor_new = "%(filename)s_n%(oldline_no)s" % {
 
                    'filename': _safe_id(diff['filename']),
 
                    'oldline_no': change['new_lineno']
 
                }
 
                cond_old = (change['old_lineno'] != '...' and
 
                            change['old_lineno'])
 
                cond_new = (change['new_lineno'] != '...' and
 
                            change['new_lineno'])
 
                no_lineno = (change['old_lineno'] == '...' and
 
                             change['new_lineno'] == '...')
 
                if cond_old:
 
                    anchor_old_id = 'id="%s"' % anchor_old
 
                if cond_new:
 
                    anchor_new_id = 'id="%s"' % anchor_new
 
                ###########################################################
 
                # OLD LINE NUMBER
 
                ###########################################################
 
                _html.append('''\t<td %(a_id)s class="%(olc)s" %(colspan)s>''' % {
 
                    'a_id': anchor_old_id,
 
                    'olc': no_lineno_class if no_lineno else old_lineno_class,
 
                    'colspan': 'colspan="2"' if no_lineno else ''
 
                })
 

	
 
                _html.append('''%(link)s''' % {
 
                    'link': _link_to_if(not no_lineno, change['old_lineno'],
 
                                        '#%s' % anchor_old)
 
                })
 
                _html.append('''</td>\n''')
 
                ###########################################################
 
                # NEW LINE NUMBER
 
                ###########################################################
 

	
 
                if not no_lineno:
 
                    _html.append('''\t<td %(a_id)s class="%(nlc)s">''' % {
 
                        'a_id': anchor_new_id,
 
                        'nlc': new_lineno_class
 
                    })
 

	
 
                    _html.append('''%(link)s''' % {
 
                        'link': _link_to_if(True, change['new_lineno'],
 
                                            '#%s' % anchor_new)
 
                    })
 
                    _html.append('''</td>\n''')
 
                ###########################################################
 
                # CODE
 
                ###########################################################
 
                comments = '' if enable_comments else 'no-comment'
 
                _html.append('''\t<td class="%(cc)s %(inc)s">''' % {
 
                    'cc': code_class,
 
                    'inc': comments
 
                })
 
                _html.append('''\n\t\t<div class="add-bubble"><div>&nbsp;</div></div><pre>%(code)s</pre>\n''' % {
 
                    'code': change['line']
 
                })
 

	
 
                _html.append('''\t</td>''')
 
                _html.append('''\n</tr>\n''')
 
    _html.append('''</table>''')
 
    if _html_empty:
 
        return None
 
    return ''.join(_html)
 

	
 

	
 
def wrap_to_table(html):
 
    """Given a string with html, return it wrapped in a table, similar to what
 
    DiffProcessor returns."""
 
    return '''\
 
              <table class="code-difftable">
 
                <tr class="line no-comment">
 
                <td class="lineno new"></td>
 
                <td class="code no-comment"><pre>%s</pre></td>
 
                </tr>
 
              </table>''' % html
 

	
 

	
 
def wrapped_diff(filenode_old, filenode_new, diff_limit=None,
 
                ignore_whitespace=True, line_context=3,
 
                enable_comments=False):
 
    """
 
    Returns a file diff wrapped into a table.
 
    Checks for diff_limit and presents a message if the diff is too big.
 
    """
 
    if filenode_old is None:
 
        filenode_old = FileNode(filenode_new.path, '', EmptyChangeset())
 

	
 
    op = None
 
    a_path = filenode_old.path # default, might be overriden by actual rename in diff
 
    if filenode_old.is_binary or filenode_new.is_binary:
 
        html_diff = wrap_to_table(_('Binary file'))
 
        stats = (0, 0)
 

	
 
    elif diff_limit != -1 and (
 
            diff_limit is None or
 
            (filenode_old.size < diff_limit and filenode_new.size < diff_limit)):
 

	
 
        raw_diff = get_gitdiff(filenode_old, filenode_new,
 
                                ignore_whitespace=ignore_whitespace,
 
                                context=line_context)
 
        diff_processor = DiffProcessor(raw_diff)
 
        if diff_processor.parsed: # there should be exactly one element, for the specified file
 
            f = diff_processor.parsed[0]
 
            op = f['operation']
 
            a_path = f['old_filename']
 

	
 
        html_diff = as_html(parsed_lines=diff_processor.parsed, enable_comments=enable_comments)
 
        stats = diff_processor.stat()
 

	
 
    else:
 
        html_diff = wrap_to_table(_('Changeset was too big and was cut off, use '
 
                               'diff menu to display this diff'))
 
        stats = (0, 0)
 

	
 
    if not html_diff:
 
        submodules = [o for o in [filenode_new, filenode_old] if isinstance(o, SubModuleNode)]
 
        if submodules:
 
            html_diff = wrap_to_table(h.escape('Submodule %r' % submodules[0]))
 
        else:
 
            html_diff = wrap_to_table(_('No changes detected'))
 

	
 
    cs1 = filenode_old.changeset.raw_id
 
    cs2 = filenode_new.changeset.raw_id
 

	
 
    return cs1, cs2, a_path, html_diff, stats, op
 

	
 

	
 
def get_gitdiff(filenode_old, filenode_new, ignore_whitespace=True, context=3):
 
    """
 
    Returns git style diff between given ``filenode_old`` and ``filenode_new``.
 
    """
 
    # make sure we pass in default context
 
    context = context or 3
 
    submodules = [o for o in [filenode_new, filenode_old] if isinstance(o, SubModuleNode)]
 
    if submodules:
 
        return ''
 
        return b''
 

	
 
    for filenode in (filenode_old, filenode_new):
 
        if not isinstance(filenode, FileNode):
 
            raise VCSError("Given object should be FileNode object, not %s"
 
                % filenode.__class__)
 

	
 
    repo = filenode_new.changeset.repository
 
    old_raw_id = getattr(filenode_old.changeset, 'raw_id', repo.EMPTY_CHANGESET)
 
    new_raw_id = getattr(filenode_new.changeset, 'raw_id', repo.EMPTY_CHANGESET)
 

	
 
    vcs_gitdiff = get_diff(repo, old_raw_id, new_raw_id, filenode_new.path,
 
                           ignore_whitespace, context)
 
    return vcs_gitdiff
 

	
 

	
 
def get_diff(scm_instance, rev1, rev2, path=None, ignore_whitespace=False, context=3):
 
    """
 
    A thin wrapper around vcs lib get_diff.
 
    """
 
    try:
 
        return scm_instance.get_diff(rev1, rev2, path=path,
 
                                     ignore_whitespace=ignore_whitespace, context=context)
 
    except MemoryError:
 
        h.flash('MemoryError: Diff is too big', category='error')
 
        return ''
 
        return b''
 

	
 

	
 
NEW_FILENODE = 1
 
DEL_FILENODE = 2
 
MOD_FILENODE = 3
 
RENAMED_FILENODE = 4
 
COPIED_FILENODE = 5
 
CHMOD_FILENODE = 6
 
BIN_FILENODE = 7
 

	
 

	
 
class DiffProcessor(object):
 
    """
 
    Give it a unified or git diff and it returns a list of the files that were
 
    mentioned in the diff together with a dict of meta information that
 
    can be used to render it in a HTML template.
 
    """
 
    _diff_git_re = re.compile('^diff --git', re.MULTILINE)
 
    _diff_git_re = re.compile(b'^diff --git', re.MULTILINE)
 

	
 
    def __init__(self, diff, vcs='hg', diff_limit=None, inline_diff=True):
 
        """
 
        :param diff:   a text in diff format
 
        :param vcs: type of version control hg or git
 
        :param diff_limit: define the size of diff that is considered "big"
 
            based on that parameter cut off will be triggered, set to None
 
            to show full diff
 
        """
 
        if not isinstance(diff, bytes):
 
            raise Exception('Diff must be bytes - got %s' % type(diff))
 

	
 
        self._diff = diff
 
        self.adds = 0
 
        self.removes = 0
 
        self.diff_limit = diff_limit
 
        self.limited_diff = False
 
        self.vcs = vcs
 
        self.parsed = self._parse_gitdiff(inline_diff=inline_diff)
 

	
 
    def _parse_gitdiff(self, inline_diff):
 
        """Parse self._diff and return a list of dicts with meta info and chunks for each file.
 
        Might set limited_diff.
 
        Optionally, do an extra pass and to extra markup of one-liner changes.
 
        """
 
        _files = [] # list of dicts with meta info and chunks
 

	
 
        starts = [m.start() for m in self._diff_git_re.finditer(self._diff)]
 
        starts.append(len(self._diff))
 

	
 
        for start, end in zip(starts, starts[1:]):
 
            if self.diff_limit and end > self.diff_limit:
 
                self.limited_diff = True
 
                continue
 

	
 
            head, diff_lines = _get_header(self.vcs, buffer(self._diff, start, end - start))
 

	
 
            op = None
 
            stats = {
 
                'added': 0,
 
                'deleted': 0,
 
                'binary': False,
 
                'ops': {},
 
            }
 

	
 
            if head['deleted_file_mode']:
 
                op = 'removed'
 
                stats['binary'] = True
 
                stats['ops'][DEL_FILENODE] = 'deleted file'
 

	
 
            elif head['new_file_mode']:
 
                op = 'added'
 
                stats['binary'] = True
 
                stats['ops'][NEW_FILENODE] = 'new file %s' % head['new_file_mode']
 
            else:  # modify operation, can be cp, rename, chmod
 
                # CHMOD
 
                if head['new_mode'] and head['old_mode']:
 
                    op = 'modified'
 
                    stats['binary'] = True
 
                    stats['ops'][CHMOD_FILENODE] = ('modified file chmod %s => %s'
 
                                        % (head['old_mode'], head['new_mode']))
 
                # RENAME
 
                if (head['rename_from'] and head['rename_to']
 
                      and head['rename_from'] != head['rename_to']):
 
                    op = 'renamed'
 
                    stats['binary'] = True
 
                    stats['ops'][RENAMED_FILENODE] = ('file renamed from %s to %s'
 
                                    % (head['rename_from'], head['rename_to']))
 
                # COPY
 
                if head.get('copy_from') and head.get('copy_to'):
 
                    op = 'modified'
 
                    stats['binary'] = True
 
                    stats['ops'][COPIED_FILENODE] = ('file copied from %s to %s'
 
                                        % (head['copy_from'], head['copy_to']))
 
                # FALL BACK: detect missed old style add or remove
 
                if op is None:
 
                    if not head['a_file'] and head['b_file']:
 
                        op = 'added'
 
                        stats['binary'] = True
 
                        stats['ops'][NEW_FILENODE] = 'new file'
 

	
 
                    elif head['a_file'] and not head['b_file']:
 
                        op = 'removed'
 
                        stats['binary'] = True
 
                        stats['ops'][DEL_FILENODE] = 'deleted file'
 

	
 
                # it's not ADD not DELETE
 
                if op is None:
 
                    op = 'modified'
 
                    stats['binary'] = True
 
                    stats['ops'][MOD_FILENODE] = 'modified file'
 

	
 
            # a real non-binary diff
 
            if head['a_file'] or head['b_file']:
 
                chunks, added, deleted = _parse_lines(diff_lines)
 
                stats['binary'] = False
 
                stats['added'] = added
 
                stats['deleted'] = deleted
 
                # explicit mark that it's a modified file
 
                if op == 'modified':
 
                    stats['ops'][MOD_FILENODE] = 'modified file'
 
            else:  # Git binary patch (or empty diff)
 
                # Git binary patch
 
                if head['bin_patch']:
 
                    stats['ops'][BIN_FILENODE] = 'binary diff not shown'
 
                chunks = []
 

	
 
            if op == 'removed' and chunks:
 
                # a way of seeing deleted content could perhaps be nice - but
 
                # not with the current UI
 
                chunks = []
 

	
 
            chunks.insert(0, [{
 
                'old_lineno': '',
 
                'new_lineno': '',
 
                'action':     'context',
 
                'line':       msg,
 
                } for _op, msg in stats['ops'].iteritems()
 
                  if _op not in [MOD_FILENODE]])
 

	
 
            _files.append({
 
                'old_filename':     head['a_path'],
 
                'filename':         head['b_path'],
 
                'old_revision':     head['a_blob_id'],
 
                'new_revision':     head['b_blob_id'],
 
                'chunks':           chunks,
 
                'operation':        op,
 
                'stats':            stats,
 
            })
 

	
 
        if not inline_diff:
 
            return _files
 

	
 
        # highlight inline changes when one del is followed by one add
 
        for diff_data in _files:
 
            for chunk in diff_data['chunks']:
 
                lineiter = iter(chunk)
 
                try:
 
                    peekline = lineiter.next()
 
                    while True:
 
                        # find a first del line
 
                        while peekline['action'] != 'del':
 
                            peekline = lineiter.next()
 
                        delline = peekline
 
                        peekline = lineiter.next()
 
                        # if not followed by add, eat all following del lines
 
                        if peekline['action'] != 'add':
 
                            while peekline['action'] == 'del':
 
                                peekline = lineiter.next()
 
                            continue
 
                        # found an add - make sure it is the only one
 
                        addline = peekline
 
                        try:
 
                            peekline = lineiter.next()
 
                        except StopIteration:
 
                            # add was last line - ok
 
                            _highlight_inline_diff(delline, addline)
 
                            raise
 
                        if peekline['action'] != 'add':
 
                            # there was only one add line - ok
 
                            _highlight_inline_diff(delline, addline)
 
                except StopIteration:
 
                    pass
 

	
 
        return _files
 

	
 
    def stat(self):
 
        """
 
        Returns tuple of added, and removed lines for this instance
 
        """
 
        return self.adds, self.removes
 

	
 

	
 
_escape_re = re.compile(r'(&)|(<)|(>)|(\t)|(\r)|(?<=.)( \n| $)')
 

	
 

	
 
def _escaper(string):
 
    """
 
    Do HTML escaping/markup
 
    """
 

	
 
    def substitute(m):
 
        groups = m.groups()
 
        if groups[0]:
 
            return '&amp;'
 
        if groups[1]:
 
            return '&lt;'
 
        if groups[2]:
 
            return '&gt;'
 
        if groups[3]:
 
            return '<u>\t</u>'
 
        if groups[4]:
 
            return '<u class="cr"></u>'
 
        if groups[5]:
 
            return ' <i></i>'
 
        assert False
 

	
 
    return _escape_re.sub(substitute, safe_unicode(string))
 

	
 

	
 
_git_header_re = re.compile(r"""
 
_git_header_re = re.compile(br"""
 
    ^diff[ ]--git[ ]a/(?P<a_path>.+?)[ ]b/(?P<b_path>.+?)\n
 
    (?:^old[ ]mode[ ](?P<old_mode>\d+)\n
 
       ^new[ ]mode[ ](?P<new_mode>\d+)(?:\n|$))?
 
    (?:^similarity[ ]index[ ](?P<similarity_index>\d+)%\n
 
       ^rename[ ]from[ ](?P<rename_from>.+)\n
 
       ^rename[ ]to[ ](?P<rename_to>.+)(?:\n|$))?
 
    (?:^new[ ]file[ ]mode[ ](?P<new_file_mode>.+)(?:\n|$))?
 
    (?:^deleted[ ]file[ ]mode[ ](?P<deleted_file_mode>.+)(?:\n|$))?
 
    (?:^index[ ](?P<a_blob_id>[0-9A-Fa-f]+)
 
        \.\.(?P<b_blob_id>[0-9A-Fa-f]+)[ ]?(?P<b_mode>.+)?(?:\n|$))?
 
    (?:^(?P<bin_patch>GIT[ ]binary[ ]patch)(?:\n|$))?
 
    (?:^---[ ](a/(?P<a_file>.+?)|/dev/null)\t?(?:\n|$))?
 
    (?:^\+\+\+[ ](b/(?P<b_file>.+?)|/dev/null)\t?(?:\n|$))?
 
""", re.VERBOSE | re.MULTILINE)
 

	
 

	
 
_hg_header_re = re.compile(r"""
 
_hg_header_re = re.compile(br"""
 
    ^diff[ ]--git[ ]a/(?P<a_path>.+?)[ ]b/(?P<b_path>.+?)\n
 
    (?:^old[ ]mode[ ](?P<old_mode>\d+)\n
 
       ^new[ ]mode[ ](?P<new_mode>\d+)(?:\n|$))?
 
    (?:^similarity[ ]index[ ](?P<similarity_index>\d+)%(?:\n|$))?
 
    (?:^rename[ ]from[ ](?P<rename_from>.+)\n
 
       ^rename[ ]to[ ](?P<rename_to>.+)(?:\n|$))?
 
    (?:^copy[ ]from[ ](?P<copy_from>.+)\n
 
       ^copy[ ]to[ ](?P<copy_to>.+)(?:\n|$))?
 
    (?:^new[ ]file[ ]mode[ ](?P<new_file_mode>.+)(?:\n|$))?
 
    (?:^deleted[ ]file[ ]mode[ ](?P<deleted_file_mode>.+)(?:\n|$))?
 
    (?:^index[ ](?P<a_blob_id>[0-9A-Fa-f]+)
 
        \.\.(?P<b_blob_id>[0-9A-Fa-f]+)[ ]?(?P<b_mode>.+)?(?:\n|$))?
 
    (?:^(?P<bin_patch>GIT[ ]binary[ ]patch)(?:\n|$))?
 
    (?:^---[ ](a/(?P<a_file>.+?)|/dev/null)\t?(?:\n|$))?
 
    (?:^\+\+\+[ ](b/(?P<b_file>.+?)|/dev/null)\t?(?:\n|$))?
 
""", re.VERBOSE | re.MULTILINE)
 

	
 

	
 
_header_next_check = re.compile(br'''(?!@)(?!literal )(?!delta )''')
 

	
 

	
 
def _get_header(vcs, diff_chunk):
 
    """
 
    Parses a Git diff for a single file (header and chunks) and returns a tuple with:
 

	
 
    1. A dict with meta info:
 

	
 
        a_path, b_path, similarity_index, rename_from, rename_to,
 
        old_mode, new_mode, new_file_mode, deleted_file_mode,
 
        a_blob_id, b_blob_id, b_mode, a_file, b_file
 

	
 
    2. An iterator yielding lines with simple HTML markup.
 
    """
 
    match = None
 
    if vcs == 'git':
 
        match = _git_header_re.match(diff_chunk)
 
    elif vcs == 'hg':
 
        match = _hg_header_re.match(diff_chunk)
 
    if match is None:
 
        raise Exception('diff not recognized as valid %s diff' % vcs)
 
    meta_info = match.groupdict()
 
    rest = diff_chunk[match.end():]
 
    if rest and _header_next_check.match(rest):
 
        raise Exception('cannot parse %s diff header: %r followed by %r' % (vcs, diff_chunk[:match.end()], rest[:1000]))
 
    diff_lines = (_escaper(m.group(0)) for m in re.finditer(r'.*\n|.+$', rest)) # don't split on \r as str.splitlines do
 
    diff_lines = (_escaper(m.group(0)) for m in re.finditer(br'.*\n|.+$', rest)) # don't split on \r as str.splitlines do
 
    return meta_info, diff_lines
 

	
 

	
 
_chunk_re = re.compile(r'^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@(.*)')
 
_newline_marker = re.compile(r'^\\ No newline at end of file')
 

	
 

	
 
def _parse_lines(diff_lines):
 
    """
 
    Given an iterator of diff body lines, parse them and return a dict per
 
    line and added/removed totals.
 
    """
 
    added = deleted = 0
 
    old_line = old_end = new_line = new_end = None
 

	
 
    try:
 
        chunks = []
 
        line = diff_lines.next()
 

	
 
        while True:
 
            lines = []
 
            chunks.append(lines)
 

	
 
            match = _chunk_re.match(line)
 

	
 
            if not match:
 
                raise Exception('error parsing diff @@ line %r' % line)
 

	
 
            gr = match.groups()
 
            (old_line, old_end,
 
             new_line, new_end) = [int(x or 1) for x in gr[:-1]]
 
            old_line -= 1
 
            new_line -= 1
 

	
 
            context = len(gr) == 5
 
            old_end += old_line
 
            new_end += new_line
 

	
 
            if context:
 
                # skip context only if it's first line
 
                if int(gr[0]) > 1:
 
                    lines.append({
 
                        'old_lineno': '...',
 
                        'new_lineno': '...',
 
                        'action':     'context',
 
                        'line':       line,
 
                    })
 

	
 
            line = diff_lines.next()
 

	
 
            while old_line < old_end or new_line < new_end:
 
                if not line:
 
                    raise Exception('error parsing diff - empty line at -%s+%s' % (old_line, new_line))
 

	
 
                affects_old = affects_new = False
 

	
 
                command = line[0]
 
                if command == '+':
 
                    affects_new = True
 
                    action = 'add'
 
                    added += 1
 
                elif command == '-':
 
                    affects_old = True
 
                    action = 'del'
 
                    deleted += 1
 
                elif command == ' ':
 
                    affects_old = affects_new = True
 
                    action = 'unmod'
 
                else:
 
                    raise Exception('error parsing diff - unknown command in line %r at -%s+%s' % (line, old_line, new_line))
 

	
 
                if not _newline_marker.match(line):
 
                    old_line += affects_old
 
                    new_line += affects_new
 
                    lines.append({
 
                        'old_lineno':   affects_old and old_line or '',
 
                        'new_lineno':   affects_new and new_line or '',
 
                        'action':       action,
 
                        'line':         line[1:],
 
                    })
 

	
 
                line = diff_lines.next()
 

	
 
                if _newline_marker.match(line):
 
                    # we need to append to lines, since this is not
 
                    # counted in the line specs of diff
 
                    lines.append({
 
                        'old_lineno':   '...',
 
                        'new_lineno':   '...',
 
                        'action':       'context',
 
                        'line':         line,
 
                    })
 
                    line = diff_lines.next()
 
            if old_line > old_end:
 
                raise Exception('error parsing diff - more than %s "-" lines at -%s+%s' % (old_end, old_line, new_line))
 
            if new_line > new_end:
 
                raise Exception('error parsing diff - more than %s "+" lines at -%s+%s' % (new_end, old_line, new_line))
 
    except StopIteration:
 
        pass
 
    if old_line != old_end or new_line != new_end:
 
        raise Exception('diff processing broken when old %s<>%s or new %s<>%s line %r' % (old_line, old_end, new_line, new_end, line))
 

	
 
    return chunks, added, deleted
 

	
 
# Used for inline highlighter word split, must match the substitutions in _escaper
 
_token_re = re.compile(r'()(&amp;|&lt;|&gt;|<u>\t</u>|<u class="cr"></u>| <i></i>|\W+?)')
 

	
 

	
 
def _highlight_inline_diff(old, new):
 
    """
 
    Highlight simple add/remove in two lines given as info dicts. They are
 
    modified in place and given markup with <del>/<ins>.
 
    """
 
    assert old['action'] == 'del'
 
    assert new['action'] == 'add'
 

	
 
    oldwords = _token_re.split(old['line'])
 
    newwords = _token_re.split(new['line'])
 
    sequence = difflib.SequenceMatcher(None, oldwords, newwords)
 

	
 
    oldfragments, newfragments = [], []
 
    for tag, i1, i2, j1, j2 in sequence.get_opcodes():
 
        oldfrag = ''.join(oldwords[i1:i2])
 
        newfrag = ''.join(newwords[j1:j2])
 
        if tag != 'equal':
 
            if oldfrag:
 
                oldfrag = '<del>%s</del>' % oldfrag
 
            if newfrag:
 
                newfrag = '<ins>%s</ins>' % newfrag
 
        oldfragments.append(oldfrag)
 
        newfragments.append(newfrag)
 

	
 
    old['line'] = "".join(oldfragments)
 
    new['line'] = "".join(newfragments)
kallithea/lib/hooks.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.hooks
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Hooks run by Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 6, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import binascii
 
import os
 
import time
 

	
 
from kallithea.lib import helpers as h
 
from kallithea.lib.exceptions import UserCreationError
 
from kallithea.lib.utils import action_logger, make_ui, setup_cache_regions
 
from kallithea.lib.utils2 import get_hook_environment, safe_str, safe_unicode
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.utils.hgcompat import revrange
 
from kallithea.model.db import Repository, User
 

	
 

	
 
def _get_scm_size(alias, root_path):
 
    if not alias.startswith('.'):
 
        alias += '.'
 

	
 
    size_scm, size_root = 0, 0
 
    for path, dirs, files in os.walk(safe_str(root_path)):
 
        if path.find(alias) != -1:
 
            for f in files:
 
                try:
 
                    size_scm += os.path.getsize(os.path.join(path, f))
 
                except OSError:
 
                    pass
 
        else:
 
            for f in files:
 
                try:
 
                    size_root += os.path.getsize(os.path.join(path, f))
 
                except OSError:
 
                    pass
 

	
 
    size_scm_f = h.format_byte_size(size_scm)
 
    size_root_f = h.format_byte_size(size_root)
 
    size_total_f = h.format_byte_size(size_root + size_scm)
 

	
 
    return size_scm_f, size_root_f, size_total_f
 

	
 

	
 
def repo_size(ui, repo, hooktype=None, **kwargs):
 
    """Presents size of repository after push"""
 
    size_hg_f, size_root_f, size_total_f = _get_scm_size('.hg', repo.root)
 

	
 
    last_cs = repo[len(repo) - 1]
 

	
 
    msg = ('Repository size .hg: %s Checkout: %s Total: %s\n'
 
           'Last revision is now r%s:%s\n') % (
 
        size_hg_f, size_root_f, size_total_f, last_cs.rev(), last_cs.hex()[:12]
 
    )
 
    ui.status(msg)
 

	
 

	
 
def log_pull_action(ui, repo, **kwargs):
 
    """Logs user last pull action
 

	
 
    Called as Mercurial hook outgoing.pull_logger or from Kallithea before invoking Git.
 

	
 
    Does *not* use the action from the hook environment but is always 'pull'.
 
    """
 
    ex = get_hook_environment()
 

	
 
    user = User.get_by_username(ex.username)
 
    action = 'pull'
 
    action_logger(user, action, ex.repository, ex.ip, commit=True)
 
    # extension hook call
 
    from kallithea import EXTENSIONS
 
    callback = getattr(EXTENSIONS, 'PULL_HOOK', None)
 
    if callable(callback):
 
        kw = {}
 
        kw.update(ex)
 
        callback(**kw)
 

	
 
    return 0
 

	
 

	
 
def log_push_action(ui, repo, node, node_last, **kwargs):
 
    """
 
    Entry point for Mercurial hook changegroup.push_logger.
 

	
 
    The pushed changesets is given by the revset 'node:node_last'.
 

	
 
    Note: This hook is not only logging, but also the side effect invalidating
 
    cahes! The function should perhaps be renamed.
 
    """
 
    _h = binascii.hexlify
 
    revs = [_h(repo[r].node()) for r in revrange(repo, [node + ':' + node_last])]
 
    revs = [_h(repo[r].node()) for r in revrange(repo, [b'%s:%s' % (node, node_last)])]
 
    process_pushed_raw_ids(revs)
 
    return 0
 

	
 

	
 
def process_pushed_raw_ids(revs):
 
    """
 
    Register that changes have been added to the repo - log the action *and* invalidate caches.
 

	
 
    Called from  Mercurial changegroup.push_logger calling hook log_push_action,
 
    or from the Git post-receive hook calling handle_git_post_receive ...
 
    or from scm _handle_push.
 
    """
 
    ex = get_hook_environment()
 

	
 
    action = '%s:%s' % (ex.action, ','.join(revs))
 
    action_logger(ex.username, action, ex.repository, ex.ip, commit=True)
 

	
 
    from kallithea.model.scm import ScmModel
 
    ScmModel().mark_for_invalidation(ex.repository)
 

	
 
    # extension hook call
 
    from kallithea import EXTENSIONS
 
    callback = getattr(EXTENSIONS, 'PUSH_HOOK', None)
 
    if callable(callback):
 
        kw = {'pushed_revs': revs}
 
        kw.update(ex)
 
        callback(**kw)
 

	
 

	
 
def log_create_repository(repository_dict, created_by, **kwargs):
 
    """
 
    Post create repository Hook.
 

	
 
    :param repository: dict dump of repository object
 
    :param created_by: username who created repository
 

	
 
    available keys of repository_dict:
 

	
 
     'repo_type',
 
     'description',
 
     'private',
 
     'created_on',
 
     'enable_downloads',
 
     'repo_id',
 
     'owner_id',
 
     'enable_statistics',
 
     'clone_uri',
 
     'fork_id',
 
     'group_id',
 
     'repo_name'
 

	
 
    """
 
    from kallithea import EXTENSIONS
 
    callback = getattr(EXTENSIONS, 'CREATE_REPO_HOOK', None)
 
    if callable(callback):
 
        kw = {}
 
        kw.update(repository_dict)
 
        kw.update({'created_by': created_by})
 
        kw.update(kwargs)
 
        return callback(**kw)
 

	
 
    return 0
 

	
 

	
 
def check_allowed_create_user(user_dict, created_by, **kwargs):
 
    # pre create hooks
 
    from kallithea import EXTENSIONS
 
    callback = getattr(EXTENSIONS, 'PRE_CREATE_USER_HOOK', None)
 
    if callable(callback):
 
        allowed, reason = callback(created_by=created_by, **user_dict)
 
        if not allowed:
 
            raise UserCreationError(reason)
 

	
 

	
 
def log_create_user(user_dict, created_by, **kwargs):
 
    """
 
    Post create user Hook.
 

	
 
    :param user_dict: dict dump of user object
 

	
 
    available keys for user_dict:
 

	
 
     'username',
 
     'full_name_or_username',
 
     'full_contact',
 
     'user_id',
 
     'name',
 
     'firstname',
 
     'short_contact',
 
     'admin',
 
     'lastname',
 
     'ip_addresses',
 
     'ldap_dn',
 
     'email',
 
     'api_key',
 
     'last_login',
 
     'full_name',
 
     'active',
 
     'password',
 
     'emails',
 

	
 
    """
 
    from kallithea import EXTENSIONS
 
    callback = getattr(EXTENSIONS, 'CREATE_USER_HOOK', None)
 
    if callable(callback):
 
        return callback(created_by=created_by, **user_dict)
 

	
 
    return 0
 

	
 

	
 
def log_delete_repository(repository_dict, deleted_by, **kwargs):
 
    """
 
    Post delete repository Hook.
 

	
 
    :param repository: dict dump of repository object
 
    :param deleted_by: username who deleted the repository
 

	
 
    available keys of repository_dict:
 

	
 
     'repo_type',
 
     'description',
 
     'private',
 
     'created_on',
 
     'enable_downloads',
 
     'repo_id',
 
     'owner_id',
 
     'enable_statistics',
 
     'clone_uri',
 
     'fork_id',
 
     'group_id',
 
     'repo_name'
 

	
 
    """
 
    from kallithea import EXTENSIONS
 
    callback = getattr(EXTENSIONS, 'DELETE_REPO_HOOK', None)
 
    if callable(callback):
 
        kw = {}
 
        kw.update(repository_dict)
 
        kw.update({'deleted_by': deleted_by,
 
                   'deleted_on': time.time()})
 
        kw.update(kwargs)
 
        return callback(**kw)
 

	
 
    return 0
 

	
 

	
 
def log_delete_user(user_dict, deleted_by, **kwargs):
 
    """
 
    Post delete user Hook.
 

	
 
    :param user_dict: dict dump of user object
 

	
 
    available keys for user_dict:
 

	
 
     'username',
 
     'full_name_or_username',
 
     'full_contact',
 
     'user_id',
 
     'name',
 
     'firstname',
 
     'short_contact',
 
     'admin',
 
     'lastname',
 
     'ip_addresses',
 
     'ldap_dn',
 
     'email',
 
     'api_key',
 
     'last_login',
 
     'full_name',
 
     'active',
 
     'password',
 
     'emails',
 

	
 
    """
 
    from kallithea import EXTENSIONS
 
    callback = getattr(EXTENSIONS, 'DELETE_USER_HOOK', None)
 
    if callable(callback):
 
        return callback(deleted_by=deleted_by, **user_dict)
 

	
 
    return 0
 

	
 

	
 
def _hook_environment(repo_path):
 
    """
 
    Create a light-weight environment for stand-alone scripts and return an UI and the
 
    db repository.
 

	
 
    Git hooks are executed as subprocess of Git while Kallithea is waiting, and
 
    they thus need enough info to be able to create an app environment and
 
    connect to the database.
 
    """
 
    from paste.deploy import appconfig
 
    from sqlalchemy import engine_from_config
 
    from kallithea.config.environment import load_environment
 
    from kallithea.model.base import init_model
 

	
 
    extras = get_hook_environment()
 
    ini_file_path = extras['config']
 
    #logging.config.fileConfig(ini_file_path) # Note: we are in a different process - don't use configured logging
 
    app_conf = appconfig('config:%s' % ini_file_path)
 
    conf = load_environment(app_conf.global_conf, app_conf.local_conf)
 

	
 
    setup_cache_regions(conf)
 

	
 
    engine = engine_from_config(conf, 'sqlalchemy.')
 
    init_model(engine)
 

	
 
    repo_path = safe_unicode(repo_path)
 
    # fix if it's not a bare repo
 
    if repo_path.endswith(os.sep + '.git'):
 
        repo_path = repo_path[:-5]
 

	
 
    repo = Repository.get_by_full_path(repo_path)
 
    if not repo:
 
        raise OSError('Repository %s not found in database'
 
                      % (safe_str(repo_path)))
 

	
 
    baseui = make_ui()
 
    return baseui, repo
 

	
 

	
 
def handle_git_pre_receive(repo_path, git_stdin_lines):
 
    """Called from Git pre-receive hook"""
 
    # Currently unused. TODO: remove?
 
    return 0
 

	
 

	
 
def handle_git_post_receive(repo_path, git_stdin_lines):
 
    """Called from Git post-receive hook"""
 
    baseui, repo = _hook_environment(repo_path)
 

	
 
    # the post push hook should never use the cached instance
 
    scm_repo = repo.scm_instance_no_cache()
 

	
 
    rev_data = []
 
    for l in git_stdin_lines:
 
        old_rev, new_rev, ref = l.strip().split(' ')
 
        _ref_data = ref.split('/')
 
        if _ref_data[1] in ['tags', 'heads']:
 
            rev_data.append({'old_rev': old_rev,
 
                             'new_rev': new_rev,
 
                             'ref': ref,
 
                             'type': _ref_data[1],
 
                             'name': '/'.join(_ref_data[2:])})
 

	
 
    git_revs = []
 
    for push_ref in rev_data:
 
        _type = push_ref['type']
 
        if _type == 'heads':
 
            if push_ref['old_rev'] == EmptyChangeset().raw_id:
 
                # update the symbolic ref if we push new repo
 
                if scm_repo.is_empty():
 
                    scm_repo._repo.refs.set_symbolic_ref('HEAD',
 
                                        'refs/heads/%s' % push_ref['name'])
 
                    scm_repo._repo.refs.set_symbolic_ref(
 
                        b'HEAD',
 
                        b'refs/heads/%s' % push_ref['name'])
 

	
 
                # build exclude list without the ref
 
                cmd = ['for-each-ref', '--format=%(refname)', 'refs/heads/*']
 
                stdout = scm_repo.run_git_command(cmd)
 
                ref = push_ref['ref']
 
                heads = [head for head in stdout.splitlines() if head != ref]
 
                # now list the git revs while excluding from the list
 
                cmd = ['log', push_ref['new_rev'], '--reverse', '--pretty=format:%H']
 
                cmd.append('--not')
 
                cmd.extend(heads) # empty list is ok
 
                stdout = scm_repo.run_git_command(cmd)
 
                git_revs += stdout.splitlines()
 

	
 
            elif push_ref['new_rev'] == EmptyChangeset().raw_id:
 
                # delete branch case
 
                git_revs += ['delete_branch=>%s' % push_ref['name']]
 
            else:
 
                cmd = ['log', '%(old_rev)s..%(new_rev)s' % push_ref,
 
                       '--reverse', '--pretty=format:%H']
 
                stdout = scm_repo.run_git_command(cmd)
 
                git_revs += stdout.splitlines()
 

	
 
        elif _type == 'tags':
 
            git_revs += ['tag=>%s' % push_ref['name']]
 

	
 
    process_pushed_raw_ids(git_revs)
 

	
 
    return 0
 

	
 

	
 
# Almost exactly like Mercurial contrib/hg-ssh:
 
def rejectpush(ui, **kwargs):
 
    """Mercurial hook to be installed as pretxnopen and prepushkey for read-only repos"""
 
    ex = get_hook_environment()
 
    ui.warn((b"Push access to %r denied\n") % safe_str(ex.repository))
 
    return 1
kallithea/lib/ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    kallithea.lib.ssh
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    :created_on: Dec 10, 2012
 
    :author: ir4y
 
    :copyright: (C) 2012 Ilya Beda <ir4y.ix@gmail.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import base64
 
import logging
 
import re
 

	
 
from tg.i18n import ugettext as _
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class SshKeyParseError(Exception):
 
    """Exception raised by parse_pub_key"""
 

	
 

	
 
def parse_pub_key(ssh_key):
 
    r"""Parse SSH public key string, raise SshKeyParseError or return decoded keytype, data and comment
 

	
 
    >>> getfixture('doctest_mock_ugettext')
 
    >>> parse_pub_key('')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: SSH key is missing
 
    >>> parse_pub_key('''AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - it must have both a key type and a base64 part
 
    >>> parse_pub_key('''abc AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - it must start with 'ssh-(rsa|dss|ed25519)'
 
    >>> parse_pub_key('''ssh-rsa  AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - failed to decode base64 part 'AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ'
 
    >>> parse_pub_key('''ssh-rsa  AAAAB2NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ==''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - base64 part is not 'ssh-rsa' as claimed but 'csh-rsa'
 
    >>> parse_pub_key('''ssh-rsa  AAAAB3NzaC1yc2EAAAA'LVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - unexpected characters in base64 part "AAAAB3NzaC1yc2EAAAA'LVGhpcyBpcyBmYWtlIQ"
 
    >>> parse_pub_key(''' ssh-rsa  AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ== and a comment
 
    ... ''')
 
    ('ssh-rsa', '\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x0bThis is fake!', 'and a comment\n')
 
    >>> parse_pub_key('''ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIP1NA2kBQIKe74afUXmIWD9ByDYQJqUwW44Y4gJOBRuo''')
 
    ('ssh-ed25519', '\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 \xfdM\x03i\x01@\x82\x9e\xef\x86\x9fQy\x88X?A\xc86\x10&\xa50[\x8e\x18\xe2\x02N\x05\x1b\xa8', '')
 
    """
 
    if not ssh_key:
 
        raise SshKeyParseError(_("SSH key is missing"))
 

	
 
    parts = ssh_key.split(None, 2)
 
    if len(parts) < 2:
 
        raise SshKeyParseError(_("Incorrect SSH key - it must have both a key type and a base64 part"))
 

	
 
    keytype, keyvalue, comment = (parts + [''])[:3]
 
    if keytype not in ('ssh-rsa', 'ssh-dss', 'ssh-ed25519'):
 
        raise SshKeyParseError(_("Incorrect SSH key - it must start with 'ssh-(rsa|dss|ed25519)'"))
 

	
 
    if re.search(r'[^a-zA-Z0-9+/=]', keyvalue):
 
        raise SshKeyParseError(_("Incorrect SSH key - unexpected characters in base64 part %r") % keyvalue)
 

	
 
    try:
 
        decoded = base64.b64decode(keyvalue)
 
    except TypeError:
 
        raise SshKeyParseError(_("Incorrect SSH key - failed to decode base64 part %r") % keyvalue)
 

	
 
    if not decoded.startswith('\x00\x00\x00' + chr(len(keytype)) + str(keytype) + '\x00'):
 
        raise SshKeyParseError(_("Incorrect SSH key - base64 part is not %r as claimed but %r") % (str(keytype), str(decoded[4:].split('\0', 1)[0])))
 
    if not decoded.startswith(b'\x00\x00\x00' + chr(len(keytype)) + str(keytype) + b'\x00'):
 
        raise SshKeyParseError(_("Incorrect SSH key - base64 part is not %r as claimed but %r") % (str(keytype), str(decoded[4:].split(b'\0', 1)[0])))
 

	
 
    return keytype, decoded, comment
 

	
 

	
 
SSH_OPTIONS = 'no-pty,no-port-forwarding,no-X11-forwarding,no-agent-forwarding'
 

	
 

	
 
def authorized_keys_line(kallithea_cli_path, config_file, key):
 
    """
 
    Return a line as it would appear in .authorized_keys
 

	
 
    >>> from kallithea.model.db import UserSshKeys, User
 
    >>> user = User(user_id=7, username='uu')
 
    >>> key = UserSshKeys(user_ssh_key_id=17, user=user, description='test key')
 
    >>> key.public_key='''ssh-rsa  AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ== and a comment'''
 
    >>> authorized_keys_line('/srv/kallithea/venv/bin/kallithea-cli', '/srv/kallithea/my.ini', key)
 
    'no-pty,no-port-forwarding,no-X11-forwarding,no-agent-forwarding,command="/srv/kallithea/venv/bin/kallithea-cli ssh-serve -c /srv/kallithea/my.ini 7 17" ssh-rsa AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ==\\n'
 
    """
 
    try:
 
        keytype, key_bytes, comment = parse_pub_key(key.public_key)
 
    except SshKeyParseError:
 
        return '# Invalid Kallithea SSH key: %s %s\n' % (key.user.user_id, key.user_ssh_key_id)
 
    base64_key = base64.b64encode(key_bytes)
 
    assert '\n' not in base64_key
 
    return '%s,command="%s ssh-serve -c %s %s %s" %s %s\n' % (
 
        SSH_OPTIONS, kallithea_cli_path, config_file,
 
        key.user.user_id, key.user_ssh_key_id,
 
        keytype, base64_key)
kallithea/lib/utils.py
Show inline comments
 
@@ -148,399 +148,399 @@ def action_logger(user, action, repo, ip
 
        repo_name = u''
 

	
 
    user_log = UserLog()
 
    user_log.user_id = user_obj.user_id
 
    user_log.username = user_obj.username
 
    user_log.action = safe_unicode(action)
 

	
 
    user_log.repository = repo_obj
 
    user_log.repository_name = repo_name
 

	
 
    user_log.action_date = datetime.datetime.now()
 
    user_log.user_ip = ipaddr
 
    meta.Session().add(user_log)
 

	
 
    log.info('Logging action:%s on %s by user:%s ip:%s',
 
             action, safe_unicode(repo), user_obj, ipaddr)
 
    if commit:
 
        meta.Session().commit()
 

	
 

	
 
def get_filesystem_repos(path):
 
    """
 
    Scans given path for repos and return (name,(type,path)) tuple
 

	
 
    :param path: path to scan for repositories
 
    :param recursive: recursive search and return names with subdirs in front
 
    """
 

	
 
    # remove ending slash for better results
 
    path = safe_str(path.rstrip(os.sep))
 
    log.debug('now scanning in %s', path)
 

	
 
    def isdir(*n):
 
        return os.path.isdir(os.path.join(*n))
 

	
 
    for root, dirs, _files in os.walk(path):
 
        recurse_dirs = []
 
        for subdir in dirs:
 
            # skip removed repos
 
            if REMOVED_REPO_PAT.match(subdir):
 
                continue
 

	
 
            # skip .<something> dirs TODO: rly? then we should prevent creating them ...
 
            if subdir.startswith('.'):
 
                continue
 

	
 
            cur_path = os.path.join(root, subdir)
 
            if isdir(cur_path, '.git'):
 
                log.warning('ignoring non-bare Git repo: %s', cur_path)
 
                continue
 

	
 
            if (isdir(cur_path, '.hg') or
 
                isdir(cur_path, '.svn') or
 
                isdir(cur_path, 'objects') and (isdir(cur_path, 'refs') or
 
                                                os.path.isfile(os.path.join(cur_path, 'packed-refs')))):
 

	
 
                if not os.access(cur_path, os.R_OK) or not os.access(cur_path, os.X_OK):
 
                    log.warning('ignoring repo path without access: %s', cur_path)
 
                    continue
 

	
 
                if not os.access(cur_path, os.W_OK):
 
                    log.warning('repo path without write access: %s', cur_path)
 

	
 
                try:
 
                    scm_info = get_scm(cur_path)
 
                    assert cur_path.startswith(path)
 
                    repo_path = cur_path[len(path) + 1:]
 
                    yield repo_path, scm_info
 
                    continue # no recursion
 
                except VCSError:
 
                    # We should perhaps ignore such broken repos, but especially
 
                    # the bare git detection is unreliable so we dive into it
 
                    pass
 

	
 
            recurse_dirs.append(subdir)
 

	
 
        dirs[:] = recurse_dirs
 

	
 

	
 
def is_valid_repo_uri(repo_type, url, ui):
 
    """Check if the url seems like a valid remote repo location - raise an Exception if any problems"""
 
    if repo_type == 'hg':
 
        if url.startswith('http') or url.startswith('ssh'):
 
            # initially check if it's at least the proper URL
 
            # or does it pass basic auth
 
            MercurialRepository._check_url(url, ui)
 
        elif url.startswith('svn+http'):
 
            try:
 
                from hgsubversion.svnrepo import svnremoterepo
 
            except ImportError:
 
                raise HgsubversionImportError(_('Unable to activate hgsubversion support. '
 
                                                'The "hgsubversion" library is missing'))
 
            svnremoterepo(ui, url).svn.uuid
 
        elif url.startswith('git+http'):
 
            raise NotImplementedError()
 
        else:
 
            raise Exception('URI %s not allowed' % (url,))
 

	
 
    elif repo_type == 'git':
 
        if url.startswith('http') or url.startswith('git'):
 
            # initially check if it's at least the proper URL
 
            # or does it pass basic auth
 
            GitRepository._check_url(url)
 
        elif url.startswith('svn+http'):
 
            raise NotImplementedError()
 
        elif url.startswith('hg+http'):
 
            raise NotImplementedError()
 
        else:
 
            raise Exception('URI %s not allowed' % (url))
 

	
 

	
 
def is_valid_repo(repo_name, base_path, scm=None):
 
    """
 
    Returns True if given path is a valid repository False otherwise.
 
    If scm param is given also compare if given scm is the same as expected
 
    from scm parameter
 

	
 
    :param repo_name:
 
    :param base_path:
 
    :param scm:
 

	
 
    :return True: if given path is a valid repository
 
    """
 
    # TODO: paranoid security checks?
 
    full_path = os.path.join(safe_str(base_path), safe_str(repo_name))
 

	
 
    try:
 
        scm_ = get_scm(full_path)
 
        if scm:
 
            return scm_[0] == scm
 
        return True
 
    except VCSError:
 
        return False
 

	
 

	
 
def is_valid_repo_group(repo_group_name, base_path, skip_path_check=False):
 
    """
 
    Returns True if given path is a repository group False otherwise
 

	
 
    :param repo_name:
 
    :param base_path:
 
    """
 
    full_path = os.path.join(safe_str(base_path), safe_str(repo_group_name))
 

	
 
    # check if it's not a repo
 
    if is_valid_repo(repo_group_name, base_path):
 
        return False
 

	
 
    try:
 
        # we need to check bare git repos at higher level
 
        # since we might match branches/hooks/info/objects or possible
 
        # other things inside bare git repo
 
        get_scm(os.path.dirname(full_path))
 
        return False
 
    except VCSError:
 
        pass
 

	
 
    # check if it's a valid path
 
    if skip_path_check or os.path.isdir(full_path):
 
        return True
 

	
 
    return False
 

	
 

	
 
# propagated from mercurial documentation
 
ui_sections = ['alias', 'auth',
 
                'decode/encode', 'defaults',
 
                'diff', 'email',
 
                'extensions', 'format',
 
                'merge-patterns', 'merge-tools',
 
                'hooks', 'http_proxy',
 
                'smtp', 'patch',
 
                'paths', 'profiling',
 
                'server', 'trusted',
 
                'ui', 'web', ]
 

	
 

	
 
def make_ui(repo_path=None):
 
    """
 
    Create an Mercurial 'ui' object based on database Ui settings, possibly
 
    augmenting with content from a hgrc file.
 
    """
 
    baseui = ui.ui()
 

	
 
    # clean the baseui object
 
    baseui._ocfg = config.config()
 
    baseui._ucfg = config.config()
 
    baseui._tcfg = config.config()
 

	
 
    sa = meta.Session()
 
    for ui_ in sa.query(Ui).all():
 
        if ui_.ui_active:
 
            ui_val = '' if ui_.ui_value is None else safe_str(ui_.ui_value)
 
            ui_val = b'' if ui_.ui_value is None else safe_str(ui_.ui_value)
 
            log.debug('config from db: [%s] %s=%r', ui_.ui_section,
 
                      ui_.ui_key, ui_val)
 
            baseui.setconfig(safe_str(ui_.ui_section), safe_str(ui_.ui_key),
 
                             ui_val)
 

	
 
    # force set push_ssl requirement to False, Kallithea handles that
 
    baseui.setconfig('web', 'push_ssl', False)
 
    baseui.setconfig('web', 'allow_push', '*')
 
    baseui.setconfig(b'web', b'push_ssl', False)
 
    baseui.setconfig(b'web', b'allow_push', b'*')
 
    # prevent interactive questions for ssh password / passphrase
 
    ssh = baseui.config('ui', 'ssh', default='ssh')
 
    baseui.setconfig('ui', 'ssh', '%s -oBatchMode=yes -oIdentitiesOnly=yes' % ssh)
 
    ssh = baseui.config(b'ui', b'ssh', default=b'ssh')
 
    baseui.setconfig(b'ui', b'ssh', b'%s -oBatchMode=yes -oIdentitiesOnly=yes' % ssh)
 
    # push / pull hooks
 
    baseui.setconfig('hooks', 'changegroup.kallithea_log_push_action', 'python:kallithea.lib.hooks.log_push_action')
 
    baseui.setconfig('hooks', 'outgoing.kallithea_log_pull_action', 'python:kallithea.lib.hooks.log_pull_action')
 
    baseui.setconfig(b'hooks', b'changegroup.kallithea_log_push_action', b'python:kallithea.lib.hooks.log_push_action')
 
    baseui.setconfig(b'hooks', b'outgoing.kallithea_log_pull_action', b'python:kallithea.lib.hooks.log_pull_action')
 

	
 
    if repo_path is not None:
 
        hgrc_path = os.path.join(repo_path, '.hg', 'hgrc')
 
        if os.path.isfile(hgrc_path):
 
            log.debug('reading hgrc from %s', hgrc_path)
 
            cfg = config.config()
 
            cfg.read(hgrc_path)
 
            for section in ui_sections:
 
                for k, v in cfg.items(section):
 
                    log.debug('config from file: [%s] %s=%s', section, k, v)
 
                    baseui.setconfig(safe_str(section), safe_str(k), safe_str(v))
 
        else:
 
            log.debug('hgrc file is not present at %s, skipping...', hgrc_path)
 

	
 
    return baseui
 

	
 

	
 
def set_app_settings(config):
 
    """
 
    Updates app config with new settings from database
 

	
 
    :param config:
 
    """
 
    hgsettings = Setting.get_app_settings()
 
    for k, v in hgsettings.items():
 
        config[k] = v
 

	
 

	
 
def set_vcs_config(config):
 
    """
 
    Patch VCS config with some Kallithea specific stuff
 

	
 
    :param config: kallithea.CONFIG
 
    """
 
    settings.BACKENDS = {
 
        'hg': 'kallithea.lib.vcs.backends.hg.MercurialRepository',
 
        'git': 'kallithea.lib.vcs.backends.git.GitRepository',
 
    }
 

	
 
    settings.GIT_EXECUTABLE_PATH = config.get('git_path', 'git')
 
    settings.GIT_REV_FILTER = config.get('git_rev_filter', '--all').strip()
 
    settings.DEFAULT_ENCODINGS = aslist(config.get('default_encoding',
 
                                                        'utf-8'), sep=',')
 

	
 

	
 
def set_indexer_config(config):
 
    """
 
    Update Whoosh index mapping
 

	
 
    :param config: kallithea.CONFIG
 
    """
 
    log.debug('adding extra into INDEX_EXTENSIONS')
 
    kallithea.config.conf.INDEX_EXTENSIONS.extend(re.split(r'\s+', config.get('index.extensions', '')))
 

	
 
    log.debug('adding extra into INDEX_FILENAMES')
 
    kallithea.config.conf.INDEX_FILENAMES.extend(re.split(r'\s+', config.get('index.filenames', '')))
 

	
 

	
 
def map_groups(path):
 
    """
 
    Given a full path to a repository, create all nested groups that this
 
    repo is inside. This function creates parent-child relationships between
 
    groups and creates default perms for all new groups.
 

	
 
    :param paths: full path to repository
 
    """
 
    from kallithea.model.repo_group import RepoGroupModel
 
    sa = meta.Session()
 
    groups = path.split(Repository.url_sep())
 
    parent = None
 
    group = None
 

	
 
    # last element is repo in nested groups structure
 
    groups = groups[:-1]
 
    rgm = RepoGroupModel()
 
    owner = User.get_first_admin()
 
    for lvl, group_name in enumerate(groups):
 
        group_name = u'/'.join(groups[:lvl] + [group_name])
 
        group = RepoGroup.get_by_group_name(group_name)
 
        desc = '%s group' % group_name
 

	
 
        # skip folders that are now removed repos
 
        if REMOVED_REPO_PAT.match(group_name):
 
            break
 

	
 
        if group is None:
 
            log.debug('creating group level: %s group_name: %s',
 
                      lvl, group_name)
 
            group = RepoGroup(group_name, parent)
 
            group.group_description = desc
 
            group.owner = owner
 
            sa.add(group)
 
            rgm._create_default_perms(group)
 
            sa.flush()
 

	
 
        parent = group
 
    return group
 

	
 

	
 
def repo2db_mapper(initial_repo_dict, remove_obsolete=False,
 
                   install_git_hooks=False, user=None, overwrite_git_hooks=False):
 
    """
 
    maps all repos given in initial_repo_dict, non existing repositories
 
    are created, if remove_obsolete is True it also check for db entries
 
    that are not in initial_repo_dict and removes them.
 

	
 
    :param initial_repo_dict: mapping with repositories found by scanning methods
 
    :param remove_obsolete: check for obsolete entries in database
 
    :param install_git_hooks: if this is True, also check and install git hook
 
        for a repo if missing
 
    :param overwrite_git_hooks: if this is True, overwrite any existing git hooks
 
        that may be encountered (even if user-deployed)
 
    """
 
    from kallithea.model.repo import RepoModel
 
    from kallithea.model.scm import ScmModel
 
    sa = meta.Session()
 
    repo_model = RepoModel()
 
    if user is None:
 
        user = User.get_first_admin()
 
    added = []
 

	
 
    # creation defaults
 
    defs = Setting.get_default_repo_settings(strip_prefix=True)
 
    enable_statistics = defs.get('repo_enable_statistics')
 
    enable_downloads = defs.get('repo_enable_downloads')
 
    private = defs.get('repo_private')
 

	
 
    for name, repo in initial_repo_dict.items():
 
        group = map_groups(name)
 
        unicode_name = safe_unicode(name)
 
        db_repo = repo_model.get_by_repo_name(unicode_name)
 
        # found repo that is on filesystem not in Kallithea database
 
        if not db_repo:
 
            log.info('repository %s not found, creating now', name)
 
            added.append(name)
 
            desc = (repo.description
 
                    if repo.description != 'unknown'
 
                    else '%s repository' % name)
 

	
 
            new_repo = repo_model._create_repo(
 
                repo_name=name,
 
                repo_type=repo.alias,
 
                description=desc,
 
                repo_group=getattr(group, 'group_id', None),
 
                owner=user,
 
                enable_downloads=enable_downloads,
 
                enable_statistics=enable_statistics,
 
                private=private,
 
                state=Repository.STATE_CREATED
 
            )
 
            sa.commit()
 
            # we added that repo just now, and make sure it has githook
 
            # installed, and updated server info
 
            if new_repo.repo_type == 'git':
 
                git_repo = new_repo.scm_instance
 
                ScmModel().install_git_hooks(git_repo)
 
                # update repository server-info
 
                log.debug('Running update server info')
 
                git_repo._update_server_info()
 
            new_repo.update_changeset_cache()
 
        elif install_git_hooks:
 
            if db_repo.repo_type == 'git':
 
                ScmModel().install_git_hooks(db_repo.scm_instance, force_create=overwrite_git_hooks)
 

	
 
    removed = []
 
    # remove from database those repositories that are not in the filesystem
 
    unicode_initial_repo_names = set(safe_unicode(name) for name in initial_repo_dict)
 
    for repo in sa.query(Repository).all():
 
        if repo.repo_name not in unicode_initial_repo_names:
 
            if remove_obsolete:
 
                log.debug("Removing non-existing repository found in db `%s`",
 
                          repo.repo_name)
 
                try:
 
                    RepoModel().delete(repo, forks='detach', fs_remove=False)
 
                    sa.commit()
 
                except Exception:
 
                    #don't hold further removals on error
 
                    log.error(traceback.format_exc())
 
                    sa.rollback()
 
            removed.append(repo.repo_name)
 
    return added, removed
 

	
 

	
 
def load_rcextensions(root_path):
 
    path = os.path.join(root_path, 'rcextensions', '__init__.py')
 
    if os.path.isfile(path):
 
        rcext = create_module('rc', path)
 
        EXT = kallithea.EXTENSIONS = rcext
 
        log.debug('Found rcextensions now loading %s...', rcext)
 

	
 
        # Additional mappings that are not present in the pygments lexers
 
        kallithea.config.conf.LANGUAGES_EXTENSIONS_MAP.update(getattr(EXT, 'EXTRA_MAPPINGS', {}))
kallithea/lib/vcs/backends/git/changeset.py
Show inline comments
 
@@ -55,506 +55,506 @@ class GitChangeset(BaseChangeset):
 

	
 
    @LazyProperty
 
    def committer(self):
 
        return safe_unicode(getattr(self._commit, self._committer_property))
 

	
 
    @LazyProperty
 
    def author(self):
 
        return safe_unicode(getattr(self._commit, self._author_property))
 

	
 
    @LazyProperty
 
    def date(self):
 
        return date_fromtimestamp(getattr(self._commit, self._date_property),
 
                                  getattr(self._commit, self._date_tz_property))
 

	
 
    @LazyProperty
 
    def _timestamp(self):
 
        return getattr(self._commit, self._date_property)
 

	
 
    @LazyProperty
 
    def status(self):
 
        """
 
        Returns modified, added, removed, deleted files for current changeset
 
        """
 
        return self.changed, self.added, self.removed
 

	
 
    @LazyProperty
 
    def tags(self):
 
        _tags = []
 
        for tname, tsha in self.repository.tags.iteritems():
 
            if tsha == self.raw_id:
 
                _tags.append(tname)
 
        return _tags
 

	
 
    @LazyProperty
 
    def branch(self):
 
        # Note: This function will return one branch name for the changeset -
 
        # that might not make sense in Git where branches() is a better match
 
        # for the basic model
 
        heads = self.repository._heads(reverse=False)
 
        ref = heads.get(self._commit.id)
 
        if ref:
 
            return safe_unicode(ref)
 

	
 
    @LazyProperty
 
    def branches(self):
 
        heads = self.repository._heads(reverse=True)
 
        return [b for b in heads if heads[b] == self.raw_id] # FIXME: Inefficient ... and returning None!
 

	
 
    def _fix_path(self, path):
 
        """
 
        Paths are stored without trailing slash so we need to get rid off it if
 
        needed.
 
        """
 
        if path.endswith('/'):
 
            path = path.rstrip('/')
 
        return path
 

	
 
    def _get_id_for_path(self, path):
 
        path = safe_str(path)
 
        # FIXME: Please, spare a couple of minutes and make those codes cleaner;
 
        if path not in self._paths:
 
            path = path.strip('/')
 
            # set root tree
 
            tree = self.repository._repo[self._tree_id]
 
            if path == '':
 
                self._paths[''] = tree.id
 
                return tree.id
 
            splitted = path.split('/')
 
            dirs, name = splitted[:-1], splitted[-1]
 
            curdir = ''
 

	
 
            # initially extract things from root dir
 
            for item, stat, id in tree.iteritems():
 
                if curdir:
 
                    name = '/'.join((curdir, item))
 
                else:
 
                    name = item
 
                self._paths[name] = id
 
                self._stat_modes[name] = stat
 

	
 
            for dir in dirs:
 
                if curdir:
 
                    curdir = '/'.join((curdir, dir))
 
                else:
 
                    curdir = dir
 
                dir_id = None
 
                for item, stat, id in tree.iteritems():
 
                    if dir == item:
 
                        dir_id = id
 
                if dir_id:
 
                    # Update tree
 
                    tree = self.repository._repo[dir_id]
 
                    if not isinstance(tree, objects.Tree):
 
                        raise ChangesetError('%s is not a directory' % curdir)
 
                else:
 
                    raise ChangesetError('%s have not been found' % curdir)
 

	
 
                # cache all items from the given traversed tree
 
                for item, stat, id in tree.iteritems():
 
                    if curdir:
 
                        name = '/'.join((curdir, item))
 
                    else:
 
                        name = item
 
                    self._paths[name] = id
 
                    self._stat_modes[name] = stat
 
            if path not in self._paths:
 
                raise NodeDoesNotExistError("There is no file nor directory "
 
                    "at the given path '%s' at revision %s"
 
                    % (path, safe_str(self.short_id)))
 
        return self._paths[path]
 

	
 
    def _get_kind(self, path):
 
        obj = self.repository._repo[self._get_id_for_path(path)]
 
        if isinstance(obj, objects.Blob):
 
            return NodeKind.FILE
 
        elif isinstance(obj, objects.Tree):
 
            return NodeKind.DIR
 

	
 
    def _get_filectx(self, path):
 
        path = self._fix_path(path)
 
        if self._get_kind(path) != NodeKind.FILE:
 
            raise ChangesetError("File does not exist for revision %s at "
 
                " '%s'" % (self.raw_id, path))
 
        return path
 

	
 
    def _get_file_nodes(self):
 
        return chain(*(t[2] for t in self.walk()))
 

	
 
    @LazyProperty
 
    def parents(self):
 
        """
 
        Returns list of parents changesets.
 
        """
 
        return [self.repository.get_changeset(parent)
 
                for parent in self._commit.parents]
 

	
 
    @LazyProperty
 
    def children(self):
 
        """
 
        Returns list of children changesets.
 
        """
 
        rev_filter = settings.GIT_REV_FILTER
 
        so = self.repository.run_git_command(
 
            ['rev-list', rev_filter, '--children']
 
        )
 
        return [
 
            self.repository.get_changeset(cs)
 
            for parts in (l.split(' ') for l in so.splitlines())
 
            if parts[0] == self.raw_id
 
            for cs in parts[1:]
 
        ]
 

	
 
    def next(self, branch=None):
 
        if branch and self.branch != branch:
 
            raise VCSError('Branch option used on changeset not belonging '
 
                           'to that branch')
 

	
 
        cs = self
 
        while True:
 
            try:
 
                next_ = cs.revision + 1
 
                next_rev = cs.repository.revisions[next_]
 
            except IndexError:
 
                raise ChangesetDoesNotExistError
 
            cs = cs.repository.get_changeset(next_rev)
 

	
 
            if not branch or branch == cs.branch:
 
                return cs
 

	
 
    def prev(self, branch=None):
 
        if branch and self.branch != branch:
 
            raise VCSError('Branch option used on changeset not belonging '
 
                           'to that branch')
 

	
 
        cs = self
 
        while True:
 
            try:
 
                prev_ = cs.revision - 1
 
                if prev_ < 0:
 
                    raise IndexError
 
                prev_rev = cs.repository.revisions[prev_]
 
            except IndexError:
 
                raise ChangesetDoesNotExistError
 
            cs = cs.repository.get_changeset(prev_rev)
 

	
 
            if not branch or branch == cs.branch:
 
                return cs
 

	
 
    def diff(self, ignore_whitespace=True, context=3):
 
        # Only used to feed diffstat
 
        rev1 = self.parents[0] if self.parents else self.repository.EMPTY_CHANGESET
 
        rev2 = self
 
        return ''.join(self.repository.get_diff(rev1, rev2,
 
        return b''.join(self.repository.get_diff(rev1, rev2,
 
                                    ignore_whitespace=ignore_whitespace,
 
                                    context=context))
 

	
 
    def get_file_mode(self, path):
 
        """
 
        Returns stat mode of the file at the given ``path``.
 
        """
 
        # ensure path is traversed
 
        path = safe_str(path)
 
        self._get_id_for_path(path)
 
        return self._stat_modes[path]
 

	
 
    def get_file_content(self, path):
 
        """
 
        Returns content of the file at given ``path``.
 
        """
 
        id = self._get_id_for_path(path)
 
        blob = self.repository._repo[id]
 
        return blob.as_pretty_string()
 

	
 
    def get_file_size(self, path):
 
        """
 
        Returns size of the file at given ``path``.
 
        """
 
        id = self._get_id_for_path(path)
 
        blob = self.repository._repo[id]
 
        return blob.raw_length()
 

	
 
    def get_file_changeset(self, path):
 
        """
 
        Returns last commit of the file at the given ``path``.
 
        """
 
        return self.get_file_history(path, limit=1)[0]
 

	
 
    def get_file_history(self, path, limit=None):
 
        """
 
        Returns history of file as reversed list of ``Changeset`` objects for
 
        which file at given ``path`` has been modified.
 

	
 
        TODO: This function now uses os underlying 'git' and 'grep' commands
 
        which is generally not good. Should be replaced with algorithm
 
        iterating commits.
 
        """
 
        self._get_filectx(path)
 
        cs_id = safe_str(self.id)
 
        f_path = safe_str(path)
 

	
 
        if limit is not None:
 
            cmd = ['log', '-n', str(safe_int(limit, 0)),
 
                   '--pretty=format:%H', '-s', cs_id, '--', f_path]
 

	
 
        else:
 
            cmd = ['log',
 
                   '--pretty=format:%H', '-s', cs_id, '--', f_path]
 
        so = self.repository.run_git_command(cmd)
 
        ids = re.findall(r'[0-9a-fA-F]{40}', so)
 
        return [self.repository.get_changeset(sha) for sha in ids]
 

	
 
    def get_file_history_2(self, path):
 
        """
 
        Returns history of file as reversed list of ``Changeset`` objects for
 
        which file at given ``path`` has been modified.
 

	
 
        """
 
        self._get_filectx(path)
 
        from dulwich.walk import Walker
 
        include = [self.id]
 
        walker = Walker(self.repository._repo.object_store, include,
 
                        paths=[path], max_entries=1)
 
        return [self.repository.get_changeset(sha)
 
                for sha in (x.commit.id for x in walker)]
 

	
 
    def get_file_annotate(self, path):
 
        """
 
        Returns a generator of four element tuples with
 
            lineno, sha, changeset lazy loader and line
 

	
 
        TODO: This function now uses os underlying 'git' command which is
 
        generally not good. Should be replaced with algorithm iterating
 
        commits.
 
        """
 
        cmd = ['blame', '-l', '--root', '-r', self.id, '--', path]
 
        # -l     ==> outputs long shas (and we need all 40 characters)
 
        # --root ==> doesn't put '^' character for boundaries
 
        # -r sha ==> blames for the given revision
 
        so = self.repository.run_git_command(cmd)
 

	
 
        for i, blame_line in enumerate(so.split('\n')[:-1]):
 
            ln_no = i + 1
 
            sha, line = re.split(r' ', blame_line, 1)
 
            yield (ln_no, sha, lambda: self.repository.get_changeset(sha), line)
 

	
 
    def fill_archive(self, stream=None, kind='tgz', prefix=None,
 
                     subrepos=False):
 
        """
 
        Fills up given stream.
 

	
 
        :param stream: file like object.
 
        :param kind: one of following: ``zip``, ``tgz`` or ``tbz2``.
 
            Default: ``tgz``.
 
        :param prefix: name of root directory in archive.
 
            Default is repository name and changeset's raw_id joined with dash
 
            (``repo-tip.<KIND>``).
 
        :param subrepos: include subrepos in this archive.
 

	
 
        :raise ImproperArchiveTypeError: If given kind is wrong.
 
        :raise VcsError: If given stream is None
 
        """
 
        allowed_kinds = settings.ARCHIVE_SPECS
 
        if kind not in allowed_kinds:
 
            raise ImproperArchiveTypeError('Archive kind not supported use one'
 
                'of %s' % ' '.join(allowed_kinds))
 

	
 
        if stream is None:
 
            raise VCSError('You need to pass in a valid stream for filling'
 
                           ' with archival data')
 

	
 
        if prefix is None:
 
            prefix = '%s-%s' % (self.repository.name, self.short_id)
 
        elif prefix.startswith('/'):
 
            raise VCSError("Prefix cannot start with leading slash")
 
        elif prefix.strip() == '':
 
            raise VCSError("Prefix cannot be empty")
 

	
 
        if kind == 'zip':
 
            frmt = 'zip'
 
        else:
 
            frmt = 'tar'
 
        _git_path = settings.GIT_EXECUTABLE_PATH
 
        cmd = '%s archive --format=%s --prefix=%s/ %s' % (_git_path,
 
                                                frmt, prefix, self.raw_id)
 
        if kind == 'tgz':
 
            cmd += ' | gzip -9'
 
        elif kind == 'tbz2':
 
            cmd += ' | bzip2 -9'
 

	
 
        if stream is None:
 
            raise VCSError('You need to pass in a valid stream for filling'
 
                           ' with archival data')
 
        popen = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True,
 
                      cwd=self.repository.path)
 

	
 
        buffer_size = 1024 * 8
 
        chunk = popen.stdout.read(buffer_size)
 
        while chunk:
 
            stream.write(chunk)
 
            chunk = popen.stdout.read(buffer_size)
 
        # Make sure all descriptors would be read
 
        popen.communicate()
 

	
 
    def get_nodes(self, path):
 
        """
 
        Returns combined ``DirNode`` and ``FileNode`` objects list representing
 
        state of changeset at the given ``path``. If node at the given ``path``
 
        is not instance of ``DirNode``, ChangesetError would be raised.
 
        """
 

	
 
        if self._get_kind(path) != NodeKind.DIR:
 
            raise ChangesetError("Directory does not exist for revision %s at "
 
                " '%s'" % (self.revision, path))
 
        path = self._fix_path(path)
 
        id = self._get_id_for_path(path)
 
        tree = self.repository._repo[id]
 
        dirnodes = []
 
        filenodes = []
 
        als = self.repository.alias
 
        for name, stat, id in tree.iteritems():
 
            if path != '':
 
                obj_path = '/'.join((path, name))
 
            else:
 
                obj_path = name
 
            if objects.S_ISGITLINK(stat):
 
                root_tree = self.repository._repo[self._tree_id]
 
                cf = ConfigFile.from_file(BytesIO(self.repository._repo.get_object(root_tree['.gitmodules'][1]).data))
 
                cf = ConfigFile.from_file(BytesIO(self.repository._repo.get_object(root_tree[b'.gitmodules'][1]).data))
 
                url = cf.get(('submodule', obj_path), 'url')
 
                dirnodes.append(SubModuleNode(obj_path, url=url, changeset=id,
 
                                              alias=als))
 
                continue
 

	
 
            obj = self.repository._repo.get_object(id)
 
            if obj_path not in self._stat_modes:
 
                self._stat_modes[obj_path] = stat
 
            if isinstance(obj, objects.Tree):
 
                dirnodes.append(DirNode(obj_path, changeset=self))
 
            elif isinstance(obj, objects.Blob):
 
                filenodes.append(FileNode(obj_path, changeset=self, mode=stat))
 
            else:
 
                raise ChangesetError("Requested object should be Tree "
 
                                     "or Blob, is %r" % type(obj))
 
        nodes = dirnodes + filenodes
 
        for node in nodes:
 
            if node.path not in self.nodes:
 
                self.nodes[node.path] = node
 
        nodes.sort()
 
        return nodes
 

	
 
    def get_node(self, path):
 
        """
 
        Returns ``Node`` object from the given ``path``. If there is no node at
 
        the given ``path``, ``ChangesetError`` would be raised.
 
        """
 
        path = self._fix_path(path)
 
        if path not in self.nodes:
 
            try:
 
                id_ = self._get_id_for_path(path)
 
            except ChangesetError:
 
                raise NodeDoesNotExistError("Cannot find one of parents' "
 
                    "directories for a given path: %s" % path)
 

	
 
            _GL = lambda m: m and objects.S_ISGITLINK(m)
 
            if _GL(self._stat_modes.get(path)):
 
                tree = self.repository._repo[self._tree_id]
 
                cf = ConfigFile.from_file(BytesIO(self.repository._repo.get_object(tree['.gitmodules'][1]).data))
 
                cf = ConfigFile.from_file(BytesIO(self.repository._repo.get_object(tree[b'.gitmodules'][1]).data))
 
                url = cf.get(('submodule', path), 'url')
 
                node = SubModuleNode(path, url=url, changeset=id_,
 
                                     alias=self.repository.alias)
 
            else:
 
                obj = self.repository._repo.get_object(id_)
 

	
 
                if isinstance(obj, objects.Tree):
 
                    if path == '':
 
                        node = RootNode(changeset=self)
 
                    else:
 
                        node = DirNode(path, changeset=self)
 
                    node._tree = obj
 
                elif isinstance(obj, objects.Blob):
 
                    node = FileNode(path, changeset=self)
 
                    node._blob = obj
 
                else:
 
                    raise NodeDoesNotExistError("There is no file nor directory "
 
                        "at the given path: '%s' at revision %s"
 
                        % (path, self.short_id))
 
            # cache node
 
            self.nodes[path] = node
 
        return self.nodes[path]
 

	
 
    @LazyProperty
 
    def affected_files(self):
 
        """
 
        Gets a fast accessible file changes for given changeset
 
        """
 
        added, modified, deleted = self._changes_cache
 
        return list(added.union(modified).union(deleted))
 

	
 
    @LazyProperty
 
    def _changes_cache(self):
 
        added = set()
 
        modified = set()
 
        deleted = set()
 
        _r = self.repository._repo
 

	
 
        parents = self.parents
 
        if not self.parents:
 
            parents = [EmptyChangeset()]
 
        for parent in parents:
 
            if isinstance(parent, EmptyChangeset):
 
                oid = None
 
            else:
 
                oid = _r[parent._commit.id].tree
 
            changes = _r.object_store.tree_changes(oid, _r[self._commit.id].tree)
 
            for (oldpath, newpath), (_, _), (_, _) in changes:
 
                if newpath and oldpath:
 
                    modified.add(newpath)
 
                elif newpath and not oldpath:
 
                    added.add(newpath)
 
                elif not newpath and oldpath:
 
                    deleted.add(oldpath)
 
        return added, modified, deleted
 

	
 
    def _get_paths_for_status(self, status):
 
        """
 
        Returns sorted list of paths for given ``status``.
 

	
 
        :param status: one of: *added*, *modified* or *deleted*
 
        """
 
        added, modified, deleted = self._changes_cache
 
        return sorted({
 
            'added': list(added),
 
            'modified': list(modified),
 
            'deleted': list(deleted)}[status]
 
        )
 

	
 
    @LazyProperty
 
    def added(self):
 
        """
 
        Returns list of added ``FileNode`` objects.
 
        """
 
        if not self.parents:
 
            return list(self._get_file_nodes())
 
        return AddedFileNodesGenerator([n for n in
 
                                self._get_paths_for_status('added')], self)
 

	
 
    @LazyProperty
 
    def changed(self):
 
        """
 
        Returns list of modified ``FileNode`` objects.
 
        """
 
        if not self.parents:
 
            return []
 
        return ChangedFileNodesGenerator([n for n in
 
                                self._get_paths_for_status('modified')], self)
 

	
 
    @LazyProperty
 
    def removed(self):
 
        """
 
        Returns list of removed ``FileNode`` objects.
 
        """
 
        if not self.parents:
 
            return []
 
        return RemovedFileNodesGenerator([n for n in
 
                                self._get_paths_for_status('deleted')], self)
 

	
 
    extra = {}
kallithea/lib/vcs/backends/git/inmemory.py
Show inline comments
 
import datetime
 
import posixpath
 
import stat
 
import time
 

	
 
from dulwich import objects
 

	
 
from kallithea.lib.vcs.backends.base import BaseInMemoryChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils import safe_bytes, safe_str
 

	
 

	
 
class GitInMemoryChangeset(BaseInMemoryChangeset):
 

	
 
    def commit(self, message, author, parents=None, branch=None, date=None,
 
               **kwargs):
 
        """
 
        Performs in-memory commit (doesn't check workdir in any way) and
 
        returns newly created ``Changeset``. Updates repository's
 
        ``revisions``.
 

	
 
        :param message: message of the commit
 
        :param author: full username, i.e. "Joe Doe <joe.doe@example.com>"
 
        :param parents: single parent or sequence of parents from which commit
 
          would be derived
 
        :param date: ``datetime.datetime`` instance. Defaults to
 
          ``datetime.datetime.now()``.
 
        :param branch: branch name, as string. If none given, default backend's
 
          branch would be used.
 

	
 
        :raises ``CommitError``: if any error occurs while committing
 
        """
 
        self.check_integrity(parents)
 

	
 
        from .repository import GitRepository
 
        if branch is None:
 
            branch = GitRepository.DEFAULT_BRANCH_NAME
 

	
 
        repo = self.repository._repo
 
        object_store = repo.object_store
 

	
 
        ENCODING = "UTF-8"  # TODO: should probably be kept in sync with safe_unicode/safe_bytes and vcs/conf/settings.py DEFAULT_ENCODINGS
 
        ENCODING = b"UTF-8"  # TODO: should probably be kept in sync with safe_unicode/safe_bytes and vcs/conf/settings.py DEFAULT_ENCODINGS
 

	
 
        # Create tree and populates it with blobs
 
        commit_tree = self.parents[0] and repo[self.parents[0]._commit.tree] or \
 
            objects.Tree()
 
        for node in self.added + self.changed:
 
            # Compute subdirs if needed
 
            dirpath, nodename = posixpath.split(node.path)
 
            dirnames = safe_str(dirpath).split('/') if dirpath else []
 
            dirnames = safe_str(dirpath).split(b'/') if dirpath else []
 
            parent = commit_tree
 
            ancestors = [('', parent)]
 

	
 
            # Tries to dig for the deepest existing tree
 
            while dirnames:
 
                curdir = dirnames.pop(0)
 
                try:
 
                    dir_id = parent[curdir][1]
 
                except KeyError:
 
                    # put curdir back into dirnames and stops
 
                    dirnames.insert(0, curdir)
 
                    break
 
                else:
 
                    # If found, updates parent
 
                    parent = self.repository._repo[dir_id]
 
                    ancestors.append((curdir, parent))
 
            # Now parent is deepest existing tree and we need to create subtrees
 
            # for dirnames (in reverse order) [this only applies for nodes from added]
 
            new_trees = []
 

	
 
            blob = objects.Blob.from_string(node.content)
 

	
 
            node_path = safe_bytes(node.name)
 
            if dirnames:
 
                # If there are trees which should be created we need to build
 
                # them now (in reverse order)
 
                reversed_dirnames = list(reversed(dirnames))
 
                curtree = objects.Tree()
 
                curtree[node_path] = node.mode, blob.id
 
                new_trees.append(curtree)
 
                for dirname in reversed_dirnames[:-1]:
 
                    newtree = objects.Tree()
 
                    #newtree.add(stat.S_IFDIR, dirname, curtree.id)
 
                    newtree[dirname] = stat.S_IFDIR, curtree.id
 
                    new_trees.append(newtree)
 
                    curtree = newtree
 
                parent[reversed_dirnames[-1]] = stat.S_IFDIR, curtree.id
 
            else:
 
                parent.add(name=node_path, mode=node.mode, hexsha=blob.id)
 

	
 
            new_trees.append(parent)
 
            # Update ancestors
 
            for parent, tree, path in reversed([(a[1], b[1], b[0]) for a, b in
 
                zip(ancestors, ancestors[1:])]
 
            ):
 
                parent[path] = stat.S_IFDIR, tree.id
 
                object_store.add_object(tree)
 

	
 
            object_store.add_object(blob)
 
            for tree in new_trees:
 
                object_store.add_object(tree)
 
        for node in self.removed:
 
            paths = node.path.split('/')
 
            paths = node.path.split(b'/')
 
            tree = commit_tree
 
            trees = [tree]
 
            # Traverse deep into the forest...
 
            for path in paths:
 
                try:
 
                    obj = self.repository._repo[tree[path][1]]
 
                    if isinstance(obj, objects.Tree):
 
                        trees.append(obj)
 
                        tree = obj
 
                except KeyError:
 
                    break
 
            # Cut down the blob and all rotten trees on the way back...
 
            for path, tree in reversed(zip(paths, trees)):
 
                del tree[path]
 
                if tree:
 
                    # This tree still has elements - don't remove it or any
 
                    # of it's parents
 
                    break
 

	
 
        object_store.add_object(commit_tree)
 

	
 
        # Create commit
 
        commit = objects.Commit()
 
        commit.tree = commit_tree.id
 
        commit.parents = [p._commit.id for p in self.parents if p]
 
        commit.author = commit.committer = safe_str(author)
 
        commit.encoding = ENCODING
 
        commit.message = safe_str(message)
 

	
 
        # Compute date
 
        if date is None:
 
            date = time.time()
 
        elif isinstance(date, datetime.datetime):
 
            date = time.mktime(date.timetuple())
 

	
 
        author_time = kwargs.pop('author_time', date)
 
        commit.commit_time = int(date)
 
        commit.author_time = int(author_time)
 
        tz = time.timezone
 
        author_tz = kwargs.pop('author_timezone', tz)
 
        commit.commit_timezone = tz
 
        commit.author_timezone = author_tz
 

	
 
        object_store.add_object(commit)
 

	
 
        ref = 'refs/heads/%s' % branch
 
        # Update vcs repository object & recreate dulwich repo
 
        ref = b'refs/heads/%s' % branch
 
        repo.refs[ref] = commit.id
 

	
 
        # Update vcs repository object & recreate dulwich repo
 
        self.repository.revisions.append(commit.id)
 
        # invalidate parsed refs after commit
 
        self.repository._parsed_refs = self.repository._get_parsed_refs()
 
        tip = self.repository.get_changeset()
 
        self.reset()
 
        return tip
 

	
 
    def _get_missing_trees(self, path, root_tree):
 
        """
 
        Creates missing ``Tree`` objects for the given path.
 

	
 
        :param path: path given as a string. It may be a path to a file node
 
          (i.e. ``foo/bar/baz.txt``) or directory path - in that case it must
 
          end with slash (i.e. ``foo/bar/``).
 
        :param root_tree: ``dulwich.objects.Tree`` object from which we start
 
          traversing (should be commit's root tree)
 
        """
 
        dirpath = posixpath.split(path)[0]
 
        dirs = dirpath.split('/')
 
        if not dirs or dirs == ['']:
 
            return []
 

	
 
        def get_tree_for_dir(tree, dirname):
 
            for name, mode, id in tree.iteritems():
 
                if name == dirname:
 
                    obj = self.repository._repo[id]
 
                    if isinstance(obj, objects.Tree):
 
                        return obj
 
                    else:
 
                        raise RepositoryError("Cannot create directory %s "
 
                            "at tree %s as path is occupied and is not a "
 
                            "Tree" % (dirname, tree))
 
            return None
 

	
 
        trees = []
 
        parent = root_tree
 
        for dirname in dirs:
 
            tree = get_tree_for_dir(parent, dirname)
 
            if tree is None:
 
                tree = objects.Tree()
 
                parent.add(stat.S_IFDIR, dirname, tree.id)
 
                parent = tree
 
            # Always append tree
 
            trees.append(tree)
 
        return trees
kallithea/lib/vcs/backends/git/repository.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    vcs.backends.git.repository
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    Git repository implementation.
 

	
 
    :created_on: Apr 8, 2010
 
    :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
 
"""
 

	
 
import errno
 
import logging
 
import os
 
import re
 
import time
 
import urllib
 
import urllib2
 
from collections import OrderedDict
 

	
 
from dulwich.config import ConfigFile
 
from dulwich.objects import Tag
 
from dulwich.repo import NotGitRepository, Repo
 

	
 
from kallithea.lib.vcs import subprocessio
 
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import (
 
    BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError, TagDoesNotExistError)
 
from kallithea.lib.vcs.utils import date_fromtimestamp, makedate, safe_str, safe_unicode
 
from kallithea.lib.vcs.utils.hgcompat import hg_url, httpbasicauthhandler, httpdigestauthhandler
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.paths import abspath, get_user_home
 

	
 
from .changeset import GitChangeset
 
from .inmemory import GitInMemoryChangeset
 
from .workdir import GitWorkdir
 

	
 

	
 
SHA_PATTERN = re.compile(r'^([0-9a-fA-F]{12}|[0-9a-fA-F]{40})$')
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class GitRepository(BaseRepository):
 
    """
 
    Git repository backend.
 
    """
 
    DEFAULT_BRANCH_NAME = 'master'
 
    scm = 'git'
 

	
 
    def __init__(self, repo_path, create=False, src_url=None,
 
                 update_after_clone=False, bare=False):
 

	
 
        self.path = safe_unicode(abspath(repo_path))
 
        self.repo = self._get_repo(create, src_url, update_after_clone, bare)
 
        self.bare = self.repo.bare
 

	
 
    @property
 
    def _config_files(self):
 
        return [
 
            self.bare and abspath(self.path, 'config')
 
                      or abspath(self.path, '.git', 'config'),
 
             abspath(get_user_home(), '.gitconfig'),
 
         ]
 

	
 
    @property
 
    def _repo(self):
 
        return self.repo
 

	
 
    @property
 
    def head(self):
 
        try:
 
            return self._repo.head()
 
        except KeyError:
 
            return None
 

	
 
    @property
 
    def _empty(self):
 
        """
 
        Checks if repository is empty ie. without any changesets
 
        """
 

	
 
        try:
 
            self.revisions[0]
 
        except (KeyError, IndexError):
 
            return True
 
        return False
 

	
 
    @LazyProperty
 
    def revisions(self):
 
        """
 
        Returns list of revisions' ids, in ascending order.  Being lazy
 
        attribute allows external tools to inject shas from cache.
 
        """
 
        return self._get_all_revisions()
 

	
 
    @classmethod
 
    def _run_git_command(cls, cmd, cwd=None):
 
        """
 
        Runs given ``cmd`` as git command and returns output bytes in a tuple
 
        (stdout, stderr) ... or raise RepositoryError.
 

	
 
        :param cmd: git command to be executed
 
        :param cwd: passed directly to subprocess
 
        """
 
        # need to clean fix GIT_DIR !
 
        gitenv = dict(os.environ)
 
        gitenv.pop('GIT_DIR', None)
 
        gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
 

	
 
        assert isinstance(cmd, list), cmd
 
        cmd = [settings.GIT_EXECUTABLE_PATH, '-c', 'core.quotepath=false'] + cmd
 
        try:
 
            p = subprocessio.SubprocessIOChunker(cmd, cwd=cwd, env=gitenv, shell=False)
 
        except (EnvironmentError, OSError) as err:
 
            # output from the failing process is in str(EnvironmentError)
 
            msg = ("Couldn't run git command %s.\n"
 
                   "Subprocess failed with '%s': %s\n" %
 
                   (cmd, type(err).__name__, err)
 
            ).strip()
 
            log.error(msg)
 
            raise RepositoryError(msg)
 

	
 
        try:
 
            stdout = ''.join(p.output)
 
            stderr = ''.join(p.error)
 
            stdout = b''.join(p.output)
 
            stderr = b''.join(p.error)
 
        finally:
 
            p.close()
 
        # TODO: introduce option to make commands fail if they have any stderr output?
 
        if stderr:
 
            log.debug('stderr from %s:\n%s', cmd, stderr)
 
        else:
 
            log.debug('stderr from %s: None', cmd)
 
        return stdout, stderr
 

	
 
    def run_git_command(self, cmd):
 
        """
 
        Runs given ``cmd`` as git command with cwd set to current repo.
 
        Returns stdout as unicode str ... or raise RepositoryError.
 
        """
 
        cwd = None
 
        if os.path.isdir(self.path):
 
            cwd = self.path
 
        stdout, _stderr = self._run_git_command(cmd, cwd=cwd)
 
        return safe_unicode(stdout)
 

	
 
    @classmethod
 
    def _check_url(cls, url):
 
        """
 
        Function will check given url and try to verify if it's a valid
 
        link. Sometimes it may happened that git will issue basic
 
        auth request that can cause whole API to hang when used from python
 
        or other external calls.
 

	
 
        On failures it'll raise urllib2.HTTPError, exception is also thrown
 
        when the return code is non 200
 
        """
 

	
 
        # check first if it's not an local url
 
        if os.path.isdir(url) or url.startswith('file:'):
 
            return True
 

	
 
        if url.startswith('git://'):
 
            return True
 

	
 
        if '+' in url[:url.find('://')]:
 
            url = url[url.find('+') + 1:]
 

	
 
        handlers = []
 
        url_obj = hg_url(url)
 
        test_uri, authinfo = url_obj.authinfo()
 
        url_obj.passwd = '*****'
 
        cleaned_uri = str(url_obj)
 

	
 
        if not test_uri.endswith('info/refs'):
 
            test_uri = test_uri.rstrip('/') + '/info/refs'
 

	
 
        url_obj.passwd = b'*****'
 
        cleaned_uri = str(url_obj)
 

	
 
        if authinfo:
 
            # create a password manager
 
            passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
 
            passmgr.add_password(*authinfo)
 

	
 
            handlers.extend((httpbasicauthhandler(passmgr),
 
                             httpdigestauthhandler(passmgr)))
 

	
 
        o = urllib2.build_opener(*handlers)
 
        o.addheaders = [('User-Agent', 'git/1.7.8.0')]  # fake some git
 

	
 
        q = {"service": 'git-upload-pack'}
 
        qs = '?%s' % urllib.urlencode(q)
 
        cu = "%s%s" % (test_uri, qs)
 
        req = urllib2.Request(cu, None, {})
 

	
 
        try:
 
            resp = o.open(req)
 
            if resp.code != 200:
 
                raise Exception('Return Code is not 200')
 
        except Exception as e:
 
            # means it cannot be cloned
 
            raise urllib2.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
 

	
 
        # now detect if it's proper git repo
 
        gitdata = resp.read()
 
        if 'service=git-upload-pack' not in gitdata:
 
            raise urllib2.URLError(
 
                "url [%s] does not look like an git" % cleaned_uri)
 

	
 
        return True
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False,
 
                  bare=False):
 
        if create and os.path.exists(self.path):
 
            raise RepositoryError("Location already exist")
 
        if src_url and not create:
 
            raise RepositoryError("Create should be set to True if src_url is "
 
                                  "given (clone operation creates repository)")
 
        try:
 
            if create and src_url:
 
                GitRepository._check_url(src_url)
 
                self.clone(src_url, update_after_clone, bare)
 
                return Repo(self.path)
 
            elif create:
 
                os.makedirs(self.path)
 
                if bare:
 
                    return Repo.init_bare(self.path)
 
                else:
 
                    return Repo.init(self.path)
 
            else:
 
                return Repo(self.path)
 
        except (NotGitRepository, OSError) as err:
 
            raise RepositoryError(err)
 

	
 
    def _get_all_revisions(self):
 
        # we must check if this repo is not empty, since later command
 
        # fails if it is. And it's cheaper to ask than throw the subprocess
 
        # errors
 
        try:
 
            self._repo.head()
 
        except KeyError:
 
            return []
 

	
 
        rev_filter = settings.GIT_REV_FILTER
 
        cmd = ['rev-list', rev_filter, '--reverse', '--date-order']
 
        try:
 
            so = self.run_git_command(cmd)
 
        except RepositoryError:
 
            # Can be raised for empty repositories
 
            return []
 
        return so.splitlines()
 

	
 
    def _get_all_revisions2(self):
 
        # alternate implementation using dulwich
 
        includes = [x[1][0] for x in self._parsed_refs.iteritems()
 
                    if x[1][1] != 'T']
 
                    if x[1][1] != b'T']
 
        return [c.commit.id for c in self._repo.get_walker(include=includes)]
 

	
 
    def _get_revision(self, revision):
 
        """
 
        Given any revision identifier, returns a 40 char string with revision hash.
 
        """
 
        if self._empty:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        if revision in (None, '', 'tip', 'HEAD', 'head', -1):
 
            revision = -1
 

	
 
        if isinstance(revision, int):
 
            try:
 
                return self.revisions[revision]
 
            except IndexError:
 
                msg = ("Revision %s does not exist for %s" % (revision, self))
 
                raise ChangesetDoesNotExistError(msg)
 

	
 
        if isinstance(revision, (str, unicode)):
 
            if revision.isdigit() and (len(revision) < 12 or len(revision) == revision.count('0')):
 
                try:
 
                    return self.revisions[int(revision)]
 
                except IndexError:
 
                    msg = "Revision %r does not exist for %s" % (revision, self)
 
                    raise ChangesetDoesNotExistError(msg)
 

	
 
            # get by branch/tag name
 
            _ref_revision = self._parsed_refs.get(revision)
 
            if _ref_revision:  # and _ref_revision[1] in ['H', 'RH', 'T']:
 
            if _ref_revision:  # and _ref_revision[1] in [b'H', b'RH', b'T']:
 
                return _ref_revision[0]
 

	
 
            if revision in self.revisions:
 
                return revision
 

	
 
            # maybe it's a tag ? we don't have them in self.revisions
 
            if revision in self.tags.values():
 
                return revision
 

	
 
            if SHA_PATTERN.match(revision):
 
                msg = ("Revision %s does not exist for %s" % (revision, self))
 
                raise ChangesetDoesNotExistError(msg)
 

	
 
        raise ChangesetDoesNotExistError("Given revision %r not recognized" % revision)
 

	
 
    def get_ref_revision(self, ref_type, ref_name):
 
        """
 
        Returns ``GitChangeset`` object representing repository's
 
        changeset at the given ``revision``.
 
        """
 
        return self._get_revision(ref_name)
 

	
 
    def _get_archives(self, archive_name='tip'):
 

	
 
        for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
 
            yield {"type": i[0], "extension": i[1], "node": archive_name}
 

	
 
    def _get_url(self, url):
 
        """
 
        Returns normalized url. If schema is not given, would fall to
 
        filesystem (``file:///``) schema.
 
        """
 
        url = safe_str(url)
 
        if url != 'default' and '://' not in url:
 
            url = ':///'.join(('file', url))
 
        return url
 

	
 
    def get_hook_location(self):
 
        """
 
        returns absolute path to location where hooks are stored
 
        """
 
        loc = os.path.join(self.path, 'hooks')
 
        if not self.bare:
 
            loc = os.path.join(self.path, '.git', 'hooks')
 
        return loc
 

	
 
    @LazyProperty
 
    def name(self):
 
        return os.path.basename(self.path)
 

	
 
    @LazyProperty
 
    def last_change(self):
 
        """
 
        Returns last change made on this repository as datetime object
 
        """
 
        return date_fromtimestamp(self._get_mtime(), makedate()[1])
 

	
 
    def _get_mtime(self):
 
        try:
 
            return time.mktime(self.get_changeset().date.timetuple())
 
        except RepositoryError:
 
            idx_loc = '' if self.bare else '.git'
 
            # fallback to filesystem
 
            in_path = os.path.join(self.path, idx_loc, "index")
 
            he_path = os.path.join(self.path, idx_loc, "HEAD")
 
            if os.path.exists(in_path):
 
                return os.stat(in_path).st_mtime
 
            else:
 
                return os.stat(he_path).st_mtime
 

	
 
    @LazyProperty
 
    def description(self):
 
        undefined_description = u'unknown'
 
        _desc = self._repo.get_description()
 
        return safe_unicode(_desc or undefined_description)
 
        return safe_unicode(self._repo.get_description() or b'unknown')
 

	
 
    @LazyProperty
 
    def contact(self):
 
        undefined_contact = u'Unknown'
 
        return undefined_contact
 

	
 
    @property
 
    def branches(self):
 
        if not self.revisions:
 
            return {}
 
        sortkey = lambda ctx: ctx[0]
 
        _branches = [(x[0], x[1][0])
 
                     for x in self._parsed_refs.iteritems() if x[1][1] == 'H']
 
                     for x in self._parsed_refs.iteritems() if x[1][1] == b'H']
 
        return OrderedDict(sorted(_branches, key=sortkey, reverse=False))
 

	
 
    @LazyProperty
 
    def closed_branches(self):
 
        return {}
 

	
 
    @LazyProperty
 
    def tags(self):
 
        return self._get_tags()
 

	
 
    def _get_tags(self):
 
        if not self.revisions:
 
            return {}
 

	
 
        sortkey = lambda ctx: ctx[0]
 
        _tags = [(x[0], x[1][0])
 
                 for x in self._parsed_refs.iteritems() if x[1][1] == 'T']
 
                 for x in self._parsed_refs.iteritems() if x[1][1] == b'T']
 
        return OrderedDict(sorted(_tags, key=sortkey, reverse=True))
 

	
 
    def tag(self, name, user, revision=None, message=None, date=None,
 
            **kwargs):
 
        """
 
        Creates and returns a tag for the given ``revision``.
 

	
 
        :param name: name for new tag
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param revision: changeset id for which new tag would be created
 
        :param message: message of the tag's commit
 
        :param date: date of tag's commit
 

	
 
        :raises TagAlreadyExistError: if tag with same name already exists
 
        """
 
        if name in self.tags:
 
            raise TagAlreadyExistError("Tag %s already exists" % name)
 
        changeset = self.get_changeset(revision)
 
        message = message or "Added tag %s for commit %s" % (name,
 
            changeset.raw_id)
 
        self._repo.refs["refs/tags/%s" % name] = changeset._commit.id
 
        self._repo.refs[b"refs/tags/%s" % name] = changeset._commit.id
 

	
 
        self._parsed_refs = self._get_parsed_refs()
 
        self.tags = self._get_tags()
 
        return changeset
 

	
 
    def remove_tag(self, name, user, message=None, date=None):
 
        """
 
        Removes tag with the given ``name``.
 

	
 
        :param name: name of the tag to be removed
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param message: message of the tag's removal commit
 
        :param date: date of tag's removal commit
 

	
 
        :raises TagDoesNotExistError: if tag with given name does not exists
 
        """
 
        if name not in self.tags:
 
            raise TagDoesNotExistError("Tag %s does not exist" % name)
 
        # self._repo.refs is a DiskRefsContainer, and .path gives the full absolute path of '.git'
 
        tagpath = os.path.join(self._repo.refs.path, 'refs', 'tags', name)
 
        try:
 
            os.remove(tagpath)
 
            self._parsed_refs = self._get_parsed_refs()
 
            self.tags = self._get_tags()
 
        except OSError as e:
 
            raise RepositoryError(e.strerror)
 

	
 
    @LazyProperty
 
    def bookmarks(self):
 
        """
 
        Gets bookmarks for this repository
 
        """
 
        return {}
 

	
 
    @LazyProperty
 
    def _parsed_refs(self):
 
        return self._get_parsed_refs()
 

	
 
    def _get_parsed_refs(self):
 
        # cache the property
 
        _repo = self._repo
 
        refs = _repo.get_refs()
 
        keys = [('refs/heads/', 'H'),
 
                ('refs/remotes/origin/', 'RH'),
 
                ('refs/tags/', 'T')]
 
        keys = [(b'refs/heads/', b'H'),
 
                (b'refs/remotes/origin/', b'RH'),
 
                (b'refs/tags/', b'T')]
 
        _refs = {}
 
        for ref, sha in refs.iteritems():
 
            for k, type_ in keys:
 
                if ref.startswith(k):
 
                    _key = ref[len(k):]
 
                    if type_ == 'T':
 
                    if type_ == b'T':
 
                        obj = _repo.get_object(sha)
 
                        if isinstance(obj, Tag):
 
                            sha = _repo.get_object(sha).object[1]
 
                    _refs[_key] = [sha, type_]
 
                    break
 
        return _refs
 

	
 
    def _heads(self, reverse=False):
 
        refs = self._repo.get_refs()
 
        heads = {}
 

	
 
        for key, val in refs.items():
 
            for ref_key in ['refs/heads/', 'refs/remotes/origin/']:
 
            for ref_key in [b'refs/heads/', b'refs/remotes/origin/']:
 
                if key.startswith(ref_key):
 
                    n = key[len(ref_key):]
 
                    if n not in ['HEAD']:
 
                    if n not in [b'HEAD']:
 
                        heads[n] = val
 

	
 
        return heads if reverse else dict((y, x) for x, y in heads.iteritems())
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns ``GitChangeset`` object representing commit from git repository
 
        at the given revision or head (most recent commit) if None given.
 
        """
 
        if isinstance(revision, GitChangeset):
 
            return revision
 
        revision = self._get_revision(revision)
 
        changeset = GitChangeset(repository=self, revision=revision)
 
        return changeset
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
 
           end_date=None, branch_name=None, reverse=False, max_revisions=None):
 
        """
 
        Returns iterator of ``GitChangeset`` objects from start to end (both
 
        are inclusive), in ascending date order (unless ``reverse`` is set).
 

	
 
        :param start: changeset ID, as str; first returned changeset
 
        :param end: changeset ID, as str; last returned changeset
 
        :param start_date: if specified, changesets with commit date less than
 
          ``start_date`` would be filtered out from returned set
 
        :param end_date: if specified, changesets with commit date greater than
 
          ``end_date`` would be filtered out from returned set
 
        :param branch_name: if specified, changesets not reachable from given
 
          branch would be filtered out from returned set
 
        :param reverse: if ``True``, returned generator would be reversed
 
          (meaning that returned changesets would have descending date order)
 

	
 
        :raise BranchDoesNotExistError: If given ``branch_name`` does not
 
            exist.
 
        :raise ChangesetDoesNotExistError: If changeset for given ``start`` or
 
          ``end`` could not be found.
 

	
 
        """
 
        if branch_name and branch_name not in self.branches:
 
            raise BranchDoesNotExistError("Branch '%s' not found"
 
                                          % branch_name)
 
        # actually we should check now if it's not an empty repo to not spaw
 
        # subprocess commands
 
        if self._empty:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        # %H at format means (full) commit hash, initial hashes are retrieved
 
        # in ascending date order
 
        cmd = ['log', '--date-order', '--reverse', '--pretty=format:%H']
 
        if max_revisions:
 
            cmd += ['--max-count=%s' % max_revisions]
 
        if start_date:
 
            cmd += ['--since', start_date.strftime('%m/%d/%y %H:%M:%S')]
 
        if end_date:
 
            cmd += ['--until', end_date.strftime('%m/%d/%y %H:%M:%S')]
 
        if branch_name:
 
            cmd.append(branch_name)
 
        else:
 
            cmd.append(settings.GIT_REV_FILTER)
 

	
 
        revs = self.run_git_command(cmd).splitlines()
 
        start_pos = 0
 
        end_pos = len(revs)
 
        if start:
 
            _start = self._get_revision(start)
 
            try:
 
                start_pos = revs.index(_start)
 
            except ValueError:
 
                pass
 

	
 
        if end is not None:
 
            _end = self._get_revision(end)
 
            try:
 
                end_pos = revs.index(_end)
 
            except ValueError:
 
                pass
 

	
 
        if None not in [start, end] and start_pos > end_pos:
 
            raise RepositoryError('start cannot be after end')
 

	
 
        if end_pos is not None:
 
            end_pos += 1
 

	
 
        revs = revs[start_pos:end_pos]
 
        if reverse:
 
            revs.reverse()
 

	
 
        return CollectionGenerator(self, revs)
 

	
 
    def get_diff(self, rev1, rev2, path=None, ignore_whitespace=False,
 
                 context=3):
 
        """
 
        Returns (git like) *diff*, as plain bytes text. Shows changes
 
        introduced by ``rev2`` since ``rev1``.
 

	
 
        :param rev1: Entry point from which diff is shown. Can be
 
          ``self.EMPTY_CHANGESET`` - in this case, patch showing all
 
          the changes since empty state of the repository until ``rev2``
 
        :param rev2: Until which revision changes should be shown.
 
        :param ignore_whitespace: If set to ``True``, would not show whitespace
 
          changes. Defaults to ``False``.
 
        :param context: How many lines before/after changed lines should be
 
          shown. Defaults to ``3``. Due to limitations in Git, if
 
          value passed-in is greater than ``2**31-1``
 
          (``2147483647``), it will be set to ``2147483647``
 
          instead. If negative value is passed-in, it will be set to
 
          ``0`` instead.
 
        """
 

	
 
        # Git internally uses a signed long int for storing context
 
        # size (number of lines to show before and after the
 
        # differences). This can result in integer overflow, so we
 
        # ensure the requested context is smaller by one than the
 
        # number that would cause the overflow. It is highly unlikely
 
        # that a single file will contain that many lines, so this
 
        # kind of change should not cause any realistic consequences.
 
        overflowed_long_int = 2**31
 

	
 
        if context >= overflowed_long_int:
 
            context = overflowed_long_int - 1
 

	
 
        # Negative context values make no sense, and will result in
 
        # errors. Ensure this does not happen.
 
        if context < 0:
 
            context = 0
 

	
 
        flags = ['-U%s' % context, '--full-index', '--binary', '-p', '-M', '--abbrev=40']
 
        if ignore_whitespace:
 
            flags.append('-w')
 

	
 
        if hasattr(rev1, 'raw_id'):
 
            rev1 = getattr(rev1, 'raw_id')
 

	
 
        if hasattr(rev2, 'raw_id'):
 
            rev2 = getattr(rev2, 'raw_id')
 

	
 
        if rev1 == self.EMPTY_CHANGESET:
 
            rev2 = self.get_changeset(rev2).raw_id
 
            cmd = ['show'] + flags + [rev2]
 
        else:
 
            rev1 = self.get_changeset(rev1).raw_id
 
            rev2 = self.get_changeset(rev2).raw_id
 
            cmd = ['diff'] + flags + [rev1, rev2]
 

	
 
        if path:
 
            cmd += ['--', path]
 

	
 
        stdout, stderr = self._run_git_command(cmd, cwd=self.path)
 
        # If we used 'show' command, strip first few lines (until actual diff
 
        # starts)
 
        if rev1 == self.EMPTY_CHANGESET:
 
            parts = stdout.split('\ndiff ', 1)
 
            parts = stdout.split(b'\ndiff ', 1)
 
            if len(parts) > 1:
 
                stdout = 'diff ' + parts[1]
 
                stdout = b'diff ' + parts[1]
 
        return stdout
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        """
 
        Returns ``GitInMemoryChangeset`` object for this repository.
 
        """
 
        return GitInMemoryChangeset(self)
 

	
 
    def clone(self, url, update_after_clone=True, bare=False):
 
        """
 
        Tries to clone changes from external location.
 

	
 
        :param update_after_clone: If set to ``False``, git won't checkout
 
          working directory
 
        :param bare: If set to ``True``, repository would be cloned into
 
          *bare* git repository (no working directory at all).
 
        """
 
        url = self._get_url(url)
 
        cmd = ['clone', '-q']
 
        if bare:
 
            cmd.append('--bare')
 
        elif not update_after_clone:
 
            cmd.append('--no-checkout')
 
        cmd += ['--', url, self.path]
 
        # If error occurs run_git_command raises RepositoryError already
 
        self.run_git_command(cmd)
 

	
 
    def pull(self, url):
 
        """
 
        Tries to pull changes from external location.
 
        """
 
        url = self._get_url(url)
 
        cmd = ['pull', '--ff-only', url]
 
        # If error occurs run_git_command raises RepositoryError already
 
        self.run_git_command(cmd)
 

	
 
    def fetch(self, url):
 
        """
 
        Tries to pull changes from external location.
 
        """
 
        url = self._get_url(url)
 
        so = self.run_git_command(['ls-remote', '-h', url])
 
        cmd = ['fetch', url, '--']
 
        for line in (x for x in so.splitlines()):
 
            sha, ref = line.split('\t')
 
            cmd.append('+%s:%s' % (ref, ref))
 
        self.run_git_command(cmd)
 

	
 
    def _update_server_info(self):
 
        """
 
        runs gits update-server-info command in this repo instance
 
        """
 
        from dulwich.server import update_server_info
 
        try:
 
            update_server_info(self._repo)
 
        except OSError as e:
 
            if e.errno not in [errno.ENOENT, errno.EROFS]:
 
                raise
 
            # Workaround for dulwich crashing on for example its own dulwich/tests/data/repos/simple_merge.git/info/refs.lock
 
            log.error('Ignoring %s running update-server-info: %s', type(e).__name__, e)
 

	
 
    @LazyProperty
 
    def workdir(self):
 
        """
 
        Returns ``Workdir`` instance for this repository.
 
        """
 
        return GitWorkdir(self)
 

	
 
    def get_config_value(self, section, name, config_file=None):
 
        """
 
        Returns configuration value for a given [``section``] and ``name``.
 

	
 
        :param section: Section we want to retrieve value from
 
        :param name: Name of configuration we want to retrieve
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        if config_file is None:
 
            config_file = []
 
        elif isinstance(config_file, basestring):
 
            config_file = [config_file]
 

	
 
        def gen_configs():
 
            for path in config_file + self._config_files:
 
                try:
 
                    yield ConfigFile.from_path(path)
 
                except (IOError, OSError, ValueError):
 
                    continue
 

	
 
        for config in gen_configs():
 
            try:
 
                return config.get(section, name)
 
            except KeyError:
 
                continue
 
        return None
 

	
 
    def get_user_name(self, config_file=None):
 
        """
 
        Returns user's name from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        return self.get_config_value('user', 'name', config_file)
 

	
 
    def get_user_email(self, config_file=None):
 
        """
 
        Returns user's email from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        return self.get_config_value('user', 'email', config_file)
kallithea/lib/vcs/backends/git/workdir.py
Show inline comments
 
import re
 

	
 
from kallithea.lib.vcs.backends.base import BaseWorkdir
 
from kallithea.lib.vcs.exceptions import BranchDoesNotExistError, RepositoryError
 

	
 

	
 
class GitWorkdir(BaseWorkdir):
 

	
 
    def get_branch(self):
 
        headpath = self.repository._repo.refs.refpath('HEAD')
 
        headpath = self.repository._repo.refs.refpath(b'HEAD')
 
        try:
 
            content = open(headpath).read()
 
            match = re.match(r'^ref: refs/heads/(?P<branch>.+)\n$', content)
 
            if match:
 
                return match.groupdict()['branch']
 
            else:
 
                raise RepositoryError("Couldn't compute workdir's branch")
 
        except IOError:
 
            # Try naive way...
 
            raise RepositoryError("Couldn't compute workdir's branch")
 

	
 
    def get_changeset(self):
 
        wk_dir_id = self.repository._repo.refs.as_dict().get('HEAD')
 
        wk_dir_id = self.repository._repo.refs.as_dict().get(b'HEAD')
 
        return self.repository.get_changeset(wk_dir_id)
 

	
 
    def checkout_branch(self, branch=None):
 
        if branch is None:
 
            branch = self.repository.DEFAULT_BRANCH_NAME
 
        if branch not in self.repository.branches:
 
            raise BranchDoesNotExistError
 
        self.repository.run_git_command(['checkout', branch])
kallithea/lib/vcs/backends/hg/changeset.py
Show inline comments
 
@@ -5,406 +5,406 @@ from kallithea.lib.vcs.backends.base imp
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError, ChangesetError, ImproperArchiveTypeError, NodeDoesNotExistError, VCSError
 
from kallithea.lib.vcs.nodes import (
 
    AddedFileNodesGenerator, ChangedFileNodesGenerator, DirNode, FileNode, NodeKind, RemovedFileNodesGenerator, RootNode, SubModuleNode)
 
from kallithea.lib.vcs.utils import date_fromtimestamp, safe_str, safe_unicode
 
from kallithea.lib.vcs.utils.hgcompat import archival, hex, obsutil
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.paths import get_dirs_for_path
 

	
 

	
 
class MercurialChangeset(BaseChangeset):
 
    """
 
    Represents state of the repository at a revision.
 
    """
 

	
 
    def __init__(self, repository, revision):
 
        self.repository = repository
 
        assert isinstance(revision, basestring), repr(revision)
 
        self.raw_id = revision
 
        self._ctx = repository._repo[revision]
 
        self.revision = self._ctx._rev
 
        self.nodes = {}
 

	
 
    @LazyProperty
 
    def tags(self):
 
        return [safe_unicode(tag) for tag in self._ctx.tags()]
 

	
 
    @LazyProperty
 
    def branch(self):
 
        return safe_unicode(self._ctx.branch())
 

	
 
    @LazyProperty
 
    def branches(self):
 
        return [safe_unicode(self._ctx.branch())]
 

	
 
    @LazyProperty
 
    def closesbranch(self):
 
        return self._ctx.closesbranch()
 

	
 
    @LazyProperty
 
    def obsolete(self):
 
        return self._ctx.obsolete()
 

	
 
    @LazyProperty
 
    def bumped(self):
 
        return self._ctx.phasedivergent()
 

	
 
    @LazyProperty
 
    def divergent(self):
 
        return self._ctx.contentdivergent()
 

	
 
    @LazyProperty
 
    def extinct(self):
 
        return self._ctx.extinct()
 

	
 
    @LazyProperty
 
    def unstable(self):
 
        return self._ctx.orphan()
 

	
 
    @LazyProperty
 
    def phase(self):
 
        if(self._ctx.phase() == 1):
 
            return 'Draft'
 
        elif(self._ctx.phase() == 2):
 
            return 'Secret'
 
        else:
 
            return ''
 

	
 
    @LazyProperty
 
    def successors(self):
 
        successors = obsutil.successorssets(self._ctx._repo, self._ctx.node(), closest=True)
 
        if successors:
 
            # flatten the list here handles both divergent (len > 1)
 
            # and the usual case (len = 1)
 
            successors = [hex(n)[:12] for sub in successors for n in sub if n != self._ctx.node()]
 

	
 
        return successors
 

	
 
    @LazyProperty
 
    def predecessors(self):
 
        return [hex(n)[:12] for n in obsutil.closestpredecessors(self._ctx._repo, self._ctx.node())]
 

	
 
    @LazyProperty
 
    def bookmarks(self):
 
        return [safe_unicode(bookmark) for bookmark in self._ctx.bookmarks()]
 

	
 
    @LazyProperty
 
    def message(self):
 
        return safe_unicode(self._ctx.description())
 

	
 
    @LazyProperty
 
    def committer(self):
 
        return safe_unicode(self.author)
 

	
 
    @LazyProperty
 
    def author(self):
 
        return safe_unicode(self._ctx.user())
 

	
 
    @LazyProperty
 
    def date(self):
 
        return date_fromtimestamp(*self._ctx.date())
 

	
 
    @LazyProperty
 
    def _timestamp(self):
 
        return self._ctx.date()[0]
 

	
 
    @LazyProperty
 
    def status(self):
 
        """
 
        Returns modified, added, removed, deleted files for current changeset
 
        """
 
        return self.repository._repo.status(self._ctx.p1().node(),
 
                                            self._ctx.node())
 

	
 
    @LazyProperty
 
    def _file_paths(self):
 
        return list(self._ctx)
 

	
 
    @LazyProperty
 
    def _dir_paths(self):
 
        p = list(set(get_dirs_for_path(*self._file_paths)))
 
        p.insert(0, '')
 
        return p
 

	
 
    @LazyProperty
 
    def _paths(self):
 
        return self._dir_paths + self._file_paths
 

	
 
    @LazyProperty
 
    def id(self):
 
        if self.last:
 
            return u'tip'
 
        return self.short_id
 

	
 
    @LazyProperty
 
    def short_id(self):
 
        return self.raw_id[:12]
 

	
 
    @LazyProperty
 
    def parents(self):
 
        """
 
        Returns list of parents changesets.
 
        """
 
        return [self.repository.get_changeset(parent.rev())
 
                for parent in self._ctx.parents() if parent.rev() >= 0]
 

	
 
    @LazyProperty
 
    def children(self):
 
        """
 
        Returns list of children changesets.
 
        """
 
        return [self.repository.get_changeset(child.rev())
 
                for child in self._ctx.children() if child.rev() >= 0]
 

	
 
    def next(self, branch=None):
 
        if branch and self.branch != branch:
 
            raise VCSError('Branch option used on changeset not belonging '
 
                           'to that branch')
 

	
 
        cs = self
 
        while True:
 
            try:
 
                next_ = cs.repository.revisions.index(cs.raw_id) + 1
 
                next_rev = cs.repository.revisions[next_]
 
            except IndexError:
 
                raise ChangesetDoesNotExistError
 
            cs = cs.repository.get_changeset(next_rev)
 

	
 
            if not branch or branch == cs.branch:
 
                return cs
 

	
 
    def prev(self, branch=None):
 
        if branch and self.branch != branch:
 
            raise VCSError('Branch option used on changeset not belonging '
 
                           'to that branch')
 

	
 
        cs = self
 
        while True:
 
            try:
 
                prev_ = cs.repository.revisions.index(cs.raw_id) - 1
 
                if prev_ < 0:
 
                    raise IndexError
 
                prev_rev = cs.repository.revisions[prev_]
 
            except IndexError:
 
                raise ChangesetDoesNotExistError
 
            cs = cs.repository.get_changeset(prev_rev)
 

	
 
            if not branch or branch == cs.branch:
 
                return cs
 

	
 
    def diff(self):
 
        # Only used to feed diffstat
 
        return ''.join(self._ctx.diff())
 
        return b''.join(self._ctx.diff())
 

	
 
    def _fix_path(self, path):
 
        """
 
        Paths are stored without trailing slash so we need to get rid off it if
 
        needed. Also mercurial keeps filenodes as str so we need to decode
 
        from unicode to str
 
        """
 
        if path.endswith('/'):
 
            path = path.rstrip('/')
 

	
 
        return safe_str(path)
 

	
 
    def _get_kind(self, path):
 
        path = self._fix_path(path)
 
        if path in self._file_paths:
 
            return NodeKind.FILE
 
        elif path in self._dir_paths:
 
            return NodeKind.DIR
 
        else:
 
            raise ChangesetError("Node does not exist at the given path '%s'"
 
                % (path))
 

	
 
    def _get_filectx(self, path):
 
        path = self._fix_path(path)
 
        if self._get_kind(path) != NodeKind.FILE:
 
            raise ChangesetError("File does not exist for revision %s at "
 
                " '%s'" % (self.raw_id, path))
 
        return self._ctx.filectx(path)
 

	
 
    def _extract_submodules(self):
 
        """
 
        returns a dictionary with submodule information from substate file
 
        of hg repository
 
        """
 
        return self._ctx.substate
 

	
 
    def get_file_mode(self, path):
 
        """
 
        Returns stat mode of the file at the given ``path``.
 
        """
 
        fctx = self._get_filectx(path)
 
        if 'x' in fctx.flags():
 
        if b'x' in fctx.flags():
 
            return 0o100755
 
        else:
 
            return 0o100644
 

	
 
    def get_file_content(self, path):
 
        """
 
        Returns content of the file at given ``path``.
 
        """
 
        fctx = self._get_filectx(path)
 
        return fctx.data()
 

	
 
    def get_file_size(self, path):
 
        """
 
        Returns size of the file at given ``path``.
 
        """
 
        fctx = self._get_filectx(path)
 
        return fctx.size()
 

	
 
    def get_file_changeset(self, path):
 
        """
 
        Returns last commit of the file at the given ``path``.
 
        """
 
        return self.get_file_history(path, limit=1)[0]
 

	
 
    def get_file_history(self, path, limit=None):
 
        """
 
        Returns history of file as reversed list of ``Changeset`` objects for
 
        which file at given ``path`` has been modified.
 
        """
 
        fctx = self._get_filectx(path)
 
        hist = []
 
        cnt = 0
 
        for cs in reversed([x for x in fctx.filelog()]):
 
            cnt += 1
 
            hist.append(hex(fctx.filectx(cs).node()))
 
            if limit is not None and cnt == limit:
 
                break
 

	
 
        return [self.repository.get_changeset(node) for node in hist]
 

	
 
    def get_file_annotate(self, path):
 
        """
 
        Returns a generator of four element tuples with
 
            lineno, sha, changeset lazy loader and line
 
        """
 
        annotations = self._get_filectx(path).annotate()
 
        annotation_lines = [(annotateline.fctx, annotateline.text) for annotateline in annotations]
 
        for i, (fctx, l) in enumerate(annotation_lines):
 
            sha = fctx.hex()
 
            yield (i + 1, sha, lambda sha=sha, l=l: self.repository.get_changeset(sha), l)
 

	
 
    def fill_archive(self, stream=None, kind='tgz', prefix=None,
 
                     subrepos=False):
 
        """
 
        Fills up given stream.
 

	
 
        :param stream: file like object.
 
        :param kind: one of following: ``zip``, ``tgz`` or ``tbz2``.
 
            Default: ``tgz``.
 
        :param prefix: name of root directory in archive.
 
            Default is repository name and changeset's raw_id joined with dash
 
            (``repo-tip.<KIND>``).
 
        :param subrepos: include subrepos in this archive.
 

	
 
        :raise ImproperArchiveTypeError: If given kind is wrong.
 
        :raise VcsError: If given stream is None
 
        """
 
        allowed_kinds = settings.ARCHIVE_SPECS
 
        if kind not in allowed_kinds:
 
            raise ImproperArchiveTypeError('Archive kind not supported use one'
 
                'of %s' % ' '.join(allowed_kinds))
 

	
 
        if stream is None:
 
            raise VCSError('You need to pass in a valid stream for filling'
 
                           ' with archival data')
 

	
 
        if prefix is None:
 
            prefix = '%s-%s' % (self.repository.name, self.short_id)
 
        elif prefix.startswith('/'):
 
            raise VCSError("Prefix cannot start with leading slash")
 
        elif prefix.strip() == '':
 
            raise VCSError("Prefix cannot be empty")
 

	
 
        archival.archive(self.repository._repo, stream, self.raw_id,
 
                         kind, prefix=prefix, subrepos=subrepos)
 

	
 
    def get_nodes(self, path):
 
        """
 
        Returns combined ``DirNode`` and ``FileNode`` objects list representing
 
        state of changeset at the given ``path``. If node at the given ``path``
 
        is not instance of ``DirNode``, ChangesetError would be raised.
 
        """
 

	
 
        if self._get_kind(path) != NodeKind.DIR:
 
            raise ChangesetError("Directory does not exist for revision %s at "
 
                " '%s'" % (self.revision, path))
 
        path = self._fix_path(path)
 

	
 
        filenodes = [FileNode(f, changeset=self) for f in self._file_paths
 
            if os.path.dirname(f) == path]
 
        dirs = path == '' and '' or [d for d in self._dir_paths
 
            if d and posixpath.dirname(d) == path]
 
        dirnodes = [DirNode(d, changeset=self) for d in dirs
 
            if os.path.dirname(d) == path]
 

	
 
        als = self.repository.alias
 
        for k, vals in self._extract_submodules().iteritems():
 
            #vals = url,rev,type
 
            loc = vals[0]
 
            cs = vals[1]
 
            dirnodes.append(SubModuleNode(k, url=loc, changeset=cs,
 
                                          alias=als))
 
        nodes = dirnodes + filenodes
 
        for node in nodes:
 
            self.nodes[node.path] = node
 
        nodes.sort()
 
        return nodes
 

	
 
    def get_node(self, path):
 
        """
 
        Returns ``Node`` object from the given ``path``. If there is no node at
 
        the given ``path``, ``ChangesetError`` would be raised.
 
        """
 
        path = self._fix_path(path)
 
        if path not in self.nodes:
 
            if path in self._file_paths:
 
                node = FileNode(path, changeset=self)
 
            elif path in self._dir_paths or path in self._dir_paths:
 
                if path == '':
 
                    node = RootNode(changeset=self)
 
                else:
 
                    node = DirNode(path, changeset=self)
 
            else:
 
                raise NodeDoesNotExistError("There is no file nor directory "
 
                    "at the given path: '%s' at revision %s"
 
                    % (path, self.short_id))
 
            # cache node
 
            self.nodes[path] = node
 
        return self.nodes[path]
 

	
 
    @LazyProperty
 
    def affected_files(self):
 
        """
 
        Gets a fast accessible file changes for given changeset
 
        """
 
        return self._ctx.files()
 

	
 
    @property
 
    def added(self):
 
        """
 
        Returns list of added ``FileNode`` objects.
 
        """
 
        return AddedFileNodesGenerator([n for n in self.status[1]], self)
 

	
 
    @property
 
    def changed(self):
 
        """
 
        Returns list of modified ``FileNode`` objects.
 
        """
 
        return ChangedFileNodesGenerator([n for n in self.status[0]], self)
 

	
 
    @property
 
    def removed(self):
 
        """
 
        Returns list of removed ``FileNode`` objects.
 
        """
 
        return RemovedFileNodesGenerator([n for n in self.status[2]], self)
 

	
 
    @LazyProperty
 
    def extra(self):
 
        return self._ctx.extra()
kallithea/lib/vcs/backends/hg/inmemory.py
Show inline comments
 
import datetime
 

	
 
from kallithea.lib.vcs.backends.base import BaseInMemoryChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils import safe_bytes
 
from kallithea.lib.vcs.utils.hgcompat import hex, memctx, memfilectx
 

	
 

	
 
class MercurialInMemoryChangeset(BaseInMemoryChangeset):
 

	
 
    def commit(self, message, author, parents=None, branch=None, date=None,
 
            **kwargs):
 
        """
 
        Performs in-memory commit (doesn't check workdir in any way) and
 
        returns newly created ``Changeset``. Updates repository's
 
        ``revisions``.
 

	
 
        :param message: message of the commit
 
        :param author: full username, i.e. "Joe Doe <joe.doe@example.com>"
 
        :param parents: single parent or sequence of parents from which commit
 
          would be derived
 
        :param date: ``datetime.datetime`` instance. Defaults to
 
          ``datetime.datetime.now()``.
 
        :param branch: branch name, as string. If none given, default backend's
 
          branch would be used.
 

	
 
        :raises ``CommitError``: if any error occurs while committing
 
        """
 
        self.check_integrity(parents)
 

	
 
        from .repository import MercurialRepository
 
        if not isinstance(message, unicode) or not isinstance(author, unicode):
 
            raise RepositoryError('Given message and author needs to be '
 
                                  'an <unicode> instance got %r & %r instead'
 
                                  % (type(message), type(author)))
 

	
 
        if branch is None:
 
            branch = MercurialRepository.DEFAULT_BRANCH_NAME
 
        kwargs['branch'] = branch
 
        kwargs[b'branch'] = branch
 

	
 
        def filectxfn(_repo, memctx, path):
 
            """
 
            Marks given path as added/changed/removed in a given _repo. This is
 
            for internal mercurial commit function.
 
            """
 

	
 
            # check if this path is removed
 
            if path in (node.path for node in self.removed):
 
                return None
 

	
 
            # check if this path is added
 
            for node in self.added:
 
                if node.path == path:
 
                    return memfilectx(_repo, memctx, path=node.path,
 
                        data=node.content,
 
                        islink=False,
 
                        isexec=node.is_executable,
 
                        copysource=False)
 

	
 
            # or changed
 
            for node in self.changed:
 
                if node.path == path:
 
                    return memfilectx(_repo, memctx, path=node.path,
 
                        data=node.content,
 
                        islink=False,
 
                        isexec=node.is_executable,
 
                        copysource=False)
 

	
 
            raise RepositoryError("Given path haven't been marked as added,"
 
                                  "changed or removed (%s)" % path)
 

	
 
        parents = [None, None]
 
        for i, parent in enumerate(self.parents):
 
            if parent is not None:
 
                parents[i] = parent._ctx.node()
 

	
 
        if date and isinstance(date, datetime.datetime):
 
            date = date.strftime('%a, %d %b %Y %H:%M:%S')
 

	
 
        commit_ctx = memctx(repo=self.repository._repo,
 
            parents=parents,
 
            text='',
 
            text=b'',
 
            files=self.get_paths(),
 
            filectxfn=filectxfn,
 
            user=author,
 
            date=date,
 
            extra=kwargs)
 

	
 
        # injecting given _repo params
 
        commit_ctx._text = safe_bytes(message)
 
        commit_ctx._user = safe_bytes(author)
 
        commit_ctx._date = date
 

	
 
        # TODO: Catch exceptions!
 
        n = self.repository._repo.commitctx(commit_ctx)
 
        # Returns mercurial node
 
        self._commit_ctx = commit_ctx  # For reference
 
        # Update vcs repository object & recreate mercurial _repo
 
        # new_ctx = self.repository._repo[node]
 
        # new_tip = self.repository.get_changeset(new_ctx.hex())
 
        new_id = hex(n)
 
        self.repository.revisions.append(new_id)
 
        self._repo = self.repository._get_repo(create=False)
 
        self.repository.branches = self.repository._get_branches()
 
        tip = self.repository.get_changeset()
 
        self.reset()
 
        return tip
kallithea/lib/vcs/backends/hg/repository.py
Show inline comments
 
@@ -32,584 +32,581 @@ from .workdir import MercurialWorkdir
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class MercurialRepository(BaseRepository):
 
    """
 
    Mercurial repository backend
 
    """
 
    DEFAULT_BRANCH_NAME = 'default'
 
    scm = 'hg'
 

	
 
    def __init__(self, repo_path, create=False, baseui=None, src_url=None,
 
                 update_after_clone=False):
 
        """
 
        Raises RepositoryError if repository could not be find at the given
 
        ``repo_path``.
 

	
 
        :param repo_path: local path of the repository
 
        :param create=False: if set to True, would try to create repository if
 
           it does not exist rather than raising exception
 
        :param baseui=None: user data
 
        :param src_url=None: would try to clone repository from given location
 
        :param update_after_clone=False: sets update of working copy after
 
          making a clone
 
        """
 

	
 
        if not isinstance(repo_path, str):
 
            raise VCSError('Mercurial backend requires repository path to '
 
                           'be instance of <str> got %s instead' %
 
                           type(repo_path))
 

	
 
        self.path = abspath(repo_path)
 
        self.baseui = baseui or ui.ui()
 
        # We've set path and ui, now we can set _repo itself
 
        self._repo = self._get_repo(create, src_url, update_after_clone)
 

	
 
    @property
 
    def _empty(self):
 
        """
 
        Checks if repository is empty ie. without any changesets
 
        """
 
        # TODO: Following raises errors when using InMemoryChangeset...
 
        # return len(self._repo.changelog) == 0
 
        return len(self.revisions) == 0
 

	
 
    @LazyProperty
 
    def revisions(self):
 
        """
 
        Returns list of revisions' ids, in ascending order.  Being lazy
 
        attribute allows external tools to inject shas from cache.
 
        """
 
        return self._get_all_revisions()
 

	
 
    @LazyProperty
 
    def name(self):
 
        return os.path.basename(self.path)
 

	
 
    @LazyProperty
 
    def branches(self):
 
        return self._get_branches()
 

	
 
    @LazyProperty
 
    def closed_branches(self):
 
        return self._get_branches(normal=False, closed=True)
 

	
 
    @LazyProperty
 
    def allbranches(self):
 
        """
 
        List all branches, including closed branches.
 
        """
 
        return self._get_branches(closed=True)
 

	
 
    def _get_branches(self, normal=True, closed=False):
 
        """
 
        Gets branches for this repository
 
        Returns only not closed branches by default
 

	
 
        :param closed: return also closed branches for mercurial
 
        :param normal: return also normal branches
 
        """
 

	
 
        if self._empty:
 
            return {}
 

	
 
        bt = OrderedDict()
 
        for bn, _heads, tip, isclosed in sorted(self._repo.branchmap().iterbranches()):
 
            if isclosed:
 
                if closed:
 
                    bt[safe_unicode(bn)] = hex(tip)
 
            else:
 
                if normal:
 
                    bt[safe_unicode(bn)] = hex(tip)
 

	
 
        return bt
 

	
 
    @LazyProperty
 
    def tags(self):
 
        """
 
        Gets tags for this repository
 
        """
 
        return self._get_tags()
 

	
 
    def _get_tags(self):
 
        if self._empty:
 
            return {}
 

	
 
        return OrderedDict(sorted(
 
            ((safe_unicode(n), hex(h)) for n, h in self._repo.tags().items()),
 
            reverse=True,
 
            key=lambda x: x[0],  # sort by name
 
        ))
 

	
 
    def tag(self, name, user, revision=None, message=None, date=None,
 
            **kwargs):
 
        """
 
        Creates and returns a tag for the given ``revision``.
 

	
 
        :param name: name for new tag
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param revision: changeset id for which new tag would be created
 
        :param message: message of the tag's commit
 
        :param date: date of tag's commit
 

	
 
        :raises TagAlreadyExistError: if tag with same name already exists
 
        """
 
        if name in self.tags:
 
            raise TagAlreadyExistError("Tag %s already exists" % name)
 
        changeset = self.get_changeset(revision)
 
        local = kwargs.setdefault('local', False)
 

	
 
        if message is None:
 
            message = "Added tag %s for changeset %s" % (name,
 
                changeset.short_id)
 

	
 
        if date is None:
 
            date = datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S')
 

	
 
        try:
 
            tag(self._repo, name, changeset._ctx.node(), message, local, user, date)
 
        except Abort as e:
 
            raise RepositoryError(e.message)
 

	
 
        # Reinitialize tags
 
        self.tags = self._get_tags()
 
        tag_id = self.tags[name]
 

	
 
        return self.get_changeset(revision=tag_id)
 

	
 
    def remove_tag(self, name, user, message=None, date=None):
 
        """
 
        Removes tag with the given ``name``.
 

	
 
        :param name: name of the tag to be removed
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param message: message of the tag's removal commit
 
        :param date: date of tag's removal commit
 

	
 
        :raises TagDoesNotExistError: if tag with given name does not exists
 
        """
 
        if name not in self.tags:
 
            raise TagDoesNotExistError("Tag %s does not exist" % name)
 
        if message is None:
 
            message = "Removed tag %s" % name
 
        if date is None:
 
            date = datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S')
 
        local = False
 

	
 
        try:
 
            tag(self._repo, name, nullid, message, local, user, date)
 
            self.tags = self._get_tags()
 
        except Abort as e:
 
            raise RepositoryError(e.message)
 

	
 
    @LazyProperty
 
    def bookmarks(self):
 
        """
 
        Gets bookmarks for this repository
 
        """
 
        return self._get_bookmarks()
 

	
 
    def _get_bookmarks(self):
 
        if self._empty:
 
            return {}
 

	
 
        return OrderedDict(sorted(
 
            ((safe_unicode(n), hex(h)) for n, h in self._repo._bookmarks.items()),
 
            reverse=True,
 
            key=lambda x: x[0],  # sort by name
 
        ))
 

	
 
    def _get_all_revisions(self):
 

	
 
        return [self._repo[x].hex() for x in self._repo.filtered('visible').changelog.revs()]
 
        return [self._repo[x].hex() for x in self._repo.filtered(b'visible').changelog.revs()]
 

	
 
    def get_diff(self, rev1, rev2, path='', ignore_whitespace=False,
 
                  context=3):
 
        """
 
        Returns (git like) *diff*, as plain text. Shows changes introduced by
 
        ``rev2`` since ``rev1``.
 

	
 
        :param rev1: Entry point from which diff is shown. Can be
 
          ``self.EMPTY_CHANGESET`` - in this case, patch showing all
 
          the changes since empty state of the repository until ``rev2``
 
        :param rev2: Until which revision changes should be shown.
 
        :param ignore_whitespace: If set to ``True``, would not show whitespace
 
          changes. Defaults to ``False``.
 
        :param context: How many lines before/after changed lines should be
 
          shown. Defaults to ``3``. If negative value is passed-in, it will be
 
          set to ``0`` instead.
 
        """
 

	
 
        # Negative context values make no sense, and will result in
 
        # errors. Ensure this does not happen.
 
        if context < 0:
 
            context = 0
 

	
 
        if hasattr(rev1, 'raw_id'):
 
            rev1 = getattr(rev1, 'raw_id')
 

	
 
        if hasattr(rev2, 'raw_id'):
 
            rev2 = getattr(rev2, 'raw_id')
 

	
 
        # Check if given revisions are present at repository (may raise
 
        # ChangesetDoesNotExistError)
 
        if rev1 != self.EMPTY_CHANGESET:
 
            self.get_changeset(rev1)
 
        self.get_changeset(rev2)
 
        if path:
 
            file_filter = match_exact(path)
 
        else:
 
            file_filter = None
 

	
 
        return ''.join(patch.diff(self._repo, rev1, rev2, match=file_filter,
 
        return b''.join(patch.diff(self._repo, rev1, rev2, match=file_filter,
 
                          opts=diffopts(git=True,
 
                                        showfunc=True,
 
                                        ignorews=ignore_whitespace,
 
                                        context=context)))
 

	
 
    @classmethod
 
    def _check_url(cls, url, repoui=None):
 
        """
 
        Function will check given url and try to verify if it's a valid
 
        link. Sometimes it may happened that mercurial will issue basic
 
        auth request that can cause whole API to hang when used from python
 
        or other external calls.
 

	
 
        On failures it'll raise urllib2.HTTPError, exception is also thrown
 
        when the return code is non 200
 
        """
 
        # check first if it's not an local url
 
        if os.path.isdir(url) or url.startswith('file:'):
 
        if os.path.isdir(url) or url.startswith(b'file:'):
 
            return True
 

	
 
        if url.startswith('ssh:'):
 
        if url.startswith(b'ssh:'):
 
            # in case of invalid uri or authentication issues, sshpeer will
 
            # throw an exception.
 
            sshpeer.instance(repoui or ui.ui(), url, False).lookup('tip')
 
            sshpeer.instance(repoui or ui.ui(), url, False).lookup(b'tip')
 
            return True
 

	
 
        url_prefix = None
 
        if '+' in url[:url.find('://')]:
 
            url_prefix, url = url.split('+', 1)
 
        if b'+' in url[:url.find(b'://')]:
 
            url_prefix, url = url.split(b'+', 1)
 

	
 
        handlers = []
 
        url_obj = hg_url(url)
 
        test_uri, authinfo = url_obj.authinfo()
 
        url_obj.passwd = '*****'
 
        url_obj.passwd = b'*****'
 
        cleaned_uri = str(url_obj)
 

	
 
        if authinfo:
 
            # create a password manager
 
            passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
 
            passmgr.add_password(*authinfo)
 

	
 
            handlers.extend((httpbasicauthhandler(passmgr),
 
                             httpdigestauthhandler(passmgr)))
 

	
 
        o = urllib2.build_opener(*handlers)
 
        o.addheaders = [('Content-Type', 'application/mercurial-0.1'),
 
                        ('Accept', 'application/mercurial-0.1')]
 

	
 
        q = {"cmd": 'between'}
 
        q.update({'pairs': "%s-%s" % ('0' * 40, '0' * 40)})
 
        qs = '?%s' % urllib.urlencode(q)
 
        cu = "%s%s" % (test_uri, qs)
 
        req = urllib2.Request(cu, None, {})
 

	
 
        try:
 
            resp = o.open(req)
 
            if resp.code != 200:
 
                raise Exception('Return Code is not 200')
 
        except Exception as e:
 
            # means it cannot be cloned
 
            raise urllib2.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
 

	
 
        if not url_prefix: # skip svn+http://... (and git+... too)
 
            # now check if it's a proper hg repo
 
            try:
 
                httppeer.instance(repoui or ui.ui(), url, False).lookup('tip')
 
                httppeer.instance(repoui or ui.ui(), url, False).lookup(b'tip')
 
            except Exception as e:
 
                raise urllib2.URLError(
 
                    "url [%s] does not look like an hg repo org_exc: %s"
 
                    % (cleaned_uri, e))
 

	
 
        return True
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False):
 
        """
 
        Function will check for mercurial repository in given path and return
 
        a localrepo object. If there is no repository in that path it will
 
        raise an exception unless ``create`` parameter is set to True - in
 
        that case repository would be created and returned.
 
        If ``src_url`` is given, would try to clone repository from the
 
        location at given clone_point. Additionally it'll make update to
 
        working copy accordingly to ``update_after_clone`` flag
 
        """
 

	
 
        try:
 
            if src_url:
 
                url = safe_str(self._get_url(src_url))
 
                opts = {}
 
                if not update_after_clone:
 
                    opts.update({'noupdate': True})
 
                MercurialRepository._check_url(url, self.baseui)
 
                clone(self.baseui, url, self.path, **opts)
 

	
 
                # Don't try to create if we've already cloned repo
 
                create = False
 
            return localrepo.instance(self.baseui, self.path, create=create)
 
        except (Abort, RepoError) as err:
 
            if create:
 
                msg = "Cannot create repository at %s. Original error was %s" \
 
                    % (self.path, err)
 
            else:
 
                msg = "Not valid repository at %s. Original error was %s" \
 
                    % (self.path, err)
 
            raise RepositoryError(msg)
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        return MercurialInMemoryChangeset(self)
 

	
 
    @LazyProperty
 
    def description(self):
 
        undefined_description = u'unknown'
 
        _desc = self._repo.ui.config('web', 'description', None, untrusted=True)
 
        return safe_unicode(_desc or undefined_description)
 
        _desc = self._repo.ui.config(b'web', b'description', None, untrusted=True)
 
        return safe_unicode(_desc or b'unknown')
 

	
 
    @LazyProperty
 
    def contact(self):
 
        undefined_contact = u'Unknown'
 
        return safe_unicode(get_contact(self._repo.ui.config)
 
                            or undefined_contact)
 
                            or b'Unknown')
 

	
 
    @LazyProperty
 
    def last_change(self):
 
        """
 
        Returns last change made on this repository as datetime object
 
        """
 
        return date_fromtimestamp(self._get_mtime(), makedate()[1])
 

	
 
    def _get_mtime(self):
 
        try:
 
            return time.mktime(self.get_changeset().date.timetuple())
 
        except RepositoryError:
 
            # fallback to filesystem
 
            cl_path = os.path.join(self.path, '.hg', "00changelog.i")
 
            st_path = os.path.join(self.path, '.hg', "store")
 
            if os.path.exists(cl_path):
 
                return os.stat(cl_path).st_mtime
 
            else:
 
                return os.stat(st_path).st_mtime
 

	
 
    def _get_revision(self, revision):
 
        """
 
        Given any revision identifier, returns a 40 char string with revision hash.
 

	
 
        :param revision: str or int or None
 
        """
 
        if self._empty:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        if revision in [-1, None]:
 
            revision = 'tip'
 
            revision = b'tip'
 
        elif isinstance(revision, unicode):
 
            revision = safe_bytes(revision)
 

	
 
        try:
 
            if isinstance(revision, int):
 
                return self._repo[revision].hex()
 
            return scmutil.revsymbol(self._repo, revision).hex()
 
        except (IndexError, ValueError, RepoLookupError, TypeError):
 
            msg = ("Revision %s does not exist for %s" % (revision, self))
 
            raise ChangesetDoesNotExistError(msg)
 
        except (LookupError, ):
 
            msg = ("Ambiguous identifier `%s` for %s" % (revision, self))
 
            raise ChangesetDoesNotExistError(msg)
 

	
 
    def get_ref_revision(self, ref_type, ref_name):
 
        """
 
        Returns revision number for the given reference.
 
        """
 
        ref_name = safe_str(ref_name)
 
        if ref_type == 'rev' and not ref_name.strip('0'):
 
            return self.EMPTY_CHANGESET
 
        # lookup up the exact node id
 
        _revset_predicates = {
 
                'branch': 'branch',
 
                'book': 'bookmark',
 
                'tag': 'tag',
 
                'rev': 'id',
 
            }
 
        # avoid expensive branch(x) iteration over whole repo
 
        rev_spec = "%%s & %s(%%s)" % _revset_predicates[ref_type]
 
        try:
 
            revs = self._repo.revs(rev_spec, ref_name, ref_name)
 
        except LookupError:
 
            msg = ("Ambiguous identifier %s:%s for %s" % (ref_type, ref_name, self.name))
 
            raise ChangesetDoesNotExistError(msg)
 
        except RepoLookupError:
 
            msg = ("Revision %s:%s does not exist for %s" % (ref_type, ref_name, self.name))
 
            raise ChangesetDoesNotExistError(msg)
 
        if revs:
 
            revision = revs.last()
 
        else:
 
            # TODO: just report 'not found'?
 
            revision = ref_name
 

	
 
        return self._get_revision(revision)
 

	
 
    def _get_archives(self, archive_name='tip'):
 
        allowed = self.baseui.configlist("web", "allow_archive",
 
        allowed = self.baseui.configlist(b"web", b"allow_archive",
 
                                         untrusted=True)
 
        for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
 
            if i[0] in allowed or self._repo.ui.configbool("web",
 
                                                           "allow" + i[0],
 
        for name, ext in [(b'zip', '.zip'), (b'gz', '.tar.gz'), (b'bz2', '.tar.bz2')]:
 
            if name in allowed or self._repo.ui.configbool(b"web",
 
                                                           b"allow" + name,
 
                                                           untrusted=True):
 
                yield {"type": i[0], "extension": i[1], "node": archive_name}
 
                yield {"type": name, "extension": ext, "node": archive_name}
 

	
 
    def _get_url(self, url):
 
        """
 
        Returns normalized url. If schema is not given, would fall
 
        to filesystem
 
        (``file:///``) schema.
 
        """
 
        url = safe_str(url)
 
        if url != 'default' and '://' not in url:
 
            url = "file:" + urllib.pathname2url(url)
 
        return url
 

	
 
    def get_hook_location(self):
 
        """
 
        returns absolute path to location where hooks are stored
 
        """
 
        return os.path.join(self.path, '.hg', '.hgrc')
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns ``MercurialChangeset`` object representing repository's
 
        changeset at the given ``revision``.
 
        """
 
        revision = self._get_revision(revision)
 
        changeset = MercurialChangeset(repository=self, revision=revision)
 
        return changeset
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
 
                       end_date=None, branch_name=None, reverse=False, max_revisions=None):
 
        """
 
        Returns iterator of ``MercurialChangeset`` objects from start to end
 
        (both are inclusive)
 

	
 
        :param start: None, str, int or mercurial lookup format
 
        :param end:  None, str, int or mercurial lookup format
 
        :param start_date:
 
        :param end_date:
 
        :param branch_name:
 
        :param reversed: return changesets in reversed order
 
        """
 
        start_raw_id = self._get_revision(start)
 
        start_pos = None if start is None else self.revisions.index(start_raw_id)
 
        end_raw_id = self._get_revision(end)
 
        end_pos = None if end is None else self.revisions.index(end_raw_id)
 

	
 
        if start_pos is not None and end_pos is not None and start_pos > end_pos:
 
            raise RepositoryError("Start revision '%s' cannot be "
 
                                  "after end revision '%s'" % (start, end))
 

	
 
        if branch_name and branch_name not in self.allbranches:
 
            msg = ("Branch %s not found in %s" % (branch_name, self))
 
            raise BranchDoesNotExistError(msg)
 
        if end_pos is not None:
 
            end_pos += 1
 
        # filter branches
 
        filter_ = []
 
        if branch_name:
 
            filter_.append('branch("%s")' % safe_str(branch_name))
 
            filter_.append(b'branch("%s")' % safe_str(branch_name))
 
        if start_date:
 
            filter_.append('date(">%s")' % start_date)
 
            filter_.append(b'date(">%s")' % start_date)
 
        if end_date:
 
            filter_.append('date("<%s")' % end_date)
 
            filter_.append(b'date("<%s")' % end_date)
 
        if filter_ or max_revisions:
 
            if filter_:
 
                revspec = ' and '.join(filter_)
 
                revspec = b' and '.join(filter_)
 
            else:
 
                revspec = 'all()'
 
                revspec = b'all()'
 
            if max_revisions:
 
                revspec = 'limit(%s, %s)' % (revspec, max_revisions)
 
                revspec = b'limit(%s, %d)' % (revspec, max_revisions)
 
            revisions = scmutil.revrange(self._repo, [revspec])
 
        else:
 
            revisions = self.revisions
 

	
 
        # this is very much a hack to turn this into a list; a better solution
 
        # would be to get rid of this function entirely and use revsets
 
        revs = list(revisions)[start_pos:end_pos]
 
        if reverse:
 
            revs.reverse()
 

	
 
        return CollectionGenerator(self, revs)
 

	
 
    def pull(self, url):
 
        """
 
        Tries to pull changes from external location.
 
        """
 
        url = self._get_url(url)
 
        other = peer(self._repo, {}, url)
 
        try:
 
            from mercurial import exchange
 
            exchange.pull(self._repo, other, heads=None, force=None)
 
        except Abort as err:
 
            # Propagate error but with vcs's type
 
            raise RepositoryError(str(err))
 

	
 
    @LazyProperty
 
    def workdir(self):
 
        """
 
        Returns ``Workdir`` instance for this repository.
 
        """
 
        return MercurialWorkdir(self)
 

	
 
    def get_config_value(self, section, name=None, config_file=None):
 
        """
 
        Returns configuration value for a given [``section``] and ``name``.
 

	
 
        :param section: Section we want to retrieve value from
 
        :param name: Name of configuration we want to retrieve
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        if config_file is None:
 
            config_file = []
 
        elif isinstance(config_file, basestring):
 
            config_file = [config_file]
 

	
 
        config = self._repo.ui
 
        if config_file:
 
            config = ui.ui()
 
            for path in config_file:
 
                config.readconfig(path)
 
        return config.config(section, name)
 

	
 
    def get_user_name(self, config_file=None):
 
        """
 
        Returns user's name from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        username = self.get_config_value('ui', 'username', config_file=config_file)
 
        if username:
 
            return author_name(username)
 
        return None
 

	
 
    def get_user_email(self, config_file=None):
 
        """
 
        Returns user's email from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        username = self.get_config_value('ui', 'username', config_file=config_file)
 
        if username:
 
            return author_email(username)
 
        return None
kallithea/lib/vcs/backends/hg/ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 

	
 
from mercurial import hg
 
from mercurial.wireprotoserver import sshserver
 

	
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.vcs.backends.ssh import BaseSshHandler
 
from kallithea.lib.vcs.utils import safe_str, safe_unicode
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class MercurialSshHandler(BaseSshHandler):
 
    vcs_type = 'hg'
 

	
 
    @classmethod
 
    def make(cls, ssh_command_parts):
 
        r"""
 
        >>> import shlex
 

	
 
        >>> MercurialSshHandler.make(shlex.split('hg -R "foo bar" serve --stdio')).repo_name
 
        u'foo bar'
 
        >>> MercurialSshHandler.make(shlex.split(' hg -R blåbærgrød serve --stdio ')).repo_name
 
        u'bl\xe5b\xe6rgr\xf8d'
 
        >>> MercurialSshHandler.make(shlex.split('''hg -R 'foo"bar' serve --stdio''')).repo_name
 
        u'foo"bar'
 

	
 
        >>> MercurialSshHandler.make(shlex.split('/bin/hg -R "foo" serve --stdio'))
 
        >>> MercurialSshHandler.make(shlex.split('''hg -R "foo"bar" serve --stdio''')) # ssh-serve will report: Error parsing SSH command "...": invalid syntax
 
        Traceback (most recent call last):
 
        ValueError: No closing quotation
 
        >>> MercurialSshHandler.make(shlex.split('git-upload-pack "/foo"')) # not handled here
 
        """
 
        if ssh_command_parts[:2] == ['hg', '-R'] and ssh_command_parts[3:] == ['serve', '--stdio']:
 
            return cls(safe_unicode(ssh_command_parts[2]))
 

	
 
        return None
 

	
 
    def __init__(self, repo_name):
 
        self.repo_name = repo_name
 

	
 
    def _serve(self):
 
        # Note: we want a repo with config based on .hg/hgrc and can thus not use self.db_repo.scm_instance._repo.ui
 
        baseui = make_ui(repo_path=self.db_repo.repo_full_path)
 
        if not self.allow_push:
 
            baseui.setconfig('hooks', 'pretxnopen._ssh_reject', 'python:kallithea.lib.hooks.rejectpush')
 
            baseui.setconfig('hooks', 'prepushkey._ssh_reject', 'python:kallithea.lib.hooks.rejectpush')
 
            baseui.setconfig(b'hooks', b'pretxnopen._ssh_reject', b'python:kallithea.lib.hooks.rejectpush')
 
            baseui.setconfig(b'hooks', b'prepushkey._ssh_reject', b'python:kallithea.lib.hooks.rejectpush')
 

	
 
        repo = hg.repository(baseui, safe_str(self.db_repo.repo_full_path))
 
        log.debug("Starting Mercurial sshserver for %s", self.db_repo.repo_full_path)
 
        sshserver(baseui, repo).serve_forever()
kallithea/lib/vcs/subprocessio.py
Show inline comments
 
@@ -178,247 +178,247 @@ class BufferedGenerator(object):
 
    def __iter__(self):
 
        return self
 

	
 
    def next(self):
 
        while not len(self.data) and not self.worker.EOF.is_set():
 
            self.worker.data_added.clear()
 
            self.worker.data_added.wait(0.2)
 
        if len(self.data):
 
            self.worker.keep_reading.set()
 
            return bytes(self.data.popleft())
 
        elif self.worker.EOF.is_set():
 
            raise StopIteration
 

	
 
    def throw(self, type, value=None, traceback=None):
 
        if not self.worker.EOF.is_set():
 
            raise type(value)
 

	
 
    def start(self):
 
        self.worker.start()
 

	
 
    def stop(self):
 
        self.worker.stop()
 

	
 
    def close(self):
 
        try:
 
            self.worker.stop()
 
            self.throw(GeneratorExit)
 
        except (GeneratorExit, StopIteration):
 
            pass
 

	
 
    ####################
 
    # Threaded reader's infrastructure.
 
    ####################
 
    @property
 
    def input(self):
 
        return self.worker.w
 

	
 
    @property
 
    def data_added_event(self):
 
        return self.worker.data_added
 

	
 
    @property
 
    def data_added(self):
 
        return self.worker.data_added.is_set()
 

	
 
    @property
 
    def reading_paused(self):
 
        return not self.worker.keep_reading.is_set()
 

	
 
    @property
 
    def done_reading_event(self):
 
        """
 
        Done_reading does not mean that the iterator's buffer is empty.
 
        Iterator might have done reading from underlying source, but the read
 
        chunks might still be available for serving through .next() method.
 

	
 
        :returns: An threading.Event class instance.
 
        """
 
        return self.worker.EOF
 

	
 
    @property
 
    def done_reading(self):
 
        """
 
        Done_reading does not mean that the iterator's buffer is empty.
 
        Iterator might have done reading from underlying source, but the read
 
        chunks might still be available for serving through .next() method.
 

	
 
        :returns: An Bool value.
 
        """
 
        return self.worker.EOF.is_set()
 

	
 
    @property
 
    def length(self):
 
        """
 
        returns int.
 

	
 
        This is the length of the queue of chunks, not the length of
 
        the combined contents in those chunks.
 

	
 
        __len__() cannot be meaningfully implemented because this
 
        reader is just flying through a bottomless pit content and
 
        can only know the length of what it already saw.
 

	
 
        If __len__() on WSGI server per PEP 3333 returns a value,
 
        the response's length will be set to that. In order not to
 
        confuse WSGI PEP3333 servers, we will not implement __len__
 
        at all.
 
        """
 
        return len(self.data)
 

	
 
    def prepend(self, x):
 
        self.data.appendleft(x)
 

	
 
    def append(self, x):
 
        self.data.append(x)
 

	
 
    def extend(self, o):
 
        self.data.extend(o)
 

	
 
    def __getitem__(self, i):
 
        return self.data[i]
 

	
 

	
 
class SubprocessIOChunker(object):
 
    """
 
    Processor class wrapping handling of subprocess IO.
 

	
 
    In a way, this is a "communicate()" replacement with a twist.
 

	
 
    - We are multithreaded. Writing in and reading out, err are all sep threads.
 
    - We support concurrent (in and out) stream processing.
 
    - The output is not a stream. It's a queue of read string (bytes, not unicode)
 
      chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
 
    - We are non-blocking in more respects than communicate()
 
      (reading from subprocess out pauses when internal buffer is full, but
 
       does not block the parent calling code. On the flip side, reading from
 
       slow-yielding subprocess may block the iteration until data shows up. This
 
       does not block the parallel inpipe reading occurring parallel thread.)
 

	
 
    The purpose of the object is to allow us to wrap subprocess interactions into
 
    an iterable that can be passed to a WSGI server as the application's return
 
    value. Because of stream-processing-ability, WSGI does not have to read ALL
 
    of the subprocess's output and buffer it, before handing it to WSGI server for
 
    HTTP response. Instead, the class initializer reads just a bit of the stream
 
    to figure out if error occurred or likely to occur and if not, just hands the
 
    further iteration over subprocess output to the server for completion of HTTP
 
    response.
 

	
 
    The real or perceived subprocess error is trapped and raised as one of
 
    EnvironmentError family of exceptions
 

	
 
    Example usage:
 
    #    try:
 
    #        answer = SubprocessIOChunker(
 
    #            cmd,
 
    #            input,
 
    #            buffer_size = 65536,
 
    #            chunk_size = 4096
 
    #            )
 
    #    except (EnvironmentError) as e:
 
    #        print str(e)
 
    #        raise e
 
    #
 
    #    return answer
 

	
 

	
 
    """
 

	
 
    def __init__(self, cmd, inputstream=None, buffer_size=65536,
 
                 chunk_size=4096, starting_values=None, **kwargs):
 
        """
 
        Initializes SubprocessIOChunker
 

	
 
        :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
 
        :param inputstream: (Default: None) A file-like, string, or file pointer.
 
        :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
 
        :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
 
        :param starting_values: (Default: []) An array of strings to put in front of output que.
 
        """
 
        starting_values = starting_values or []
 
        if inputstream:
 
            input_streamer = StreamFeeder(inputstream)
 
            input_streamer.start()
 
            inputstream = input_streamer.output
 

	
 
        # Note: fragile cmd mangling has been removed for use in Kallithea
 
        assert isinstance(cmd, list), cmd
 

	
 
        _p = subprocess.Popen(cmd, bufsize=-1,
 
                              stdin=inputstream,
 
                              stdout=subprocess.PIPE,
 
                              stderr=subprocess.PIPE,
 
                              **kwargs)
 

	
 
        bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
 
                                   starting_values)
 
        bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
 

	
 
        while not bg_out.done_reading and not bg_out.reading_paused:
 
            # doing this until we reach either end of file, or end of buffer.
 
            bg_out.data_added_event.wait(1)
 
            bg_out.data_added_event.clear()
 

	
 
        # at this point it's still ambiguous if we are done reading or just full buffer.
 
        # Either way, if error (returned by ended process, or implied based on
 
        # presence of stuff in stderr output) we error out.
 
        # Else, we are happy.
 
        returncode = _p.poll()
 
        if (returncode is not None # process has terminated
 
            and returncode != 0
 
        ): # and it failed
 
            bg_out.stop()
 
            out = ''.join(bg_out)
 
            out = b''.join(bg_out)
 
            bg_err.stop()
 
            err = ''.join(bg_err)
 
            if (err.strip() == 'fatal: The remote end hung up unexpectedly' and
 
                out.startswith('0034shallow ')
 
            err = b''.join(bg_err)
 
            if (err.strip() == b'fatal: The remote end hung up unexpectedly' and
 
                out.startswith(b'0034shallow ')
 
            ):
 
                # hack inspired by https://github.com/schacon/grack/pull/7
 
                bg_out = iter([out])
 
                _p = None
 
            elif err:
 
                raise EnvironmentError("Subprocess exited due to an error: %s" % err)
 
            else:
 
                raise EnvironmentError(
 
                    "Subprocess exited with non 0 ret code: %s" % returncode)
 
        self.process = _p
 
        self.output = bg_out
 
        self.error = bg_err
 
        self.inputstream = inputstream
 

	
 
    def __iter__(self):
 
        return self
 

	
 
    def next(self):
 
        if self.process:
 
            returncode = self.process.poll()
 
            if (returncode is not None # process has terminated
 
                and returncode != 0
 
            ): # and it failed
 
                self.output.stop()
 
                self.error.stop()
 
                err = ''.join(self.error)
 
                raise EnvironmentError("Subprocess exited due to an error:\n" + err)
 
        return self.output.next()
 

	
 
    def throw(self, type, value=None, traceback=None):
 
        if self.output.length or not self.output.done_reading:
 
            raise type(value)
 

	
 
    def close(self):
 
        try:
 
            self.process.terminate()
 
        except:
 
            pass
 
        try:
 
            self.output.close()
 
        except:
 
            pass
 
        try:
 
            self.error.close()
 
        except:
 
            pass
 
        try:
 
            os.close(self.inputstream)
 
        except:
 
            pass
kallithea/model/pull_request.py
Show inline comments
 
@@ -72,323 +72,323 @@ class PullRequestModel(object):
 
            h.link_to(
 
              _('%(user)s wants you to review pull request %(pr_nice_id)s: %(pr_title)s') %
 
                {'user': user.username,
 
                 'pr_title': pr.title,
 
                 'pr_nice_id': pr.nice_id()},
 
                pr_url)
 
            )
 
        body = pr.description
 
        _org_ref_type, org_ref_name, _org_rev = pr.org_ref.split(':')
 
        _other_ref_type, other_ref_name, _other_rev = pr.other_ref.split(':')
 
        revision_data = [(x.raw_id, x.message)
 
                         for x in map(pr.org_repo.get_changeset, pr.revisions)]
 
        email_kwargs = {
 
            'pr_title': pr.title,
 
            'pr_title_short': h.shorter(pr.title, 50),
 
            'pr_user_created': user.full_name_and_username,
 
            'pr_repo_url': h.canonical_url('summary_home', repo_name=pr.other_repo.repo_name),
 
            'pr_url': pr_url,
 
            'pr_revisions': revision_data,
 
            'repo_name': pr.other_repo.repo_name,
 
            'org_repo_name': pr.org_repo.repo_name,
 
            'pr_nice_id': pr.nice_id(),
 
            'pr_target_repo': h.canonical_url('summary_home',
 
                               repo_name=pr.other_repo.repo_name),
 
            'pr_target_branch': other_ref_name,
 
            'pr_source_repo': h.canonical_url('summary_home',
 
                               repo_name=pr.org_repo.repo_name),
 
            'pr_source_branch': org_ref_name,
 
            'pr_owner': pr.owner,
 
            'pr_owner_username': pr.owner.username,
 
            'pr_username': user.username,
 
            'threading': threading,
 
            'is_mention': False,
 
            }
 
        if reviewers:
 
            NotificationModel().create(created_by=user, subject=subject, body=body,
 
                                       recipients=reviewers,
 
                                       type_=NotificationModel.TYPE_PULL_REQUEST,
 
                                       email_kwargs=email_kwargs)
 

	
 
        if mention_recipients:
 
            email_kwargs['is_mention'] = True
 
            subject = _('[Mention]') + ' ' + subject
 
            # FIXME: this subject is wrong and unused!
 
            NotificationModel().create(created_by=user, subject=subject, body=body,
 
                                       recipients=mention_recipients,
 
                                       type_=NotificationModel.TYPE_PULL_REQUEST,
 
                                       email_kwargs=email_kwargs)
 

	
 
    def mention_from_description(self, user, pr, old_description=''):
 
        mention_recipients = (extract_mentioned_users(pr.description) -
 
                              extract_mentioned_users(old_description))
 

	
 
        log.debug("Mentioning %s", mention_recipients)
 
        self.add_reviewers(user, pr, set(), mention_recipients)
 

	
 
    def remove_reviewers(self, user, pull_request, reviewers):
 
        """Remove specified users from being reviewers of the PR."""
 
        if not reviewers:
 
            return # avoid SQLAlchemy warning about empty sequence for IN-predicate
 

	
 
        PullRequestReviewer.query() \
 
            .filter_by(pull_request=pull_request) \
 
            .filter(PullRequestReviewer.user_id.in_(r.user_id for r in reviewers)) \
 
            .delete(synchronize_session='fetch') # the default of 'evaluate' is not available
 

	
 
    def delete(self, pull_request):
 
        pull_request = PullRequest.guess_instance(pull_request)
 
        Session().delete(pull_request)
 
        if pull_request.org_repo.scm_instance.alias == 'git':
 
            # remove a ref under refs/pull/ so that commits can be garbage-collected
 
            try:
 
                del pull_request.org_repo.scm_instance._repo["refs/pull/%d/head" % pull_request.pull_request_id]
 
            except KeyError:
 
                pass
 

	
 
    def close_pull_request(self, pull_request):
 
        pull_request = PullRequest.guess_instance(pull_request)
 
        pull_request.status = PullRequest.STATUS_CLOSED
 
        pull_request.updated_on = datetime.datetime.now()
 

	
 

	
 
class CreatePullRequestAction(object):
 

	
 
    class ValidationError(Exception):
 
        pass
 

	
 
    class Empty(ValidationError):
 
        pass
 

	
 
    class AmbiguousAncestor(ValidationError):
 
        pass
 

	
 
    class Unauthorized(ValidationError):
 
        pass
 

	
 
    @staticmethod
 
    def is_user_authorized(org_repo, other_repo):
 
        """Performs authorization check with only the minimum amount of
 
        information needed for such a check, rather than a full command
 
        object.
 
        """
 
        if (h.HasRepoPermissionLevel('read')(org_repo.repo_name) and
 
            h.HasRepoPermissionLevel('read')(other_repo.repo_name)
 
        ):
 
            return True
 

	
 
        return False
 

	
 
    def __init__(self, org_repo, other_repo, org_ref, other_ref, title, description, owner, reviewers):
 
        from kallithea.controllers.compare import CompareController
 
        reviewers = set(reviewers)
 
        _assert_valid_reviewers(reviewers)
 

	
 
        (org_ref_type,
 
         org_ref_name,
 
         org_rev) = org_ref.split(':')
 
        org_display = h.short_ref(org_ref_type, org_ref_name)
 
        if org_ref_type == 'rev':
 
            cs = org_repo.scm_instance.get_changeset(org_rev)
 
            org_ref = 'branch:%s:%s' % (cs.branch, cs.raw_id)
 

	
 
        (other_ref_type,
 
         other_ref_name,
 
         other_rev) = other_ref.split(':')
 
        if other_ref_type == 'rev':
 
            cs = other_repo.scm_instance.get_changeset(other_rev)
 
            other_ref_name = cs.raw_id[:12]
 
            other_ref = '%s:%s:%s' % (other_ref_type, other_ref_name, cs.raw_id)
 
        other_display = h.short_ref(other_ref_type, other_ref_name)
 

	
 
        cs_ranges, _cs_ranges_not, ancestor_revs = \
 
            CompareController._get_changesets(org_repo.scm_instance.alias,
 
                                              other_repo.scm_instance, other_rev, # org and other "swapped"
 
                                              org_repo.scm_instance, org_rev,
 
                                              )
 
        if not cs_ranges:
 
            raise self.Empty(_('Cannot create empty pull request'))
 

	
 
        if not ancestor_revs:
 
            ancestor_rev = org_repo.scm_instance.EMPTY_CHANGESET
 
        elif len(ancestor_revs) == 1:
 
            ancestor_rev = ancestor_revs[0]
 
        else:
 
            raise self.AmbiguousAncestor(
 
                _('Cannot create pull request - criss cross merge detected, please merge a later %s revision to %s')
 
                % (other_ref_name, org_ref_name))
 

	
 
        self.revisions = [cs_.raw_id for cs_ in cs_ranges]
 

	
 
        # hack: ancestor_rev is not an other_rev but we want to show the
 
        # requested destination and have the exact ancestor
 
        other_ref = '%s:%s:%s' % (other_ref_type, other_ref_name, ancestor_rev)
 

	
 
        if not title:
 
            if org_repo == other_repo:
 
                title = '%s to %s' % (org_display, other_display)
 
            else:
 
                title = '%s#%s to %s#%s' % (org_repo.repo_name, org_display,
 
                                            other_repo.repo_name, other_display)
 
        description = description or _('No description')
 

	
 
        self.org_repo = org_repo
 
        self.other_repo = other_repo
 
        self.org_ref = org_ref
 
        self.org_rev = org_rev
 
        self.other_ref = other_ref
 
        self.title = title
 
        self.description = description
 
        self.owner = owner
 
        self.reviewers = reviewers
 

	
 
        if not CreatePullRequestAction.is_user_authorized(self.org_repo, self.other_repo):
 
            raise self.Unauthorized(_('You are not authorized to create the pull request'))
 

	
 
    def execute(self):
 
        created_by = User.get(request.authuser.user_id)
 

	
 
        pr = PullRequest()
 
        pr.org_repo = self.org_repo
 
        pr.org_ref = self.org_ref
 
        pr.other_repo = self.other_repo
 
        pr.other_ref = self.other_ref
 
        pr.revisions = self.revisions
 
        pr.title = self.title
 
        pr.description = self.description
 
        pr.owner = self.owner
 
        Session().add(pr)
 
        Session().flush() # make database assign pull_request_id
 

	
 
        if self.org_repo.scm_instance.alias == 'git':
 
            # create a ref under refs/pull/ so that commits don't get garbage-collected
 
            self.org_repo.scm_instance._repo["refs/pull/%d/head" % pr.pull_request_id] = safe_str(self.org_rev)
 
            self.org_repo.scm_instance._repo[b"refs/pull/%d/head" % pr.pull_request_id] = safe_str(self.org_rev)
 

	
 
        # reset state to under-review
 
        from kallithea.model.changeset_status import ChangesetStatusModel
 
        from kallithea.model.comment import ChangesetCommentsModel
 
        comment = ChangesetCommentsModel().create(
 
            text=u'',
 
            repo=self.org_repo,
 
            author=created_by,
 
            pull_request=pr,
 
            send_email=False,
 
            status_change=ChangesetStatus.STATUS_UNDER_REVIEW,
 
        )
 
        ChangesetStatusModel().set_status(
 
            self.org_repo,
 
            ChangesetStatus.STATUS_UNDER_REVIEW,
 
            created_by,
 
            comment,
 
            pull_request=pr,
 
        )
 

	
 
        mention_recipients = extract_mentioned_users(self.description)
 
        PullRequestModel().add_reviewers(created_by, pr, self.reviewers, mention_recipients)
 

	
 
        return pr
 

	
 

	
 
class CreatePullRequestIterationAction(object):
 
    @staticmethod
 
    def is_user_authorized(old_pull_request):
 
        """Performs authorization check with only the minimum amount of
 
        information needed for such a check, rather than a full command
 
        object.
 
        """
 
        if h.HasPermissionAny('hg.admin')():
 
            return True
 

	
 
        # Authorized to edit the old PR?
 
        if request.authuser.user_id != old_pull_request.owner_id:
 
            return False
 

	
 
        # Authorized to create a new PR?
 
        if not CreatePullRequestAction.is_user_authorized(old_pull_request.org_repo, old_pull_request.other_repo):
 
            return False
 

	
 
        return True
 

	
 
    def __init__(self, old_pull_request, new_org_rev, new_other_rev, title, description, owner, reviewers):
 
        self.old_pull_request = old_pull_request
 

	
 
        org_repo = old_pull_request.org_repo
 
        org_ref_type, org_ref_name, org_rev = old_pull_request.org_ref.split(':')
 

	
 
        other_repo = old_pull_request.other_repo
 
        other_ref_type, other_ref_name, other_rev = old_pull_request.other_ref.split(':') # other_rev is ancestor
 
        #assert other_ref_type == 'branch', other_ref_type # TODO: what if not?
 

	
 
        new_org_ref = '%s:%s:%s' % (org_ref_type, org_ref_name, new_org_rev)
 
        new_other_ref = '%s:%s:%s' % (other_ref_type, other_ref_name, new_other_rev)
 

	
 
        self.create_action = CreatePullRequestAction(org_repo, other_repo, new_org_ref, new_other_ref, None, None, owner, reviewers)
 

	
 
        # Generate complete title/description
 

	
 
        old_revisions = set(old_pull_request.revisions)
 
        revisions = self.create_action.revisions
 
        new_revisions = [r for r in revisions if r not in old_revisions]
 
        lost = old_revisions.difference(revisions)
 

	
 
        infos = ['This is a new iteration of %s "%s".' %
 
                 (h.canonical_url('pullrequest_show', repo_name=old_pull_request.other_repo.repo_name,
 
                      pull_request_id=old_pull_request.pull_request_id),
 
                  old_pull_request.title)]
 

	
 
        if lost:
 
            infos.append(_('Missing changesets since the previous iteration:'))
 
            for r in old_pull_request.revisions:
 
                if r in lost:
 
                    rev_desc = org_repo.get_changeset(r).message.split('\n')[0]
 
                    infos.append('  %s %s' % (h.short_id(r), rev_desc))
 

	
 
        if new_revisions:
 
            infos.append(_('New changesets on %s %s since the previous iteration:') % (org_ref_type, org_ref_name))
 
            for r in reversed(revisions):
 
                if r in new_revisions:
 
                    rev_desc = org_repo.get_changeset(r).message.split('\n')[0]
 
                    infos.append('  %s %s' % (h.short_id(r), h.shorter(rev_desc, 80)))
 

	
 
            if self.create_action.other_ref == old_pull_request.other_ref:
 
                infos.append(_("Ancestor didn't change - diff since previous iteration:"))
 
                infos.append(h.canonical_url('compare_url',
 
                                 repo_name=org_repo.repo_name, # other_repo is always same as repo_name
 
                                 org_ref_type='rev', org_ref_name=h.short_id(org_rev), # use old org_rev as base
 
                                 other_ref_type='rev', other_ref_name=h.short_id(new_org_rev),
 
                                 )) # note: linear diff, merge or not doesn't matter
 
            else:
 
                infos.append(_('This iteration is based on another %s revision and there is no simple diff.') % other_ref_name)
 
        else:
 
            infos.append(_('No changes found on %s %s since previous iteration.') % (org_ref_type, org_ref_name))
 
            # TODO: fail?
 

	
 
        try:
 
            title, old_v = re.match(r'(.*)\(v(\d+)\)\s*$', title).groups()
 
            v = int(old_v) + 1
 
        except (AttributeError, ValueError):
 
            v = 2
 
        self.create_action.title = '%s (v%s)' % (title.strip(), v)
 

	
 
        # using a mail-like separator, insert new iteration info in description with latest first
 
        descriptions = description.replace('\r\n', '\n').split('\n-- \n', 1)
 
        description = descriptions[0].strip() + '\n\n-- \n' + '\n'.join(infos)
 
        if len(descriptions) > 1:
 
            description += '\n\n' + descriptions[1].strip()
 
        self.create_action.description = description
 

	
 
        if not CreatePullRequestIterationAction.is_user_authorized(self.old_pull_request):
 
            raise CreatePullRequestAction.Unauthorized(_('You are not authorized to create the pull request'))
 

	
 
    def execute(self):
 
        pull_request = self.create_action.execute()
 

	
 
        # Close old iteration
 
        from kallithea.model.comment import ChangesetCommentsModel
 
        ChangesetCommentsModel().create(
 
            text=_('Closed, next iteration: %s .') % pull_request.url(canonical=True),
 
            repo=self.old_pull_request.other_repo_id,
 
            author=request.authuser.user_id,
 
            pull_request=self.old_pull_request.pull_request_id,
 
            closing_pr=True)
 
        PullRequestModel().close_pull_request(self.old_pull_request.pull_request_id)
 
        return pull_request
kallithea/model/scm.py
Show inline comments
 
@@ -527,252 +527,252 @@ class ScmModel(object):
 
        return tip
 

	
 
    def update_nodes(self, user, ip_addr, repo, message, nodes, parent_cs=None,
 
                     author=None, trigger_push_hook=True):
 
        """
 
        Commits specified nodes to repo. Again.
 
        """
 
        user = User.guess_instance(user)
 
        scm_instance = repo.scm_instance_no_cache()
 

	
 
        message = safe_unicode(message)
 
        committer = user.full_contact
 
        author = safe_unicode(author) if author else committer
 

	
 
        imc_class = self._get_IMC_module(scm_instance.alias)
 
        imc = imc_class(scm_instance)
 

	
 
        if not parent_cs:
 
            parent_cs = EmptyChangeset(alias=scm_instance.alias)
 

	
 
        if isinstance(parent_cs, EmptyChangeset):
 
            # EmptyChangeset means we we're editing empty repository
 
            parents = None
 
        else:
 
            parents = [parent_cs]
 

	
 
        # add multiple nodes
 
        for _filename, data in nodes.items():
 
            # new filename, can be renamed from the old one
 
            filename = self._sanitize_path(data['filename'])
 
            old_filename = self._sanitize_path(_filename)
 
            content = data['content']
 

	
 
            filenode = FileNode(old_filename, content=content)
 
            op = data['op']
 
            if op == 'add':
 
                imc.add(filenode)
 
            elif op == 'del':
 
                imc.remove(filenode)
 
            elif op == 'mod':
 
                if filename != old_filename:
 
                    # TODO: handle renames, needs vcs lib changes
 
                    imc.remove(filenode)
 
                    imc.add(FileNode(filename, content=content))
 
                else:
 
                    imc.change(filenode)
 

	
 
        # commit changes
 
        tip = imc.commit(message=message,
 
                         author=author,
 
                         parents=parents,
 
                         branch=parent_cs.branch)
 

	
 
        if trigger_push_hook:
 
            self._handle_push(scm_instance,
 
                              username=user.username,
 
                              ip_addr=ip_addr,
 
                              action='push_local',
 
                              repo_name=repo.repo_name,
 
                              revisions=[tip.raw_id])
 
        else:
 
            self.mark_for_invalidation(repo.repo_name)
 

	
 
    def delete_nodes(self, user, ip_addr, repo, message, nodes, parent_cs=None,
 
                     author=None, trigger_push_hook=True):
 
        """
 
        Deletes specified nodes from repo.
 

	
 
        :param user: Kallithea User object or user_id, the committer
 
        :param repo: Kallithea Repository object
 
        :param message: commit message
 
        :param nodes: mapping {filename:{'content':content},...}
 
        :param parent_cs: parent changeset, can be empty than it's initial commit
 
        :param author: author of commit, cna be different that committer only for git
 
        :param trigger_push_hook: trigger push hooks
 

	
 
        :returns: new committed changeset after deletion
 
        """
 

	
 
        user = User.guess_instance(user)
 
        scm_instance = repo.scm_instance_no_cache()
 

	
 
        processed_nodes = []
 
        for f_path in nodes:
 
            f_path = self._sanitize_path(f_path)
 
            # content can be empty but for compatibility it allows same dicts
 
            # structure as add_nodes
 
            content = nodes[f_path].get('content')
 
            processed_nodes.append((f_path, content))
 

	
 
        message = safe_unicode(message)
 
        committer = user.full_contact
 
        author = safe_unicode(author) if author else committer
 

	
 
        IMC = self._get_IMC_module(scm_instance.alias)
 
        imc = IMC(scm_instance)
 

	
 
        if not parent_cs:
 
            parent_cs = EmptyChangeset(alias=scm_instance.alias)
 

	
 
        if isinstance(parent_cs, EmptyChangeset):
 
            # EmptyChangeset means we we're editing empty repository
 
            parents = None
 
        else:
 
            parents = [parent_cs]
 
        # add multiple nodes
 
        for path, content in processed_nodes:
 
            imc.remove(FileNode(path, content=content))
 

	
 
        tip = imc.commit(message=message,
 
                         author=author,
 
                         parents=parents,
 
                         branch=parent_cs.branch)
 

	
 
        if trigger_push_hook:
 
            self._handle_push(scm_instance,
 
                              username=user.username,
 
                              ip_addr=ip_addr,
 
                              action='push_local',
 
                              repo_name=repo.repo_name,
 
                              revisions=[tip.raw_id])
 
        else:
 
            self.mark_for_invalidation(repo.repo_name)
 
        return tip
 

	
 
    def get_unread_journal(self):
 
        return UserLog.query().count()
 

	
 
    def get_repo_landing_revs(self, repo=None):
 
        """
 
        Generates select option with tags branches and bookmarks (for hg only)
 
        grouped by type
 

	
 
        :param repo:
 
        """
 

	
 
        hist_l = []
 
        choices = []
 
        repo = self.__get_repo(repo)
 
        hist_l.append(('rev:tip', _('latest tip')))
 
        choices.append('rev:tip')
 
        if repo is None:
 
            return choices, hist_l
 

	
 
        repo = repo.scm_instance
 

	
 
        branches_group = ([(u'branch:%s' % k, k) for k, v in
 
                           repo.branches.iteritems()], _("Branches"))
 
        hist_l.append(branches_group)
 
        choices.extend([x[0] for x in branches_group[0]])
 

	
 
        if repo.alias == 'hg':
 
            bookmarks_group = ([(u'book:%s' % k, k) for k, v in
 
                                repo.bookmarks.iteritems()], _("Bookmarks"))
 
            hist_l.append(bookmarks_group)
 
            choices.extend([x[0] for x in bookmarks_group[0]])
 

	
 
        tags_group = ([(u'tag:%s' % k, k) for k, v in
 
                       repo.tags.iteritems()], _("Tags"))
 
        hist_l.append(tags_group)
 
        choices.extend([x[0] for x in tags_group[0]])
 

	
 
        return choices, hist_l
 

	
 
    def _get_git_hook_interpreter(self):
 
        """Return a suitable interpreter for Git hooks.
 

	
 
        Return a suitable string to be written in the POSIX #! shebang line for
 
        Git hook scripts so they invoke Kallithea code with the right Python
 
        interpreter and in the right environment.
 
        """
 
        # Note: sys.executable might not point at a usable Python interpreter. For
 
        # example, when using uwsgi, it will point at the uwsgi program itself.
 
        # FIXME This may not work on Windows and may need a shell wrapper script.
 
        return (kallithea.CONFIG.get('git_hook_interpreter')
 
                or sys.executable
 
                or '/usr/bin/env python2')
 

	
 
    def install_git_hooks(self, repo, force_create=False):
 
        """
 
        Creates a kallithea hook inside a git repository
 

	
 
        :param repo: Instance of VCS repo
 
        :param force_create: Create even if same name hook exists
 
        """
 

	
 
        loc = os.path.join(repo.path, 'hooks')
 
        if not repo.bare:
 
            loc = os.path.join(repo.path, '.git', 'hooks')
 
        if not os.path.isdir(loc):
 
            os.makedirs(loc)
 

	
 
        tmpl_post = "#!%s\n" % self._get_git_hook_interpreter()
 
        tmpl_post = b"#!%s\n" % self._get_git_hook_interpreter()
 
        tmpl_post += pkg_resources.resource_string(
 
            'kallithea', os.path.join('config', 'post_receive_tmpl.py')
 
        )
 
        tmpl_pre = "#!%s\n" % self._get_git_hook_interpreter()
 
        tmpl_pre = b"#!%s\n" % self._get_git_hook_interpreter()
 
        tmpl_pre += pkg_resources.resource_string(
 
            'kallithea', os.path.join('config', 'pre_receive_tmpl.py')
 
        )
 

	
 
        for h_type, tmpl in [('pre', tmpl_pre), ('post', tmpl_post)]:
 
            _hook_file = os.path.join(loc, '%s-receive' % h_type)
 
            has_hook = False
 
            log.debug('Installing git hook in repo %s', repo)
 
            if os.path.exists(_hook_file):
 
                # let's take a look at this hook, maybe it's kallithea ?
 
                log.debug('hook exists, checking if it is from kallithea')
 
                with open(_hook_file, 'rb') as f:
 
                    data = f.read()
 
                    matches = re.search(br'^KALLITHEA_HOOK_VER\s*=\s*(.*)$', data, flags=re.MULTILINE)
 
                    if matches:
 
                        try:
 
                            ver = matches.groups()[0]
 
                            log.debug('Found Kallithea hook - it has KALLITHEA_HOOK_VER %r', ver)
 
                            has_hook = True
 
                        except Exception:
 
                            log.error(traceback.format_exc())
 
            else:
 
                # there is no hook in this dir, so we want to create one
 
                has_hook = True
 

	
 
            if has_hook or force_create:
 
                log.debug('writing %s hook file !', h_type)
 
                try:
 
                    with open(_hook_file, 'wb') as f:
 
                        tmpl = tmpl.replace('_TMPL_', kallithea.__version__)
 
                        tmpl = tmpl.replace(b'_TMPL_', kallithea.__version__)
 
                        f.write(tmpl)
 
                    os.chmod(_hook_file, 0o755)
 
                except IOError as e:
 
                    log.error('error writing %s: %s', _hook_file, e)
 
            else:
 
                log.debug('skipping writing hook file')
 

	
 

	
 
def AvailableRepoGroupChoices(top_perms, repo_group_perm_level, extras=()):
 
    """Return group_id,string tuples with choices for all the repo groups where
 
    the user has the necessary permissions.
 

	
 
    Top level is -1.
 
    """
 
    groups = RepoGroup.query().all()
 
    if HasPermissionAny('hg.admin')('available repo groups'):
 
        groups.append(None)
 
    else:
 
        groups = list(RepoGroupList(groups, perm_level=repo_group_perm_level))
 
        if top_perms and HasPermissionAny(*top_perms)('available repo groups'):
 
            groups.append(None)
 
        for extra in extras:
 
            if not any(rg == extra for rg in groups):
 
                groups.append(extra)
 
    return RepoGroup.groups_choices(groups=groups)
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -2320,249 +2320,249 @@ class _BaseTestApi(object):
 
                                  lifetime=10,
 
                                  description='foobar-gist',
 
                                  gist_type='public',
 
                                  files={'foobar': {'content': 'foo'}})
 
        response = api_call(self, params)
 
        response_json = response.json
 
        expected = {
 
            'gist': {
 
                'access_id': response_json['result']['gist']['access_id'],
 
                'created_on': response_json['result']['gist']['created_on'],
 
                'description': 'foobar-gist',
 
                'expires': response_json['result']['gist']['expires'],
 
                'gist_id': response_json['result']['gist']['gist_id'],
 
                'type': 'public',
 
                'url': response_json['result']['gist']['url']
 
            },
 
            'msg': 'created new gist'
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(GistModel, 'create', crash)
 
    def test_api_create_gist_exception_occurred(self):
 
        id_, params = _build_data(self.apikey_regular, 'create_gist',
 
                                  files={})
 
        response = api_call(self, params)
 
        expected = 'failed to create gist'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_delete_gist(self):
 
        gist_id = fixture.create_gist().gist_access_id
 
        id_, params = _build_data(self.apikey, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_delete_gist_regular_user(self):
 
        gist_id = fixture.create_gist(owner=self.TEST_USER_LOGIN).gist_access_id
 
        id_, params = _build_data(self.apikey_regular, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_delete_gist_regular_user_no_permission(self):
 
        gist_id = fixture.create_gist().gist_access_id
 
        id_, params = _build_data(self.apikey_regular, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = 'gist `%s` does not exist' % (gist_id,)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(GistModel, 'delete', crash)
 
    def test_api_delete_gist_exception_occurred(self):
 
        gist_id = fixture.create_gist().gist_access_id
 
        id_, params = _build_data(self.apikey, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = 'failed to delete gist ID:%s' % (gist_id,)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_ip(self):
 
        id_, params = _build_data(self.apikey, 'get_ip')
 
        response = api_call(self, params)
 
        expected = {
 
            'server_ip_addr': '0.0.0.0',
 
            'user_ips': []
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_server_info(self):
 
        id_, params = _build_data(self.apikey, 'get_server_info')
 
        response = api_call(self, params)
 
        expected = Setting.get_server_info()
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_changesets(self):
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, start=0, end=2)
 
        response = api_call(self, params)
 
        result = json.loads(response.body)["result"]
 
        assert len(result) == 3
 
        assert 'message' in result[0]
 
        assert 'added' not in result[0]
 

	
 
    def test_api_get_changesets_with_max_revisions(self):
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, start_date="2011-02-24T00:00:00", max_revisions=10)
 
        response = api_call(self, params)
 
        result = json.loads(response.body)["result"]
 
        assert len(result) == 10
 
        assert 'message' in result[0]
 
        assert 'added' not in result[0]
 

	
 
    def test_api_get_changesets_with_branch(self):
 
        if self.REPO == 'vcs_test_hg':
 
            branch = 'stable'
 
        else:
 
            pytest.skip("skipping due to missing branches in git test repo")
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, branch_name=branch, start_date="2011-02-24T00:00:00")
 
        response = api_call(self, params)
 
        result = json.loads(response.body)["result"]
 
        assert len(result) == 5
 
        assert 'message' in result[0]
 
        assert 'added' not in result[0]
 

	
 
    def test_api_get_changesets_with_file_list(self):
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, start_date="2010-04-07T23:30:30", end_date="2010-04-08T00:31:14", with_file_list=True)
 
        response = api_call(self, params)
 
        result = json.loads(response.body)["result"]
 
        assert len(result) == 3
 
        assert 'message' in result[0]
 
        assert 'added' in result[0]
 

	
 
    def test_api_get_changeset(self):
 
        review = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
 
        id_, params = _build_data(self.apikey, 'get_changeset',
 
                                  repoid=self.REPO, raw_id=self.TEST_REVISION)
 
        response = api_call(self, params)
 
        result = json.loads(response.body)["result"]
 
        assert result["raw_id"] == self.TEST_REVISION
 
        assert "reviews" not in result
 

	
 
    def test_api_get_changeset_with_reviews(self):
 
        reviewobjs = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
 
        id_, params = _build_data(self.apikey, 'get_changeset',
 
                                  repoid=self.REPO, raw_id=self.TEST_REVISION,
 
                                  with_reviews=True)
 
        response = api_call(self, params)
 
        result = json.loads(response.body)["result"]
 
        assert result["raw_id"] == self.TEST_REVISION
 
        assert "reviews" in result
 
        assert len(result["reviews"]) == 1
 
        review = result["reviews"][0]
 
        expected = {
 
            'status': 'approved',
 
            'modified_at': reviewobjs[0].modified_at.replace(microsecond=0).isoformat(),
 
            'reviewer': 'test_admin',
 
        }
 
        assert review == expected
 

	
 
    def test_api_get_changeset_that_does_not_exist(self):
 
        """ Fetch changeset status for non-existant changeset.
 
        revision id is the above git hash used in the test above with the
 
        last 3 nibbles replaced with 0xf.  Should not exist for git _or_ hg.
 
        """
 
        id_, params = _build_data(self.apikey, 'get_changeset',
 
                                  repoid=self.REPO, raw_id = '7ab37bc680b4aa72c34d07b230c866c28e9fcfff')
 
        response = api_call(self, params)
 
        expected = u'Changeset %s does not exist' % ('7ab37bc680b4aa72c34d07b230c866c28e9fcfff',)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_changeset_without_permission(self):
 
        review = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
 
        RepoModel().revoke_user_permission(repo=self.REPO, user=self.TEST_USER_LOGIN)
 
        RepoModel().revoke_user_permission(repo=self.REPO, user="default")
 
        id_, params = _build_data(self.apikey_regular, 'get_changeset',
 
                                  repoid=self.REPO, raw_id=self.TEST_REVISION)
 
        response = api_call(self, params)
 
        expected = u'Access denied to repo %s' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, u'get test')
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": 'get_pullrequest',
 
            "args": {"pullrequest_id": pull_request_id},
 
        })
 
        response = api_call(self, params)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        expected = {
 
            "status": "new",
 
            "pull_request_id": pull_request_id,
 
            "description": "No description",
 
            "url": "/%s/pull-request/%s/_/%s" % (self.REPO, pull_request_id, "stable"),
 
            "reviewers": [{"username": "test_regular"}],
 
            "org_repo_url": "http://localhost:80/%s" % self.REPO,
 
            "org_ref_parts": ["branch", "stable", self.TEST_PR_SRC],
 
            "other_ref_parts": ["branch", "default", self.TEST_PR_DST],
 
            "comments": [{"username": TEST_USER_ADMIN_LOGIN, "text": "",
 
                         "comment_id": pullrequest.comments[0].comment_id}],
 
            "owner": TEST_USER_ADMIN_LOGIN,
 
            "statuses": [{"status": "under_review", "reviewer": TEST_USER_ADMIN_LOGIN, "modified_at": "2000-01-01T00:00:00"} for i in range(0, len(self.TEST_PR_REVISIONS))],
 
            "title": "get test",
 
            "revisions": self.TEST_PR_REVISIONS,
 
        }
 
        self._compare_ok(random_id, expected,
 
                         given=re.sub(r"\d\d\d\d\-\d\d\-\d\dT\d\d\:\d\d\:\d\d",
 
                                      "2000-01-01T00:00:00", response.body))
 
                         given=re.sub(br"\d\d\d\d\-\d\d\-\d\dT\d\d\:\d\d\:\d\d",
 
                                      b"2000-01-01T00:00:00", response.body))
 

	
 
    def test_api_close_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, u'close test')
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "close_pr": True},
 
        })
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert pullrequest.comments[-1].text == ''
 
        assert pullrequest.status == PullRequest.STATUS_CLOSED
 
        assert pullrequest.is_closed() == True
 

	
 
    def test_api_status_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, u"status test")
 

	
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": User.get_by_username(TEST_USER_REGULAR2_LOGIN).api_key,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "status": ChangesetStatus.STATUS_APPROVED},
 
        })
 
        response = api_call(self, params)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        self._compare_error(random_id, "No permission to change pull request status. User needs to be admin, owner or reviewer.", given=response.body)
 
        assert ChangesetStatus.STATUS_UNDER_REVIEW == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": User.get_by_username(TEST_USER_REGULAR_LOGIN).api_key,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "status": ChangesetStatus.STATUS_APPROVED},
 
        })
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert ChangesetStatus.STATUS_APPROVED == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
 

	
 
    def test_api_comment_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, u"comment test")
 
        random_id = random.randrange(1, 9999)
 
        params = json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "comment_msg": "Looks good to me"},
 
        })
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert pullrequest.comments[-1].text == u'Looks good to me'
kallithea/tests/base.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import datetime
 
import logging
 
import os
 
import re
 
import tempfile
 
import time
 

	
 
import pytest
 
from webtest import TestApp
 

	
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.model.db import User
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
skipif = pytest.mark.skipif
 
parametrize = pytest.mark.parametrize
 

	
 
# Hack: These module global values MUST be set to actual values before running any tests. This is currently done by conftest.py.
 
url = None
 
testapp = None
 

	
 
__all__ = [
 
    'skipif', 'parametrize', 'url', 'TestController',
 
    'ldap_lib_installed', 'pam_lib_installed', 'invalidate_all_caches',
 
    'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
 
    'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
 
    'TEST_USER_ADMIN_EMAIL', 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
 
    'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
 
    'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'IP_ADDR',
 
    'TEST_HG_REPO', 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
 
    'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
 
    'GIT_REMOTE_REPO', 'HG_TEST_REVISION', 'GIT_TEST_REVISION',
 
]
 

	
 
## SOME GLOBALS FOR TESTS
 

	
 
TESTS_TMP_PATH = os.environ.get('KALLITHEA_TESTS_TMP_PATH', tempfile.mkdtemp(prefix='kallithea-test-'))
 

	
 
TEST_USER_ADMIN_LOGIN = 'test_admin'
 
TEST_USER_ADMIN_PASS = 'test12'
 
TEST_USER_ADMIN_EMAIL = 'test_admin@example.com'
 

	
 
TEST_USER_REGULAR_LOGIN = 'test_regular'
 
TEST_USER_REGULAR_PASS = 'test12'
 
TEST_USER_REGULAR_EMAIL = 'test_regular@example.com'
 

	
 
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@example.com'
 

	
 
IP_ADDR = '127.0.0.127'
 

	
 
HG_REPO = u'vcs_test_hg'
 
GIT_REPO = u'vcs_test_git'
 

	
 
NEW_HG_REPO = u'vcs_test_hg_new'
 
NEW_GIT_REPO = u'vcs_test_git_new'
 

	
 
HG_FORK = u'vcs_test_hg_fork'
 
GIT_FORK = u'vcs_test_git_fork'
 

	
 
HG_TEST_REVISION = u"a53d9201d4bc278910d416d94941b7ea007ecd52"
 
GIT_TEST_REVISION = u"7ab37bc680b4aa72c34d07b230c866c28e9fc204"
 

	
 

	
 
## VCS
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
GIT_REMOTE_REPO = os.path.join(TESTS_TMP_PATH, GIT_REPO)
 

	
 
TEST_GIT_REPO = os.path.join(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = os.path.join(TESTS_TMP_PATH, 'vcs-git-clone-%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = os.path.join(TESTS_TMP_PATH, 'vcs-git-pull-%s' % uniq_suffix)
 

	
 
HG_REMOTE_REPO = os.path.join(TESTS_TMP_PATH, HG_REPO)
 

	
 
TEST_HG_REPO = os.path.join(TESTS_TMP_PATH, HG_REPO)
 
TEST_HG_REPO_CLONE = os.path.join(TESTS_TMP_PATH, 'vcs-hg-clone-%s' % uniq_suffix)
 
TEST_HG_REPO_PULL = os.path.join(TESTS_TMP_PATH, 'vcs-hg-pull-%s' % uniq_suffix)
 

	
 
# By default, some of the tests will utilise locally available
 
# repositories stored within tar.gz archives as source for
 
# cloning. Should you wish to use some other, remote archive, simply
 
# uncomment these entries and/or update the URLs to use.
 
#
 
# GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 
# HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 

	
 
# skip ldap tests if LDAP lib is not installed
 
ldap_lib_installed = False
 
try:
 
    import ldap
 
    ldap.API_VERSION
 
    ldap_lib_installed = True
 
except ImportError:
 
    # means that python-ldap is not installed
 
    pass
 

	
 
try:
 
    import pam
 
    pam.PAM_TEXT_INFO
 
    pam_lib_installed = True
 
except ImportError:
 
    pam_lib_installed = False
 

	
 

	
 
def invalidate_all_caches():
 
    """Invalidate all beaker caches currently configured.
 
    Useful when manipulating IP permissions in a test and changes need to take
 
    effect immediately.
 
    Note: Any use of this function is probably a workaround - it should be
 
    replaced with a more specific cache invalidation in code or test."""
 
    from beaker.cache import cache_managers
 
    for cache in cache_managers.values():
 
        cache.clear()
 

	
 

	
 
class NullHandler(logging.Handler):
 
    def emit(self, record):
 
        pass
 

	
 

	
 
class TestController(object):
 
    """Pytest-style test controller"""
 

	
 
    # Note: pytest base classes cannot have an __init__ method
 

	
 
    @pytest.fixture(autouse=True)
 
    def app_fixture(self):
 
        h = NullHandler()
 
        logging.getLogger("kallithea").addHandler(h)
 
        self.app = TestApp(testapp)
 
        return self.app
 

	
 
    def log_user(self, username=TEST_USER_ADMIN_LOGIN,
 
                 password=TEST_USER_ADMIN_PASS):
 
        self._logged_username = username
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': username,
 
                                  'password': password,
 
                                  '_session_csrf_secret_token': self.session_csrf_secret_token()})
 

	
 
        if 'Invalid username or password' in response.body:
 
        if b'Invalid username or password' in response.body:
 
            pytest.fail('could not login using %s %s' % (username, password))
 

	
 
        assert response.status == '302 Found'
 
        self.assert_authenticated_user(response, username)
 

	
 
        response = response.follow()
 
        return response.session['authuser']
 

	
 
    def _get_logged_user(self):
 
        return User.get_by_username(self._logged_username)
 

	
 
    def assert_authenticated_user(self, response, expected_username):
 
        cookie = response.session.get('authuser')
 
        user = cookie and cookie.get('user_id')
 
        user = user and User.get(user)
 
        user = user and user.username
 
        assert user == expected_username
 

	
 
    def session_csrf_secret_token(self):
 
        return self.app.get(url('session_csrf_secret_token')).body
 

	
 
    def checkSessionFlash(self, response, msg=None, skip=0, _matcher=lambda msg, m: msg in m):
 
        if 'flash' not in response.session:
 
            pytest.fail(safe_str(u'msg `%s` not found - session has no flash:\n%s' % (msg, response)))
 
        try:
 
            level, m = response.session['flash'][-1 - skip]
 
            if _matcher(msg, m):
 
                return
 
        except IndexError:
 
            pass
 
        pytest.fail(safe_str(u'msg `%s` not found in session flash (skipping %s): %s' %
 
                           (msg, skip,
 
                            ', '.join('`%s`' % m for level, m in response.session['flash']))))
 

	
 
    def checkSessionFlashRegex(self, response, regex, skip=0):
 
        self.checkSessionFlash(response, regex, skip=skip, _matcher=re.search)

Changeset was too big and was cut off... Show full diff anyway

0 comments (0 inline, 0 general)