Changeset - 47e11e924cbe
[Not reviewed]
default
0 4 0
Mads Kiilerich - 6 years ago 2019-07-16 12:10:33
mads@kiilerich.com
clone_url: drop **override - we only pass username
4 files changed with 24 insertions and 27 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/summary.py
Show inline comments
 
@@ -21,195 +21,195 @@ This file was forked by the Kallithea pr
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 18, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import traceback
 
import calendar
 
import logging
 
import itertools
 
from time import mktime
 
from datetime import timedelta, date
 

	
 
from tg import tmpl_context as c, request
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPBadRequest
 

	
 
from beaker.cache import cache_region, region_invalidate
 

	
 
from kallithea.lib.vcs.exceptions import ChangesetError, EmptyRepositoryError, \
 
    NodeDoesNotExistError
 
from kallithea.config.conf import ALL_READMES, ALL_EXTS, LANGUAGES_EXTENSIONS_MAP
 
from kallithea.model.db import Statistics, CacheInvalidation, User
 
from kallithea.lib.utils2 import safe_int, safe_str
 
from kallithea.lib.auth import LoginRequired, HasRepoPermissionLevelDecorator
 
from kallithea.lib.base import BaseRepoController, render, jsonify
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.markup_renderer import MarkupRenderer
 
from kallithea.lib.celerylib.tasks import get_commits_stats
 
from kallithea.lib.compat import json
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.page import RepoPage
 

	
 
log = logging.getLogger(__name__)
 

	
 
README_FILES = [''.join([x[0][0], x[1][0]]) for x in
 
                    sorted(list(itertools.product(ALL_READMES, ALL_EXTS)),
 
                           key=lambda y:y[0][1] + y[1][1])]
 

	
 

	
 
