Changeset - f73a1103ccdc
[Not reviewed]
default
0 8 0
Mads Kiilerich - 6 years ago 2019-08-04 01:37:08
mads@kiilerich.com
Grafted from: 50e481e30c46
flake8: fix E125 continuation line with same indent as next logical line
8 files changed with 13 insertions and 8 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/api/api.py
Show inline comments
 
@@ -1228,193 +1228,193 @@ class ApiController(JSONRPCController):
 
            if not isinstance(owner, Optional):
 
                # forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
 
        if isinstance(owner, Optional):
 
            owner = request.authuser.user_id
 

	
 
        owner = get_user_or_error(owner)
 

	
 
        if RepoModel().get_by_repo_name(repo_name):
 
            raise JSONRPCError("repo `%s` already exist" % repo_name)
 

	
 
        defs = Setting.get_default_repo_settings(strip_prefix=True)
 
        if isinstance(private, Optional):
 
            private = defs.get('repo_private') or Optional.extract(private)
 
        if isinstance(repo_type, Optional):
 
            repo_type = defs.get('repo_type')
 
        if isinstance(enable_statistics, Optional):
 
            enable_statistics = defs.get('repo_enable_statistics')
 
        if isinstance(enable_downloads, Optional):
 
            enable_downloads = defs.get('repo_enable_downloads')
 

	
 
        clone_uri = Optional.extract(clone_uri)
 
        description = Optional.extract(description)
 
        landing_rev = Optional.extract(landing_rev)
 
        copy_permissions = Optional.extract(copy_permissions)
 

	
 
        try:
 
            repo_name_parts = repo_name.split('/')
 
            repo_group = None
 
            if len(repo_name_parts) > 1:
 
                group_name = '/'.join(repo_name_parts[:-1])
 
                repo_group = RepoGroup.get_by_group_name(group_name)
 
                if repo_group is None:
 
                    raise JSONRPCError("repo group `%s` not found" % group_name)
 
            data = dict(
 
                repo_name=repo_name_parts[-1],
 
                repo_name_full=repo_name,
 
                repo_type=repo_type,
 
                repo_description=description,
 
                owner=owner,
 
                repo_private=private,
 
                clone_uri=clone_uri,
 
                repo_group=repo_group,
 
                repo_landing_rev=landing_rev,
 
                enable_statistics=enable_statistics,
 
                enable_downloads=enable_downloads,
 
                repo_copy_permissions=copy_permissions,
 
            )
 

	
 
            task = RepoModel().create(form_data=data, cur_user=owner)
 
            task_id = task.task_id
 
            # no commit, it's done in RepoModel, or async via celery
 
            return dict(
 
                msg="Created new repository `%s`" % (repo_name,),
 
                success=True,  # cannot return the repo data here since fork
 
                               # can be done async
 
                task=task_id
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError(
 
                'failed to create repository `%s`' % (repo_name,))
 

	
 
    # permission check inside
 
    def update_repo(self, repoid, name=Optional(None),
 
                    owner=Optional(OAttr('apiuser')),
 
                    group=Optional(None),
 
                    description=Optional(''), private=Optional(False),
 
                    clone_uri=Optional(None), landing_rev=Optional('rev:tip'),
 
                    enable_statistics=Optional(False),
 
                    enable_downloads=Optional(False)):
 

	
 
        """
 
        Updates repo
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param name:
 
        :param owner:
 
        :param group:
 
        :param description:
 
        :param private:
 
        :param clone_uri:
 
        :param landing_rev:
 
        :param enable_statistics:
 
        :param enable_downloads:
 
        """
 
        repo = get_repo_or_error(repoid)
 
        if not HasPermissionAny('hg.admin')():
 
            if not HasRepoPermissionLevel('admin')(repo.repo_name):
 
                raise JSONRPCError('repository `%s` does not exist' % (repoid,))
 

	
 
            if (name != repo.repo_name and
 
                not HasPermissionAny('hg.create.repository')()
 
                ):
 
            ):
 
                raise JSONRPCError('no permission to create (or move) repositories')
 

	
 
            if not isinstance(owner, Optional):
 
                # forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
 

	
 
        updates = {}
 
        repo_group = group
 
        if not isinstance(repo_group, Optional):
 
            repo_group = get_repo_group_or_error(repo_group)
 
            repo_group = repo_group.group_id
 
        try:
 
            store_update(updates, name, 'repo_name')
 
            store_update(updates, repo_group, 'repo_group')
 
            store_update(updates, owner, 'owner')
 
            store_update(updates, description, 'repo_description')
 
            store_update(updates, private, 'repo_private')
 
            store_update(updates, clone_uri, 'clone_uri')
 
            store_update(updates, landing_rev, 'repo_landing_rev')
 
            store_update(updates, enable_statistics, 'repo_enable_statistics')
 
            store_update(updates, enable_downloads, 'repo_enable_downloads')
 

	
 
            RepoModel().update(repo, **updates)
 
            Session().commit()
 
            return dict(
 
                msg='updated repo ID:%s %s' % (repo.repo_id, repo.repo_name),
 
                repository=repo.get_api_data()
 
            )
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise JSONRPCError('failed to update repo `%s`' % repoid)
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
 
    def fork_repo(self, repoid, fork_name,
 
                  owner=Optional(OAttr('apiuser')),
 
                  description=Optional(''), copy_permissions=Optional(False),
 
                  private=Optional(False), landing_rev=Optional('rev:tip')):
 
        """
 
        Creates a fork of given repo. In case of using celery this will
 
        immediately return success message, while fork is going to be created
 
        asynchronous. This command can be executed only using api_key belonging to
 
        user with admin rights or regular user that have fork permission, and at least
 
        read access to forking repository. Regular users cannot specify owner parameter.
 

	
 
        :param repoid: repository name or repository id
 
        :type repoid: str or int
 
        :param fork_name:
 
        :param owner:
 
        :param description:
 
        :param copy_permissions:
 
        :param private:
 
        :param landing_rev:
 

	
 
        INPUT::
 

	
 
            id : <id_for_response>
 
            api_key : "<api_key>"
 
            args:     {
 
                        "repoid" :          "<reponame or repo_id>",
 
                        "fork_name":        "<forkname>",
 
                        "owner":            "<username or user_id = Optional(=apiuser)>",
 
                        "description":      "<description>",
 
                        "copy_permissions": "<bool>",
 
                        "private":          "<bool>",
 
                        "landing_rev":      "<landing_rev>"
 
                      }
 

	
 
        OUTPUT::
 

	
 
            id : <id_given_in_input>
 
            result: {
 
                      "msg": "Created fork of `<reponame>` as `<forkname>`",
 
                      "success": true,
 
                      "task": "<celery task id or None if done sync>"
 
                    }
 
            error:  null
 

	
 
        """
 
        repo = get_repo_or_error(repoid)
 
        repo_name = repo.repo_name
 

	
 
        _repo = RepoModel().get_by_repo_name(fork_name)
 
        if _repo:
 
            type_ = 'fork' if _repo.fork else 'repo'
 
            raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
 

	
 
        if HasPermissionAny('hg.admin')():
 
            pass
 
        elif HasRepoPermissionLevel('read')(repo.repo_name):
 
            if not isinstance(owner, Optional):
 
                # forbid setting owner for non-admins
 
                raise JSONRPCError(
 
                    'Only Kallithea admin can specify `owner` param'
 
                )
