Changeset - 86aa4f1f130b
[Not reviewed]
beta
0 5 0
Marcin Kuzminski - 13 years ago 2012-06-01 17:59:57
marcin@python-works.com
white space cleanup
5 files changed with 14 insertions and 14 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/helpers.py
Show inline comments
 
"""Helper functions
 

	
 
Consists of functions to typically be used within templates, but also
 
available to Controllers. This module is available to both as 'h'.
 
"""
 
import random
 
import hashlib
 
import StringIO
 
import urllib
 
import math
 
import logging
 

	
 
from datetime import datetime
 
from pygments.formatters.html import HtmlFormatter
 
from pygments import highlight as code_highlight
 
from pylons import url, request, config
 
from pylons.i18n.translation import _, ungettext
 
from hashlib import md5
 

	
 
from webhelpers.html import literal, HTML, escape
 
from webhelpers.html.tools import *
 
from webhelpers.html.builder import make_tag
 
from webhelpers.html.tags import auto_discovery_link, checkbox, css_classes, \
 
    end_form, file, form, hidden, image, javascript_link, link_to, \
 
    link_to_if, link_to_unless, ol, required_legend, select, stylesheet_link, \
 
    submit, text, password, textarea, title, ul, xml_declaration, radio
 
from webhelpers.html.tools import auto_link, button_to, highlight, \
 
    js_obfuscate, mail_to, strip_links, strip_tags, tag_re
 
from webhelpers.number import format_byte_size, format_bit_size
 
from webhelpers.pylonslib import Flash as _Flash
 
from webhelpers.pylonslib.secure_form import secure_form
 
from webhelpers.text import chop_at, collapse, convert_accented_entities, \
 
    convert_misc_entities, lchop, plural, rchop, remove_formatting, \
 
    replace_whitespace, urlify, truncate, wrap_paragraphs
 
from webhelpers.date import time_ago_in_words
 
from webhelpers.paginate import Page
 
from webhelpers.html.tags import _set_input_attrs, _set_id_attr, \
 
    convert_boolean_attrs, NotGiven, _make_safe_id_component
 

	
 
from rhodecode.lib.annotate import annotate_highlight
 
from rhodecode.lib.utils import repo_name_slug
 
from rhodecode.lib.utils2 import str2bool, safe_unicode, safe_str, \
 
    get_changeset_safe
 
from rhodecode.lib.markup_renderer import MarkupRenderer
 
from rhodecode.lib.vcs.exceptions import ChangesetDoesNotExistError
 
from rhodecode.lib.vcs.backends.base import BaseChangeset
 
from rhodecode.model.db import URL_SEP
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def shorter(text, size=20):
 
    postfix = '...'
 
    if len(text) > size:
 
        return text[:size - len(postfix)] + postfix
 
    return text
 

	
 

	
 
def _reset(name, value=None, id=NotGiven, type="reset", **attrs):
 
    """
 
    Reset button
 
    """
 
    _set_input_attrs(attrs, type, name, value)
 
    _set_id_attr(attrs, id, name)
 
    convert_boolean_attrs(attrs, ["disabled"])
 
    return HTML.input(**attrs)
 

	
 
reset = _reset
 
safeid = _make_safe_id_component
 

	
 

	
 
def FID(raw_id, path):
 
    """
 
    Creates a uniqe ID for filenode based on it's hash of path and revision
 
    it's safe to use in urls
 

	
 
    :param raw_id:
 
    :param path:
 
    """
 

	
 
    return 'C-%s-%s' % (short_id(raw_id), md5(safe_str(path)).hexdigest()[:12])
 

	
 

	
 
def get_token():
 
    """Return the current authentication token, creating one if one doesn't
 
    already exist.
 
    """
 
    token_key = "_authentication_token"
 
    from pylons import session
 
    if not token_key in session:
 
        try:
 
            token = hashlib.sha1(str(random.getrandbits(128))).hexdigest()
 
        except AttributeError:  # Python < 2.4
 
            token = hashlib.sha1(str(random.randrange(2 ** 128))).hexdigest()
 
        session[token_key] = token
 
        if hasattr(session, 'save'):
 
            session.save()
 
    return session[token_key]
 

	
 

	
 
class _GetError(object):
 
    """Get error from form_errors, and represent it as span wrapped error
 
    message
 

	
 
    :param field_name: field to fetch errors for
 
    :param form_errors: form errors dict
 
    """
 

	
 
    def __call__(self, field_name, form_errors):
 
        tmpl = """<span class="error_msg">%s</span>"""
 
        if form_errors and form_errors.has_key(field_name):
 
            return literal(tmpl % form_errors.get(field_name))
 

	
 
get_error = _GetError()
 

	
 

	
 
class _ToolTip(object):
 

	
 
    def __call__(self, tooltip_title, trim_at=50):
 
        """Special function just to wrap our text into nice formatted
 
        autowrapped text
 

	
 
        :param tooltip_title:
 
        """
 
        return escape(tooltip_title)
 
tooltip = _ToolTip()
 

	
 

	
 
class _FilesBreadCrumbs(object):
 

	
 
    def __call__(self, repo_name, rev, paths):
 
        if isinstance(paths, str):
 
            paths = safe_unicode(paths)
 
        url_l = [link_to(repo_name, url('files_home',
 
                                        repo_name=repo_name,
 
                                        revision=rev, f_path=''))]
 
        paths_l = paths.split('/')
 
        for cnt, p in enumerate(paths_l):
 
            if p != '':
 
                url_l.append(link_to(p,
 
                                     url('files_home',
 
                                         repo_name=repo_name,
 
                                         revision=rev,
 
                                         f_path='/'.join(paths_l[:cnt + 1])
 
                                         )
 
                                     )
 
                             )
 

	
 
        return literal('/'.join(url_l))
 

	
 
files_breadcrumbs = _FilesBreadCrumbs()
 

	
 

	
 
class CodeHtmlFormatter(HtmlFormatter):
 
    """
 
    My code Html Formatter for source codes
 
    """
 

	
 
    def wrap(self, source, outfile):
 
        return self._wrap_div(self._wrap_pre(self._wrap_code(source)))
 

	
 
    def _wrap_code(self, source):
 
        for cnt, it in enumerate(source):
 
            i, t = it
 
            t = '<div id="L%s">%s</div>' % (cnt + 1, t)
 
            yield i, t
 

	
 
    def _wrap_tablelinenos(self, inner):
 
        dummyoutfile = StringIO.StringIO()
 
        lncount = 0
 
        for t, line in inner:
 
            if t:
 
                lncount += 1
 
            dummyoutfile.write(line)
 

	
 
        fl = self.linenostart
 
        mw = len(str(lncount + fl - 1))
 
        sp = self.linenospecial
 
        st = self.linenostep
 
        la = self.lineanchors
 
        aln = self.anchorlinenos
 
        nocls = self.noclasses
 
        if sp:
 
            lines = []
 

	
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if i % sp == 0:
 
                        if aln:
 
                            lines.append('<a href="#%s%d" class="special">%*d</a>' %
 
                                         (la, i, mw, i))
 
                        else:
 
                            lines.append('<span class="special">%*d</span>' % (mw, i))
 
                    else:
 
                        if aln:
 
                            lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                        else:
 
                            lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 
        else:
 
            lines = []
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if aln:
 
                        lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                    else:
 
                        lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 

	
 
        # in case you wonder about the seemingly redundant <div> here: since the
 
        # content in the other cell also is wrapped in a div, some browsers in
 
        # some configurations seem to mess up the formatting...
 
        if nocls:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td><div class="linenodiv" '
 
                      'style="background-color: #f0f0f0; padding-right: 10px">'
 
                      '<pre style="line-height: 125%">' +
 
                      ls + '</pre></div></td><td id="hlcode" class="code">')
 
        else:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td class="linenos"><div class="linenodiv"><pre>' +
 
                      ls + '</pre></div></td><td id="hlcode" class="code">')
 
        yield 0, dummyoutfile.getvalue()
 
        yield 0, '</td></tr></table>'
 

	
 

	
 
def pygmentize(filenode, **kwargs):
 
    """pygmentize function using pygments
 

	
 
    :param filenode:
 
    """
 

	
 
    return literal(code_highlight(filenode.content,
 
                                  filenode.lexer, CodeHtmlFormatter(**kwargs)))
 

	
 

	
 
def pygmentize_annotation(repo_name, filenode, **kwargs):
 
    """
 
    pygmentize function for annotation
 

	
 
    :param filenode:
 
    """
 

	
 
    color_dict = {}
 

	
 
    def gen_color(n=10000):
 
        """generator for getting n of evenly distributed colors using
 
        hsv color and golden ratio. It always return same order of colors
 

	
 
        :returns: RGB tuple
 
        """
 

	
 
        def hsv_to_rgb(h, s, v):
 
            if s == 0.0:
 
                return v, v, v
 
            i = int(h * 6.0)  # XXX assume int() truncates!
 
            f = (h * 6.0) - i
 
            p = v * (1.0 - s)
 
            q = v * (1.0 - s * f)
 
            t = v * (1.0 - s * (1.0 - f))
 
            i = i % 6
 
            if i == 0:
 
                return v, t, p
 
            if i == 1:
 
                return q, v, p
 
            if i == 2:
 
                return p, v, t
 
            if i == 3:
 
                return p, q, v
 
            if i == 4:
 
                return t, p, v
 
            if i == 5:
 
                return v, p, q
 

	
 
        golden_ratio = 0.618033988749895
 
        h = 0.22717784590367374
 

	
 
        for _ in xrange(n):
 
            h += golden_ratio
 
            h %= 1
 
            HSV_tuple = [h, 0.95, 0.95]
 
            RGB_tuple = hsv_to_rgb(*HSV_tuple)
 
            yield map(lambda x: str(int(x * 256)), RGB_tuple)
 

	
 
    cgenerator = gen_color()
 

	
 
    def get_color_string(cs):
 
        if cs in color_dict:
 
            col = color_dict[cs]
 
        else:
 
            col = color_dict[cs] = cgenerator.next()
 
        return "color: rgb(%s)! important;" % (', '.join(col))
 

	
 
    def url_func(repo_name):
 

	
 
        def _url_func(changeset):
 
            author = changeset.author
 
            date = changeset.date
 
            message = tooltip(changeset.message)
 

	
 
            tooltip_html = ("<div style='font-size:0.8em'><b>Author:</b>"
 
                            " %s<br/><b>Date:</b> %s</b><br/><b>Message:"
 
                            "</b> %s<br/></div>")
 

	
 
            tooltip_html = tooltip_html % (author, date, message)
 
            lnk_format = '%5s:%s' % ('r%s' % changeset.revision,
 
                                     short_id(changeset.raw_id))
 
            uri = link_to(
 
                    lnk_format,
 
                    url('changeset_home', repo_name=repo_name,
 
                        revision=changeset.raw_id),
 
                    style=get_color_string(changeset.raw_id),
 
                    class_='tooltip',
 
                    title=tooltip_html
 
                  )
 

	
 
            uri += '\n'
 
            return uri
 
        return _url_func
 

	
 
    return literal(annotate_highlight(filenode, url_func(repo_name), **kwargs))
 

	
 

	
 