class SummaryController(BaseRepoController):
 

	
 
    def __get_readme_data(self, db_repo):
 
        repo_name = db_repo.repo_name
 
        log.debug('Looking for README file')
 

	
 
        @cache_region('long_term', '_get_readme_from_cache')
 
        def _get_readme_from_cache(key, kind):
 
            readme_data = None
 
            readme_file = None
 
            try:
 
                # gets the landing revision! or tip if fails
 
                cs = db_repo.get_landing_changeset()
 
                if isinstance(cs, EmptyChangeset):
 
                    raise EmptyRepositoryError()
 
                renderer = MarkupRenderer()
 
                for f in README_FILES:
 
                    try:
 
                        readme = cs.get_node(f)
 
                        if not isinstance(readme, FileNode):
 
                            continue
 
                        readme_file = f
 
                        log.debug('Found README file `%s` rendering...',
 
                                  readme_file)
 
                        readme_data = renderer.render(readme.content,
 
                                                      filename=f)
 
                        break
 
                    except NodeDoesNotExistError:
 
                        continue
 
            except ChangesetError:
 
                log.error(traceback.format_exc())
 
                pass
 
            except EmptyRepositoryError:
 
                pass
 

	
 
            return readme_data, readme_file
 

	
 
        kind = 'README'
 
        valid = CacheInvalidation.test_and_set_valid(repo_name, kind)
 
        if not valid:
 
            region_invalidate(_get_readme_from_cache, None, '_get_readme_from_cache', repo_name, kind)
 
        return _get_readme_from_cache(repo_name, kind)
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def index(self, repo_name):
 
        p = safe_int(request.GET.get('page'), 1)
 
        size = safe_int(request.GET.get('size'), 10)
 
        collection = c.db_repo_scm_instance
 
        c.cs_pagination = RepoPage(collection, page=p, items_per_page=size)
 
        page_revisions = [x.raw_id for x in list(c.cs_pagination)]
 
        c.cs_comments = c.db_repo.get_comments(page_revisions)
 
        c.cs_statuses = c.db_repo.statuses(page_revisions)
 

	
 
        if request.authuser.is_default_user:
 
            username = ''
 
            username = None
 
        else:
 
            username = safe_str(request.authuser.username)
 
            username = request.authuser.username
 
        c.clone_repo_url = c.db_repo.clone_url(clone_uri_tmpl=c.clone_uri_tmpl, with_id=False, username=username)
 
        c.clone_repo_url_id = c.db_repo.clone_url(clone_uri_tmpl=c.clone_uri_tmpl, with_id=True, username=username)
 

	
 
        if c.db_repo.enable_statistics:
 
            c.show_stats = True
 
        else:
 
            c.show_stats = False
 

	
 
        stats = Statistics.query() \
 
            .filter(Statistics.repository == c.db_repo) \
 
            .scalar()
 

	
 
        c.stats_percentage = 0
 

	
 
        if stats and stats.languages:
 
            c.no_data = False is c.db_repo.enable_statistics
 
            lang_stats_d = json.loads(stats.languages)
 

	
 
            lang_stats = ((x, {"count": y,
 
                               "desc": LANGUAGES_EXTENSIONS_MAP.get(x)})
 
                          for x, y in lang_stats_d.items())
 

	
 
            c.trending_languages = (
 
                sorted(lang_stats, reverse=True, key=lambda k: k[1])[:10]
 
            )
 
        else:
 
            c.no_data = True
 
            c.trending_languages = []
 

	
 
        c.enable_downloads = c.db_repo.enable_downloads
 
        c.readme_data, c.readme_file = \
 
            self.__get_readme_data(c.db_repo)
 
        return render('summary/summary.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    @jsonify
 
    def repo_size(self, repo_name):
 
        if request.is_xhr:
 
            return c.db_repo._repo_size()
 
        else:
 
            raise HTTPBadRequest()
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def statistics(self, repo_name):
 
        if c.db_repo.enable_statistics:
 
            c.show_stats = True
 
            c.no_data_msg = _('No data ready yet')
 
        else:
 
            c.show_stats = False
 
            c.no_data_msg = _('Statistics are disabled for this repository')
 

	
 
        td = date.today() + timedelta(days=1)
 
        td_1m = td - timedelta(days=calendar.mdays[td.month])
 
        td_1y = td - timedelta(days=365)
 

	
 
        ts_min_m = mktime(td_1m.timetuple())
 
        ts_min_y = mktime(td_1y.timetuple())
 
        ts_max_y = mktime(td.timetuple())
 
        c.ts_min = ts_min_m
 
        c.ts_max = ts_max_y
 

	
 
        stats = Statistics.query() \
 
            .filter(Statistics.repository == c.db_repo) \
 
            .scalar()
 
        c.stats_percentage = 0
 
        if stats and stats.languages:
 
            c.no_data = False is c.db_repo.enable_statistics
 
            lang_stats_d = json.loads(stats.languages)
 
            c.commit_data = json.loads(stats.commit_activity)
 
            c.overview_data = json.loads(stats.commit_activity_combined)
 

	
 
            lang_stats = ((x, {"count": y,
 
                               "desc": LANGUAGES_EXTENSIONS_MAP.get(x)})
 
                          for x, y in lang_stats_d.items())
 

	
 
            c.trending_languages = (
 
                sorted(lang_stats, reverse=True, key=lambda k: k[1])[:10]
 
            )
 
            last_rev = stats.stat_on_revision + 1
 
            c.repo_last_rev = c.db_repo_scm_instance.count() \
 
                if c.db_repo_scm_instance.revisions else 0
 
            if last_rev == 0 or c.repo_last_rev == 0:
 
                pass
 
            else:
 
                c.stats_percentage = '%.2f' % ((float((last_rev)) /
 
                                                c.repo_last_rev) * 100)
 
        else:
 
            c.commit_data = {}
 
            c.overview_data = ([[ts_min_y, 0], [ts_max_y, 10]])
 
            c.trending_languages = {}
 
            c.no_data = True
 

	
 
        recurse_limit = 500  # don't recurse more than 500 times when parsing
 
        get_commits_stats(c.db_repo.repo_name, ts_min_y, ts_max_y, recurse_limit)
kallithea/lib/utils2.py
Show inline comments
 
@@ -318,207 +318,203 @@ def age(prevdate, show_short_version=Fal
 

	
 
    # In short version, we want nicer handling of ages of more than a year
 
    if show_short_version:
 
        if deltas['year'] == 1:
 
            # ages between 1 and 2 years: show as months
 
            deltas['month'] += 12
 
            deltas['year'] = 0
 
        if deltas['year'] >= 2:
 
            # ages 2+ years: round
 
            if deltas['month'] > 6:
 
                deltas['year'] += 1
 
                deltas['month'] = 0
 

	
 
    # Format the result
 
    fmt_funcs = {
 
        'year': lambda d: ungettext(u'%d year', '%d years', d) % d,
 
        'month': lambda d: ungettext(u'%d month', '%d months', d) % d,
 
        'day': lambda d: ungettext(u'%d day', '%d days', d) % d,
 
        'hour': lambda d: ungettext(u'%d hour', '%d hours', d) % d,
 
        'minute': lambda d: ungettext(u'%d minute', '%d minutes', d) % d,
 
        'second': lambda d: ungettext(u'%d second', '%d seconds', d) % d,
 
    }
 

	
 
    for i, part in enumerate(order):
 
        value = deltas[part]
 
        if value == 0:
 
            continue
 

	
 
        if i < 5:
 
            sub_part = order[i + 1]
 
            sub_value = deltas[sub_part]
 
        else:
 
            sub_value = 0
 

	
 
        if sub_value == 0 or show_short_version:
 
            if future:
 
                return _('in %s') % fmt_funcs[part](value)
 
            else:
 
                return _('%s ago') % fmt_funcs[part](value)
 
        if future:
 
            return _('in %s and %s') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 
        else:
 
            return _('%s and %s ago') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 

	
 
    return _('just now')
 

	
 

	
 
def uri_filter(uri):
 
    """
 
    Removes user:password from given url string
 

	
 
    :param uri:
 
    :rtype: unicode
 
    :returns: filtered list of strings
 
    """
 
    if not uri:
 
        return ''
 

	
 
    proto = ''
 

	
 
    for pat in ('https://', 'http://', 'git://'):
 
        if uri.startswith(pat):
 
            uri = uri[len(pat):]
 
            proto = pat
 
            break
 

	
 
    # remove passwords and username
 
    uri = uri[uri.find('@') + 1:]
 

	
 
    # get the port
 
    cred_pos = uri.find(':')
 
    if cred_pos == -1:
 
        host, port = uri, None
 
    else:
 
        host, port = uri[:cred_pos], uri[cred_pos + 1:]
 

	
 
    return filter(None, [proto, host, port])
 

	
 

	
 
def credentials_filter(uri):
 
    """
 
    Returns a url with removed credentials
 

	
 
    :param uri:
 
    """
 

	
 
    uri = uri_filter(uri)
 
    # check if we have port
 
    if len(uri) > 2 and uri[2]:
 
        uri[2] = ':' + uri[2]
 

	
 
    return ''.join(uri)
 

	
 

	
 
def get_clone_url(clone_uri_tmpl, prefix_url, repo_name, repo_id, **override):
 
def get_clone_url(clone_uri_tmpl, prefix_url, repo_name, repo_id, username=None):
 
    parsed_url = urlobject.URLObject(prefix_url)
 
    prefix = safe_unicode(urllib.unquote(parsed_url.path.rstrip('/')))
 
    args = {
 
        'scheme': parsed_url.scheme,
 
        'user': '',
 
        'user': safe_unicode(urllib.quote(safe_str(username or ''))),
 
        'netloc': parsed_url.netloc + prefix,  # like "hostname:port/prefix" (with optional ":port" and "/prefix")
 
        'prefix': prefix, # undocumented, empty or starting with /
 
        'repo': repo_name,
 
        'repoid': str(repo_id),
 
    }
 
    if 'username' in override:
 
        args['user'] = override.pop('username')
 
    args.update(override)
 
    args['user'] = urllib.quote(safe_str(args['user']))
 

	
 
    for k, v in args.items():
 
        clone_uri_tmpl = clone_uri_tmpl.replace('{%s}' % k, v)
 

	
 
    # remove leading @ sign if it's present. Case of empty user
 
    url_obj = urlobject.URLObject(clone_uri_tmpl)
 
    url = url_obj.with_netloc(url_obj.netloc.lstrip('@'))
 

	
 
    return safe_unicode(url)
 

	
 

	
 
def get_changeset_safe(repo, rev):
 
    """
 
    Safe version of get_changeset if this changeset doesn't exists for a
 
    repo it returns a Dummy one instead
 

	
 
    :param repo:
 
    :param rev:
 
    """
 
    from kallithea.lib.vcs.backends.base import BaseRepository
 
    from kallithea.lib.vcs.exceptions import RepositoryError
 
    from kallithea.lib.vcs.backends.base import EmptyChangeset
 
    if not isinstance(repo, BaseRepository):
 
        raise Exception('You must pass an Repository '
 
                        'object as first argument got %s', type(repo))
 

	
 
    try:
 
        cs = repo.get_changeset(rev)
 
    except (RepositoryError, LookupError):
 
        cs = EmptyChangeset(requested_revision=rev)
 
    return cs
 

	
 

	
 
def datetime_to_time(dt):
 
    if dt:
 
        return time.mktime(dt.timetuple())
 

	
 

	
 
def time_to_datetime(tm):
 
    if tm:
 
        if isinstance(tm, basestring):
 
            try:
 
                tm = float(tm)
 
            except ValueError:
 
                return
 
        return datetime.datetime.fromtimestamp(tm)
 

	
 

	
 
# Must match regexp in kallithea/public/js/base.js MentionsAutoComplete()
 
# Check char before @ - it must not look like we are in an email addresses.
 
# Matching is greedy so we don't have to look beyond the end.
 
MENTIONS_REGEX = re.compile(r'(?:^|(?<=[^a-zA-Z0-9]))@([a-zA-Z0-9][-_.a-zA-Z0-9]*[a-zA-Z0-9])')
 

	
 

	
 
def extract_mentioned_usernames(text):
 
    r"""
 
    Returns list of (possible) usernames @mentioned in given text.
 

	
 
    >>> extract_mentioned_usernames('@1-2.a_X,@1234 not@not @ddd@not @n @ee @ff @gg, @gg;@hh @n\n@zz,')
 
    ['1-2.a_X', '1234', 'ddd', 'ee', 'ff', 'gg', 'gg', 'hh', 'zz']
 
    """
 
    return MENTIONS_REGEX.findall(text)
 

	
 

	
 
def extract_mentioned_users(text):
 
    """ Returns set of actual database Users @mentioned in given text. """
 
    from kallithea.model.db import User
 
    result = set()
 
    for name in extract_mentioned_usernames(text):
 
        user = User.get_by_username(name, case_insensitive=True)
 
        if user is not None and not user.is_default_user:
 
            result.add(user)
 
    return result
 

	
 

	
 
class AttributeDict(dict):
 
    def __getattr__(self, attr):
 
        return self.get(attr, None)
 
    __setattr__ = dict.__setitem__
 
    __delattr__ = dict.__delitem__
 

	
 

	
 
def obfuscate_url_pw(engine):
 
    from sqlalchemy.engine import url as sa_url
 
    from sqlalchemy.exc import ArgumentError
 
    try:
 
        _url = sa_url.make_url(engine or '')
 
    except ArgumentError:
 
        return engine
 
    if _url.password:
 
        _url.password = 'XXXXX'
 
    return str(_url)
 

	
 

	
 
def get_hook_environment():
 
    """
kallithea/model/db.py
Show inline comments
 
@@ -1166,207 +1166,208 @@ class Repository(Base, BaseDbModel):
 
        # into a valid system path
 
        p += self.repo_name.split(Repository.url_sep())
 
        return os.path.join(*map(safe_unicode, p))
 

	
 
    @property
 
    def cache_keys(self):
 
        """
 
        Returns associated cache keys for that repo
 
        """
 
        return CacheInvalidation.query() \
 
            .filter(CacheInvalidation.cache_args == self.repo_name) \
 
            .order_by(CacheInvalidation.cache_key) \
 
            .all()
 

	
 
    def get_new_name(self, repo_name):
 
        """
 
        returns new full repository name based on assigned group and new new
 

	
 
        :param group_name:
 
        """
 
        path_prefix = self.group.full_path_splitted if self.group else []
 
        return Repository.url_sep().join(path_prefix + [repo_name])
 

	
 
    @property
 
    def _ui(self):
 
        """
 
        Creates an db based ui object for this repository
 
        """
 
        from kallithea.lib.utils import make_ui
 
        return make_ui(clear_session=False)
 

	
 
    @classmethod
 
    def is_valid(cls, repo_name):
 
        """
 
        returns True if given repo name is a valid filesystem repository
 

	
 
        :param cls:
 
        :param repo_name:
 
        """
 
        from kallithea.lib.utils import is_valid_repo
 

	
 
        return is_valid_repo(repo_name, cls.base_path())
 

	
 
    def get_api_data(self, with_revision_names=False,
 
                           with_pullrequests=False):
 
        """
 
        Common function for generating repo api data.
 
        Optionally, also return tags, branches, bookmarks and PRs.
 
        """
 
        repo = self
 
        data = dict(
 
            repo_id=repo.repo_id,
 
            repo_name=repo.repo_name,
 
            repo_type=repo.repo_type,
 
            clone_uri=repo.clone_uri,
 
            private=repo.private,
 
            created_on=repo.created_on,
 
            description=repo.description,
 
            landing_rev=repo.landing_rev,
 
            owner=repo.owner.username,
 
            fork_of=repo.fork.repo_name if repo.fork else None,
 
            enable_statistics=repo.enable_statistics,
 
            enable_downloads=repo.enable_downloads,
 
            last_changeset=repo.changeset_cache,
 
        )
 
        if with_revision_names:
 
            scm_repo = repo.scm_instance_no_cache()
 
            data.update(dict(
 
                tags=scm_repo.tags,
 
                branches=scm_repo.branches,
 
                bookmarks=scm_repo.bookmarks,
 
            ))
 
        if with_pullrequests:
 
            data['pull_requests'] = repo.pull_requests_other
 
        rc_config = Setting.get_app_settings()
 
        repository_fields = str2bool(rc_config.get('repository_fields'))
 
        if repository_fields:
 
            for f in self.extra_fields:
 
                data[f.field_key_prefixed] = f.field_value
 

	
 
        return data
 

	
 
    @property
 
    def last_db_change(self):
 
        return self.updated_on
 

	
 
    @property
 
    def clone_uri_hidden(self):
 
        clone_uri = self.clone_uri
 
        if clone_uri:
 
            import urlobject
 
            url_obj = urlobject.URLObject(self.clone_uri)
 
            if url_obj.password:
 
                clone_uri = url_obj.with_password('*****')
 
        return clone_uri
 

	
 
    def clone_url(self, clone_uri_tmpl, with_id=False, **override):
 
    def clone_url(self, clone_uri_tmpl, with_id=False, username=None):
 
        if '{repo}' not in clone_uri_tmpl and '_{repoid}' not in clone_uri_tmpl:
 
            log.error("Configured clone_uri_tmpl %r has no '{repo}' or '_{repoid}' and cannot toggle to use repo id URLs", clone_uri_tmpl)
 
        elif with_id:
 
            clone_uri_tmpl = clone_uri_tmpl.replace('{repo}', '_{repoid}')
 
        else:
 
            clone_uri_tmpl = clone_uri_tmpl.replace('_{repoid}', '{repo}')
 

	
 
        import kallithea.lib.helpers as h
 
        prefix_url = h.canonical_url('home')
 

	
 
        return get_clone_url(clone_uri_tmpl=clone_uri_tmpl,
 
                             prefix_url=prefix_url,
 
                             repo_name=self.repo_name,
 
                             repo_id=self.repo_id, **override)
 
                             repo_id=self.repo_id,
 
                             username=username)
 

	
 
    def set_state(self, state):
 
        self.repo_state = state
 

	
 
    #==========================================================================
 
    # SCM PROPERTIES
 
    #==========================================================================
 

	
 
    def get_changeset(self, rev=None):
 
        return get_changeset_safe(self.scm_instance, rev)
 

	
 
    def get_landing_changeset(self):
 
        """
 
        Returns landing changeset, or if that doesn't exist returns the tip
 
        """
 
        _rev_type, _rev = self.landing_rev
 
        cs = self.get_changeset(_rev)
 
        if isinstance(cs, EmptyChangeset):
 
            return self.get_changeset()
 
        return cs
 

	
 
    def update_changeset_cache(self, cs_cache=None):
 
        """
 
        Update cache of last changeset for repository, keys should be::
 

	
 
            short_id
 
            raw_id
 
            revision
 
            message
 
            date
 
            author
 

	
 
        :param cs_cache:
 
        """
 
        from kallithea.lib.vcs.backends.base import BaseChangeset
 
        if cs_cache is None:
 
            cs_cache = EmptyChangeset()
 
            # use no-cache version here
 
            scm_repo = self.scm_instance_no_cache()
 
            if scm_repo:
 
                cs_cache = scm_repo.get_changeset()
 

	
 
        if isinstance(cs_cache, BaseChangeset):
 
            cs_cache = cs_cache.__json__()
 

	
 
        if (not self.changeset_cache or cs_cache['raw_id'] != self.changeset_cache['raw_id']):
 
            _default = datetime.datetime.fromtimestamp(0)
 
            last_change = cs_cache.get('date') or _default
 
            log.debug('updated repo %s with new cs cache %s',
 
                      self.repo_name, cs_cache)
 
            self.updated_on = last_change
 
            self.changeset_cache = cs_cache
 
            Session().commit()
 
        else:
 
            log.debug('changeset_cache for %s already up to date with %s',
 
                      self.repo_name, cs_cache['raw_id'])
 

	
 
    @property
 
    def tip(self):
 
        return self.get_changeset('tip')
 

	
 
    @property
 
    def author(self):
 
        return self.tip.author
 

	
 
    @property
 
    def last_change(self):
 
        return self.scm_instance.last_change
 

	
 
    def get_comments(self, revisions=None):
 
        """
 
        Returns comments for this repository grouped by revisions
 

	
 
        :param revisions: filter query by revisions only
 
        """
 
        cmts = ChangesetComment.query() \
 
            .filter(ChangesetComment.repo == self)
 
        if revisions is not None:
 
            if not revisions:
 
                return {} # don't use sql 'in' on empty set
 
            cmts = cmts.filter(ChangesetComment.revision.in_(revisions))
 
        grouped = collections.defaultdict(list)
 
        for cmt in cmts.all():
 
            grouped[cmt.revision].append(cmt)
 
        return grouped
 

	
 
    def statuses(self, revisions):
 
        """
 
        Returns statuses for this repository.
 
        PRs without any votes do _not_ show up as unreviewed.
 

	
 
        :param revisions: list of revisions to get statuses for
 
        """
 
        if not revisions:
 
            return {}
 

	
kallithea/tests/other/test_libs.py
Show inline comments
 
@@ -174,212 +174,212 @@ class TestLibs(TestController):
 
        (dict(months= -1, days= -20), u'1 month ago'),
 
        (dict(years= -1, months= -1), u'13 months ago'),
 
        (dict(years= -1, months= -10), u'22 months ago'),
 
        (dict(years= -2, months= -4), u'2 years ago'),
 
        (dict(years= -2, months= -11), u'3 years ago'),
 
        (dict(years= -3, months= -2), u'3 years ago'),
 
        (dict(years= -4, months= -8), u'5 years ago'),
 
    ])
 
    def test_age_short(self, age_args, expected):
 
        from kallithea.lib.utils2 import age
 
        from dateutil import relativedelta
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), show_short_version=True, now=n) == expected
 

	
 
    @parametrize('age_args,expected', [
 
        (dict(), u'just now'),
 
        (dict(seconds=1), u'in 1 second'),
 
        (dict(seconds=60 * 2), u'in 2 minutes'),
 
        (dict(hours=1), u'in 1 hour'),
 
        (dict(hours=24), u'in 1 day'),
 
        (dict(hours=24 * 5), u'in 5 days'),
 
        (dict(months=1), u'in 1 month'),
 
        (dict(months=1, days=1), u'in 1 month and 1 day'),
 
        (dict(years=1, months=1), u'in 1 year and 1 month')
 
    ])
 
    def test_age_in_future(self, age_args, expected):
 
        from kallithea.lib.utils2 import age
 
        from dateutil import relativedelta
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), now=n) == expected
 

	
 
    def test_tag_extractor(self):
 
        sample = (
 
            "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
 
            "[requires] [stale] [see<>=>] [see => http://example.com]"
 
            "[requires => url] [lang => python] [just a tag]"
 
            "[,d] [ => ULR ] [obsolete] [desc]]"
 
        )
 
        from kallithea.lib.helpers import urlify_text
 
        res = urlify_text(sample, stylize=True)
 
        assert '<div class="label label-meta" data-tag="tag">tag</div>' in res
 
        assert '<div class="label label-meta" data-tag="obsolete">obsolete</div>' in res
 
        assert '<div class="label label-meta" data-tag="stale">stale</div>' in res
 
        assert '<div class="label label-meta" data-tag="lang">python</div>' in res
 
        assert '<div class="label label-meta" data-tag="requires">requires =&gt; <a href="/url">url</a></div>' in res
 
        assert '<div class="label label-meta" data-tag="tag">tag</div>' in res
 

	
 
    def test_alternative_gravatar(self):
 
        from kallithea.lib.helpers import gravatar_url
 
        _md5 = lambda s: hashlib.md5(s).hexdigest()
 

	
 
        # mock tg.tmpl_context
 
        def fake_tmpl_context(_url):
 
            _c = AttributeDict()
 
            _c.visual = AttributeDict()
 
            _c.visual.use_gravatar = True
 
            _c.visual.gravatar_url = _url
 

	
 
            return _c
 

	
 
        fake_url = FakeUrlGenerator(current_url='https://example.com')
 
        with mock.patch('kallithea.config.routing.url', fake_url):
 
            fake = fake_tmpl_context(_url='http://example.com/{email}')
 
            with mock.patch('tg.tmpl_context', fake):
 
                    from kallithea.config.routing import url
 
                    assert url.current() == 'https://example.com'
 
                    grav = gravatar_url(email_address='test@example.com', size=24)
 
                    assert grav == 'http://example.com/test@example.com'
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{email}')
 
            with mock.patch('tg.tmpl_context', fake):
 
                grav = gravatar_url(email_address='test@example.com', size=24)
 
                assert grav == 'http://example.com/test@example.com'
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{md5email}')
 
            with mock.patch('tg.tmpl_context', fake):
 
                em = 'test@example.com'
 
                grav = gravatar_url(email_address=em, size=24)
 
                assert grav == 'http://example.com/%s' % (_md5(em))
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{md5email}/{size}')
 
            with mock.patch('tg.tmpl_context', fake):
 
                em = 'test@example.com'
 
                grav = gravatar_url(email_address=em, size=24)
 
                assert grav == 'http://example.com/%s/%s' % (_md5(em), 24)
 

	
 
            fake = fake_tmpl_context(_url='{scheme}://{netloc}/{md5email}/{size}')
 
            with mock.patch('tg.tmpl_context', fake):
 
                em = 'test@example.com'
 
                grav = gravatar_url(email_address=em, size=24)
 
                assert grav == 'https://example.com/%s/%s' % (_md5(em), 24)
 

	
 
    @parametrize('clone_uri_tmpl,repo_name,overrides,prefix,expected', [
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {}, '', 'http://vps1:8000/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'username': 'username'}, '', 'http://username@vps1:8000/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {}, '/prefix', 'http://vps1:8000/prefix/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'username': 'user'}, '/prefix', 'http://user@vps1:8000/prefix/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'username': 'username'}, '/prefix', 'http://username@vps1:8000/prefix/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'username': 'user'}, '/prefix/', 'http://user@vps1:8000/prefix/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', {'username': 'username'}, '/prefix/', 'http://username@vps1:8000/prefix/group/repo1'),
 
        ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', {}, '', 'http://vps1:8000/_23'),
 
        ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', {'username': 'username'}, '', 'http://username@vps1:8000/_23'),
 
        ('http://{user}@{netloc}/_{repoid}', 'group/repo1', {'username': 'username'}, '', 'http://username@vps1:8000/_23'),
 
        ('http://{netloc}/_{repoid}', 'group/repo1', {'username': 'username'}, '', 'http://vps1:8000/_23'),
 
        ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', {'username': 'username'}, '', 'https://username@proxy1.example.com/group/repo1'),
 
        ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', {}, '', 'https://proxy1.example.com/group/repo1'),
 
        ('https://proxy1.example.com/{user}/{repo}', 'group/repo1', {'username': 'username'}, '', 'https://proxy1.example.com/username/group/repo1'),
 
    @parametrize('clone_uri_tmpl,repo_name,username,prefix,expected', [
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', None, '', 'http://vps1:8000/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', 'username', '', 'http://username@vps1:8000/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', None, '/prefix', 'http://vps1:8000/prefix/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', 'user', '/prefix', 'http://user@vps1:8000/prefix/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', 'username', '/prefix', 'http://username@vps1:8000/prefix/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', 'user', '/prefix/', 'http://user@vps1:8000/prefix/group/repo1'),
 
        (Repository.DEFAULT_CLONE_URI, 'group/repo1', 'username', '/prefix/', 'http://username@vps1:8000/prefix/group/repo1'),
 
        ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', None, '', 'http://vps1:8000/_23'),
 
        ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', 'username', '', 'http://username@vps1:8000/_23'),
 
        ('http://{user}@{netloc}/_{repoid}', 'group/repo1', 'username', '', 'http://username@vps1:8000/_23'),
 
        ('http://{netloc}/_{repoid}', 'group/repo1', 'username', '', 'http://vps1:8000/_23'),
 
        ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', 'username', '', 'https://username@proxy1.example.com/group/repo1'),
 
        ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', None, '', 'https://proxy1.example.com/group/repo1'),
 
        ('https://proxy1.example.com/{user}/{repo}', 'group/repo1', 'username', '', 'https://proxy1.example.com/username/group/repo1'),
 
    ])
 
    def test_clone_url_generator(self, clone_uri_tmpl, repo_name, overrides, prefix, expected):
 
    def test_clone_url_generator(self, clone_uri_tmpl, repo_name, username, prefix, expected):
 
        from kallithea.lib.utils2 import get_clone_url
 
        clone_url = get_clone_url(clone_uri_tmpl=clone_uri_tmpl, prefix_url='http://vps1:8000' + prefix,
 
                                  repo_name=repo_name, repo_id=23, **overrides)
 
                                  repo_name=repo_name, repo_id=23, username=username)
 
        assert clone_url == expected
 

	
 
    def _quick_url(self, text, tmpl="""<a class="changeset_hash" href="%s">%s</a>""", url_=None):
 
        """
 
        Changes `some text url[foo]` => `some text <a href="/">foo</a>
 

	
 
        :param text:
 
        """
 
        import re
 
        # quickly change expected url[] into a link
 
        url_pattern = re.compile(r'(?:url\[)(.+?)(?:\])')
 

	
 
        def url_func(match_obj):
 
            _url = match_obj.groups()[0]
 
            return tmpl % (url_ or '/repo_name/changeset/%s' % _url, _url)
 
        return url_pattern.sub(url_func, text)
 

	
 
    @parametrize('sample,expected', [
 
      ("",
 
       ""),
 
      ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
 
       """git-svn-id: <a href="https://svn.apache.org/repos/asf/libcloud/trunk@1441655">https://svn.apache.org/repos/asf/libcloud/trunk@1441655</a> 13f79535-47bb-0310-9956-ffa450edef68"""),
 
      ("from rev 000000000000",
 
       """from rev url[000000000000]"""),
 
      ("from rev 000000000000123123 also rev 000000000000",
 
       """from rev url[000000000000123123] also rev url[000000000000]"""),
 
      ("this should-000 00",
 
       """this should-000 00"""),
 
      ("longtextffffffffff rev 123123123123",
 
       """longtextffffffffff rev url[123123123123]"""),
 
      ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff",
 
       """rev ffffffffffffffffffffffffffffffffffffffffffffffffff"""),
 
      ("ffffffffffff some text traalaa",
 
       """url[ffffffffffff] some text traalaa"""),
 
       ("""Multi line
 
       123123123123
 
       some text 123123123123
 
       sometimes !
 
       """,
 
       """Multi line<br/>"""
 
       """       url[123123123123]<br/>"""
 
       """       some text url[123123123123]<br/>"""
 
       """       sometimes !"""),
 
    ])
 
    def test_urlify_text(self, sample, expected):
 
        expected = self._quick_url(expected)
 
        fake_url = FakeUrlGenerator(changeset_home='/%(repo_name)s/changeset/%(revision)s')
 
        with mock.patch('kallithea.config.routing.url', fake_url):
 
            from kallithea.lib.helpers import urlify_text
 
            assert urlify_text(sample, 'repo_name') == expected
 

	
 
    @parametrize('sample,expected,url_', [
 
      ("",
 
       "",
 
       ""),
 
      ("https://svn.apache.org/repos",
 
       """url[https://svn.apache.org/repos]""",
 
       "https://svn.apache.org/repos"),
 
      ("http://svn.apache.org/repos",
 
       """url[http://svn.apache.org/repos]""",
 
       "http://svn.apache.org/repos"),
 
      ("from rev a also rev http://google.com",
 
       """from rev a also rev url[http://google.com]""",
 
       "http://google.com"),
 
      ("http://imgur.com/foo.gif inline http://imgur.com/foo.gif ending http://imgur.com/foo.gif",
 
       """url[http://imgur.com/foo.gif] inline url[http://imgur.com/foo.gif] ending url[http://imgur.com/foo.gif]""",
 
       "http://imgur.com/foo.gif"),
 
      ("""Multi line
 
       https://foo.bar.example.com
 
       some text lalala""",
 
       """Multi line<br/>"""
 
       """       url[https://foo.bar.example.com]<br/>"""
 
       """       some text lalala""",
 
       "https://foo.bar.example.com"),
 
      ("@mention @someone",
 
       """<b>@mention</b> <b>@someone</b>""",
 
       ""),
 
      ("deadbeefcafe 123412341234",
 
       """<a class="changeset_hash" href="/repo_name/changeset/deadbeefcafe">deadbeefcafe</a> <a class="changeset_hash" href="/repo_name/changeset/123412341234">123412341234</a>""",
 
       ""),
 
      ("We support * markup for *bold* markup of *single or multiple* words, "
 
       "*a bit @like http://slack.com*. "
 
       "The first * must come after whitespace and not be followed by whitespace, "
 
       "contain anything but * and newline until the next *, "
 
       "which must not come after whitespace "
 
       "and not be followed by * or alphanumerical *characters*.",
 
       """We support * markup for <b>*bold*</b> markup of <b>*single or multiple*</b> words, """
 
       """<b>*a bit <b>@like</b> <a href="http://slack.com">http://slack.com</a>*</b>. """
 
       """The first * must come after whitespace and not be followed by whitespace, """
 
       """contain anything but * and newline until the next *, """
 
       """which must not come after whitespace """
 
       """and not be followed by * or alphanumerical <b>*characters*</b>.""",
 
       "-"),
 
      ("HTML escaping: <abc> 'single' \"double\" &pointer",
 
       "HTML escaping: &lt;abc&gt; &#39;single&#39; &quot;double&quot; &amp;pointer",
 
       "-"),
0 comments (0 inline, 0 general)