kallithea/controllers/changeset.py
Show inline comments
 
@@ -113,193 +113,193 @@ def get_line_ctx(fid, GET):
 
        _ln_ctx = filter(lambda k: k.startswith('C'), GET)
 
        ln_ctx = GET.get(_ln_ctx[0]) if _ln_ctx else ln_ctx_global
 
        if ln_ctx:
 
            ln_ctx = [ln_ctx]
 

	
 
    if ln_ctx:
 
        retval = ln_ctx[0].split(':')[-1]
 
    else:
 
        retval = ln_ctx_global
 

	
 
    try:
 
        return int(retval)
 
    except Exception:
 
        return 3
 

	
 

	
 
def _context_url(GET, fileid=None):
 
    """
 
    Generates url for context lines
 

	
 
    :param fileid:
 
    """
 

	
 
    fileid = str(fileid) if fileid else None
 
    ig_ws = get_ignore_ws(fileid, GET)
 
    ln_ctx = (get_line_ctx(fileid, GET) or 3) * 2
 

	
 
    params = defaultdict(list)
 
    _update_with_GET(params, GET)
 

	
 
    # global option
 
    if fileid is None:
 
        if ln_ctx > 0:
 
            params['context'] += [ln_ctx]
 

	
 
        if ig_ws:
 
            ig_ws_key = 'ignorews'
 
            ig_ws_val = 1
 

	
 
    # per file option
 
    else:
 
        params[fileid] += ['C:%s' % ln_ctx]
 
        ig_ws_key = fileid
 
        ig_ws_val = 'WS:%s' % 1
 

	
 
    if ig_ws:
 
        params[ig_ws_key] += [ig_ws_val]
 

	
 
    lbl = _('Increase diff context to %(num)s lines') % {'num': ln_ctx}
 

	
 
    params['anchor'] = fileid
 
    icon = h.literal('<i class="icon-sort"></i>')
 
    return h.link_to(icon, h.url.current(**params), title=lbl, **{'data-toggle': 'tooltip'})
 

	
 

	
 
def create_cs_pr_comment(repo_name, revision=None, pull_request=None, allowed_to_change_status=True):
 
    """
 
    Add a comment to the specified changeset or pull request, using POST values
 
    from the request.
 

	
 
    Comments can be inline (when a file path and line number is specified in
 
    POST) or general comments.
 
    A comment can be accompanied by a review status change (accepted, rejected,
 
    etc.). Pull requests can be closed or deleted.
 

	
 
    Parameter 'allowed_to_change_status' is used for both status changes and
 
    closing of pull requests. For deleting of pull requests, more specific
 
    checks are done.
 
    """
 

	
 
    assert request.environ.get('HTTP_X_PARTIAL_XHR')
 
    if pull_request:
 
        pull_request_id = pull_request.pull_request_id
 
    else:
 
        pull_request_id = None
 

	
 
    status = request.POST.get('changeset_status')
 
    close_pr = request.POST.get('save_close')
 
    delete = request.POST.get('save_delete')
 
    f_path = request.POST.get('f_path')
 
    line_no = request.POST.get('line')
 

	
 
    if (status or close_pr or delete) and (f_path or line_no):
 
        # status votes and closing is only possible in general comments
 
        raise HTTPBadRequest()
 

	
 
    if not allowed_to_change_status:
 
        if status or close_pr:
 
            h.flash(_('No permission to change status'), 'error')
 
            raise HTTPForbidden()
 

	
 
    if pull_request and delete == "delete":
 
        if (pull_request.owner_id == request.authuser.user_id or
 
            h.HasPermissionAny('hg.admin')() or
 
            h.HasRepoPermissionLevel('admin')(pull_request.org_repo.repo_name) or
 
            h.HasRepoPermissionLevel('admin')(pull_request.other_repo.repo_name)
 
            ) and not pull_request.is_closed():
 
        ) and not pull_request.is_closed():
 
            PullRequestModel().delete(pull_request)
 
            Session().commit()
 
            h.flash(_('Successfully deleted pull request %s') % pull_request_id,
 
                    category='success')
 
            return {
 
               'location': h.url('my_pullrequests'), # or repo pr list?
 
            }
 
            raise HTTPFound(location=h.url('my_pullrequests')) # or repo pr list?
 
        raise HTTPForbidden()
 

	
 
    text = request.POST.get('text', '').strip()
 

	
 
    comment = ChangesetCommentsModel().create(
 
        text=text,
 
        repo=c.db_repo.repo_id,
 
        author=request.authuser.user_id,
 
        revision=revision,
 
        pull_request=pull_request_id,
 
        f_path=f_path or None,
 
        line_no=line_no or None,
 
        status_change=ChangesetStatus.get_status_lbl(status) if status else None,
 
        closing_pr=close_pr,
 
    )
 

	
 
    if status:
 
        ChangesetStatusModel().set_status(
 
            c.db_repo.repo_id,
 
            status,
 
            request.authuser.user_id,
 
            comment,
 
            revision=revision,
 
            pull_request=pull_request_id,
 
        )
 

	
 
    if pull_request:
 
        action = 'user_commented_pull_request:%s' % pull_request_id
 
    else:
 
        action = 'user_commented_revision:%s' % revision
 
    action_logger(request.authuser, action, c.db_repo, request.ip_addr)
 

	
 
    if pull_request and close_pr:
 
        PullRequestModel().close_pull_request(pull_request_id)
 
        action_logger(request.authuser,
 
                      'user_closed_pull_request:%s' % pull_request_id,
 
                      c.db_repo, request.ip_addr)
 

	
 
    Session().commit()
 

	
 
    data = {
 
       'target_id': h.safeid(h.safe_unicode(request.POST.get('f_path'))),
 
    }
 
    if comment is not None:
 
        c.comment = comment
 
        data.update(comment.get_dict())
 
        data.update({'rendered_text':
 
                     render('changeset/changeset_comment_block.html')})
 

	
 
    return data
 

	
 