def is_following_repo(repo_name, user_id):
 
    from rhodecode.model.scm import ScmModel
 
    return ScmModel().is_following_repo(repo_name, user_id)
 

	
 
flash = _Flash()
 

	
 
#==============================================================================
 
# SCM FILTERS available via h.
 
#==============================================================================
 
from rhodecode.lib.vcs.utils import author_name, author_email
 
from rhodecode.lib.utils2 import credentials_filter, age as _age
 
from rhodecode.model.db import User
 

	
 
age = lambda  x: _age(x)
 
capitalize = lambda x: x.capitalize()
 
email = author_email
 
short_id = lambda x: x[:12]
 
hide_credentials = lambda x: ''.join(credentials_filter(x))
 

	
 

	
 
def is_git(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'git'
 

	
 

	
 
def is_hg(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'hg'
 

	
 

	
 
def email_or_none(author):
 
    _email = email(author)
 
    if _email != '':
 
        return _email
 

	
 
    # See if it contains a username we can get an email from
 
    user = User.get_by_username(author_name(author), case_insensitive=True,
 
                                cache=True)
 
    if user is not None:
 
        return user.email
 

	
 
    # No valid email, not a valid user in the system, none!
 
    return None
 

	
 

	
 
def person(author):
 
    # attr to return from fetched user
 
    person_getter = lambda usr: usr.username
 

	
 
    # Valid email in the attribute passed, see if they're in the system
 
    _email = email(author)
 
    if _email != '':
 
        user = User.get_by_email(_email, case_insensitive=True, cache=True)
 
        if user is not None:
 
            return person_getter(user)
 
        return _email
 

	
 
    # Maybe it's a username?
 
    _author = author_name(author)
 
    user = User.get_by_username(_author, case_insensitive=True,
 
                                cache=True)
 
    if user is not None:
 
        return person_getter(user)
 

	
 
    # Still nothing?  Just pass back the author name then
 
    return _author
 

	
 

	
 
def bool2icon(value):
 
    """Returns True/False values represented as small html image of true/false
 
    icons
 

	
 
    :param value: bool value
 
    """
 

	
 
    if value is True:
 
        return HTML.tag('img', src=url("/images/icons/accept.png"),
 
                        alt=_('True'))
 

	
 
    if value is False:
 
        return HTML.tag('img', src=url("/images/icons/cancel.png"),
 
                        alt=_('False'))
 

	
 
    return value
 

	
 

	
 
def action_parser(user_log, feed=False):
 
    """
 
    This helper will action_map the specified string action into translated
 
    fancy names with icons and links
 

	
 
    :param user_log: user log instance
 
    :param feed: use output for feeds (no html and fancy icons)
 
    """
 

	
 
    action = user_log.action
 
    action_params = ' '
 

	
 
    x = action.split(':')
 

	
 
    if len(x) > 1:
 
        action, action_params = x
 

	
 
    def get_cs_links():
 
        revs_limit = 3  # display this amount always
 
        revs_top_limit = 50  # show upto this amount of changesets hidden
 
        revs_ids = action_params.split(',')
 
        deleted = user_log.repository is None
 
        if deleted:
 
            return ','.join(revs_ids)
 

	
 
        repo_name = user_log.repository.repo_name
 

	
 
        repo = user_log.repository.scm_instance
 

	
 
        def lnk(rev, repo_name):
 

	
 
            if isinstance(rev, BaseChangeset):
 
                lbl = 'r%s:%s' % (rev.revision, rev.short_id)
 
                _url = url('changeset_home', repo_name=repo_name, 
 
                _url = url('changeset_home', repo_name=repo_name,
 
                           revision=rev.raw_id)
 
                title = tooltip(rev.message)
 
            else:
 
                lbl = '%s' % rev
 
                _url = '#'
 
                title = _('Changeset not found')
 

	
 
            return link_to(lbl, _url, title=title, class_='tooltip',)
 

	
 
        revs = []
 
        if len(filter(lambda v: v != '', revs_ids)) > 0:
 
            for rev in revs_ids[:revs_top_limit]:
 
                try:
 
                    rev = repo.get_changeset(rev)
 
                    revs.append(rev)
 
                except ChangesetDoesNotExistError:
 
                    log.error('cannot find revision %s in this repo' % rev)
 
                    revs.append(rev)
 
                    continue
 
        cs_links = []
 
        cs_links.append(" " + ', '.join(
 
            [lnk(rev, repo_name) for rev in revs[:revs_limit]]
 
            )
 
        )
 

	
 
        compare_view = (
 
            ' <div class="compare_view tooltip" title="%s">'
 
            '<a href="%s">%s</a> </div>' % (
 
                _('Show all combined changesets %s->%s') % (
 
                    revs_ids[0], revs_ids[-1]
 
                ),
 
                url('changeset_home', repo_name=repo_name,
 
                    revision='%s...%s' % (revs_ids[0], revs_ids[-1])
 
                ),
 
                _('compare view')
 
            )
 
        )
 

	
 
        # if we have exactly one more than normally displayed
 
        # just display it, takes less space than displaying
 
        # "and 1 more revisions"
 
        if len(revs_ids) == revs_limit + 1:
 
            rev = revs[revs_limit]
 
            cs_links.append(", " + lnk(rev, repo_name))
 

	
 
        # hidden-by-default ones
 
        if len(revs_ids) > revs_limit + 1:
 
            uniq_id = revs_ids[0]
 
            html_tmpl = (
 
                '<span> %s <a class="show_more" id="_%s" '
 
                'href="#more">%s</a> %s</span>'
 
            )
 
            if not feed:
 
                cs_links.append(html_tmpl % (
 
                      _('and'),
 
                      uniq_id, _('%s more') % (len(revs_ids) - revs_limit),
 
                      _('revisions')
 
                    )
 
                )
 

	
 
            if not feed:
 
                html_tmpl = '<span id="%s" style="display:none">, %s </span>'
 
            else:
 
                html_tmpl = '<span id="%s"> %s </span>'
 

	
 
            morelinks = ', '.join(
 
              [lnk(rev, repo_name) for rev in revs[revs_limit:]]
 
            )
 

	
 
            if len(revs_ids) > revs_top_limit:
 
                morelinks += ', ...'
 

	
 
            cs_links.append(html_tmpl % (uniq_id, morelinks))
 
        if len(revs) > 1:
 
            cs_links.append(compare_view)
 
        return ''.join(cs_links)
 

	
 
    def get_fork_name():
 
        repo_name = action_params
 
        return _('fork name ') + str(link_to(action_params, url('summary_home',
 
                                          repo_name=repo_name,)))
 

	
 
    action_map = {'user_deleted_repo': (_('[deleted] repository'), None),
 
           'user_created_repo': (_('[created] repository'), None),
 
           'user_created_fork': (_('[created] repository as fork'), None),
 
           'user_forked_repo': (_('[forked] repository'), get_fork_name),
 
           'user_updated_repo': (_('[updated] repository'), None),
 
           'admin_deleted_repo': (_('[delete] repository'), None),
 
           'admin_created_repo': (_('[created] repository'), None),
 
           'admin_forked_repo': (_('[forked] repository'), None),
 
           'admin_updated_repo': (_('[updated] repository'), None),
 
           'push': (_('[pushed] into'), get_cs_links),
 
           'push_local': (_('[committed via RhodeCode] into'), get_cs_links),
 
           'push_remote': (_('[pulled from remote] into'), get_cs_links),
 
           'pull': (_('[pulled] from'), None),
 
           'started_following_repo': (_('[started following] repository'), None),
 
           'stopped_following_repo': (_('[stopped following] repository'), None),
 
            }
 

	
 
    action_str = action_map.get(action, action)
 
    if feed:
 
        action = action_str[0].replace('[', '').replace(']', '')
 
    else:
 
        action = action_str[0]\
 
            .replace('[', '<span class="journal_highlight">')\
 
            .replace(']', '</span>')
 

	
 
    action_params_func = lambda: ""
 

	
 
    if callable(action_str[1]):
 
        action_params_func = action_str[1]
 

	
 
    return [literal(action), action_params_func]
 

	
 

	
 
def action_parser_icon(user_log):
 
    action = user_log.action
 
    action_params = None
 
    x = action.split(':')
 

	
 
    if len(x) > 1:
 
        action, action_params = x
 

	
 
    tmpl = """<img src="%s%s" alt="%s"/>"""
 
    map = {'user_deleted_repo':'database_delete.png',
 
           'user_created_repo':'database_add.png',
 
           'user_created_fork':'arrow_divide.png',
 
           'user_forked_repo':'arrow_divide.png',
 
           'user_updated_repo':'database_edit.png',
 
           'admin_deleted_repo':'database_delete.png',
 
           'admin_created_repo':'database_add.png',
 
           'admin_forked_repo':'arrow_divide.png',
 
           'admin_updated_repo':'database_edit.png',
 
           'push':'script_add.png',
 
           'push_local':'script_edit.png',
 
           'push_remote':'connect.png',
 
           'pull':'down_16.png',
 
           'started_following_repo':'heart_add.png',
 
           'stopped_following_repo':'heart_delete.png',
 
            }
 
    return literal(tmpl % ((url('/images/icons/')),
 
                           map.get(action, action), action))
 

	
 

	
 
#==============================================================================
 
# PERMS
 
#==============================================================================
 
from rhodecode.lib.auth import HasPermissionAny, HasPermissionAll, \
 
HasRepoPermissionAny, HasRepoPermissionAll
 

	
 

	
 
#==============================================================================
 
# GRAVATAR URL
 
#==============================================================================
 

	
 
def gravatar_url(email_address, size=30):
 
    if (not str2bool(config['app_conf'].get('use_gravatar')) or
 
        not email_address or email_address == 'anonymous@rhodecode.org'):
 
        f = lambda a, l: min(l, key=lambda x: abs(x - a))
 
        return url("/images/user%s.png" % f(size, [14, 16, 20, 24, 30]))
 

	
 
    ssl_enabled = 'https' == request.environ.get('wsgi.url_scheme')
 
    default = 'identicon'
 
    baseurl_nossl = "http://www.gravatar.com/avatar/"
 
    baseurl_ssl = "https://secure.gravatar.com/avatar/"
 
    baseurl = baseurl_ssl if ssl_enabled else baseurl_nossl
 

	
 
    if isinstance(email_address, unicode):
 
        #hashlib crashes on unicode items
 
        email_address = safe_str(email_address)
 
    # construct the url
 
    gravatar_url = baseurl + hashlib.md5(email_address.lower()).hexdigest() + "?"
 
    gravatar_url += urllib.urlencode({'d': default, 's': str(size)})
 

	
 
    return gravatar_url
 

	
 

	
 
#==============================================================================
 
# REPO PAGER, PAGER FOR REPOSITORY
 
#==============================================================================
 
class RepoPage(Page):
 

	
 
    def __init__(self, collection, page=1, items_per_page=20,
 
                 item_count=None, url=None, **kwargs):
 

	
 
        """Create a "RepoPage" instance. special pager for paging
 
        repository
 
        """
 
        self._url_generator = url
 

	
 
        # Safe the kwargs class-wide so they can be used in the pager() method
 
        self.kwargs = kwargs
 

	
 
        # Save a reference to the collection
 
        self.original_collection = collection
 

	
 
        self.collection = collection
 

	
 
        # The self.page is the number of the current page.
 
        # The first page has the number 1!
 
        try:
 
            self.page = int(page)  # make it int() if we get it as a string
 
        except (ValueError, TypeError):
 
            self.page = 1
 

	
 
        self.items_per_page = items_per_page
 

	
 
        # Unless the user tells us how many items the collections has
 
        # we calculate that ourselves.
 
        if item_count is not None:
 
            self.item_count = item_count
 
        else:
 
            self.item_count = len(self.collection)
 

	
 
        # Compute the number of the first and last available page
 
        if self.item_count > 0:
 
            self.first_page = 1
 
            self.page_count = int(math.ceil(float(self.item_count) /
 
                                            self.items_per_page))
 
            self.last_page = self.first_page + self.page_count - 1
 

	
 
            # Make sure that the requested page number is the range of
 
            # valid pages
 
            if self.page > self.last_page:
 
                self.page = self.last_page
 
            elif self.page < self.first_page:
 
                self.page = self.first_page
 

	
 
            # Note: the number of items on this page can be less than
 
            #       items_per_page if the last page is not full
 
            self.first_item = max(0, (self.item_count) - (self.page *
 
                                                          items_per_page))
 
            self.last_item = ((self.item_count - 1) - items_per_page *
 
                              (self.page - 1))
 

	
 
            self.items = list(self.collection[self.first_item:self.last_item + 1])
 

	
 
            # Links to previous and next page
 
            if self.page > self.first_page:
 
                self.previous_page = self.page - 1
 
            else:
 
                self.previous_page = None
 

	
 
            if self.page < self.last_page:
 
                self.next_page = self.page + 1
 
            else:
 
                self.next_page = None
 

	
 
        # No items available
 
        else:
 
            self.first_page = None
 
            self.page_count = 0
 
            self.last_page = None
 
            self.first_item = None
 
            self.last_item = None
 
            self.previous_page = None
 
            self.next_page = None
 
            self.items = []
 

	
 
        # This is a subclass of the 'list' type. Initialise the list now.
 
        list.__init__(self, reversed(self.items))
 

	
 

	
 
def changed_tooltip(nodes):
 
    """
 
    Generates a html string for changed nodes in changeset page.
 
    It limits the output to 30 entries
 

	
 
    :param nodes: LazyNodesGenerator
 
    """
 
    if nodes:
 
        pref = ': <br/> '
 
        suf = ''
 
        if len(nodes) > 30:
 
            suf = '<br/>' + _(' and %s more') % (len(nodes) - 30)
 
        return literal(pref + '<br/> '.join([safe_unicode(x.path)
 
                                             for x in nodes[:30]]) + suf)
 
    else:
 
        return ': ' + _('No Files')
 

	
 

	
 
def repo_link(groups_and_repos):
 
    """
 
    Makes a breadcrumbs link to repo within a group
 
    joins &raquo; on each group to create a fancy link
 

	
 
    ex::
 
        group >> subgroup >> repo
 

	
 
    :param groups_and_repos:
 
    """
 
    groups, repo_name = groups_and_repos
 

	
 
    if not groups:
 
        return repo_name
 
    else:
 
        def make_link(group):
 
            return link_to(group.name, url('repos_group_home',
 
                                           group_name=group.group_name))
 
        return literal(' &raquo; '.join(map(make_link, groups)) + \
 
                       " &raquo; " + repo_name)
 

	
 

	
 
def fancy_file_stats(stats):
 
    """
 
    Displays a fancy two colored bar for number of added/deleted
 
    lines of code on file
 

	
 
    :param stats: two element list of added/deleted lines of code
 
    """
 

	
 
    a, d, t = stats[0], stats[1], stats[0] + stats[1]
 
    width = 100
 
    unit = float(width) / (t or 1)
 

	
 
    # needs > 9% of width to be visible or 0 to be hidden
 
    a_p = max(9, unit * a) if a > 0 else 0
 
    d_p = max(9, unit * d) if d > 0 else 0
 
    p_sum = a_p + d_p
 

	
 
    if p_sum > width:
 
        #adjust the percentage to be == 100% since we adjusted to 9
 
        if a_p > d_p:
 
            a_p = a_p - (p_sum - width)
 
        else:
 
            d_p = d_p - (p_sum - width)
 

	
 
    a_v = a if a > 0 else ''
 
    d_v = d if d > 0 else ''
 

	
 
    def cgen(l_type):
 
        mapping = {'tr': 'top-right-rounded-corner-mid',
 
                   'tl': 'top-left-rounded-corner-mid',
 
                   'br': 'bottom-right-rounded-corner-mid',
 
                   'bl': 'bottom-left-rounded-corner-mid'}
 
        map_getter = lambda x: mapping[x]
 

	
 
        if l_type == 'a' and d_v:
 
            #case when added and deleted are present
 
            return ' '.join(map(map_getter, ['tl', 'bl']))
 

	
 
        if l_type == 'a' and not d_v:
 
            return ' '.join(map(map_getter, ['tr', 'br', 'tl', 'bl']))
 

	
 
        if l_type == 'd' and a_v:
 
            return ' '.join(map(map_getter, ['tr', 'br']))
 

	
 
        if l_type == 'd' and not a_v:
 
            return ' '.join(map(map_getter, ['tr', 'br', 'tl', 'bl']))
 

	
 
    d_a = '<div class="added %s" style="width:%s%%">%s</div>' % (
 
        cgen('a'), a_p, a_v
 
    )
 
    d_d = '<div class="deleted %s" style="width:%s%%">%s</div>' % (
 
        cgen('d'), d_p, d_v
 
    )
 
    return literal('<div style="width:%spx">%s%s</div>' % (width, d_a, d_d))
 

	
 

	
 
def urlify_text(text_):
 
    import re
 

	
 
    url_pat = re.compile(r'''(http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]'''
 
                         '''|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)''')
 

	
 
    def url_func(match_obj):
 
        url_full = match_obj.groups()[0]
 
        return '<a href="%(url)s">%(url)s</a>' % ({'url': url_full})
 

	
 
    return literal(url_pat.sub(url_func, text_))
 

	
 

	
 
def urlify_changesets(text_, repository):
 
    """
 
    Extract revision ids from changeset and make link from them
 

	
 
    :param text_:
 
    :param repository:
 
    """
 
    import re
 
    URL_PAT = re.compile(r'([0-9a-fA-F]{12,})')
 

	
 
    def url_func(match_obj):
 
        rev = match_obj.groups()[0]
 
        pref = ''
 
        if match_obj.group().startswith(' '):
 
            pref = ' '
 
        tmpl = (
 
        '%(pref)s<a class="%(cls)s" href="%(url)s">'
 
        '%(rev)s'
 
        '</a>'
 
        )
 
        return tmpl % {
 
         'pref': pref,
 
         'cls': 'revision-link',
 
         'url': url('changeset_home', repo_name=repository, revision=rev),
 
         'rev': rev,
 
        }
 

	
 
    newtext = URL_PAT.sub(url_func, text_)
 

	
 
    return newtext
 

	
 

	
 
def urlify_commit(text_, repository=None, link_=None):
 
    """
 
    Parses given text message and makes proper links.
 
    issues are linked to given issue-server, and rest is a changeset link
 
    if link_ is given, in other case it's a plain text
 

	
 
    :param text_:
 
    :param repository:
 
    :param link_: changeset link
 
    """
 
    import re
 
    import traceback
 

	
 
    def escaper(string):
 
        return string.replace('<', '&lt;').replace('>', '&gt;')
 

	
 
    def linkify_others(t, l):
 
        urls = re.compile(r'(\<a.*?\<\/a\>)',)
 
        links = []
 
        for e in urls.split(t):
 
            if not urls.match(e):
 
                links.append('<a class="message-link" href="%s">%s</a>' % (l, e))
 
            else:
 
                links.append(e)
 

	
 
        return ''.join(links)
 

	
 
    # urlify changesets - extrac revisions and make link out of them
 
    text_ = urlify_changesets(escaper(text_), repository)
 

	
 
    try:
 
        conf = config['app_conf']
 

	
 
        URL_PAT = re.compile(r'%s' % conf.get('issue_pat'))
 

	
 
        if URL_PAT:
 
            ISSUE_SERVER_LNK = conf.get('issue_server_link')
 
            ISSUE_PREFIX = conf.get('issue_prefix')
 

	
 
            def url_func(match_obj):
 
                pref = ''
 
                if match_obj.group().startswith(' '):
 
                    pref = ' '
 

	
 
                issue_id = ''.join(match_obj.groups())
 
                tmpl = (
 
                '%(pref)s<a class="%(cls)s" href="%(url)s">'
 
                '%(issue-prefix)s%(id-repr)s'
 
                '</a>'
 
                )
 
                url = ISSUE_SERVER_LNK.replace('{id}', issue_id)
 
                if repository:
 
                    url = url.replace('{repo}', repository)
 
                    repo_name = repository.split(URL_SEP)[-1]
 
                    url = url.replace('{repo_name}', repo_name)
 
                return tmpl % {
 
                     'pref': pref,
 
                     'cls': 'issue-tracker-link',
 
                     'url': url,
 
                     'id-repr': issue_id,
 
                     'issue-prefix': ISSUE_PREFIX,
 
                     'serv': ISSUE_SERVER_LNK,
 
                }
 

	
 
            newtext = URL_PAT.sub(url_func, text_)
 

	
 
            if link_:
 
                # wrap not links into final link => link_
 
                newtext = linkify_others(newtext, link_)
 

	
 
            return literal(newtext)
 
    except:
 
        log.error(traceback.format_exc())
 
        pass
 

	
 
    return text_
 

	
 

	
 
def rst(source):
 
    return literal('<div class="rst-block">%s</div>' %
 
                   MarkupRenderer.rst(source))
 

	
 

	
 
def rst_w_mentions(source):
 
    """
 
    Wrapped rst renderer with @mention highlighting
 

	
 
    :param source:
 
    """
 
    return literal('<div class="rst-block">%s</div>' %
 
                   MarkupRenderer.rst_with_mentions(source))
rhodecode/lib/utils2.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.lib.utils
 
    ~~~~~~~~~~~~~~~~~~~
 

	
 
    Some simple helper functions
 

	
 
    :created_on: Jan 5, 2011
 
    :author: marcink
 
    :copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import re
 
from datetime import datetime
 
from pylons.i18n.translation import _, ungettext
 
from rhodecode.lib.vcs.utils.lazy import LazyProperty
 

	
 

	
 
def __get_lem():
 
    """
 
    Get language extension map based on what's inside pygments lexers
 
    """
 
    from pygments import lexers
 
    from string import lower
 
    from collections import defaultdict
 

	
 
    d = defaultdict(lambda: [])
 

	
 
    def __clean(s):
 
        s = s.lstrip('*')
 
        s = s.lstrip('.')
 

	
 
        if s.find('[') != -1:
 
            exts = []
 
            start, stop = s.find('['), s.find(']')
 

	
 
            for suffix in s[start + 1:stop]:
 
                exts.append(s[:s.find('[')] + suffix)
 
            return map(lower, exts)
 
        else:
 
            return map(lower, [s])
 

	
 
    for lx, t in sorted(lexers.LEXERS.items()):
 
        m = map(__clean, t[-2])
 
        if m:
 
            m = reduce(lambda x, y: x + y, m)
 
            for ext in m:
 
                desc = lx.replace('Lexer', '')
 
                d[ext].append(desc)
 

	
 
    return dict(d)
 

	
 
def str2bool(_str):
 
    """
 
    returs True/False value from given string, it tries to translate the
 
    string into boolean
 

	
 
    :param _str: string value to translate into boolean
 
    :rtype: boolean
 
    :returns: boolean from given string
 
    """
 
    if _str is None:
 
        return False
 
    if _str in (True, False):
 
        return _str
 
    _str = str(_str).strip().lower()
 
    return _str in ('t', 'true', 'y', 'yes', 'on', '1')
 

	
 

	
 
def convert_line_endings(line, mode):
 
    """
 
    Converts a given line  "line end" accordingly to given mode
 

	
 
    Available modes are::
 
        0 - Unix
 
        1 - Mac
 
        2 - DOS
 

	
 
    :param line: given line to convert
 
    :param mode: mode to convert to
 
    :rtype: str
 
    :return: converted line according to mode
 
    """
 
    from string import replace
 

	
 
    if mode == 0:
 
            line = replace(line, '\r\n', '\n')
 
            line = replace(line, '\r', '\n')
 
    elif mode == 1:
 
            line = replace(line, '\r\n', '\r')
 
            line = replace(line, '\n', '\r')
 
    elif mode == 2:
 
            line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
 
    return line
 

	
 

	
 
def detect_mode(line, default):
 
    """
 
    Detects line break for given line, if line break couldn't be found
 
    given default value is returned
 

	
 
    :param line: str line
 
    :param default: default
 
    :rtype: int
 
    :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
 
    """
 
    if line.endswith('\r\n'):
 
        return 2
 
    elif line.endswith('\n'):
 
        return 0
 
    elif line.endswith('\r'):
 
        return 1
 
    else:
 
        return default
 

	
 

	
 
def generate_api_key(username, salt=None):
 
    """
 
    Generates unique API key for given username, if salt is not given
 
    it'll be generated from some random string
 

	
 
    :param username: username as string
 
    :param salt: salt to hash generate KEY
 
    :rtype: str
 
    :returns: sha1 hash from username+salt
 
    """
 
    from tempfile import _RandomNameSequence
 
    import hashlib
 

	
 
    if salt is None:
 
        salt = _RandomNameSequence().next()
 

	
 
    return hashlib.sha1(username + salt).hexdigest()
 

	
 

	
 
def safe_unicode(str_, from_encoding=None):
 
    """
 
    safe unicode function. Does few trick to turn str_ into unicode
 

	
 
    In case of UnicodeDecode error we try to return it with encoding detected
 
    by chardet library if it fails fallback to unicode with errors replaced
 

	
 
    :param str_: string to decode
 
    :rtype: unicode
 
    :returns: unicode object
 
    """
 
    if isinstance(str_, unicode):
 
        return str_
 

	
 
    if not from_encoding:
 
        import rhodecode
 
        DEFAULT_ENCODING = rhodecode.CONFIG.get('default_encoding','utf8')
 
        from_encoding = DEFAULT_ENCODING
 

	
 
    try:
 
        return unicode(str_)
 
    except UnicodeDecodeError:
 
        pass
 

	
 
    try:
 
        return unicode(str_, from_encoding)
 
    except UnicodeDecodeError:
 
        pass
 

	
 
    try:
 
        import chardet
 
        encoding = chardet.detect(str_)['encoding']
 
        if encoding is None:
 
            raise Exception()
 
        return str_.decode(encoding)
 
    except (ImportError, UnicodeDecodeError, Exception):
 
        return unicode(str_, from_encoding, 'replace')
 

	
 

	
 
def safe_str(unicode_, to_encoding=None):
 
    """
 
    safe str function. Does few trick to turn unicode_ into string
 

	
 
    In case of UnicodeEncodeError we try to return it with encoding detected
 
    by chardet library if it fails fallback to string with errors replaced
 

	
 
    :param unicode_: unicode to encode
 
    :rtype: str
 
    :returns: str object
 
    """
 

	
 
    # if it's not basestr cast to str
 
    if not isinstance(unicode_, basestring):
 
        return str(unicode_)
 

	
 
    if isinstance(unicode_, str):
 
        return unicode_
 

	
 
    if not to_encoding:
 
        import rhodecode
 
        DEFAULT_ENCODING = rhodecode.CONFIG.get('default_encoding','utf8')
 
        to_encoding = DEFAULT_ENCODING
 

	
 
    try:
 
        return unicode_.encode(to_encoding)
 
    except UnicodeEncodeError:
 
        pass
 

	
 
    try:
 
        import chardet
 
        encoding = chardet.detect(unicode_)['encoding']
 
        if encoding is None:
 
            raise UnicodeEncodeError()
 

	
 
        return unicode_.encode(encoding)
 
    except (ImportError, UnicodeEncodeError):
 
        return unicode_.encode(to_encoding, 'replace')
 

	
 
    return safe_str
 

	
 

	
 
def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs):
 
    """
 
    Custom engine_from_config functions that makes sure we use NullPool for
 
    file based sqlite databases. This prevents errors on sqlite. This only
 
    applies to sqlalchemy versions < 0.7.0
 

	
 
    """
 
    import sqlalchemy
 
    from sqlalchemy import engine_from_config as efc
 
    import logging
 

	
 
    if int(sqlalchemy.__version__.split('.')[1]) < 7:
 

	
 
        # This solution should work for sqlalchemy < 0.7.0, and should use
 
        # proxy=TimerProxy() for execution time profiling
 

	
 
        from sqlalchemy.pool import NullPool
 
        url = configuration[prefix + 'url']
 

	
 
        if url.startswith('sqlite'):
 
            kwargs.update({'poolclass': NullPool})
 
        return efc(configuration, prefix, **kwargs)
 
    else:
 
        import time
 
        from sqlalchemy import event
 
        from sqlalchemy.engine import Engine
 

	
 
        log = logging.getLogger('sqlalchemy.engine')
 
        BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38)
 
        engine = efc(configuration, prefix, **kwargs)
 

	
 
        def color_sql(sql):
 
            COLOR_SEQ = "\033[1;%dm"
 
            COLOR_SQL = YELLOW
 
            normal = '\x1b[0m'
 
            return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal])
 

	
 
        if configuration['debug']:
 
            #attach events only for debug configuration
 

	
 
            def before_cursor_execute(conn, cursor, statement,
 
                                    parameters, context, executemany):
 
                context._query_start_time = time.time()
 
                log.info(color_sql(">>>>> STARTING QUERY >>>>>"))
 

	
 

	
 
            def after_cursor_execute(conn, cursor, statement,
 
                                    parameters, context, executemany):
 
                total = time.time() - context._query_start_time
 
                log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total))
 

	
 
            event.listen(engine, "before_cursor_execute",
 
                         before_cursor_execute)
 
            event.listen(engine, "after_cursor_execute",
 
                         after_cursor_execute)
 

	
 
    return engine
 

	
 

	
 
