# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see
'
' ' +
ls + ' | ')
else:
yield 0, ('
Author:"
" %s ") % (author, date, message)
lnk_format = show_id(changeset)
uri = link_to(
lnk_format,
url('changeset_home', repo_name=repo_name,
revision=changeset.raw_id),
style=get_color_string(changeset.raw_id),
class_='tooltip safe-html-title',
title=tooltip_html
)
uri += '\n'
return uri
return _url_func
return literal(markup_whitespace(annotate_highlight(filenode, url_func(repo_name), **kwargs)))
def is_following_repo(repo_name, user_id):
from kallithea.model.scm import ScmModel
return ScmModel().is_following_repo(repo_name, user_id)
class _Message(object):
"""A message returned by ``Flash.pop_messages()``.
Converting the message to a string returns the message text. Instances
also have the following attributes:
* ``message``: the message text.
* ``category``: the category specified when the message was created.
"""
def __init__(self, category, message):
self.category = category
self.message = message
def __str__(self):
return self.message
__unicode__ = __str__
def __html__(self):
return escape(safe_unicode(self.message))
class Flash(_Flash):
def __call__(self, message, category=None, ignore_duplicate=False, logf=None):
"""
Show a message to the user _and_ log it through the specified function
category: notice (default), warning, error, success
logf: a custom log function - such as log.debug
logf defaults to log.info, unless category equals 'success', in which
case logf defaults to log.debug.
"""
if logf is None:
logf = log.info
if category == 'success':
logf = log.debug
logf('Flash %s: %s', category, message)
super(Flash, self).__call__(message, category, ignore_duplicate)
def pop_messages(self):
"""Return all accumulated messages and delete them from the session.
The return value is a list of ``Message`` objects.
"""
from pylons import session
messages = session.pop(self.session_key, [])
session.save()
return [_Message(*m) for m in messages]
flash = Flash()
#==============================================================================
# SCM FILTERS available via h.
#==============================================================================
from kallithea.lib.vcs.utils import author_name, author_email
from kallithea.lib.utils2 import credentials_filter, age as _age
from kallithea.model.db import User, ChangesetStatus, PullRequest
age = lambda x, y=False: _age(x, y)
capitalize = lambda x: x.capitalize()
email = author_email
short_id = lambda x: x[:12]
hide_credentials = lambda x: ''.join(credentials_filter(x))
def show_id(cs):
"""
Configurable function that shows ID
by default it's r123:fffeeefffeee
:param cs: changeset instance
"""
from kallithea import CONFIG
def_len = safe_int(CONFIG.get('show_sha_length', 12))
show_rev = str2bool(CONFIG.get('show_revision_number', False))
raw_id = cs.raw_id[:def_len]
if show_rev:
return 'r%s:%s' % (cs.revision, raw_id)
else:
return raw_id
def fmt_date(date):
if date:
return date.strftime("%Y-%m-%d %H:%M:%S").decode('utf8')
return ""
def is_git(repository):
if hasattr(repository, 'alias'):
_type = repository.alias
elif hasattr(repository, 'repo_type'):
_type = repository.repo_type
else:
_type = repository
return _type == 'git'
def is_hg(repository):
if hasattr(repository, 'alias'):
_type = repository.alias
elif hasattr(repository, 'repo_type'):
_type = repository.repo_type
else:
_type = repository
return _type == 'hg'
@cache_region('long_term', 'user_or_none')
def user_or_none(author):
"""Try to match email part of VCS committer string with a local user - or return None"""
email = author_email(author)
if email:
user = User.get_by_email(email, cache=True) # cache will only use sql_cache_short
if user is not None:
return user
return None
def email_or_none(author):
"""Try to match email part of VCS committer string with a local user.
Return primary email of user, email part of the specified author name, or None."""
if not author:
return None
user = user_or_none(author)
if user is not None:
return user.email # always use main email address - not necessarily the one used to find user
# extract email from the commit string
email = author_email(author)
if email:
return email
# No valid email, not a valid user in the system, none!
return None
def person(author, show_attr="username"):
"""Find the user identified by 'author', return one of the users attributes,
default to the username attribute, None if there is no user"""
# attr to return from fetched user
person_getter = lambda usr: getattr(usr, show_attr)
# if author is already an instance use it for extraction
if isinstance(author, User):
return person_getter(author)
user = user_or_none(author)
if user is not None:
return person_getter(user)
# Still nothing? Just pass back the author name if any, else the email
return author_name(author) or email(author)
def person_by_id(id_, show_attr="username"):
# attr to return from fetched user
person_getter = lambda usr: getattr(usr, show_attr)
#maybe it's an ID ?
if str(id_).isdigit() or isinstance(id_, int):
id_ = int(id_)
user = User.get(id_)
if user is not None:
return person_getter(user)
return id_
def desc_stylize(value):
"""
converts tags from value into html equivalent
:param value:
"""
if not value:
return ''
value = re.sub(r'\[see\ \=>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
'', value)
value = re.sub(r'\[license\ \=>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
'', value)
value = re.sub(r'\[(requires|recommends|conflicts|base)\ \=>\ *([a-zA-Z0-9\-\/]*)\]',
'', value)
value = re.sub(r'\[(lang|language)\ \=>\ *([a-zA-Z\-\/\#\+]*)\]',
'', value)
value = re.sub(r'\[([a-z]+)\]',
'', value)
return value
def boolicon(value):
"""Returns boolean value of a value, represented as small html image of true/false
icons
:param value: value
"""
if value:
return HTML.tag('i', class_="icon-ok")
else:
return HTML.tag('i', class_="icon-minus-circled")
def action_parser(user_log, feed=False, parse_cs=False):
"""
This helper will action_map the specified string action into translated
fancy names with icons and links
:param user_log: user log instance
:param feed: use output for feeds (no html and fancy icons)
:param parse_cs: parse Changesets into VCS instances
"""
action = user_log.action
action_params = ' '
x = action.split(':')
if len(x) > 1:
action, action_params = x
def get_cs_links():
revs_limit = 3 # display this amount always
revs_top_limit = 50 # show upto this amount of changesets hidden
revs_ids = action_params.split(',')
deleted = user_log.repository is None
if deleted:
return ','.join(revs_ids)
repo_name = user_log.repository.repo_name
def lnk(rev, repo_name):
lazy_cs = False
title_ = None
url_ = '#'
if isinstance(rev, BaseChangeset) or isinstance(rev, AttributeDict):
if rev.op and rev.ref_name:
if rev.op == 'delete_branch':
lbl = _('Deleted branch: %s') % rev.ref_name
elif rev.op == 'tag':
lbl = _('Created tag: %s') % rev.ref_name
else:
lbl = 'Unknown operation %s' % rev.op
else:
lazy_cs = True
lbl = rev.short_id[:8]
url_ = url('changeset_home', repo_name=repo_name,
revision=rev.raw_id)
else:
# changeset cannot be found - it might have been stripped or removed
lbl = rev[:12]
title_ = _('Changeset not found')
if parse_cs:
return link_to(lbl, url_, title=title_, class_='tooltip')
return link_to(lbl, url_, raw_id=rev.raw_id, repo_name=repo_name,
class_='lazy-cs' if lazy_cs else '')
def _get_op(rev_txt):
_op = None
_name = rev_txt
if len(rev_txt.split('=>')) == 2:
_op, _name = rev_txt.split('=>')
return _op, _name
revs = []
if len(filter(lambda v: v != '', revs_ids)) > 0:
repo = None
for rev in revs_ids[:revs_top_limit]:
_op, _name = _get_op(rev)
# we want parsed changesets, or new log store format is bad
if parse_cs:
try:
if repo is None:
repo = user_log.repository.scm_instance
_rev = repo.get_changeset(rev)
revs.append(_rev)
except ChangesetDoesNotExistError:
log.error('cannot find revision %s in this repo', rev)
revs.append(rev)
else:
_rev = AttributeDict({
'short_id': rev[:12],
'raw_id': rev,
'message': '',
'op': _op,
'ref_name': _name
})
revs.append(_rev)
cs_links = [" " + ', '.join(
[lnk(rev, repo_name) for rev in revs[:revs_limit]]
)]
_op1, _name1 = _get_op(revs_ids[0])
_op2, _name2 = _get_op(revs_ids[-1])
_rev = '%s...%s' % (_name1, _name2)
compare_view = (
' Date: %s Message:" " %s '
'%s ' % (
_('Show all combined changesets %s->%s') % (
revs_ids[0][:12], revs_ids[-1][:12]
),
url('changeset_home', repo_name=repo_name,
revision=_rev
),
_('Compare view')
)
)
# if we have exactly one more than normally displayed
# just display it, takes less space than displaying
# "and 1 more revisions"
if len(revs_ids) == revs_limit + 1:
cs_links.append(", " + lnk(revs[revs_limit], repo_name))
# hidden-by-default ones
if len(revs_ids) > revs_limit + 1:
uniq_id = revs_ids[0]
html_tmpl = (
' %s %s %s'
)
if not feed:
cs_links.append(html_tmpl % (
_('and'),
uniq_id, _('%s more') % (len(revs_ids) - revs_limit),
_('revisions')
)
)
if not feed:
html_tmpl = ''
else:
html_tmpl = ' %s '
morelinks = ', '.join(
[lnk(rev, repo_name) for rev in revs[revs_limit:]]
)
if len(revs_ids) > revs_top_limit:
morelinks += ', ...'
cs_links.append(html_tmpl % (uniq_id, morelinks))
if len(revs) > 1:
cs_links.append(compare_view)
return ''.join(cs_links)
def get_fork_name():
repo_name = action_params
url_ = url('summary_home', repo_name=repo_name)
return _('Fork name %s') % link_to(action_params, url_)
def get_user_name():
user_name = action_params
return user_name
def get_users_group():
group_name = action_params
return group_name
def get_pull_request():
pull_request_id = action_params
nice_id = PullRequest.make_nice_id(pull_request_id)
deleted = user_log.repository is None
if deleted:
repo_name = user_log.repository_name
else:
repo_name = user_log.repository.repo_name
return link_to(_('Pull request %s') % nice_id,
url('pullrequest_show', repo_name=repo_name,
pull_request_id=pull_request_id))
def get_archive_name():
archive_name = action_params
return archive_name
# action : translated str, callback(extractor), icon
action_map = {
'user_deleted_repo': (_('[deleted] repository'),
None, 'icon-trashcan'),
'user_created_repo': (_('[created] repository'),
None, 'icon-plus'),
'user_created_fork': (_('[created] repository as fork'),
None, 'icon-fork'),
'user_forked_repo': (_('[forked] repository'),
get_fork_name, 'icon-fork'),
'user_updated_repo': (_('[updated] repository'),
None, 'icon-pencil'),
'user_downloaded_archive': (_('[downloaded] archive from repository'),
get_archive_name, 'icon-download-cloud'),
'admin_deleted_repo': (_('[delete] repository'),
None, 'icon-trashcan'),
'admin_created_repo': (_('[created] repository'),
None, 'icon-plus'),
'admin_forked_repo': (_('[forked] repository'),
None, 'icon-fork'),
'admin_updated_repo': (_('[updated] repository'),
None, 'icon-pencil'),
'admin_created_user': (_('[created] user'),
get_user_name, 'icon-user'),
'admin_updated_user': (_('[updated] user'),
get_user_name, 'icon-user'),
'admin_created_users_group': (_('[created] user group'),
get_users_group, 'icon-pencil'),
'admin_updated_users_group': (_('[updated] user group'),
get_users_group, 'icon-pencil'),
'user_commented_revision': (_('[commented] on revision in repository'),
get_cs_links, 'icon-comment'),
'user_commented_pull_request': (_('[commented] on pull request for'),
get_pull_request, 'icon-comment'),
'user_closed_pull_request': (_('[closed] pull request for'),
get_pull_request, 'icon-ok'),
'push': (_('[pushed] into'),
get_cs_links, 'icon-move-up'),
'push_local': (_('[committed via Kallithea] into repository'),
get_cs_links, 'icon-pencil'),
'push_remote': (_('[pulled from remote] into repository'),
get_cs_links, 'icon-move-up'),
'pull': (_('[pulled] from'),
None, 'icon-move-down'),
'started_following_repo': (_('[started following] repository'),
None, 'icon-heart'),
'stopped_following_repo': (_('[stopped following] repository'),
None, 'icon-heart-empty'),
}
action_str = action_map.get(action, action)
if feed:
action = action_str[0].replace('[', '').replace(']', '')
else:
action = action_str[0] \
.replace('[', '') \
.replace(']', '')
action_params_func = lambda: ""
if callable(action_str[1]):
action_params_func = action_str[1]
def action_parser_icon():
action = user_log.action
action_params = None
x = action.split(':')
if len(x) > 1:
action, action_params = x
tmpl = """"""
ico = action_map.get(action, ['', '', ''])[2]
return literal(tmpl % (ico, action))
# returned callbacks we need to call to get
return [lambda: literal(action), action_params_func, action_parser_icon]
#==============================================================================
# PERMS
#==============================================================================
from kallithea.lib.auth import HasPermissionAny, HasPermissionAll, \
HasRepoPermissionAny, HasRepoPermissionAll, HasRepoGroupPermissionAll, \
HasRepoGroupPermissionAny
#==============================================================================
# GRAVATAR URL
#==============================================================================
def gravatar(email_address, cls='', size=30, ssl_enabled=True):
"""return html element of the gravatar
This method will return an ' suf = '' if len(nodes) > 30: suf = ' ' + _(' and %s more') % (len(nodes) - 30) return literal(pref + ' '.join([safe_unicode(x.path) for x in nodes[:30]]) + suf) else: return ': ' + _('No files') def repo_link(groups_and_repos): """ Makes a breadcrumbs link to repo within a group joins » on each group to create a fancy link ex:: group >> subgroup >> repo :param groups_and_repos: :param last_url: """ groups, just_name, repo_name = groups_and_repos last_url = url('summary_home', repo_name=repo_name) last_link = link_to(just_name, last_url) def make_link(group): return link_to(group.name, url('repos_group_home', group_name=group.group_name)) return literal(' » '.join(map(make_link, groups) + ['%s' % last_link])) def fancy_file_stats(stats): """ Displays a fancy two colored bar for number of added/deleted lines of code on file :param stats: two element list of added/deleted lines of code """ from kallithea.lib.diffs import NEW_FILENODE, DEL_FILENODE, \ MOD_FILENODE, RENAMED_FILENODE, CHMOD_FILENODE, BIN_FILENODE def cgen(l_type, a_v, d_v): mapping = {'tr': 'top-right-rounded-corner-mid', 'tl': 'top-left-rounded-corner-mid', 'br': 'bottom-right-rounded-corner-mid', 'bl': 'bottom-left-rounded-corner-mid'} map_getter = lambda x: mapping[x] if l_type == 'a' and d_v: #case when added and deleted are present return ' '.join(map(map_getter, ['tl', 'bl'])) if l_type == 'a' and not d_v: return ' '.join(map(map_getter, ['tr', 'br', 'tl', 'bl'])) if l_type == 'd' and a_v: return ' '.join(map(map_getter, ['tr', 'br'])) if l_type == 'd' and not a_v: return ' '.join(map(map_getter, ['tr', 'br', 'tl', 'bl'])) a, d = stats['added'], stats['deleted'] width = 100 if stats['binary']: #binary mode lbl = '' bin_op = 1 if BIN_FILENODE in stats['ops']: lbl = 'bin+' if NEW_FILENODE in stats['ops']: lbl += _('new file') bin_op = NEW_FILENODE elif MOD_FILENODE in stats['ops']: lbl += _('mod') bin_op = MOD_FILENODE elif DEL_FILENODE in stats['ops']: lbl += _('del') bin_op = DEL_FILENODE elif RENAMED_FILENODE in stats['ops']: lbl += _('rename') bin_op = RENAMED_FILENODE #chmod can go with other operations if CHMOD_FILENODE in stats['ops']: _org_lbl = _('chmod') lbl += _org_lbl if lbl.endswith('+') else '+%s' % _org_lbl #import ipdb;ipdb.set_trace() b_d = ' %s ' % (bin_op, cgen('a', a_v='', d_v=0), lbl)
b_a = ''
return literal('%s%s ' % (width, b_a, b_d))
t = stats['added'] + stats['deleted']
unit = float(width) / (t or 1)
# needs > 9% of width to be visible or 0 to be hidden
a_p = max(9, unit * a) if a > 0 else 0
d_p = max(9, unit * d) if d > 0 else 0
p_sum = a_p + d_p
if p_sum > width:
#adjust the percentage to be == 100% since we adjusted to 9
if a_p > d_p:
a_p = a_p - (p_sum - width)
else:
d_p = d_p - (p_sum - width)
a_v = a if a > 0 else ''
d_v = d if d > 0 else ''
d_a = '%s ' % (
cgen('a', a_v, d_v), a_p, a_v
)
d_d = '%s ' % (
cgen('d', a_v, d_v), d_p, d_v
)
return literal('%s%s ' % (width, d_a, d_d))
def _urlify_text_replace(match_obj):
url_full = match_obj.group(1)
return '%(url)s' % {'url': url_full}
def _urlify_text(s):
"""
Extract urls from text and make html links out of them
"""
return url_re.sub(_urlify_text_replace, s)
def urlify_text(s, truncate=None, stylize=False, truncatef=truncate):
"""
Extract urls from text and make literal html links out of them
"""
if truncate is not None:
s = truncatef(s, truncate, whole_word=True)
s = html_escape(s)
if stylize:
s = desc_stylize(s)
s = _urlify_text(s)
return literal(s)
def _urlify_changeset_replace_f(repository):
from pylons import url # doh, we need to re-import url to mock it later
def urlify_changeset_replace(match_obj):
rev = match_obj.group(0)
return '%(rev)s' % {
'url': url('changeset_home', repo_name=repository, revision=rev),
'rev': rev,
}
return urlify_changeset_replace
urilify_changeset_re = r'(?:^|(?<=[\s(),]))([0-9a-fA-F]{12,40})(?=$|\s|[.,:()])'
def urlify_changesets(text_, repository):
"""
Extract revision ids from changeset and make link from them
:param text_:
:param repository: repo name to build the URL with
"""
urlify_changeset_replace = _urlify_changeset_replace_f(repository)
return re.sub(urilify_changeset_re, urlify_changeset_replace, text_)
def linkify_others(t, l):
# attempt at fixing double quoting?
urls = re.compile(r'(\%s' % s)
def short_ref(ref_type, ref_name):
if ref_type == 'rev':
return short_id(ref_name)
return ref_name
def link_to_ref(repo_name, ref_type, ref_name, rev=None):
"""
Return full markup for a href to changeset_home for a changeset.
If ref_type is branch it will link to changelog.
ref_name is shortened if ref_type is 'rev'.
if rev is specified show it too, explicitly linking to that revision.
"""
txt = short_ref(ref_type, ref_name)
if ref_type == 'branch':
u = url('changelog_home', repo_name=repo_name, branch=ref_name)
else:
u = url('changeset_home', repo_name=repo_name, revision=ref_name)
l = link_to(repo_name + '#' + txt, u)
if rev and ref_type != 'rev':
l = literal('%s (%s)' % (l, link_to(short_id(rev), url('changeset_home', repo_name=repo_name, revision=rev))))
return l
def changeset_status(repo, revision):
return ChangesetStatusModel().get_status(repo, revision)
def changeset_status_lbl(changeset_status):
return ChangesetStatus.get_status_lbl(changeset_status)
def get_permission_name(key):
return dict(Permission.PERMS).get(key)
def journal_filter_help():
return _(textwrap.dedent('''
Example filter terms:
repository:vcs
username:developer
action:*push*
ip:127.0.0.1
date:20120101
date:[20120101100000 TO 20120102]
Generate wildcards using '*' character:
"repository:vcs*" - search everything starting with 'vcs'
"repository:*vcs*" - search for repository containing 'vcs'
Optional AND / OR operators in queries
"repository:vcs OR repository:test"
"username:test AND repository:test*"
'''))
def not_mapped_error(repo_name):
flash(_('%s repository is not mapped to db perhaps'
' it was created or renamed from the filesystem'
' please run the application again'
' in order to rescan repositories') % repo_name, category='error')
def ip_range(ip_addr):
from kallithea.model.db import UserIpMap
s, e = UserIpMap._get_ip_range(ip_addr)
return '%s - %s' % (s, e)
def form(url, method="post", **attrs):
"""Like webhelpers.html.tags.form but automatically using secure_form with
authentication_token for POST. authentication_token is thus never leaked
in the URL."""
if method.lower() == 'get':
return insecure_form(url, method=method, **attrs)
# webhelpers will turn everything but GET into POST
return secure_form(url, method=method, **attrs)
|