def delete_cs_pr_comment(repo_name, comment_id):
 
    """Delete a comment from a changeset or pull request"""
 
    co = ChangesetComment.get_or_404(comment_id)
 
    if co.repo.repo_name != repo_name:
 
        raise HTTPNotFound()
 
    if co.pull_request and co.pull_request.is_closed():
 
        # don't allow deleting comments on closed pull request
 
        raise HTTPForbidden()
 

	
 
    owner = co.author_id == request.authuser.user_id
 
    repo_admin = h.HasRepoPermissionLevel('admin')(repo_name)
 
    if h.HasPermissionAny('hg.admin')() or repo_admin or owner:
 
        ChangesetCommentsModel().delete(comment=co)
 
        Session().commit()
 
        return True
 
    else:
 
        raise HTTPForbidden()
 

	
 
class ChangesetController(BaseRepoController):
 

	
 
    def _before(self, *args, **kwargs):
 
        super(ChangesetController, self)._before(*args, **kwargs)
 
        c.affected_files_cut_off = 60
 

	
 
    def _index(self, revision, method):
 
        c.pull_request = None
 
        c.anchor_url = anchor_url
 
        c.ignorews_url = _ignorews_url
 
        c.context_url = _context_url
 
        c.fulldiff = request.GET.get('fulldiff') # for reporting number of changed files
 
        # get ranges of revisions if preset
 
        rev_range = revision.split('...')[:2]
 
        enable_comments = True
 
        c.cs_repo = c.db_repo
 
        try:
 
            if len(rev_range) == 2:
 
                enable_comments = False
kallithea/lib/annotate.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.annotate
 
~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Annotation library for usage in Kallithea, previously part of vcs
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Dec 4, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import StringIO
 

	
 
from pygments import highlight
 
from pygments.formatters import HtmlFormatter
 

	
 
from kallithea.lib.vcs.exceptions import VCSError
 
from kallithea.lib.vcs.nodes import FileNode
 

	
 

	
 
def annotate_highlight(filenode, annotate_from_changeset_func=None,
 
        order=None, headers=None, **options):
 
    """
 
    Returns html portion containing annotated table with 3 columns: line
 
    numbers, changeset information and pygmentized line of code.
 

	
 
    :param filenode: FileNode object
 
    :param annotate_from_changeset_func: function taking changeset and
 
      returning single annotate cell; needs break line at the end
 
    :param order: ordered sequence of ``ls`` (line numbers column),
 
      ``annotate`` (annotate column), ``code`` (code column); Default is
 
      ``['ls', 'annotate', 'code']``
 
    :param headers: dictionary with headers (keys are whats in ``order``
 
      parameter)
 
    """
 
    from kallithea.lib.pygmentsutils import get_custom_lexer
 
    options['linenos'] = True
 
    formatter = AnnotateHtmlFormatter(filenode=filenode, order=order,
 
        headers=headers,
 
        annotate_from_changeset_func=annotate_from_changeset_func, **options)
 
    lexer = get_custom_lexer(filenode.extension) or filenode.lexer
 
    highlighted = highlight(filenode.content, lexer, formatter)
 
    return highlighted
 

	
 

	
 
class AnnotateHtmlFormatter(HtmlFormatter):
 

	
 
    def __init__(self, filenode, annotate_from_changeset_func=None,
 
            order=None, **options):
 
        """
 
        If ``annotate_from_changeset_func`` is passed it should be a function
 
        which returns string from the given changeset. For example, we may pass
 
        following function as ``annotate_from_changeset_func``::
 

	
 
            def changeset_to_anchor(changeset):
 
                return '<a href="/changesets/%s/">%s</a>\n' % \
 
                       (changeset.id, changeset.id)
 

	
 
        :param annotate_from_changeset_func: see above
 
        :param order: (default: ``['ls', 'annotate', 'code']``); order of
 
          columns;
 
        :param options: standard pygment's HtmlFormatter options, there is
 
          extra option tough, ``headers``. For instance we can pass::
 

	
 
             formatter = AnnotateHtmlFormatter(filenode, headers={
 
                'ls': '#',
 
                'annotate': 'Annotate',
 
                'code': 'Code',
 
             })
 

	
 
        """
 
        super(AnnotateHtmlFormatter, self).__init__(**options)
 
        self.annotate_from_changeset_func = annotate_from_changeset_func
 
        self.order = order or ('ls', 'annotate', 'code')
 
        headers = options.pop('headers', None)
 
        if headers and not ('ls' in headers and 'annotate' in headers and
 
            'code' in headers):
 
            'code' in headers
 
        ):
 
            raise ValueError("If headers option dict is specified it must "
 
                "all 'ls', 'annotate' and 'code' keys")
 
        self.headers = headers
 
        if isinstance(filenode, FileNode):
 
            self.filenode = filenode
 
        else:
 
            raise VCSError("This formatter expect FileNode parameter, not %r"
 
                % type(filenode))
 

	
 
    def annotate_from_changeset(self, changeset):
 
        """
 
        Returns full html line for single changeset per annotated line.
 
        """
 
        if self.annotate_from_changeset_func:
 
            return self.annotate_from_changeset_func(changeset)
 
        else:
 
            return ''.join((changeset.id, '\n'))
 

	
 
    def _wrap_tablelinenos(self, inner):
 
        dummyoutfile = StringIO.StringIO()
 
        lncount = 0
 
        for t, line in inner:
 
            if t:
 
                lncount += 1
 
            dummyoutfile.write(line)
 

	
 
        fl = self.linenostart
 
        mw = len(str(lncount + fl - 1))
 
        sp = self.linenospecial
 
        st = self.linenostep
 
        la = self.lineanchors
 
        aln = self.anchorlinenos
 
        if sp:
 
            lines = []
 

	
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if i % sp == 0:
 
                        if aln:
 
                            lines.append('<a href="#%s-%d" class="special">'
 
                                         '%*d</a>' %
 
                                         (la, i, mw, i))
 
                        else:
 
                            lines.append('<span class="special">'
 
                                         '%*d</span>' % (mw, i))
 
                    else:
 
                        if aln:
 
                            lines.append('<a href="#%s-%d">'
 
                                         '%*d</a>' % (la, i, mw, i))
 
                        else:
 
                            lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 
        else:
 
            lines = []
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if aln:
 
                        lines.append('<a href="#%s-%d">%*d</a>'
 
                                     % (la, i, mw, i))
 
                    else:
 
                        lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 

	
 