def age(prevdate):
 
    """
 
    turns a datetime into an age string.
 

	
 
    :param prevdate: datetime object
 
    :rtype: unicode
 
    :returns: unicode words describing age
 
    """
 

	
 
    order = ['year', 'month', 'day', 'hour', 'minute', 'second']
 
    deltas = {}
 

	
 
    # Get date parts deltas
 
    now = datetime.now()
 
    for part in order:
 
        deltas[part] = getattr(now, part) - getattr(prevdate, part)
 

	
 
    # Fix negative offsets (there is 1 second between 10:59:59 and 11:00:00,
 
    # not 1 hour, -59 minutes and -59 seconds)
 

	
 
    for num, length in [(5, 60), (4, 60), (3, 24)]:  # seconds, minutes, hours
 
        part = order[num]
 
        carry_part = order[num - 1]
 
        
 

	
 
        if deltas[part] < 0:
 
            deltas[part] += length
 
            deltas[carry_part] -= 1
 

	
 
    # Same thing for days except that the increment depends on the (variable)
 
    # number of days in the month
 
    month_lengths = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
 
    if deltas['day'] < 0:
 
        if prevdate.month == 2 and (prevdate.year % 4 == 0 and
 
            (prevdate.year % 100 != 0 or prevdate.year % 400 == 0)):
 
            deltas['day'] += 29
 
        else:
 
            deltas['day'] += month_lengths[prevdate.month - 1]
 
        
 

	
 
        deltas['month'] -= 1
 
    
 

	
 
    if deltas['month'] < 0:
 
        deltas['month'] += 12
 
        deltas['year'] -= 1
 
    
 

	
 
    # Format the result
 
    fmt_funcs = {
 
        'year': lambda d: ungettext(u'%d year', '%d years', d) % d,
 
        'month': lambda d: ungettext(u'%d month', '%d months', d) % d,
 
        'day': lambda d: ungettext(u'%d day', '%d days', d) % d,
 
        'hour': lambda d: ungettext(u'%d hour', '%d hours', d) % d,
 
        'minute': lambda d: ungettext(u'%d minute', '%d minutes', d) % d,
 
        'second': lambda d: ungettext(u'%d second', '%d seconds', d) % d,
 
    }
 
    
 

	
 
    for i, part in enumerate(order):
 
        value = deltas[part]
 
        if value == 0:
 
            continue
 
        
 

	
 
        if i < 5:
 
            sub_part = order[i + 1]
 
            sub_value = deltas[sub_part]
 
        else:
 
            sub_value = 0
 
        
 

	
 
        if sub_value == 0:
 
            return _(u'%s ago') % fmt_funcs[part](value)
 
        
 

	
 
        return _(u'%s and %s ago') % (fmt_funcs[part](value),
 
            fmt_funcs[sub_part](sub_value))
 

	
 
    return _(u'just now')
 

	
 

	
 