#        annotate_changesets = [tup[1] for tup in self.filenode.annotate]
 
#        # TODO: not sure what that fixes
 
#        # If pygments cropped last lines break we need do that too
 
#        ln_cs = len(annotate_changesets)
 
#        ln_ = len(ls.splitlines())
 
#        if  ln_cs > ln_:
 
#            annotate_changesets = annotate_changesets[:ln_ - ln_cs]
 
        annotate = ''.join((self.annotate_from_changeset(el[2]())
 
                            for el in self.filenode.annotate))
 
        # in case you wonder about the seemingly redundant <div> here:
 
        # since the content in the other cell also is wrapped in a div,
 
        # some browsers in some configurations seem to mess up the formatting.
 
        '''
 
        yield 0, ('<table class="%stable">' % self.cssclass +
 
                  '<tr><td class="linenos"><div class="linenodiv"><pre>' +
 
                  ls + '</pre></div></td>' +
 
                  '<td class="code">')
 
        yield 0, dummyoutfile.getvalue()
 
        yield 0, '</td></tr></table>'
 

	
 
        '''
 
        headers_row = []
 
        if self.headers:
 
            headers_row = ['<tr class="annotate-header">']
 
            for key in self.order:
 
                td = ''.join(('<td>', self.headers[key], '</td>'))
 
                headers_row.append(td)
 
            headers_row.append('</tr>')
 

	
kallithea/lib/middleware/simplegit.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.middleware.simplegit
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
SimpleGit middleware for handling Git protocol requests (push/clone etc.)
 
It's implemented with basic auth function
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 28, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 

	
 
import logging
 
import re
 

	
 
from kallithea.lib.base import BaseVCSController
 
from kallithea.lib.hooks import log_pull_action
 
from kallithea.lib.middleware.pygrack import make_wsgi_app
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.utils2 import safe_unicode
 
from kallithea.model.db import Repository
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
GIT_PROTO_PAT = re.compile(r'^/(.+)/(info/refs|git-upload-pack|git-receive-pack)$')
 

	
 

	
 
cmd_mapping = {
 
    'git-receive-pack': 'push',
 
    'git-upload-pack': 'pull',
 
}
 

	
 

	
 
class SimpleGit(BaseVCSController):
 

	
 
    scm_alias = 'git'
 

	
 
    @classmethod
 
    def parse_request(cls, environ):
 
        path_info = environ.get('PATH_INFO', '')
 
        m = GIT_PROTO_PAT.match(path_info)
 
        if m is None:
 
            return None
 

	
 
        class parsed_request(object):
 
            # See https://git-scm.com/book/en/v2/Git-Internals-Transfer-Protocols#_the_smart_protocol
 
            repo_name = safe_unicode(m.group(1).rstrip('/'))
 
            cmd = m.group(2)
 

	
 
            query_string = environ['QUERY_STRING']
 
            if cmd == 'info/refs' and query_string.startswith('service='):
 
                service = query_string.split('=', 1)[1]
 
                action = cmd_mapping.get(service)
 
            else:
 
                service = None
 
                action = cmd_mapping.get(cmd)
 

	
 
        return parsed_request
 

	
 
    def _make_app(self, parsed_request):
 
        """
 
        Return a pygrack wsgi application.
 
        """
 
        pygrack_app = make_wsgi_app(parsed_request.repo_name, self.basepath)
 

	
 
        def wrapper_app(environ, start_response):
 
            if (parsed_request.cmd == 'info/refs' and
 
                parsed_request.service == 'git-upload-pack'
 
                ):
 
            ):
 
                baseui = make_ui()
 
                repo = Repository.get_by_repo_name(parsed_request.repo_name)
 
                scm_repo = repo.scm_instance
 
                # Run hooks, like Mercurial outgoing.pull_logger does
 
                log_pull_action(ui=baseui, repo=scm_repo._repo)
 
            # Note: push hooks are handled by post-receive hook
 

	
 
            return pygrack_app(environ, start_response)
 

	
 
        return wrapper_app
kallithea/lib/page.py
Show inline comments
 
@@ -33,193 +33,194 @@ class Page(_Page):
 
    """
 

	
 
    def __init__(self, *args, **kwargs):
 
        kwargs.setdefault('url', url.current)
 
        _Page.__init__(self, *args, **kwargs)
 

	
 
    def _get_pos(self, cur_page, max_page, items):
 
        edge = (items / 2) + 1
 
        if (cur_page <= edge):
 
            radius = max(items / 2, items - cur_page)
 
        elif (max_page - cur_page) < edge:
 
            radius = (items - 1) - (max_page - cur_page)
 
        else:
 
            radius = items / 2
 

	
 
        left = max(1, (cur_page - (radius)))
 
        right = min(max_page, cur_page + (radius))
 
        return left, cur_page, right
 

	
 
    def _range(self, regexp_match):
 
        """
 
        Return range of linked pages (e.g. '1 2 [3] 4 5 6 7 8').
 

	
 
        Arguments:
 

	
 
        regexp_match
 
            A "re" (regular expressions) match object containing the
 
            radius of linked pages around the current page in
 
            regexp_match.group(1) as a string
 

	
 
        This function is supposed to be called as a callable in
 
        re.sub.
 

	
 
        """
 
        radius = int(regexp_match.group(1))
 

	
 
        # Compute the first and last page number within the radius
 
        # e.g. '1 .. 5 6 [7] 8 9 .. 12'
 
        # -> leftmost_page  = 5
 
        # -> rightmost_page = 9
 
        leftmost_page, _cur, rightmost_page = self._get_pos(self.page,
 
                                                            self.last_page,
 
                                                            (radius * 2) + 1)
 
        nav_items = []
 

	
 
        # Create a link to the first page (unless we are on the first page
 
        # or there would be no need to insert '..' spacers)
 
        if self.page != self.first_page and self.first_page < leftmost_page:
 
            nav_items.append(HTML.li(self._pagerlink(self.first_page, self.first_page)))
 

	
 
        # Insert dots if there are pages between the first page
 
        # and the currently displayed page range
 
        if leftmost_page - self.first_page > 1:
 
            # Wrap in a SPAN tag if nolink_attr is set
 
            text_ = '..'
 
            if self.dotdot_attr:
 
                text_ = HTML.span(c=text_, **self.dotdot_attr)
 
            nav_items.append(HTML.li(text_))
 

	
 
        for thispage in xrange(leftmost_page, rightmost_page + 1):
 
            # Highlight the current page number and do not use a link
 
            text_ = str(thispage)
 
            if thispage == self.page:
 
                # Wrap in a SPAN tag if nolink_attr is set
 
                if self.curpage_attr:
 
                    text_ = HTML.li(HTML.span(c=text_), **self.curpage_attr)
 
                nav_items.append(text_)
 
            # Otherwise create just a link to that page
 
            else:
 
                nav_items.append(HTML.li(self._pagerlink(thispage, text_)))
 

	
 
        # Insert dots if there are pages between the displayed
 
        # page numbers and the end of the page range
 
        if self.last_page - rightmost_page > 1:
 
            text_ = '..'
 
            # Wrap in a SPAN tag if nolink_attr is set
 
            if self.dotdot_attr:
 
                text_ = HTML.span(c=text_, **self.dotdot_attr)
 
            nav_items.append(HTML.li(text_))
 

	
 
        # Create a link to the very last page (unless we are on the last
 
        # page or there would be no need to insert '..' spacers)
 
        if self.page != self.last_page and rightmost_page < self.last_page:
 
            nav_items.append(HTML.li(self._pagerlink(self.last_page, self.last_page)))
 

	
 
        #_page_link = url.current()
 
        #nav_items.append(literal('<link rel="prerender" href="%s?page=%s">' % (_page_link, str(int(self.page)+1))))
 
        #nav_items.append(literal('<link rel="prefetch" href="%s?page=%s">' % (_page_link, str(int(self.page)+1))))
 
        return self.separator.join(nav_items)
 

	
 
    def pager(self, format='<ul class="pagination">$link_previous ~2~ $link_next</ul>', page_param='page', partial_param='partial',
 
        show_if_single_page=False, separator=' ', onclick=None,
 
        symbol_first='<<', symbol_last='>>',
 
        symbol_previous='<', symbol_next='>',
 
        link_attr=None,
 
        curpage_attr=None,
 
        dotdot_attr=None, **kwargs):
 
        dotdot_attr=None, **kwargs
 
    ):
 
        self.curpage_attr = curpage_attr or {'class': 'active'}
 
        self.separator = separator
 
        self.pager_kwargs = kwargs
 
        self.page_param = page_param
 
        self.partial_param = partial_param
 
        self.onclick = onclick
 
        self.link_attr = link_attr or {'class': 'pager_link', 'rel': 'prerender'}
 
        self.dotdot_attr = dotdot_attr or {'class': 'pager_dotdot'}
 

	
 
        # Don't show navigator if there is no more than one page
 
        if self.page_count == 0 or (self.page_count == 1 and not show_if_single_page):
 
            return ''
 

	
 
        from string import Template
 
        # Replace ~...~ in token format by range of pages
 
        result = re.sub(r'~(\d+)~', self._range, format)
 

	
 
        # Interpolate '%' variables
 
        result = Template(result).safe_substitute({
 
            'first_page': self.first_page,
 
            'last_page': self.last_page,
 
            'page': self.page,
 
            'page_count': self.page_count,
 
            'items_per_page': self.items_per_page,
 
            'first_item': self.first_item,
 
            'last_item': self.last_item,
 
            'item_count': self.item_count,
 
            'link_first': self.page > self.first_page and
 
                    self._pagerlink(self.first_page, symbol_first) or '',
 
            'link_last': self.page < self.last_page and
 
                    self._pagerlink(self.last_page, symbol_last) or '',
 
            'link_previous': HTML.li(self.previous_page and
 
                    self._pagerlink(self.previous_page, symbol_previous)
 
                    or HTML.a(symbol_previous)),
 
            'link_next': HTML.li(self.next_page and
 
                    self._pagerlink(self.next_page, symbol_next)
 
                    or HTML.a(symbol_next))
 
        })
 

	
 
        return literal(result)
 

	
 

	
 
class RepoPage(Page):
 

	
 
    def __init__(self, collection, page=1, items_per_page=20,
 
                 item_count=None, **kwargs):
 

	
 
        """Create a "RepoPage" instance. special pager for paging
 
        repository
 
        """
 
        # TODO: call baseclass __init__
 
        self._url_generator = kwargs.pop('url', url.current)
 

	
 
        # Safe the kwargs class-wide so they can be used in the pager() method
 
        self.kwargs = kwargs
 

	
 
        # Save a reference to the collection
 
        self.original_collection = collection
 

	
 
        self.collection = collection
 

	
 
        # The self.page is the number of the current page.
 
        # The first page has the number 1!
 
        try:
 
            self.page = int(page)  # make it int() if we get it as a string
 
        except (ValueError, TypeError):
 
            log.error("Invalid page value: %r", page)
 
            self.page = 1
 

	
 
        self.items_per_page = items_per_page
 

	
 
        # Unless the user tells us how many items the collections has
 
        # we calculate that ourselves.
 
        if item_count is not None:
 
            self.item_count = item_count
 
        else:
 
            self.item_count = len(self.collection)
 

	
 
        # Compute the number of the first and last available page
 
        if self.item_count > 0:
 
            self.first_page = 1
 
            self.page_count = int(math.ceil(float(self.item_count) /
 
                                            self.items_per_page))
 
            self.last_page = self.first_page + self.page_count - 1
 

	
 
            # Make sure that the requested page number is the range of
 
            # valid pages
 
            if self.page > self.last_page:
 
                self.page = self.last_page
 
            elif self.page < self.first_page:
 
                self.page = self.first_page
 

	
 
            # Note: the number of items on this page can be less than
 
            #       items_per_page if the last page is not full
 
            self.first_item = max(0, (self.item_count) - (self.page *
 
                                                          items_per_page))
kallithea/lib/utils2.py
Show inline comments
 
@@ -211,193 +211,194 @@ def safe_str(unicode_, to_encoding=None)
 

	
 
    In case of UnicodeEncodeError we try to return it with encoding detected
 
    by chardet library if it fails fallback to string with errors replaced
 

	
 
    :param unicode_: unicode to encode
 
    :rtype: str
 
    :returns: str object
 
    """
 

	
 
    # if it's not basestr cast to str
 
    if not isinstance(unicode_, basestring):
 
        return str(unicode_)
 

	
 
    if isinstance(unicode_, str):
 
        return unicode_
 

	
 
    if not to_encoding:
 
        import kallithea
 
        DEFAULT_ENCODINGS = aslist(kallithea.CONFIG.get('default_encoding',
 
                                                        'utf-8'), sep=',')
 
        to_encoding = DEFAULT_ENCODINGS
 

	
 
    if not isinstance(to_encoding, (list, tuple)):
 
        to_encoding = [to_encoding]
 

	
 
    for enc in to_encoding:
 
        try:
 
            return unicode_.encode(enc)
 
        except UnicodeEncodeError:
 
            pass
 

	
 
    try:
 
        import chardet
 
        encoding = chardet.detect(unicode_)['encoding']
 
        if encoding is None:
 
            raise UnicodeEncodeError()
 

	
 
        return unicode_.encode(encoding)
 
    except (ImportError, UnicodeEncodeError):
 
        return unicode_.encode(to_encoding[0], 'replace')
 

	
 

	
 