def uri_filter(uri):
 
    """
 
    Removes user:password from given url string
 

	
 
    :param uri:
 
    :rtype: unicode
 
    :returns: filtered list of strings
 
    """
 
    if not uri:
 
        return ''
 

	
 
    proto = ''
 

	
 
    for pat in ('https://', 'http://'):
 
        if uri.startswith(pat):
 
            uri = uri[len(pat):]
 
            proto = pat
 
            break
 

	
 
    # remove passwords and username
 
    uri = uri[uri.find('@') + 1:]
 

	
 
    # get the port
 
    cred_pos = uri.find(':')
 
    if cred_pos == -1:
 
        host, port = uri, None
 
    else:
 
        host, port = uri[:cred_pos], uri[cred_pos + 1:]
 

	
 
    return filter(None, [proto, host, port])
 

	
 

	
 
def credentials_filter(uri):
 
    """
 
    Returns a url with removed credentials
 

	
 
    :param uri:
 
    """
 

	
 
    uri = uri_filter(uri)
 
    #check if we have port
 
    if len(uri) > 2 and uri[2]:
 
        uri[2] = ':' + uri[2]
 

	
 
    return ''.join(uri)
 

	
 

	
 
def get_changeset_safe(repo, rev):
 
    """
 
    Safe version of get_changeset if this changeset doesn't exists for a
 
    repo it returns a Dummy one instead
 

	
 
    :param repo:
 
    :param rev:
 
    """
 
    from rhodecode.lib.vcs.backends.base import BaseRepository
 
    from rhodecode.lib.vcs.exceptions import RepositoryError
 
    if not isinstance(repo, BaseRepository):
 
        raise Exception('You must pass an Repository '
 
                        'object as first argument got %s', type(repo))
 

	
 
    try:
 
        cs = repo.get_changeset(rev)
 
    except RepositoryError:
 
        from rhodecode.lib.utils import EmptyChangeset
 
        cs = EmptyChangeset(requested_revision=rev)
 
    return cs
 

	
 

	
 