def remove_suffix(s, suffix):
 
    if s.endswith(suffix):
 
        s = s[:-1 * len(suffix)]
 
    return s
 

	
 

	
 
def remove_prefix(s, prefix):
 
    if s.startswith(prefix):
 
        s = s[len(prefix):]
 
    return s
 

	
 

	
 
def age(prevdate, show_short_version=False, now=None):
 
    """
 
    turns a datetime into an age string.
 
    If show_short_version is True, then it will generate a not so accurate but shorter string,
 
    example: 2days ago, instead of 2 days and 23 hours ago.
 

	
 
    :param prevdate: datetime object
 
    :param show_short_version: if it should approximate the date and return a shorter string
 
    :rtype: unicode
 
    :returns: unicode words describing age
 
    """
 
    now = now or datetime.datetime.now()
 
    order = ['year', 'month', 'day', 'hour', 'minute', 'second']
 
    deltas = {}
 
    future = False
 

	
 
    if prevdate > now:
 
        now, prevdate = prevdate, now
 
        future = True
 
    if future:
 
        prevdate = prevdate.replace(microsecond=0)
 
    # Get date parts deltas
 
    from dateutil import relativedelta
 
    for part in order:
 
        d = relativedelta.relativedelta(now, prevdate)
 
        deltas[part] = getattr(d, part + 's')
 

	
 
    # Fix negative offsets (there is 1 second between 10:59:59 and 11:00:00,
 
    # not 1 hour, -59 minutes and -59 seconds)
 
    for num, length in [(5, 60), (4, 60), (3, 24)]:  # seconds, minutes, hours
 
        part = order[num]
 
        carry_part = order[num - 1]
 

	
 
        if deltas[part] < 0:
 
            deltas[part] += length
 
            deltas[carry_part] -= 1
 

	
 
    # Same thing for days except that the increment depends on the (variable)
 
    # number of days in the month
 
    month_lengths = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
 
    if deltas['day'] < 0:
 
        if prevdate.month == 2 and (prevdate.year % 4 == 0 and
 
            (prevdate.year % 100 != 0 or prevdate.year % 400 == 0)):
 
            (prevdate.year % 100 != 0 or prevdate.year % 400 == 0)
 
        ):
 
            deltas['day'] += 29
 
        else:
 
            deltas['day'] += month_lengths[prevdate.month - 1]
 

	
 
        deltas['month'] -= 1
 

	
 
    if deltas['month'] < 0:
 
        deltas['month'] += 12
 
        deltas['year'] -= 1
 

	
 
    # In short version, we want nicer handling of ages of more than a year
 
    if show_short_version:
 
        if deltas['year'] == 1:
 
            # ages between 1 and 2 years: show as months
 
            deltas['month'] += 12
 
            deltas['year'] = 0
 
        if deltas['year'] >= 2:
 
            # ages 2+ years: round
 
            if deltas['month'] > 6:
 
                deltas['year'] += 1
 
                deltas['month'] = 0
 

	
 
    # Format the result
 
    fmt_funcs = {
 
        'year': lambda d: ungettext(u'%d year', '%d years', d) % d,
 
        'month': lambda d: ungettext(u'%d month', '%d months', d) % d,
 
        'day': lambda d: ungettext(u'%d day', '%d days', d) % d,
 
        'hour': lambda d: ungettext(u'%d hour', '%d hours', d) % d,
 
        'minute': lambda d: ungettext(u'%d minute', '%d minutes', d) % d,
 
        'second': lambda d: ungettext(u'%d second', '%d seconds', d) % d,
 
    }
 

	
 
    for i, part in enumerate(order):
 
        value = deltas[part]
 
        if value == 0:
 
            continue
 

	
 
        if i < 5:
 
            sub_part = order[i + 1]
 
            sub_value = deltas[sub_part]
 
        else:
 
            sub_value = 0
 

	
 
        if sub_value == 0 or show_short_version:
 
            if future:
 
                return _('in %s') % fmt_funcs[part](value)
 
            else:
 
                return _('%s ago') % fmt_funcs[part](value)
 
        if future:
 
            return _('in %s and %s') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 
        else:
 
            return _('%s and %s ago') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 

	
 
    return _('just now')
 

	
 

	
 
def uri_filter(uri):
 
    """
 
    Removes user:password from given url string
 

	
 
    :param uri:
 
    :rtype: unicode
 
    :returns: filtered list of strings
 
    """
 
    if not uri:
 
        return ''
 

	
 
    proto = ''
 

	
 
    for pat in ('https://', 'http://', 'git://'):
 
        if uri.startswith(pat):
 
            uri = uri[len(pat):]
 
            proto = pat
 
            break
 

	
 
    # remove passwords and username
 
    uri = uri[uri.find('@') + 1:]
 

	
 
    # get the port
 
    cred_pos = uri.find(':')
 
    if cred_pos == -1:
 
        host, port = uri, None
 
    else:
 
        host, port = uri[:cred_pos], uri[cred_pos + 1:]
 

	
 
    return filter(None, [proto, host, port])
 

	
 

	
 
def credentials_filter(uri):
 
    """
 
    Returns a url with removed credentials
 

	
 
    :param uri:
 
    """
kallithea/lib/vcs/backends/git/inmemory.py
Show inline comments
 
@@ -2,193 +2,194 @@ import datetime
 
import posixpath
 
import stat
 
import time
 

	
 
from dulwich import objects
 

	
 
from kallithea.lib.vcs.backends.base import BaseInMemoryChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils import safe_str
 

	
 

	
 