MENTIONS_REGEX = r'(?:^@|\s@)([a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+)(?:\s{1})'
 

	
 

	
 
def extract_mentioned_users(s):
 
    """
 
    Returns unique usernames from given string s that have @mention
 

	
 
    :param s: string to get mentions
 
    """
 
    usrs = set()
 
    for username in re.findall(MENTIONS_REGEX, s):
 
        usrs.add(username)
 

	
 
    return sorted(list(usrs), key=lambda k: k.lower())
rhodecode/lib/vcs/backends/git/repository.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    vcs.backends.git
 
    ~~~~~~~~~~~~~~~~
 

	
 
    Git backend implementation.
 

	
 
    :created_on: Apr 8, 2010
 
    :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
 
"""
 

	
 
import os
 
import re
 
import time
 
import posixpath
 
from dulwich.repo import Repo, NotGitRepository
 
#from dulwich.config import ConfigFile
 
from string import Template
 
from subprocess import Popen, PIPE
 
from rhodecode.lib.vcs.backends.base import BaseRepository
 
from rhodecode.lib.vcs.exceptions import BranchDoesNotExistError
 
from rhodecode.lib.vcs.exceptions import ChangesetDoesNotExistError
 
from rhodecode.lib.vcs.exceptions import EmptyRepositoryError
 
from rhodecode.lib.vcs.exceptions import RepositoryError
 
from rhodecode.lib.vcs.exceptions import TagAlreadyExistError
 
from rhodecode.lib.vcs.exceptions import TagDoesNotExistError
 
from rhodecode.lib.vcs.utils import safe_unicode, makedate, date_fromtimestamp
 
from rhodecode.lib.vcs.utils.lazy import LazyProperty
 
from rhodecode.lib.vcs.utils.ordered_dict import OrderedDict
 
from rhodecode.lib.vcs.utils.paths import abspath
 
from rhodecode.lib.vcs.utils.paths import get_user_home
 
from .workdir import GitWorkdir
 
from .changeset import GitChangeset
 
from .inmemory import GitInMemoryChangeset
 
from .config import ConfigFile
 

	
 

	
 
class GitRepository(BaseRepository):
 
    """
 
    Git repository backend.
 
    """
 
    DEFAULT_BRANCH_NAME = 'master'
 
    scm = 'git'
 

	
 
    def __init__(self, repo_path, create=False, src_url=None,
 
                 update_after_clone=False, bare=False):
 

	
 
        self.path = abspath(repo_path)
 
        self._repo = self._get_repo(create, src_url, update_after_clone, bare)
 
        #temporary set that to now at later we will move it to constructor
 
        baseui = None
 
        if baseui is None:
 
            from mercurial.ui import ui
 
            baseui = ui()
 
        # patch the instance of GitRepo with an "FAKE" ui object to add
 
        # compatibility layer with Mercurial
 
        setattr(self._repo, 'ui', baseui)
 

	
 
        try:
 
            self.head = self._repo.head()
 
        except KeyError:
 
            self.head = None
 

	
 
        self._config_files = [
 
            bare and abspath(self.path, 'config') or abspath(self.path, '.git',
 
                'config'),
 
            abspath(get_user_home(), '.gitconfig'),
 
        ]
 
        self.bare = self._repo.bare
 

	
 
    @LazyProperty
 
    def revisions(self):
 
        """
 
        Returns list of revisions' ids, in ascending order.  Being lazy
 
        attribute allows external tools to inject shas from cache.
 
        """
 
        return self._get_all_revisions()
 

	
 
    def run_git_command(self, cmd):
 
        """
 
        Runs given ``cmd`` as git command and returns tuple
 
        (returncode, stdout, stderr).
 

	
 
        .. note::
 
           This method exists only until log/blame functionality is implemented
 
           at Dulwich (see https://bugs.launchpad.net/bugs/645142). Parsing
 
           os command's output is road to hell...
 

	
 
        :param cmd: git command to be executed
 
        """
 

	
 
        _copts = ['-c', 'core.quotepath=false', ]
 
        _str_cmd = False
 
        if isinstance(cmd, basestring):
 
            cmd = [cmd]
 
            _str_cmd = True
 
 
 

	
 
        gitenv = os.environ
 
        gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
 

	
 
        cmd = ['git'] + _copts + cmd
 
        if _str_cmd:
 
            cmd = ' '.join(cmd)
 
        try:
 
            opts = dict(
 
                shell=isinstance(cmd, basestring),
 
                stdout=PIPE,
 
                stderr=PIPE,
 
                env=gitenv,
 
            )
 
            if os.path.isdir(self.path):
 
                opts['cwd'] = self.path
 
            p = Popen(cmd, **opts)
 
        except OSError, err:
 
            raise RepositoryError("Couldn't run git command (%s).\n"
 
                "Original error was:%s" % (cmd, err))
 
        so, se = p.communicate()
 
        if not se.startswith("fatal: bad default revision 'HEAD'") and \
 
            p.returncode != 0:
 
            raise RepositoryError("Couldn't run git command (%s).\n"
 
                "stderr:\n%s" % (cmd, se))
 
        return so, se
 

	
 
    def _check_url(self, url):
 
        """
 
        Functon will check given url and try to verify if it's a valid
 
        link. Sometimes it may happened that mercurial will issue basic
 
        auth request that can cause whole API to hang when used from python
 
        or other external calls.
 

	
 
        On failures it'll raise urllib2.HTTPError
 
        """
 

	
 
        #TODO: implement this
 
        pass
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False,
 
            bare=False):
 
        if create and os.path.exists(self.path):
 
            raise RepositoryError("Location already exist")
 
        if src_url and not create:
 
            raise RepositoryError("Create should be set to True if src_url is "
 
                                  "given (clone operation creates repository)")
 
        try:
 
            if create and src_url:
 
                self._check_url(src_url)
 
                self.clone(src_url, update_after_clone, bare)
 
                return Repo(self.path)
 
            elif create:
 
                os.mkdir(self.path)
 
                if bare:
 
                    return Repo.init_bare(self.path)
 
                else:
 
                    return Repo.init(self.path)
 
            else:
 
                return Repo(self.path)
 
        except (NotGitRepository, OSError), err:
 
            raise RepositoryError(err)
 

	
 
    def _get_all_revisions(self):
 
        cmd = 'rev-list --all --date-order'
 
        try:
 
            so, se = self.run_git_command(cmd)
 
        except RepositoryError:
 
            # Can be raised for empty repositories
 
            return []
 
        revisions = so.splitlines()
 
        revisions.reverse()
 
        return revisions
 

	
 
    def _get_revision(self, revision):
 
        """
 
        For git backend we always return integer here. This way we ensure
 
        that changset's revision attribute would become integer.
 
        """
 
        pattern = re.compile(r'^[[0-9a-fA-F]{12}|[0-9a-fA-F]{40}]$')
 
        is_bstr = lambda o: isinstance(o, (str, unicode))
 
        is_null = lambda o: len(o) == revision.count('0')
 

	
 
        if len(self.revisions) == 0:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        if revision in (None, '', 'tip', 'HEAD', 'head', -1):
 
            revision = self.revisions[-1]
 

	
 
        if ((is_bstr(revision) and revision.isdigit() and len(revision) < 12)
 
            or isinstance(revision, int) or is_null(revision)):
 
            try:
 
                revision = self.revisions[int(revision)]
 
            except:
 
                raise ChangesetDoesNotExistError("Revision %r does not exist "
 
                    "for this repository %s" % (revision, self))
 

	
 
        elif is_bstr(revision):
 
            if not pattern.match(revision) or revision not in self.revisions:
 
                raise ChangesetDoesNotExistError("Revision %r does not exist "
 
                    "for this repository %s" % (revision, self))
 

	
 
        # Ensure we return full id
 
        if not pattern.match(str(revision)):
 
            raise ChangesetDoesNotExistError("Given revision %r not recognized"
 
                % revision)
 
        return revision
 

	
 
    def _get_archives(self, archive_name='tip'):
 

	
 
        for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
 
                yield {"type": i[0], "extension": i[1], "node": archive_name}
 

	
 
    def _get_url(self, url):
 
        """
 
        Returns normalized url. If schema is not given, would fall to
 
        filesystem (``file:///``) schema.
 
        """
 
        url = str(url)
 
        if url != 'default' and not '://' in url:
 
            url = ':///'.join(('file', url))
 
        return url
 

	
 
    @LazyProperty
 
    def name(self):
 
        return os.path.basename(self.path)
 

	
 
    @LazyProperty
 
    def last_change(self):
 
        """
 
        Returns last change made on this repository as datetime object
 
        """
 
        return date_fromtimestamp(self._get_mtime(), makedate()[1])
 

	
 
    def _get_mtime(self):
 
        try:
 
            return time.mktime(self.get_changeset().date.timetuple())
 
        except RepositoryError:
 
            idx_loc = '' if self.bare else '.git'
 
            # fallback to filesystem
 
            in_path = os.path.join(self.path, idx_loc, "index")
 
            he_path = os.path.join(self.path, idx_loc, "HEAD")
 
            if os.path.exists(in_path):
 
                return os.stat(in_path).st_mtime
 
            else:
 
                return os.stat(he_path).st_mtime
 

	
 
    @LazyProperty
 
    def description(self):
 
        idx_loc = '' if self.bare else '.git'
 
        undefined_description = u'unknown'
 
        description_path = os.path.join(self.path, idx_loc, 'description')
 
        if os.path.isfile(description_path):
 
            return safe_unicode(open(description_path).read())
 
        else:
 
            return undefined_description
 

	
 
    @LazyProperty
 
    def contact(self):
 
        undefined_contact = u'Unknown'
 
        return undefined_contact
 

	
 
    @property
 
    def branches(self):
 
        if not self.revisions:
 
            return {}
 
        refs = self._repo.refs.as_dict()
 
        sortkey = lambda ctx: ctx[0]
 
        _branches = [('/'.join(ref.split('/')[2:]), head)
 
            for ref, head in refs.items()
 
            if ref.startswith('refs/heads/') and not ref.endswith('/HEAD')]
 
        return OrderedDict(sorted(_branches, key=sortkey, reverse=False))
 

	
 
    def _heads(self, reverse=False):
 
        refs = self._repo.get_refs()
 
        heads = {}
 

	
 
        for key, val in refs.items():
 
            for ref_key in ['refs/heads/', 'refs/remotes/origin/']:
 
                if key.startswith(ref_key):
 
                    n = key[len(ref_key):]
 
                    if n not in ['HEAD']:
 
                        heads[n] = val
 

	
 
        return heads if reverse else dict((y,x) for x,y in heads.iteritems())
 

	
 
    def _get_tags(self):
 
        if not self.revisions:
 
            return {}
 
        sortkey = lambda ctx: ctx[0]
 
        _tags = [('/'.join(ref.split('/')[2:]), head) for ref, head in
 
            self._repo.get_refs().items() if ref.startswith('refs/tags/')]
 
        return OrderedDict(sorted(_tags, key=sortkey, reverse=True))
 

	
 
    @LazyProperty
 
    def tags(self):
 
        return self._get_tags()
 

	
 
    def tag(self, name, user, revision=None, message=None, date=None,
 
            **kwargs):
 
        """
 
        Creates and returns a tag for the given ``revision``.
 

	
 
        :param name: name for new tag
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param revision: changeset id for which new tag would be created
 
        :param message: message of the tag's commit
 
        :param date: date of tag's commit
 

	
 
        :raises TagAlreadyExistError: if tag with same name already exists
 
        """
 
        if name in self.tags:
 
            raise TagAlreadyExistError("Tag %s already exists" % name)
 
        changeset = self.get_changeset(revision)
 
        message = message or "Added tag %s for commit %s" % (name,
 
            changeset.raw_id)
 
        self._repo.refs["refs/tags/%s" % name] = changeset._commit.id
 

	
 
        self.tags = self._get_tags()
 
        return changeset
 

	
 
    def remove_tag(self, name, user, message=None, date=None):
 
        """
 
        Removes tag with the given ``name``.
 

	
 
        :param name: name of the tag to be removed
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param message: message of the tag's removal commit
 
        :param date: date of tag's removal commit
 

	
 
        :raises TagDoesNotExistError: if tag with given name does not exists
 
        """
 
        if name not in self.tags:
 
            raise TagDoesNotExistError("Tag %s does not exist" % name)
 
        tagpath = posixpath.join(self._repo.refs.path, 'refs', 'tags', name)
 
        try:
 
            os.remove(tagpath)
 
            self.tags = self._get_tags()
 
        except OSError, e:
 
            raise RepositoryError(e.strerror)
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns ``GitChangeset`` object representing commit from git repository
 
        at the given revision or head (most recent commit) if None given.
 
        """
 
        if isinstance(revision, GitChangeset):
 
            return revision
 
        revision = self._get_revision(revision)
 
        changeset = GitChangeset(repository=self, revision=revision)
 
        return changeset
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
 
           end_date=None, branch_name=None, reverse=False):
 
        """
 
        Returns iterator of ``GitChangeset`` objects from start to end (both
 
        are inclusive), in ascending date order (unless ``reverse`` is set).
 

	
 
        :param start: changeset ID, as str; first returned changeset
 
        :param end: changeset ID, as str; last returned changeset
 
        :param start_date: if specified, changesets with commit date less than
 
          ``start_date`` would be filtered out from returned set
 
        :param end_date: if specified, changesets with commit date greater than
 
          ``end_date`` would be filtered out from returned set
 
        :param branch_name: if specified, changesets not reachable from given
 
          branch would be filtered out from returned set
 
        :param reverse: if ``True``, returned generator would be reversed
 
          (meaning that returned changesets would have descending date order)
 

	
 
        :raise BranchDoesNotExistError: If given ``branch_name`` does not
 
            exist.
 
        :raise ChangesetDoesNotExistError: If changeset for given ``start`` or
 
          ``end`` could not be found.
 

	
 
        """
 
        if branch_name and branch_name not in self.branches:
 
            raise BranchDoesNotExistError("Branch '%s' not found" \
 
                                          % branch_name)
 
        # %H at format means (full) commit hash, initial hashes are retrieved
 
        # in ascending date order
 
        cmd_template = 'log --date-order --reverse --pretty=format:"%H"'
 
        cmd_params = {}
 
        if start_date:
 
            cmd_template += ' --since "$since"'
 
            cmd_params['since'] = start_date.strftime('%m/%d/%y %H:%M:%S')
 
        if end_date:
 
            cmd_template += ' --until "$until"'
 
            cmd_params['until'] = end_date.strftime('%m/%d/%y %H:%M:%S')
 
        if branch_name:
 
            cmd_template += ' $branch_name'
 
            cmd_params['branch_name'] = branch_name
 
        else:
 
            cmd_template += ' --all'
 

	
 
        cmd = Template(cmd_template).safe_substitute(**cmd_params)
 
        revs = self.run_git_command(cmd)[0].splitlines()
 
        start_pos = 0
 
        end_pos = len(revs)
 
        if start:
 
            _start = self._get_revision(start)
 
            try:
 
                start_pos = revs.index(_start)
 
            except ValueError:
 
                pass
 

	
 
        if end is not None:
 
            _end = self._get_revision(end)
 
            try:
 
                end_pos = revs.index(_end)
 
            except ValueError:
 
                pass
 

	
 
        if None not in [start, end] and start_pos > end_pos:
 
            raise RepositoryError('start cannot be after end')
 

	
 
        if end_pos is not None:
 
            end_pos += 1
 

	
 
        revs = revs[start_pos:end_pos]
 
        if reverse:
 
            revs = reversed(revs)
 
        for rev in revs:
 
            yield self.get_changeset(rev)
 

	
 
    def get_diff(self, rev1, rev2, path=None, ignore_whitespace=False,
 
                 context=3):
 
        """
 
        Returns (git like) *diff*, as plain text. Shows changes introduced by
 
        ``rev2`` since ``rev1``.
 

	
 
        :param rev1: Entry point from which diff is shown. Can be
 
          ``self.EMPTY_CHANGESET`` - in this case, patch showing all
 
          the changes since empty state of the repository until ``rev2``
 
        :param rev2: Until which revision changes should be shown.
 
        :param ignore_whitespace: If set to ``True``, would not show whitespace
 
          changes. Defaults to ``False``.
 
        :param context: How many lines before/after changed lines should be
 
          shown. Defaults to ``3``.
 
        """
 
        flags = ['-U%s' % context]
 
        if ignore_whitespace:
 
            flags.append('-w')
 

	
 
        if rev1 == self.EMPTY_CHANGESET:
 
            rev2 = self.get_changeset(rev2).raw_id
 
            cmd = ' '.join(['show'] + flags + [rev2])
 
        else:
 
            rev1 = self.get_changeset(rev1).raw_id
 
            rev2 = self.get_changeset(rev2).raw_id
 
            cmd = ' '.join(['diff'] + flags + [rev1, rev2])
 

	
 
        if path:
 
            cmd += ' -- "%s"' % path
 
        stdout, stderr = self.run_git_command(cmd)
 
        # If we used 'show' command, strip first few lines (until actual diff
 
        # starts)
 
        if rev1 == self.EMPTY_CHANGESET:
 
            lines = stdout.splitlines()
 
            x = 0
 
            for line in lines:
 
                if line.startswith('diff'):
 
                    break
 
                x += 1
 
            # Append new line just like 'diff' command do
 
            stdout = '\n'.join(lines[x:]) + '\n'
 
        return stdout
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        """
 
        Returns ``GitInMemoryChangeset`` object for this repository.
 
        """
 
        return GitInMemoryChangeset(self)
 

	
 
    def clone(self, url, update_after_clone=True, bare=False):
 
        """
 
        Tries to clone changes from external location.
 

	
 
        :param update_after_clone: If set to ``False``, git won't checkout
 
          working directory
 
        :param bare: If set to ``True``, repository would be cloned into
 
          *bare* git repository (no working directory at all).
 
        """
 
        url = self._get_url(url)
 
        cmd = ['clone']
 
        if bare:
 
            cmd.append('--bare')
 
        elif not update_after_clone:
 
            cmd.append('--no-checkout')
 
        cmd += ['--', '"%s"' % url, '"%s"' % self.path]
 
        cmd = ' '.join(cmd)
 
        # If error occurs run_git_command raises RepositoryError already
 
        self.run_git_command(cmd)
 

	
 
    def pull(self, url):
 
        """
 
        Tries to pull changes from external location.
 
        """
 
        url = self._get_url(url)
 
        cmd = ['pull']
 
        cmd.append("--ff-only")
 
        cmd.append(url)
 
        cmd = ' '.join(cmd)
 
        # If error occurs run_git_command raises RepositoryError already
 
        self.run_git_command(cmd)
 

	
 
    @LazyProperty
 
    def workdir(self):
 
        """
 
        Returns ``Workdir`` instance for this repository.
 
        """
 
        return GitWorkdir(self)
 

	
 
    def get_config_value(self, section, name, config_file=None):
 
        """
 
        Returns configuration value for a given [``section``] and ``name``.
 

	
 
        :param section: Section we want to retrieve value from
 
        :param name: Name of configuration we want to retrieve
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        if config_file is None:
 
            config_file = []
 
        elif isinstance(config_file, basestring):
 
            config_file = [config_file]
 

	
 
        def gen_configs():
 
            for path in config_file + self._config_files:
 
                try:
 
                    yield ConfigFile.from_path(path)
 
                except (IOError, OSError, ValueError):
 
                    continue
 

	
 
        for config in gen_configs():
 
            try:
 
                return config.get(section, name)
 
            except KeyError:
 
                continue
 
        return None
 

	
 
    def get_user_name(self, config_file=None):
 
        """
 
        Returns user's name from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        return self.get_config_value('user', 'name', config_file)
 

	
 
    def get_user_email(self, config_file=None):
 
        """
 
        Returns user's email from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        return self.get_config_value('user', 'email', config_file)
rhodecode/templates/admin/users/user_edit_my_account_form.html
Show inline comments
 
<div>
 
    ${h.form(url('admin_settings_my_account_update'),method='put')}
 
        <div class="form">
 

	
 
             <div class="field">
 
                <div class="gravatar_box">
 
                    <div class="gravatar"><img alt="gravatar" src="${h.gravatar_url(c.user.email)}"/></div>
 
                    <p>
 
                    %if c.use_gravatar:
 
                    <strong>${_('Change your avatar at')} <a href="http://gravatar.com">gravatar.com</a></strong>
 
                    <br/>${_('Using')} ${c.user.email}
 
                    %else:
 
                    <br/>${c.user.email}
 
                    %endif
 
                    </p>
 
                </div>
 
             </div>
 
            <div class="field">
 
                <div class="label">
 
                    <label>${_('API key')}</label> ${c.user.api_key}
 
                </div>
 
            </div>
 
            <div class="fields">
 
                 <div class="field">
 
                    <div class="label">
 
                        <label for="username">${_('Username')}:</label>
 
                    </div>
 
                    <div class="input">
 
                        ${h.text('username',class_="medium")}
 
                    </div>
 
                 </div>
 

	
 
                 <div class="field">
 
                    <div class="label">
 
                        <label for="new_password">${_('New password')}:</label>
 
                    </div>
 
                    <div class="input">
 
                        ${h.password('new_password',class_="medium",autocomplete="off")}
 
                    </div>
 
                 </div>
 

	
 
                 <div class="field">
 
                    <div class="label">
 
                        <label for="password_confirmation">${_('New password confirmation')}:</label>
 
                    </div>
 
                    <div class="input">
 
                        ${h.password('password_confirmation',class_="medium",autocomplete="off")}
 
                    </div>
 
                 </div>
 

	
 
                 <div class="field">
 
                    <div class="label">
 
                        <label for="name">${_('First Name')}:</label>
 
                    </div>
 
                    <div class="input">
 
                        ${h.text('name',class_="medium")}
 
                    </div>
 
                 </div>
 

	
 
                 <div class="field">
 
                    <div class="label">
 
                        <label for="lastname">${_('Last Name')}:</label>
 
                    </div>
 
                    <div class="input">
 
                        ${h.text('lastname',class_="medium")}
 
                    </div>
 
                 </div>
 

	
 
                 <div class="field">
 
                    <div class="label">
 
                        <label for="email">${_('Email')}:</label>
 
                    </div>
 
                    <div class="input">
 
                        ${h.text('email',class_="medium")}
 
                    </div>
 
                 </div>
 

	
 
                <div class="buttons">
 
                  ${h.submit('save',_('Save'),class_="ui-button")}
 
                  ${h.reset('reset',_('Reset'),class_="ui-button")}
 
                </div>
 
            </div>
 
        </div>
 
    ${h.end_form()}
 
    </div>
 
\ No newline at end of file
 
    </div>
rhodecode/tests/functional/test_files.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
ARCHIVE_SPECS = {
 
    '.tar.bz2': ('application/x-bzip2', 'tbz2', ''),
 
    '.tar.gz': ('application/x-gzip', 'tgz', ''),
 
    '.zip': ('application/zip', 'zip', ''),
 
}
 

	
 

	
 
class TestFilesController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='/'))
 
        # Test response...
 
        response.mustcontain('<a class="browser-dir ypjax-link" href="/vcs_test_hg/files/27cd5cce30c96924232dffcd24178a07ffeb5dfc/docs">docs</a>')
 
        response.mustcontain('<a class="browser-dir ypjax-link" href="/vcs_test_hg/files/27cd5cce30c96924232dffcd24178a07ffeb5dfc/tests">tests</a>')
 
        response.mustcontain('<a class="browser-dir ypjax-link" href="/vcs_test_hg/files/27cd5cce30c96924232dffcd24178a07ffeb5dfc/vcs">vcs</a>')
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/vcs_test_hg/files/27cd5cce30c96924232dffcd24178a07ffeb5dfc/.hgignore">.hgignore</a>')
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/vcs_test_hg/files/27cd5cce30c96924232dffcd24178a07ffeb5dfc/MANIFEST.in">MANIFEST.in</a>')
 

	
 
    def test_index_revision(self):
 
        self.log_user()
 

	
 
        response = self.app.get(
 
            url(controller='files', action='index',
 
                repo_name=HG_REPO,
 
                revision='7ba66bec8d6dbba14a2155be32408c435c5f4492',
 
                f_path='/')
 
        )
 

	
 
        #Test response...
 

	
 
        response.mustcontain('<a class="browser-dir ypjax-link" href="/vcs_test_hg/files/7ba66bec8d6dbba14a2155be32408c435c5f4492/docs">docs</a>')
 
        response.mustcontain('<a class="browser-dir ypjax-link" href="/vcs_test_hg/files/7ba66bec8d6dbba14a2155be32408c435c5f4492/tests">tests</a>')
 
        response.mustcontain('<a class="browser-file ypjax-link" href="/vcs_test_hg/files/7ba66bec8d6dbba14a2155be32408c435c5f4492/README.rst">README.rst</a>')
 
        response.mustcontain('1.1 KiB')
 
        response.mustcontain('text/x-python')
 

	
 
    def test_index_different_branch(self):
 
        self.log_user()
 

	
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision='97e8b885c04894463c51898e14387d80c30ed1ee',
 
                                    f_path='/'))
 

	
 
        response.mustcontain("""<span style="text-transform: uppercase;"><a href="#">branch: git</a></span>""")
 

	
 
    def test_index_paging(self):
 
        self.log_user()
 

	
 
        for r in [(73, 'a066b25d5df7016b45a41b7e2a78c33b57adc235'),
 
                  (92, 'cc66b61b8455b264a7a8a2d8ddc80fcfc58c221e'),
 
                  (109, '75feb4c33e81186c87eac740cee2447330288412'),
 
                  (1, '3d8f361e72ab303da48d799ff1ac40d5ac37c67e'),
 
                  (0, 'b986218ba1c9b0d6a259fac9b050b1724ed8e545')]:
 

	
 
            response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision=r[1],
 
                                    f_path='/'))
 

	
 
            response.mustcontain("""@ r%s:%s""" % (r[0], r[1][:12]))
 

	
 
    def test_file_source(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision='27cd5cce30c96924232dffcd24178a07ffeb5dfc',
 
                                    f_path='vcs/nodes.py'))
 

	
 
        #test or history
 
        response.mustcontain("""<optgroup label="Changesets">
 
<option value="8911406ad776fdd3d0b9932a2e89677e57405a48">r167:8911406ad776 (default)</option>
 
<option value="aa957ed78c35a1541f508d2ec90e501b0a9e3167">r165:aa957ed78c35 (default)</option>
 
<option value="48e11b73e94c0db33e736eaeea692f990cb0b5f1">r140:48e11b73e94c (default)</option>
 
<option value="adf3cbf483298563b968a6c673cd5bde5f7d5eea">r126:adf3cbf48329 (default)</option>
 
<option value="6249fd0fb2cfb1411e764129f598e2cf0de79a6f">r113:6249fd0fb2cf (git)</option>
 
<option value="75feb4c33e81186c87eac740cee2447330288412">r109:75feb4c33e81 (default)</option>
 
<option value="9a4dc232ecdc763ef2e98ae2238cfcbba4f6ad8d">r108:9a4dc232ecdc (default)</option>
 
<option value="595cce4efa21fda2f2e4eeb4fe5f2a6befe6fa2d">r107:595cce4efa21 (default)</option>
 
<option value="4a8bd421fbc2dfbfb70d85a3fe064075ab2c49da">r104:4a8bd421fbc2 (default)</option>
 
<option value="57be63fc8f85e65a0106a53187f7316f8c487ffa">r102:57be63fc8f85 (default)</option>
 
<option value="5530bd87f7e2e124a64d07cb2654c997682128be">r101:5530bd87f7e2 (git)</option>
 
<option value="e516008b1c93f142263dc4b7961787cbad654ce1">r99:e516008b1c93 (default)</option>
 
<option value="41f43fc74b8b285984554532eb105ac3be5c434f">r93:41f43fc74b8b (default)</option>
 
<option value="cc66b61b8455b264a7a8a2d8ddc80fcfc58c221e">r92:cc66b61b8455 (default)</option>
 
<option value="73ab5b616b3271b0518682fb4988ce421de8099f">r91:73ab5b616b32 (default)</option>
 
<option value="e0da75f308c0f18f98e9ce6257626009fdda2b39">r82:e0da75f308c0 (default)</option>
 
<option value="fb2e41e0f0810be4d7103bc2a4c7be16ee3ec611">r81:fb2e41e0f081 (default)</option>
 
<option value="602ae2f5e7ade70b3b66a58cdd9e3e613dc8a028">r76:602ae2f5e7ad (default)</option>
 
<option value="a066b25d5df7016b45a41b7e2a78c33b57adc235">r73:a066b25d5df7 (default)</option>
 
<option value="637a933c905958ce5151f154147c25c1c7b68832">r61:637a933c9059 (web)</option>
 
<option value="0c21004effeb8ce2d2d5b4a8baf6afa8394b6fbc">r60:0c21004effeb (web)</option>
 
<option value="a1f39c56d3f1d52d5fb5920370a2a2716cd9a444">r59:a1f39c56d3f1 (web)</option>
 
<option value="97d32df05c715a3bbf936bf3cc4e32fb77fe1a7f">r58:97d32df05c71 (web)</option>
 
<option value="08eaf14517718dccea4b67755a93368341aca919">r57:08eaf1451771 (web)</option>
 
<option value="22f71ad265265a53238359c883aa976e725aa07d">r56:22f71ad26526 (web)</option>
 
<option value="97501f02b7b4330924b647755663a2d90a5e638d">r49:97501f02b7b4 (web)</option>
 
<option value="86ede6754f2b27309452bb11f997386ae01d0e5a">r47:86ede6754f2b (web)</option>
 
<option value="014c40c0203c423dc19ecf94644f7cac9d4cdce0">r45:014c40c0203c (web)</option>
 
<option value="ee87846a61c12153b51543bf860e1026c6d3dcba">r30:ee87846a61c1 (default)</option>
 
<option value="9bb326a04ae5d98d437dece54be04f830cf1edd9">r26:9bb326a04ae5 (default)</option>
 
<option value="536c1a19428381cfea92ac44985304f6a8049569">r24:536c1a194283 (default)</option>
 
<option value="dc5d2c0661b61928834a785d3e64a3f80d3aad9c">r8:dc5d2c0661b6 (default)</option>
 
<option value="3803844fdbd3b711175fc3da9bdacfcd6d29a6fb">r7:3803844fdbd3 (default)</option>
 
</optgroup>
 
<optgroup label="Branches">
 
<option selected="selected" value="27cd5cce30c96924232dffcd24178a07ffeb5dfc">default</option>
 
<option value="97e8b885c04894463c51898e14387d80c30ed1ee">git</option>
 
<option value="2e6a2bf9356ca56df08807f4ad86d480da72a8f4">web</option>
 
</optgroup>
 
<optgroup label="Tags">
 
<option selected="selected" value="27cd5cce30c96924232dffcd24178a07ffeb5dfc">tip</option>
 
<option value="fd4bdb5e9b2a29b4393a4ac6caef48c17ee1a200">0.1.4</option>
 
<option value="17544fbfcd33ffb439e2b728b5d526b1ef30bfcf">0.1.3</option>
 
<option value="a7e60bff65d57ac3a1a1ce3b12a70f8a9e8a7720">0.1.2</option>
 
<option value="eb3a60fc964309c1a318b8dfe26aa2d1586c85ae">0.1.1</option>
 
</optgroup>
 
""")
 

	
 
        response.mustcontain("""<div class="commit">merge</div>""")
 

	
 
        response.mustcontain("""<span style="text-transform: uppercase;"><a href="#">branch: default</a></span>""")
 

	
 
    def test_file_annotation(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=HG_REPO,
 
                                    revision='27cd5cce30c96924232dffcd24178a07ffeb5dfc',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate=True))
 

	
 

	
 
        response.mustcontain("""<optgroup label="Changesets">
 
<option value="8911406ad776fdd3d0b9932a2e89677e57405a48">r167:8911406ad776 (default)</option>
 
<option value="aa957ed78c35a1541f508d2ec90e501b0a9e3167">r165:aa957ed78c35 (default)</option>
 
<option value="48e11b73e94c0db33e736eaeea692f990cb0b5f1">r140:48e11b73e94c (default)</option>
 
<option value="adf3cbf483298563b968a6c673cd5bde5f7d5eea">r126:adf3cbf48329 (default)</option>
 
<option value="6249fd0fb2cfb1411e764129f598e2cf0de79a6f">r113:6249fd0fb2cf (git)</option>
 
<option value="75feb4c33e81186c87eac740cee2447330288412">r109:75feb4c33e81 (default)</option>
 
<option value="9a4dc232ecdc763ef2e98ae2238cfcbba4f6ad8d">r108:9a4dc232ecdc (default)</option>
 
<option value="595cce4efa21fda2f2e4eeb4fe5f2a6befe6fa2d">r107:595cce4efa21 (default)</option>
 
<option value="4a8bd421fbc2dfbfb70d85a3fe064075ab2c49da">r104:4a8bd421fbc2 (default)</option>
 
<option value="57be63fc8f85e65a0106a53187f7316f8c487ffa">r102:57be63fc8f85 (default)</option>
 
<option value="5530bd87f7e2e124a64d07cb2654c997682128be">r101:5530bd87f7e2 (git)</option>
 
<option value="e516008b1c93f142263dc4b7961787cbad654ce1">r99:e516008b1c93 (default)</option>
 
<option value="41f43fc74b8b285984554532eb105ac3be5c434f">r93:41f43fc74b8b (default)</option>
 
<option value="cc66b61b8455b264a7a8a2d8ddc80fcfc58c221e">r92:cc66b61b8455 (default)</option>
 
<option value="73ab5b616b3271b0518682fb4988ce421de8099f">r91:73ab5b616b32 (default)</option>
 
<option value="e0da75f308c0f18f98e9ce6257626009fdda2b39">r82:e0da75f308c0 (default)</option>
 
<option value="fb2e41e0f0810be4d7103bc2a4c7be16ee3ec611">r81:fb2e41e0f081 (default)</option>
 
<option value="602ae2f5e7ade70b3b66a58cdd9e3e613dc8a028">r76:602ae2f5e7ad (default)</option>
 
<option value="a066b25d5df7016b45a41b7e2a78c33b57adc235">r73:a066b25d5df7 (default)</option>
 
<option value="637a933c905958ce5151f154147c25c1c7b68832">r61:637a933c9059 (web)</option>
 
<option value="0c21004effeb8ce2d2d5b4a8baf6afa8394b6fbc">r60:0c21004effeb (web)</option>
 
<option value="a1f39c56d3f1d52d5fb5920370a2a2716cd9a444">r59:a1f39c56d3f1 (web)</option>
 
<option value="97d32df05c715a3bbf936bf3cc4e32fb77fe1a7f">r58:97d32df05c71 (web)</option>
 
<option value="08eaf14517718dccea4b67755a93368341aca919">r57:08eaf1451771 (web)</option>
 
<option value="22f71ad265265a53238359c883aa976e725aa07d">r56:22f71ad26526 (web)</option>
 
<option value="97501f02b7b4330924b647755663a2d90a5e638d">r49:97501f02b7b4 (web)</option>
 
<option value="86ede6754f2b27309452bb11f997386ae01d0e5a">r47:86ede6754f2b (web)</option>
 
<option value="014c40c0203c423dc19ecf94644f7cac9d4cdce0">r45:014c40c0203c (web)</option>
 
<option value="ee87846a61c12153b51543bf860e1026c6d3dcba">r30:ee87846a61c1 (default)</option>
 
<option value="9bb326a04ae5d98d437dece54be04f830cf1edd9">r26:9bb326a04ae5 (default)</option>
 
<option value="536c1a19428381cfea92ac44985304f6a8049569">r24:536c1a194283 (default)</option>
 
<option value="dc5d2c0661b61928834a785d3e64a3f80d3aad9c">r8:dc5d2c0661b6 (default)</option>
 
<option value="3803844fdbd3b711175fc3da9bdacfcd6d29a6fb">r7:3803844fdbd3 (default)</option>
 
</optgroup>
 
<optgroup label="Branches">
 
<option selected="selected" value="27cd5cce30c96924232dffcd24178a07ffeb5dfc">default</option>
 
<option value="97e8b885c04894463c51898e14387d80c30ed1ee">git</option>
 
<option value="2e6a2bf9356ca56df08807f4ad86d480da72a8f4">web</option>
 
</optgroup>
 
<optgroup label="Tags">
 
<option selected="selected" value="27cd5cce30c96924232dffcd24178a07ffeb5dfc">tip</option>
 
<option value="fd4bdb5e9b2a29b4393a4ac6caef48c17ee1a200">0.1.4</option>
 
<option value="17544fbfcd33ffb439e2b728b5d526b1ef30bfcf">0.1.3</option>
 
<option value="a7e60bff65d57ac3a1a1ce3b12a70f8a9e8a7720">0.1.2</option>
 
<option value="eb3a60fc964309c1a318b8dfe26aa2d1586c85ae">0.1.1</option>
 
</optgroup>""")
 

	
 
        response.mustcontain("""<span style="text-transform: uppercase;"><a href="#">branch: default</a></span>""")
 

	
 
    def test_archival(self):
 
        self.log_user()
 

	
 
        for arch_ext, info in ARCHIVE_SPECS.items():
 
            short = '27cd5cce30c9%s' % arch_ext
 
            fname = '27cd5cce30c96924232dffcd24178a07ffeb5dfc%s' % arch_ext
 
            filename = '%s-%s' % (HG_REPO, short)
 
            response = self.app.get(url(controller='files',
 
                                        action='archivefile',
 
                                        repo_name=HG_REPO,
 
                                        fname=fname))
 

	
 
            self.assertEqual(response.status, '200 OK')
 
            heads = [
 
                ('Pragma', 'no-cache'), 
 
                ('Cache-Control', 'no-cache'), 
 
                ('Pragma', 'no-cache'),
 
                ('Cache-Control', 'no-cache'),
 
                ('Content-Disposition', 'attachment; filename=%s' % filename),
 
                ('Content-Type', '%s; charset=utf-8' % info[0]),
 
            ]
 
            self.assertEqual(response.response._headers.items(), heads)
 

	
 
    def test_archival_wrong_ext(self):
 
        self.log_user()
 

	
 
        for arch_ext in ['tar', 'rar', 'x', '..ax', '.zipz']:
 
            fname = '27cd5cce30c96924232dffcd24178a07ffeb5dfc%s' % arch_ext
 

	
 
            response = self.app.get(url(controller='files', 
 
            response = self.app.get(url(controller='files',
 
                                        action='archivefile',
 
                                        repo_name=HG_REPO,
 
                                        fname=fname))
 
            response.mustcontain('Unknown archive type')
 

	
 
    def test_archival_wrong_revision(self):
 
        self.log_user()
 

	
 
        for rev in ['00x000000', 'tar', 'wrong', '@##$@$42413232', '232dffcd']:
 
            fname = '%s.zip' % rev
 

	
 
            response = self.app.get(url(controller='files',
 
                                        action='archivefile',
 
                                        repo_name=HG_REPO,
 
                                        fname=fname))
 
            response.mustcontain('Unknown revision')
 

	
 
    #==========================================================================
 
    # RAW FILE
 
    #==========================================================================
 
    def test_raw_file_ok(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='rawfile',
 
                                    repo_name=HG_REPO,
 
                                    revision='27cd5cce30c96924232dffcd24178a07ffeb5dfc',
 
                                    f_path='vcs/nodes.py'))
 

	
 
        self.assertEqual(response.content_disposition, "attachment; filename=nodes.py")
 
        self.assertEqual(response.content_type, "text/x-python")
 

	
 
    def test_raw_file_wrong_cs(self):
 
        self.log_user()
 
        rev = u'ERRORce30c96924232dffcd24178a07ffeb5dfc'
 
        f_path = 'vcs/nodes.py'
 

	
 
        response = self.app.get(url(controller='files', action='rawfile',
 
                                    repo_name=HG_REPO,
 
                                    revision=rev,
 
                                    f_path=f_path))
 

	
 
        msg = """Revision %r does not exist for this repository""" % (rev)
 
        self.checkSessionFlash(response, msg)
 

	
 
        msg = """%s""" % (HG_REPO)
 
        self.checkSessionFlash(response, msg)
 

	
 
    def test_raw_file_wrong_f_path(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        f_path = 'vcs/ERRORnodes.py'
 
        response = self.app.get(url(controller='files', action='rawfile',
 
                                    repo_name=HG_REPO,
 
                                    revision=rev,
 
                                    f_path=f_path))
 

	
 
        msg = "There is no file nor directory at the given path: %r at revision %r" % (f_path, rev[:12])
 
        self.checkSessionFlash(response, msg)
 

	
 
    #==========================================================================
 
    # RAW RESPONSE - PLAIN
 
    #==========================================================================
 
    def test_raw_ok(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='raw',
 
                                    repo_name=HG_REPO,
 
                                    revision='27cd5cce30c96924232dffcd24178a07ffeb5dfc',
 
                                    f_path='vcs/nodes.py'))
 

	
 
        self.assertEqual(response.content_type, "text/plain")
 

	
 
    def test_raw_wrong_cs(self):
 
        self.log_user()
 
        rev = u'ERRORcce30c96924232dffcd24178a07ffeb5dfc'
 
        f_path = 'vcs/nodes.py'
 

	
 
        response = self.app.get(url(controller='files', action='raw',
 
                                    repo_name=HG_REPO,
 
                                    revision=rev,
 
                                    f_path=f_path))
 
        msg = """Revision %r does not exist for this repository""" % (rev)
 
        self.checkSessionFlash(response, msg)
 

	
 
        msg = """%s""" % (HG_REPO)
 
        self.checkSessionFlash(response, msg)
 

	
 
    def test_raw_wrong_f_path(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        f_path = 'vcs/ERRORnodes.py'
 
        response = self.app.get(url(controller='files', action='raw',
 
                                    repo_name=HG_REPO,
 
                                    revision=rev,
 
                                    f_path=f_path))
 
        msg = "There is no file nor directory at the given path: %r at revision %r" % (f_path, rev[:12])
 
        self.checkSessionFlash(response, msg)
 

	
 
    def test_ajaxed_files_list(self):
 
        self.log_user()
 
        rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
 
        response = self.app.get(
 
            url('files_nodelist_home', repo_name=HG_REPO,f_path='/',revision=rev),
 
            extra_environ={'HTTP_X_PARTIAL_XHR': '1'},
 
        )
 
        response.mustcontain("vcs/web/simplevcs/views/repository.py")
0 comments (0 inline, 0 general)