class GitInMemoryChangeset(BaseInMemoryChangeset):
 

	
 
    def commit(self, message, author, parents=None, branch=None, date=None,
 
               **kwargs):
 
        """
 
        Performs in-memory commit (doesn't check workdir in any way) and
 
        returns newly created ``Changeset``. Updates repository's
 
        ``revisions``.
 

	
 
        :param message: message of the commit
 
        :param author: full username, i.e. "Joe Doe <joe.doe@example.com>"
 
        :param parents: single parent or sequence of parents from which commit
 
          would be derived
 
        :param date: ``datetime.datetime`` instance. Defaults to
 
          ``datetime.datetime.now()``.
 
        :param branch: branch name, as string. If none given, default backend's
 
          branch would be used.
 

	
 
        :raises ``CommitError``: if any error occurs while committing
 
        """
 
        self.check_integrity(parents)
 

	
 
        from .repository import GitRepository
 
        if branch is None:
 
            branch = GitRepository.DEFAULT_BRANCH_NAME
 

	
 
        repo = self.repository._repo
 
        object_store = repo.object_store
 

	
 
        ENCODING = "UTF-8"
 

	
 
        # Create tree and populates it with blobs
 
        commit_tree = self.parents[0] and repo[self.parents[0]._commit.tree] or \
 
            objects.Tree()
 
        for node in self.added + self.changed:
 
            # Compute subdirs if needed
 
            dirpath, nodename = posixpath.split(node.path)
 
            dirnames = map(safe_str, dirpath and dirpath.split('/') or [])
 
            parent = commit_tree
 
            ancestors = [('', parent)]
 

	
 
            # Tries to dig for the deepest existing tree
 
            while dirnames:
 
                curdir = dirnames.pop(0)
 
                try:
 
                    dir_id = parent[curdir][1]
 
                except KeyError:
 
                    # put curdir back into dirnames and stops
 
                    dirnames.insert(0, curdir)
 
                    break
 
                else:
 
                    # If found, updates parent
 
                    parent = self.repository._repo[dir_id]
 
                    ancestors.append((curdir, parent))
 
            # Now parent is deepest existing tree and we need to create subtrees
 
            # for dirnames (in reverse order) [this only applies for nodes from added]
 
            new_trees = []
 

	
 
            if not node.is_binary:
 
                content = node.content.encode(ENCODING)
 
            else:
 
                content = node.content
 
            blob = objects.Blob.from_string(content)
 

	
 
            node_path = node.name.encode(ENCODING)
 
            if dirnames:
 
                # If there are trees which should be created we need to build
 
                # them now (in reverse order)
 
                reversed_dirnames = list(reversed(dirnames))
 
                curtree = objects.Tree()
 
                curtree[node_path] = node.mode, blob.id
 
                new_trees.append(curtree)
 
                for dirname in reversed_dirnames[:-1]:
 
                    newtree = objects.Tree()
 
                    #newtree.add(stat.S_IFDIR, dirname, curtree.id)
 
                    newtree[dirname] = stat.S_IFDIR, curtree.id
 
                    new_trees.append(newtree)
 
                    curtree = newtree
 
                parent[reversed_dirnames[-1]] = stat.S_IFDIR, curtree.id
 
            else:
 
                parent.add(name=node_path, mode=node.mode, hexsha=blob.id)
 

	
 
            new_trees.append(parent)
 
            # Update ancestors
 
            for parent, tree, path in reversed([(a[1], b[1], b[0]) for a, b in
 
                zip(ancestors, ancestors[1:])]):
 
                zip(ancestors, ancestors[1:])]
 
            ):
 
                parent[path] = stat.S_IFDIR, tree.id
 
                object_store.add_object(tree)
 

	
 
            object_store.add_object(blob)
 
            for tree in new_trees:
 
                object_store.add_object(tree)
 
        for node in self.removed:
 
            paths = node.path.split('/')
 
            tree = commit_tree
 
            trees = [tree]
 
            # Traverse deep into the forest...
 
            for path in paths:
 
                try:
 
                    obj = self.repository._repo[tree[path][1]]
 
                    if isinstance(obj, objects.Tree):
 
                        trees.append(obj)
 
                        tree = obj
 
                except KeyError:
 
                    break
 
            # Cut down the blob and all rotten trees on the way back...
 
            for path, tree in reversed(zip(paths, trees)):
 
                del tree[path]
 
                if tree:
 
                    # This tree still has elements - don't remove it or any
 
                    # of it's parents
 
                    break
 

	
 
        object_store.add_object(commit_tree)
 

	
 
        # Create commit
 
        commit = objects.Commit()
 
        commit.tree = commit_tree.id
 
        commit.parents = [p._commit.id for p in self.parents if p]
 
        commit.author = commit.committer = safe_str(author)
 
        commit.encoding = ENCODING
 
        commit.message = safe_str(message)
 

	
 
        # Compute date
 
        if date is None:
 
            date = time.time()
 
        elif isinstance(date, datetime.datetime):
 
            date = time.mktime(date.timetuple())
 

	
 
        author_time = kwargs.pop('author_time', date)
 
        commit.commit_time = int(date)
 
        commit.author_time = int(author_time)
 
        tz = time.timezone
 
        author_tz = kwargs.pop('author_timezone', tz)
 
        commit.commit_timezone = tz
 
        commit.author_timezone = author_tz
 

	
 
        object_store.add_object(commit)
 

	
 
        ref = 'refs/heads/%s' % branch
 
        repo.refs[ref] = commit.id
 

	
 
        # Update vcs repository object & recreate dulwich repo
 
        self.repository.revisions.append(commit.id)
 
        # invalidate parsed refs after commit
 
        self.repository._parsed_refs = self.repository._get_parsed_refs()
 
        tip = self.repository.get_changeset()
 
        self.reset()
 
        return tip
 

	
 
    def _get_missing_trees(self, path, root_tree):
 
        """
 
        Creates missing ``Tree`` objects for the given path.
 

	
 
        :param path: path given as a string. It may be a path to a file node
 
          (i.e. ``foo/bar/baz.txt``) or directory path - in that case it must
 
          end with slash (i.e. ``foo/bar/``).
 
        :param root_tree: ``dulwich.objects.Tree`` object from which we start
 
          traversing (should be commit's root tree)
 
        """
 
        dirpath = posixpath.split(path)[0]
 
        dirs = dirpath.split('/')
 
        if not dirs or dirs == ['']:
 
            return []
 

	
 
        def get_tree_for_dir(tree, dirname):
 
            for name, mode, id in tree.iteritems():
 
                if name == dirname:
 
                    obj = self.repository._repo[id]
 
                    if isinstance(obj, objects.Tree):
 
                        return obj
 
                    else:
 
                        raise RepositoryError("Cannot create directory %s "
 
                        "at tree %s as path is occupied and is not a "
 
                        "Tree" % (dirname, tree))
 
            return None
 

	
 
        trees = []
 
        parent = root_tree
 
        for dirname in dirs:
 
            tree = get_tree_for_dir(parent, dirname)
 
            if tree is None:
kallithea/lib/vcs/utils/annotate.py
Show inline comments
 
import StringIO
 

	
 
from pygments import highlight
 
from pygments.formatters import HtmlFormatter
 

	
 
from kallithea.lib.vcs.exceptions import VCSError
 
from kallithea.lib.vcs.nodes import FileNode
 

	
 

	
 
def annotate_highlight(filenode, annotate_from_changeset_func=None,
 
        order=None, headers=None, **options):
 
    """
 
    Returns html portion containing annotated table with 3 columns: line
 
    numbers, changeset information and pygmentized line of code.
 

	
 
    :param filenode: FileNode object
 
    :param annotate_from_changeset_func: function taking changeset and
 
      returning single annotate cell; needs break line at the end
 
    :param order: ordered sequence of ``ls`` (line numbers column),
 
      ``annotate`` (annotate column), ``code`` (code column); Default is
 
      ``['ls', 'annotate', 'code']``
 
    :param headers: dictionary with headers (keys are whats in ``order``
 
      parameter)
 
    """
 
    options['linenos'] = True
 
    formatter = AnnotateHtmlFormatter(filenode=filenode, order=order,
 
        headers=headers,
 
        annotate_from_changeset_func=annotate_from_changeset_func, **options)
 
    lexer = filenode.lexer
 
    highlighted = highlight(filenode.content, lexer, formatter)
 
    return highlighted
 

	
 

	
 
class AnnotateHtmlFormatter(HtmlFormatter):
 

	
 
    def __init__(self, filenode, annotate_from_changeset_func=None,
 
            order=None, **options):
 
        """
 
        If ``annotate_from_changeset_func`` is passed it should be a function
 
        which returns string from the given changeset. For example, we may pass
 
        following function as ``annotate_from_changeset_func``::
 

	
 
            def changeset_to_anchor(changeset):
 
                return '<a href="/changesets/%s/">%s</a>\n' % \
 
                       (changeset.id, changeset.id)
 

	
 
        :param annotate_from_changeset_func: see above
 
        :param order: (default: ``['ls', 'annotate', 'code']``); order of
 
          columns;
 
        :param options: standard pygment's HtmlFormatter options, there is
 
          extra option tough, ``headers``. For instance we can pass::
 

	
 
             formatter = AnnotateHtmlFormatter(filenode, headers={
 
                'ls': '#',
 
                'annotate': 'Annotate',
 
                'code': 'Code',
 
             })
 

	
 
        """
 
        super(AnnotateHtmlFormatter, self).__init__(**options)
 
        self.annotate_from_changeset_func = annotate_from_changeset_func
 
        self.order = order or ('ls', 'annotate', 'code')
 
        headers = options.pop('headers', None)
 
        if headers and not ('ls' in headers and 'annotate' in headers and
 
            'code' in headers):
 
            'code' in headers
 
        ):
 
            raise ValueError("If headers option dict is specified it must "
 
                "all 'ls', 'annotate' and 'code' keys")
 
        self.headers = headers
 
        if isinstance(filenode, FileNode):
 
            self.filenode = filenode
 
        else:
 
            raise VCSError("This formatter expect FileNode parameter, not %r"
 
                % type(filenode))
 

	
 
    def annotate_from_changeset(self, changeset):
 
        """
 
        Returns full html line for single changeset per annotated line.
 
        """
 
        if self.annotate_from_changeset_func:
 
            return self.annotate_from_changeset_func(changeset)
 
        else:
 
            return ''.join((changeset.id, '\n'))
 

	
 
    def _wrap_tablelinenos(self, inner):
 
        dummyoutfile = StringIO.StringIO()
 
        lncount = 0
 
        for t, line in inner:
 
            if t:
 
                lncount += 1
 
            dummyoutfile.write(line)
 

	
 
        fl = self.linenostart
 
        mw = len(str(lncount + fl - 1))
 
        sp = self.linenospecial
 
        st = self.linenostep
 
        la = self.lineanchors
 
        aln = self.anchorlinenos
 
        if sp:
 
            lines = []
 

	
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if i % sp == 0:
 
                        if aln:
 
                            lines.append('<a href="#%s-%d" class="special">'
 
                                         '%*d</a>' %
 
                                         (la, i, mw, i))
 
                        else:
 
                            lines.append('<span class="special">'
 
                                         '%*d</span>' % (mw, i))
 
                    else:
 
                        if aln:
 
                            lines.append('<a href="#%s-%d">'
 
                                         '%*d</a>' % (la, i, mw, i))
 
                        else:
 
                            lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 
        else:
 
            lines = []
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if aln:
 
                        lines.append('<a href="#%s-%d">%*d</a>'
 
                                     % (la, i, mw, i))
 
                    else:
 
                        lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 

	
 
        annotate_changesets = [tup[1] for tup in self.filenode.annotate]
 
        # If pygments cropped last lines break we need do that too
 
        ln_cs = len(annotate_changesets)
 
        ln_ = len(ls.splitlines())
 
        if ln_cs > ln_:
 
            annotate_changesets = annotate_changesets[:ln_ - ln_cs]
 
        annotate = ''.join((self.annotate_from_changeset(changeset)
 
            for changeset in annotate_changesets))
 
        # in case you wonder about the seemingly redundant <div> here:
 
        # since the content in the other cell also is wrapped in a div,
 
        # some browsers in some configurations seem to mess up the formatting.
 
        '''
 
        yield 0, ('<table class="%stable">' % self.cssclass +
 
                  '<tr><td class="linenos"><div class="linenodiv"><pre>' +
 
                  ls + '</pre></div></td>' +
 
                  '<td class="code">')
 
        yield 0, dummyoutfile.getvalue()
 
        yield 0, '</td></tr></table>'
 

	
 
        '''
 
        headers_row = []
 
        if self.headers:
 
            headers_row = ['<tr class="annotate-header">']
 
            for key in self.order:
 
                td = ''.join(('<td>', self.headers[key], '</td>'))
 
                headers_row.append(td)
 
            headers_row.append('</tr>')
 

	
 
        body_row_start = ['<tr>']
0 comments (0 inline, 0 general)