Changeset - 5ebd887522ea
[Not reviewed]
default
0 3 0
Andrew Shadura - 12 years ago 2014-01-12 17:30:00
andrew@shadura.me
helpers: user email can be unset

Check for not being empty, not for ''.
Also, don't call person() with email, as it can't be caught properly.
3 files changed with 5 insertions and 5 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/helpers.py
Show inline comments
 
@@ -97,797 +97,797 @@ def _reset(name, value=None, id=NotGiven
 
    _set_id_attr(attrs, id, name)
 
    convert_boolean_attrs(attrs, ["disabled"])
 
    return HTML.input(**attrs)
 

	
 
reset = _reset
 
safeid = _make_safe_id_component
 

	
 

	
 
def FID(raw_id, path):
 
    """
 
    Creates a uniqe ID for filenode based on it's hash of path and revision
 
    it's safe to use in urls
 

	
 
    :param raw_id:
 
    :param path:
 
    """
 

	
 
    return 'C-%s-%s' % (short_id(raw_id), md5(safe_str(path)).hexdigest()[:12])
 

	
 

	
 
def get_token():
 
    """Return the current authentication token, creating one if one doesn't
 
    already exist.
 
    """
 
    token_key = "_authentication_token"
 
    from pylons import session
 
    if not token_key in session:
 
        try:
 
            token = hashlib.sha1(str(random.getrandbits(128))).hexdigest()
 
        except AttributeError:  # Python < 2.4
 
            token = hashlib.sha1(str(random.randrange(2 ** 128))).hexdigest()
 
        session[token_key] = token
 
        if hasattr(session, 'save'):
 
            session.save()
 
    return session[token_key]
 

	
 

	
 
class _GetError(object):
 
    """Get error from form_errors, and represent it as span wrapped error
 
    message
 

	
 
    :param field_name: field to fetch errors for
 
    :param form_errors: form errors dict
 
    """
 

	
 
    def __call__(self, field_name, form_errors):
 
        tmpl = """<span class="error_msg">%s</span>"""
 
        if form_errors and field_name in form_errors:
 
            return literal(tmpl % form_errors.get(field_name))
 

	
 
get_error = _GetError()
 

	
 

	
 
class _ToolTip(object):
 

	
 
    def __call__(self, tooltip_title, trim_at=50):
 
        """
 
        Special function just to wrap our text into nice formatted
 
        autowrapped text
 

	
 
        :param tooltip_title:
 
        """
 
        tooltip_title = escape(tooltip_title)
 
        tooltip_title = tooltip_title.replace('<', '&lt;').replace('>', '&gt;')
 
        return tooltip_title
 
tooltip = _ToolTip()
 

	
 

	
 
class _FilesBreadCrumbs(object):
 

	
 
    def __call__(self, repo_name, rev, paths):
 
        if isinstance(paths, str):
 
            paths = safe_unicode(paths)
 
        url_l = [link_to(repo_name, url('files_home',
 
                                        repo_name=repo_name,
 
                                        revision=rev, f_path=''),
 
                         class_='ypjax-link')]
 
        paths_l = paths.split('/')
 
        for cnt, p in enumerate(paths_l):
 
            if p != '':
 
                url_l.append(link_to(p,
 
                                     url('files_home',
 
                                         repo_name=repo_name,
 
                                         revision=rev,
 
                                         f_path='/'.join(paths_l[:cnt + 1])
 
                                         ),
 
                                     class_='ypjax-link'
 
                                     )
 
                             )
 

	
 
        return literal('/'.join(url_l))
 

	
 
files_breadcrumbs = _FilesBreadCrumbs()
 

	
 

	
 
class CodeHtmlFormatter(HtmlFormatter):
 
    """
 
    My code Html Formatter for source codes
 
    """
 

	
 
    def wrap(self, source, outfile):
 
        return self._wrap_div(self._wrap_pre(self._wrap_code(source)))
 

	
 
    def _wrap_code(self, source):
 
        for cnt, it in enumerate(source):
 
            i, t = it
 
            t = '<div id="L%s">%s</div>' % (cnt + 1, t)
 
            yield i, t
 

	
 
    def _wrap_tablelinenos(self, inner):
 
        dummyoutfile = StringIO.StringIO()
 
        lncount = 0
 
        for t, line in inner:
 
            if t:
 
                lncount += 1
 
            dummyoutfile.write(line)
 

	
 
        fl = self.linenostart
 
        mw = len(str(lncount + fl - 1))
 
        sp = self.linenospecial
 
        st = self.linenostep
 
        la = self.lineanchors
 
        aln = self.anchorlinenos
 
        nocls = self.noclasses
 
        if sp:
 
            lines = []
 

	
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if i % sp == 0:
 
                        if aln:
 
                            lines.append('<a href="#%s%d" class="special">%*d</a>' %
 
                                         (la, i, mw, i))
 
                        else:
 
                            lines.append('<span class="special">%*d</span>' % (mw, i))
 
                    else:
 
                        if aln:
 
                            lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                        else:
 
                            lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 
        else:
 
            lines = []
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if aln:
 
                        lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                    else:
 
                        lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 

	
 
        # in case you wonder about the seemingly redundant <div> here: since the
 
        # content in the other cell also is wrapped in a div, some browsers in
 
        # some configurations seem to mess up the formatting...
 
        if nocls:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td><div class="linenodiv" '
 
                      'style="background-color: #f0f0f0; padding-right: 10px">'
 
                      '<pre style="line-height: 125%">' +
 
                      ls + '</pre></div></td><td id="hlcode" class="code">')
 
        else:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td class="linenos"><div class="linenodiv"><pre>' +
 
                      ls + '</pre></div></td><td id="hlcode" class="code">')
 
        yield 0, dummyoutfile.getvalue()
 
        yield 0, '</td></tr></table>'
 

	
 

	
 
_whitespace_re = re.compile(r'(\t)|( )(?=\n|</div>)')
 

	
 
def _markup_whitespace(m):
 
    groups = m.groups()
 
    if groups[0]:
 
        return '<u>\t</u>'
 
    if groups[1]:
 
        return ' <i></i>'
 

	
 
def markup_whitespace(s):
 
    return _whitespace_re.sub(_markup_whitespace, s)
 

	
 
def pygmentize(filenode, **kwargs):
 
    """
 
    pygmentize function using pygments
 

	
 
    :param filenode:
 
    """
 
    lexer = get_custom_lexer(filenode.extension) or filenode.lexer
 
    return literal(markup_whitespace(
 
        code_highlight(filenode.content, lexer, CodeHtmlFormatter(**kwargs))))
 

	
 

	
 
def pygmentize_annotation(repo_name, filenode, **kwargs):
 
    """
 
    pygmentize function for annotation
 

	
 
    :param filenode:
 
    """
 

	
 
    color_dict = {}
 

	
 
    def gen_color(n=10000):
 
        """generator for getting n of evenly distributed colors using
 
        hsv color and golden ratio. It always return same order of colors
 

	
 
        :returns: RGB tuple
 
        """
 

	
 
        def hsv_to_rgb(h, s, v):
 
            if s == 0.0:
 
                return v, v, v
 
            i = int(h * 6.0)  # XXX assume int() truncates!
 
            f = (h * 6.0) - i
 
            p = v * (1.0 - s)
 
            q = v * (1.0 - s * f)
 
            t = v * (1.0 - s * (1.0 - f))
 
            i = i % 6
 
            if i == 0:
 
                return v, t, p
 
            if i == 1:
 
                return q, v, p
 
            if i == 2:
 
                return p, v, t
 
            if i == 3:
 
                return p, q, v
 
            if i == 4:
 
                return t, p, v
 
            if i == 5:
 
                return v, p, q
 

	
 
        golden_ratio = 0.618033988749895
 
        h = 0.22717784590367374
 

	
 
        for _ in xrange(n):
 
            h += golden_ratio
 
            h %= 1
 
            HSV_tuple = [h, 0.95, 0.95]
 
            RGB_tuple = hsv_to_rgb(*HSV_tuple)
 
            yield map(lambda x: str(int(x * 256)), RGB_tuple)
 

	
 
    cgenerator = gen_color()
 

	
 
    def get_color_string(cs):
 
        if cs in color_dict:
 
            col = color_dict[cs]
 
        else:
 
            col = color_dict[cs] = cgenerator.next()
 
        return "color: rgb(%s)! important;" % (', '.join(col))
 

	
 
    def url_func(repo_name):
 

	
 
        def _url_func(changeset):
 
            author = changeset.author
 
            date = changeset.date
 
            message = tooltip(changeset.message)
 

	
 
            tooltip_html = ("<div style='font-size:0.8em'><b>Author:</b>"
 
                            " %s<br/><b>Date:</b> %s</b><br/><b>Message:"
 
                            "</b> %s<br/></div>")
 

	
 
            tooltip_html = tooltip_html % (author, date, message)
 
            lnk_format = show_id(changeset)
 
            uri = link_to(
 
                    lnk_format,
 
                    url('changeset_home', repo_name=repo_name,
 
                        revision=changeset.raw_id),
 
                    style=get_color_string(changeset.raw_id),
 
                    class_='tooltip',
 
                    title=tooltip_html
 
                  )
 

	
 
            uri += '\n'
 
            return uri
 
        return _url_func
 

	
 
    return literal(markup_whitespace(annotate_highlight(filenode, url_func(repo_name), **kwargs)))
 

	
 

	
 
def is_following_repo(repo_name, user_id):
 
    from kallithea.model.scm import ScmModel
 
    return ScmModel().is_following_repo(repo_name, user_id)
 

	
 
class _Message(object):
 
    """A message returned by ``Flash.pop_messages()``.
 

	
 
    Converting the message to a string returns the message text. Instances
 
    also have the following attributes:
 

	
 
    * ``message``: the message text.
 
    * ``category``: the category specified when the message was created.
 
    """
 

	
 
    def __init__(self, category, message):
 
        self.category = category
 
        self.message = message
 

	
 
    def __str__(self):
 
        return self.message
 

	
 
    __unicode__ = __str__
 

	
 
    def __html__(self):
 
        return escape(safe_unicode(self.message))
 

	
 
class Flash(_Flash):
 

	
 
    def pop_messages(self):
 
        """Return all accumulated messages and delete them from the session.
 

	
 
        The return value is a list of ``Message`` objects.
 
        """
 
        from pylons import session
 
        messages = session.pop(self.session_key, [])
 
        session.save()
 
        return [_Message(*m) for m in messages]
 

	
 
flash = Flash()
 

	
 
#==============================================================================
 
# SCM FILTERS available via h.
 
#==============================================================================
 
from kallithea.lib.vcs.utils import author_name, author_email
 
from kallithea.lib.utils2 import credentials_filter, age as _age
 
from kallithea.model.db import User, ChangesetStatus
 

	
 
age = lambda  x, y=False: _age(x, y)
 
capitalize = lambda x: x.capitalize()
 
email = author_email
 
short_id = lambda x: x[:12]
 
hide_credentials = lambda x: ''.join(credentials_filter(x))
 

	
 

	
 
def show_id(cs):
 
    """
 
    Configurable function that shows ID
 
    by default it's r123:fffeeefffeee
 

	
 
    :param cs: changeset instance
 
    """
 
    from kallithea import CONFIG
 
    def_len = safe_int(CONFIG.get('show_sha_length', 12))
 
    show_rev = str2bool(CONFIG.get('show_revision_number', True))
 

	
 
    raw_id = cs.raw_id[:def_len]
 
    if show_rev:
 
        return 'r%s:%s' % (cs.revision, raw_id)
 
    else:
 
        return '%s' % (raw_id)
 

	
 

	
 
def fmt_date(date):
 
    if date:
 
        _fmt = u"%a, %d %b %Y %H:%M:%S".encode('utf8')
 
        return date.strftime(_fmt).decode('utf8')
 

	
 
    return ""
 

	
 

	
 
def is_git(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'git'
 

	
 

	
 
def is_hg(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'hg'
 

	
 

	
 
def email_or_none(author):
 
    # extract email from the commit string
 
    _email = email(author)
 
    if _email != '':
 
    if _email:
 
        # check it against Kallithea database, and use the MAIN email for this
 
        # user
 
        user = User.get_by_email(_email, case_insensitive=True, cache=True)
 
        if user is not None:
 
            return user.email
 
        return _email
 

	
 
    # See if it contains a username we can get an email from
 
    user = User.get_by_username(author_name(author), case_insensitive=True,
 
                                cache=True)
 
    if user is not None:
 
        return user.email
 

	
 
    # No valid email, not a valid user in the system, none!
 
    return None
 

	
 

	
 
def person(author, show_attr="username"):
 
    # attr to return from fetched user
 
    person_getter = lambda usr: getattr(usr, show_attr)
 

	
 
    # if author is already an instance use it for extraction
 
    if isinstance(author, User):
 
        return person_getter(author)
 

	
 
    # Valid email in the attribute passed, see if they're in the system
 
    _email = email(author)
 
    if _email != '':
 
    if _email:
 
        user = User.get_by_email(_email, case_insensitive=True, cache=True)
 
        if user is not None:
 
            return person_getter(user)
 

	
 
    # Maybe it's a username?
 
    _author = author_name(author)
 
    user = User.get_by_username(_author, case_insensitive=True,
 
                                cache=True)
 
    if user is not None:
 
        return person_getter(user)
 

	
 
    # Still nothing?  Just pass back the author name if any, else the email
 
    return _author or _email
 

	
 

	
 
def person_by_id(id_, show_attr="username"):
 
    # attr to return from fetched user
 
    person_getter = lambda usr: getattr(usr, show_attr)
 

	
 
    #maybe it's an ID ?
 
    if str(id_).isdigit() or isinstance(id_, int):
 
        id_ = int(id_)
 
        user = User.get(id_)
 
        if user is not None:
 
            return person_getter(user)
 
    return id_
 

	
 

	
 
def desc_stylize(value):
 
    """
 
    converts tags from value into html equivalent
 

	
 
    :param value:
 
    """
 
    if not value:
 
        return ''
 

	
 
    value = re.sub(r'\[see\ \=\>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
 
                   '<div class="metatag" tag="see">see =&gt; \\1 </div>', value)
 
    value = re.sub(r'\[license\ \=\>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
 
                   '<div class="metatag" tag="license"><a href="http:\/\/www.opensource.org/licenses/\\1">\\1</a></div>', value)
 
    value = re.sub(r'\[(requires|recommends|conflicts|base)\ \=\>\ *([a-zA-Z0-9\-\/]*)\]',
 
                   '<div class="metatag" tag="\\1">\\1 =&gt; <a href="/\\2">\\2</a></div>', value)
 
    value = re.sub(r'\[(lang|language)\ \=\>\ *([a-zA-Z\-\/\#\+]*)\]',
 
                   '<div class="metatag" tag="lang">\\2</div>', value)
 
    value = re.sub(r'\[([a-z]+)\]',
 
                  '<div class="metatag" tag="\\1">\\1</div>', value)
 

	
 
    return value
 

	
 

	
 
def boolicon(value):
 
    """Returns boolean value of a value, represented as small html image of true/false
 
    icons
 

	
 
    :param value: value
 
    """
 

	
 
    if value:
 
        return HTML.tag('i', class_="icon-ok-sign")
 
    else:
 
        return HTML.tag('i', class_="icon-minus-sign")
 

	
 

	
 
def action_parser(user_log, feed=False, parse_cs=False):
 
    """
 
    This helper will action_map the specified string action into translated
 
    fancy names with icons and links
 

	
 
    :param user_log: user log instance
 
    :param feed: use output for feeds (no html and fancy icons)
 
    :param parse_cs: parse Changesets into VCS instances
 
    """
 

	
 
    action = user_log.action
 
    action_params = ' '
 

	
 
    x = action.split(':')
 

	
 
    if len(x) > 1:
 
        action, action_params = x
 

	
 
    def get_cs_links():
 
        revs_limit = 3  # display this amount always
 
        revs_top_limit = 50  # show upto this amount of changesets hidden
 
        revs_ids = action_params.split(',')
 
        deleted = user_log.repository is None
 
        if deleted:
 
            return ','.join(revs_ids)
 

	
 
        repo_name = user_log.repository.repo_name
 

	
 
        def lnk(rev, repo_name):
 
            if isinstance(rev, BaseChangeset) or isinstance(rev, AttributeDict):
 
                lazy_cs = True
 
                if getattr(rev, 'op', None) and getattr(rev, 'ref_name', None):
 
                    lazy_cs = False
 
                    lbl = '?'
 
                    if rev.op == 'delete_branch':
 
                        lbl = '%s' % _('Deleted branch: %s') % rev.ref_name
 
                        title = ''
 
                    elif rev.op == 'tag':
 
                        lbl = '%s' % _('Created tag: %s') % rev.ref_name
 
                        title = ''
 
                    _url = '#'
 

	
 
                else:
 
                    lbl = '%s' % (rev.short_id[:8])
 
                    _url = url('changeset_home', repo_name=repo_name,
 
                               revision=rev.raw_id)
 
                    title = tooltip(rev.message)
 
            else:
 
                ## changeset cannot be found/striped/removed etc.
 
                lbl = ('%s' % rev)[:12]
 
                _url = '#'
 
                title = _('Changeset not found')
 
            if parse_cs:
 
                return link_to(lbl, _url, title=title, class_='tooltip')
 
            return link_to(lbl, _url, raw_id=rev.raw_id, repo_name=repo_name,
 
                           class_='lazy-cs' if lazy_cs else '')
 

	
 
        def _get_op(rev_txt):
 
            _op = None
 
            _name = rev_txt
 
            if len(rev_txt.split('=>')) == 2:
 
                _op, _name = rev_txt.split('=>')
 
            return _op, _name
 

	
 
        revs = []
 
        if len(filter(lambda v: v != '', revs_ids)) > 0:
 
            repo = None
 
            for rev in revs_ids[:revs_top_limit]:
 
                _op, _name = _get_op(rev)
 

	
 
                # we want parsed changesets, or new log store format is bad
 
                if parse_cs:
 
                    try:
 
                        if repo is None:
 
                            repo = user_log.repository.scm_instance
 
                        _rev = repo.get_changeset(rev)
 
                        revs.append(_rev)
 
                    except ChangesetDoesNotExistError:
 
                        log.error('cannot find revision %s in this repo' % rev)
 
                        revs.append(rev)
 
                        continue
 
                else:
 
                    _rev = AttributeDict({
 
                        'short_id': rev[:12],
 
                        'raw_id': rev,
 
                        'message': '',
 
                        'op': _op,
 
                        'ref_name': _name
 
                    })
 
                    revs.append(_rev)
 
        cs_links = [" " + ', '.join(
 
            [lnk(rev, repo_name) for rev in revs[:revs_limit]]
 
        )]
 
        _op1, _name1 = _get_op(revs_ids[0])
 
        _op2, _name2 = _get_op(revs_ids[-1])
 

	
 
        _rev = '%s...%s' % (_name1, _name2)
 

	
 
        compare_view = (
 
            ' <div class="compare_view tooltip" title="%s">'
 
            '<a href="%s">%s</a> </div>' % (
 
                _('Show all combined changesets %s->%s') % (
 
                    revs_ids[0][:12], revs_ids[-1][:12]
 
                ),
 
                url('changeset_home', repo_name=repo_name,
 
                    revision=_rev
 
                ),
 
                _('compare view')
 
            )
 
        )
 

	
 
        # if we have exactly one more than normally displayed
 
        # just display it, takes less space than displaying
 
        # "and 1 more revisions"
 
        if len(revs_ids) == revs_limit + 1:
 
            rev = revs[revs_limit]
 
            cs_links.append(", " + lnk(rev, repo_name))
 

	
 
        # hidden-by-default ones
 
        if len(revs_ids) > revs_limit + 1:
 
            uniq_id = revs_ids[0]
 
            html_tmpl = (
 
                '<span> %s <a class="show_more" id="_%s" '
 
                'href="#more">%s</a> %s</span>'
 
            )
 
            if not feed:
 
                cs_links.append(html_tmpl % (
 
                      _('and'),
 
                      uniq_id, _('%s more') % (len(revs_ids) - revs_limit),
 
                      _('revisions')
 
                    )
 
                )
 

	
 
            if not feed:
 
                html_tmpl = '<span id="%s" style="display:none">, %s </span>'
 
            else:
 
                html_tmpl = '<span id="%s"> %s </span>'
 

	
 
            morelinks = ', '.join(
 
              [lnk(rev, repo_name) for rev in revs[revs_limit:]]
 
            )
 

	
 
            if len(revs_ids) > revs_top_limit:
 
                morelinks += ', ...'
 

	
 
            cs_links.append(html_tmpl % (uniq_id, morelinks))
 
        if len(revs) > 1:
 
            cs_links.append(compare_view)
 
        return ''.join(cs_links)
 

	
 
    def get_fork_name():
 
        repo_name = action_params
 
        _url = url('summary_home', repo_name=repo_name)
 
        return _('fork name %s') % link_to(action_params, _url)
 

	
 
    def get_user_name():
 
        user_name = action_params
 
        return user_name
 

	
 
    def get_users_group():
 
        group_name = action_params
 
        return group_name
 

	
 
    def get_pull_request():
 
        pull_request_id = action_params
 
        deleted = user_log.repository is None
 
        if deleted:
 
            repo_name = user_log.repository_name
 
        else:
 
            repo_name = user_log.repository.repo_name
 
        return link_to(_('Pull request #%s') % pull_request_id,
 
                    url('pullrequest_show', repo_name=repo_name,
 
                    pull_request_id=pull_request_id))
 

	
 
    def get_archive_name():
 
        archive_name = action_params
 
        return archive_name
 

	
 
    # action : translated str, callback(extractor), icon
 
    action_map = {
 
    'user_deleted_repo':           (_('[deleted] repository'),
 
                                    None, 'icon-trash'),
 
    'user_created_repo':           (_('[created] repository'),
 
                                    None, 'icon-plus icon-plus-colored'),
 
    'user_created_fork':           (_('[created] repository as fork'),
 
                                    None, 'icon-code-fork'),
 
    'user_forked_repo':            (_('[forked] repository'),
 
                                    get_fork_name, 'icon-code-fork'),
 
    'user_updated_repo':           (_('[updated] repository'),
 
                                    None, 'icon-pencil icon-pencil-colored'),
 
    'user_downloaded_archive':      (_('[downloaded] archive from repository'),
 
                                    get_archive_name, 'icon-download-alt'),
 
    'admin_deleted_repo':          (_('[delete] repository'),
 
                                    None, 'icon-trash'),
 
    'admin_created_repo':          (_('[created] repository'),
 
                                    None, 'icon-plus icon-plus-colored'),
 
    'admin_forked_repo':           (_('[forked] repository'),
 
                                    None, 'icon-code-fork icon-fork-colored'),
 
    'admin_updated_repo':          (_('[updated] repository'),
 
                                    None, 'icon-pencil icon-pencil-colored'),
 
    'admin_created_user':          (_('[created] user'),
 
                                    get_user_name, 'icon-user icon-user-colored'),
 
    'admin_updated_user':          (_('[updated] user'),
 
                                    get_user_name, 'icon-user icon-user-colored'),
 
    'admin_created_users_group':   (_('[created] user group'),
 
                                    get_users_group, 'icon-pencil icon-pencil-colored'),
 
    'admin_updated_users_group':   (_('[updated] user group'),
 
                                    get_users_group, 'icon-pencil icon-pencil-colored'),
 
    'user_commented_revision':     (_('[commented] on revision in repository'),
 
                                    get_cs_links, 'icon-comment icon-comment-colored'),
 
    'user_commented_pull_request': (_('[commented] on pull request for'),
 
                                    get_pull_request, 'icon-comment icon-comment-colored'),
 
    'user_closed_pull_request':    (_('[closed] pull request for'),
 
                                    get_pull_request, 'icon-check'),
 
    'push':                        (_('[pushed] into'),
 
                                    get_cs_links, 'icon-arrow-up'),
 
    'push_local':                  (_('[committed via Kallithea] into repository'),
 
                                    get_cs_links, 'icon-pencil icon-pencil-colored'),
 
    'push_remote':                 (_('[pulled from remote] into repository'),
 
                                    get_cs_links, 'icon-arrow-up'),
 
    'pull':                        (_('[pulled] from'),
 
                                    None, 'icon-arrow-down'),
 
    'started_following_repo':      (_('[started following] repository'),
 
                                    None, 'icon-heart icon-heart-colored'),
 
    'stopped_following_repo':      (_('[stopped following] repository'),
 
                                    None, 'icon-heart-empty icon-heart-colored'),
 
    }
 

	
 
    action_str = action_map.get(action, action)
 
    if feed:
 
        action = action_str[0].replace('[', '').replace(']', '')
 
    else:
 
        action = action_str[0]\
 
            .replace('[', '<span class="journal_highlight">')\
 
            .replace(']', '</span>')
 

	
 
    action_params_func = lambda: ""
 

	
 
    if callable(action_str[1]):
 
        action_params_func = action_str[1]
 

	
 
    def action_parser_icon():
 
        action = user_log.action
 
        action_params = None
 
        x = action.split(':')
 

	
 
        if len(x) > 1:
 
            action, action_params = x
 

	
 
        tmpl = """<i class="%s" alt="%s"></i>"""
 
        ico = action_map.get(action, ['', '', ''])[2]
 
        return literal(tmpl % (ico, action))
 

	
 
    # returned callbacks we need to call to get
 
    return [lambda: literal(action), action_params_func, action_parser_icon]
 

	
 

	
 

	
 
#==============================================================================
 
# PERMS
 
#==============================================================================
 
from kallithea.lib.auth import HasPermissionAny, HasPermissionAll, \
 
HasRepoPermissionAny, HasRepoPermissionAll, HasRepoGroupPermissionAll, \
 
HasRepoGroupPermissionAny
 

	
 

	
 
#==============================================================================
 
# GRAVATAR URL
 
#==============================================================================
 

	
 
def gravatar_url(email_address, size=30, ssl_enabled=True):
 
    # doh, we need to re-import those to mock it later
 
    from pylons import url
 
    from pylons import tmpl_context as c
 

	
 
    _def = 'anonymous@kallithea-scm.org'  # default gravatar
 
    _use_gravatar = c.visual.use_gravatar
 
    _gravatar_url = c.visual.gravatar_url or User.DEFAULT_GRAVATAR_URL
 

	
 
    email_address = email_address or _def
 
    if isinstance(email_address, unicode):
 
        #hashlib crashes on unicode items
 
        email_address = safe_str(email_address)
 

	
 
    if not _use_gravatar or not email_address or email_address == _def:
 
        # pick best matching size to one given in size param
 
        f = lambda a, l: min(l, key=lambda x: abs(x - a))
 
        return url("/images/user%s.png" % f(size, [14, 16, 20, 24, 30]))
 

	
 
    if _use_gravatar:
 
        _md5 = lambda s: hashlib.md5(s).hexdigest()
 

	
 
        tmpl = _gravatar_url
 
        parsed_url = urlparse.urlparse(url.current(qualified=True))
 
        tmpl = tmpl.replace('{email}', email_address)\
 
                   .replace('{md5email}', _md5(email_address.lower())) \
 
                   .replace('{netloc}', parsed_url.netloc)\
 
                   .replace('{scheme}', parsed_url.scheme)\
 
                   .replace('{size}', safe_str(size))
 
        return tmpl
 

	
 
class Page(_Page):
 
    """
 
    Custom pager to match rendering style with YUI paginator
 
    """
 

	
 
    def _get_pos(self, cur_page, max_page, items):
 
        edge = (items / 2) + 1
 
        if (cur_page <= edge):
 
            radius = max(items / 2, items - cur_page)
 
        elif (max_page - cur_page) < edge:
 
            radius = (items - 1) - (max_page - cur_page)
 
        else:
 
            radius = items / 2
 

	
 
        left = max(1, (cur_page - (radius)))
 
        right = min(max_page, cur_page + (radius))
 
        return left, cur_page, right
 

	
 
    def _range(self, regexp_match):
kallithea/model/comment.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.comment
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
comments model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Nov 11, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
from pylons.i18n.translation import _
 
from sqlalchemy.util.compat import defaultdict
 

	
 
from kallithea.lib.utils2 import extract_mentioned_users, safe_unicode
 
from kallithea.lib import helpers as h
 
from kallithea.model import BaseModel
 
from kallithea.model.db import ChangesetComment, User, Repository, \
 
    Notification, PullRequest
 
from kallithea.model.notification import NotificationModel
 
from kallithea.model.meta import Session
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ChangesetCommentsModel(BaseModel):
 

	
 
    cls = ChangesetComment
 

	
 
    def __get_changeset_comment(self, changeset_comment):
 
        return self._get_instance(ChangesetComment, changeset_comment)
 

	
 
    def __get_pull_request(self, pull_request):
 
        return self._get_instance(PullRequest, pull_request)
 

	
 
    def _extract_mentions(self, s):
 
        user_objects = []
 
        for username in extract_mentioned_users(s):
 
            user_obj = User.get_by_username(username, case_insensitive=True)
 
            if user_obj:
 
                user_objects.append(user_obj)
 
        return user_objects
 

	
 
    def _get_notification_data(self, repo, comment, user, comment_text,
 
                               line_no=None, revision=None, pull_request=None,
 
                               status_change=None, closing_pr=False):
 
        """
 
        Get notification data
 

	
 
        :param comment_text:
 
        :param line:
 
        :returns: tuple (subj,body,recipients,notification_type,email_kwargs)
 
        """
 
        # make notification
 
        body = comment_text  # text of the comment
 
        line = ''
 
        if line_no:
 
            line = _('on line %s') % line_no
 

	
 
        #changeset
 
        if revision:
 
            notification_type = Notification.TYPE_CHANGESET_COMMENT
 
            cs = repo.scm_instance.get_changeset(revision)
 
            desc = "%s" % (cs.short_id)
 

	
 
            _url = h.url('changeset_home',
 
                repo_name=repo.repo_name,
 
                revision=revision,
 
                anchor='comment-%s' % comment.comment_id,
 
                qualified=True,
 
            )
 
            subj = safe_unicode(
 
                h.link_to('Re changeset: %(desc)s %(line)s' % \
 
                          {'desc': desc, 'line': line},
 
                          _url)
 
            )
 
            email_subject = '%s commented on changeset %s' % \
 
                (user.username, h.short_id(revision))
 
            # get the current participants of this changeset
 
            recipients = ChangesetComment.get_users(revision=revision)
 
            # add changeset author if it's in kallithea system
 
            cs_author = User.get_from_cs_author(cs.author)
 
            if not cs_author:
 
                #use repo owner if we cannot extract the author correctly
 
                cs_author = repo.user
 
            recipients += [cs_author]
 
            email_kwargs = {
 
                'status_change': status_change,
 
                'cs_comment_user': h.person(user.email),
 
                'cs_comment_user': h.person(user),
 
                'cs_target_repo': h.url('summary_home', repo_name=repo.repo_name,
 
                                        qualified=True),
 
                'cs_comment_url': _url,
 
                'raw_id': revision,
 
                'message': cs.message
 
            }
 
        #pull request
 
        elif pull_request:
 
            notification_type = Notification.TYPE_PULL_REQUEST_COMMENT
 
            desc = comment.pull_request.title
 
            _url = h.url('pullrequest_show',
 
                repo_name=pull_request.other_repo.repo_name,
 
                pull_request_id=pull_request.pull_request_id,
 
                anchor='comment-%s' % comment.comment_id,
 
                qualified=True,
 
            )
 
            subj = safe_unicode(
 
                h.link_to('Re pull request #%(pr_id)s: %(desc)s %(line)s' % \
 
                          {'desc': desc,
 
                           'pr_id': comment.pull_request.pull_request_id,
 
                           'line': line},
 
                          _url)
 
            )
 
            email_subject = '%s commented on pull request #%s' % \
 
                    (user.username, comment.pull_request.pull_request_id)
 
            # get the current participants of this pull request
 
            recipients = ChangesetComment.get_users(pull_request_id=
 
                                                pull_request.pull_request_id)
 
            # add pull request author
 
            recipients += [pull_request.author]
 

	
 
            # add the reviewers to notification
 
            recipients += [x.user for x in pull_request.reviewers]
 

	
 
            #set some variables for email notification
 
            email_kwargs = {
 
                'pr_title': pull_request.title,
 
                'pr_id': pull_request.pull_request_id,
 
                'status_change': status_change,
 
                'closing_pr': closing_pr,
 
                'pr_comment_url': _url,
 
                'pr_comment_user': h.person(user.email),
 
                'pr_comment_user': h.person(user),
 
                'pr_target_repo': h.url('summary_home',
 
                                   repo_name=pull_request.other_repo.repo_name,
 
                                   qualified=True)
 
            }
 

	
 
        return subj, body, recipients, notification_type, email_kwargs, email_subject
 

	
 
    def create(self, text, repo, user, revision=None, pull_request=None,
 
               f_path=None, line_no=None, status_change=None, closing_pr=False,
 
               send_email=True):
 
        """
 
        Creates new comment for changeset or pull request.
 
        If status_change is not None this comment is associated with a
 
        status change of changeset or changesets associated with pull request
 

	
 
        :param text:
 
        :param repo:
 
        :param user:
 
        :param revision:
 
        :param pull_request: (for emails, not for comments)
 
        :param f_path:
 
        :param line_no:
 
        :param status_change: (for emails, not for comments)
 
        :param closing_pr: (for emails, not for comments)
 
        :param send_email: also send email
 
        """
 
        if not text:
 
            log.warning('Missing text for comment, skipping...')
 
            return
 

	
 
        repo = self._get_repo(repo)
 
        user = self._get_user(user)
 
        comment = ChangesetComment()
 
        comment.repo = repo
 
        comment.author = user
 
        comment.text = text
 
        comment.f_path = f_path
 
        comment.line_no = line_no
 

	
 
        if revision:
 
            comment.revision = revision
 
        elif pull_request:
 
            pull_request = self.__get_pull_request(pull_request)
 
            comment.pull_request = pull_request
 
        else:
 
            raise Exception('Please specify revision or pull_request_id')
 

	
 
        Session().add(comment)
 
        Session().flush()
 

	
 
        if send_email:
 
            (subj, body, recipients, notification_type,
 
             email_kwargs, email_subject) = self._get_notification_data(
 
                                repo, comment, user,
 
                                comment_text=text,
 
                                line_no=line_no,
 
                                revision=revision,
 
                                pull_request=pull_request,
 
                                status_change=status_change,
 
                                closing_pr=closing_pr)
 
            # create notification objects, and emails
 
            NotificationModel().create(
 
                created_by=user, subject=subj, body=body,
 
                recipients=recipients, type_=notification_type,
 
                email_kwargs=email_kwargs, email_subject=email_subject
 
            )
 

	
 
            mention_recipients = set(self._extract_mentions(body))\
 
                                    .difference(recipients)
 
            if mention_recipients:
 
                email_kwargs.update({'pr_mention': True})
 
                subj = _('[Mention]') + ' ' + subj
 
                NotificationModel().create(
 
                    created_by=user, subject=subj, body=body,
 
                    recipients=mention_recipients,
 
                    type_=notification_type,
 
                    email_kwargs=email_kwargs
 
                )
 

	
 
        return comment
 

	
 
    def delete(self, comment):
 
        """
 
        Deletes given comment
 

	
 
        :param comment_id:
 
        """
 
        comment = self.__get_changeset_comment(comment)
 
        Session().delete(comment)
 

	
 
        return comment
 

	
 
    def get_comments(self, repo_id, revision=None, pull_request=None):
 
        """
 
        Gets main comments based on revision or pull_request_id
 

	
 
        :param repo_id:
 
        :param revision:
 
        :param pull_request:
 
        """
 

	
 
        q = ChangesetComment.query()\
 
                .filter(ChangesetComment.repo_id == repo_id)\
 
                .filter(ChangesetComment.line_no == None)\
 
                .filter(ChangesetComment.f_path == None)
 
        if revision:
 
            q = q.filter(ChangesetComment.revision == revision)
 
        elif pull_request:
 
            pull_request = self.__get_pull_request(pull_request)
 
            q = q.filter(ChangesetComment.pull_request == pull_request)
 
        else:
 
            raise Exception('Please specify revision or pull_request')
 
        q = q.order_by(ChangesetComment.created_on)
 
        return q.all()
 

	
 
    def get_inline_comments(self, repo_id, revision=None, pull_request=None):
 
        q = Session().query(ChangesetComment)\
 
            .filter(ChangesetComment.repo_id == repo_id)\
 
            .filter(ChangesetComment.line_no != None)\
 
            .filter(ChangesetComment.f_path != None)\
 
            .order_by(ChangesetComment.comment_id.asc())\
 

	
 
        if revision:
 
            q = q.filter(ChangesetComment.revision == revision)
 
        elif pull_request:
 
            pull_request = self.__get_pull_request(pull_request)
 
            q = q.filter(ChangesetComment.pull_request == pull_request)
 
        else:
 
            raise Exception('Please specify revision or pull_request_id')
 

	
 
        comments = q.all()
 

	
 
        paths = defaultdict(lambda: defaultdict(list))
 

	
 
        for co in comments:
 
            paths[co.f_path][co.line_no].append(co)
 
        return paths.items()
kallithea/model/pull_request.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.pull_request
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
pull request model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 6, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import datetime
 

	
 
from pylons.i18n.translation import _
 

	
 
from kallithea.model.meta import Session
 
from kallithea.lib import helpers as h
 
from kallithea.model import BaseModel
 
from kallithea.model.db import PullRequest, PullRequestReviewers, Notification,\
 
    ChangesetStatus
 
from kallithea.model.notification import NotificationModel
 
from kallithea.lib.utils2 import safe_unicode
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class PullRequestModel(BaseModel):
 

	
 
    cls = PullRequest
 

	
 
    def __get_pull_request(self, pull_request):
 
        return self._get_instance(PullRequest, pull_request)
 

	
 
    def get_pullrequest_cnt_for_user(self, user):
 
        return PullRequest.query()\
 
                                .join(PullRequestReviewers)\
 
                                .filter(PullRequestReviewers.user_id == user)\
 
                                .filter(PullRequest.status != PullRequest.STATUS_CLOSED)\
 
                                .count()
 

	
 
    def get_all(self, repo_name, from_=False, closed=False):
 
        """Get all PRs for repo.
 
        Default is all PRs to the repo, PRs from the repo if from_.
 
        Closed PRs are only included if closed is true."""
 
        repo = self._get_repo(repo_name)
 
        q = PullRequest.query()
 
        if from_:
 
            q = q.filter(PullRequest.org_repo == repo)
 
        else:
 
            q = q.filter(PullRequest.other_repo == repo)
 
        if not closed:
 
            q = q.filter(PullRequest.status != PullRequest.STATUS_CLOSED)
 
        return q.order_by(PullRequest.created_on.desc()).all()
 

	
 
    def create(self, created_by, org_repo, org_ref, other_repo, other_ref,
 
               revisions, reviewers, title, description=None):
 
        from kallithea.model.changeset_status import ChangesetStatusModel
 

	
 
        created_by_user = self._get_user(created_by)
 
        org_repo = self._get_repo(org_repo)
 
        other_repo = self._get_repo(other_repo)
 

	
 
        new = PullRequest()
 
        new.org_repo = org_repo
 
        new.org_ref = org_ref
 
        new.other_repo = other_repo
 
        new.other_ref = other_ref
 
        new.revisions = revisions
 
        new.title = title
 
        new.description = description
 
        new.author = created_by_user
 
        Session().add(new)
 
        Session().flush()
 

	
 
        #reset state to under-review
 
        from kallithea.model.comment import ChangesetCommentsModel
 
        comment = ChangesetCommentsModel().create(
 
            text=u'Auto status change to %s' % (ChangesetStatus.get_status_lbl(ChangesetStatus.STATUS_UNDER_REVIEW)),
 
            repo=org_repo,
 
            user=new.author,
 
            pull_request=new,
 
            send_email=False
 
        )
 
        ChangesetStatusModel().set_status(
 
            org_repo,
 
            ChangesetStatus.STATUS_UNDER_REVIEW,
 
            new.author,
 
            comment,
 
            pull_request=new
 
        )
 
        self.__add_reviewers(new, reviewers)
 
        return new
 

	
 
    def __add_reviewers(self, pr, reviewers):
 
        #members
 
        for member in set(reviewers):
 
            _usr = self._get_user(member)
 
            reviewer = PullRequestReviewers(_usr, pr)
 
            Session().add(reviewer)
 

	
 
        revision_data = [(x.raw_id, x.message)
 
                         for x in map(pr.org_repo.get_changeset, pr.revisions)]
 

	
 
        #notification to reviewers
 
        pr_url = h.url('pullrequest_show', repo_name=pr.other_repo.repo_name,
 
                       pull_request_id=pr.pull_request_id,
 
                       qualified=True)
 
        subject = safe_unicode(
 
            h.link_to(
 
              _('%(user)s wants you to review pull request #%(pr_id)s: %(pr_title)s') % \
 
                {'user': pr.author.username,
 
                 'pr_title': pr.title,
 
                 'pr_id': pr.pull_request_id},
 
                pr_url)
 
            )
 
        body = pr.description
 
        kwargs = {
 
            'pr_title': pr.title,
 
            'pr_user_created': h.person(pr.author.email),
 
            'pr_user_created': h.person(pr.author),
 
            'pr_repo_url': h.url('summary_home', repo_name=pr.other_repo.repo_name,
 
                                 qualified=True,),
 
            'pr_url': pr_url,
 
            'pr_revisions': revision_data}
 

	
 
        NotificationModel().create(created_by=pr.author, subject=subject, body=body,
 
                                   recipients=reviewers,
 
                                   type_=Notification.TYPE_PULL_REQUEST, email_kwargs=kwargs)
 

	
 
    def update_reviewers(self, pull_request, reviewers_ids):
 
        reviewers_ids = set(reviewers_ids)
 
        pull_request = self.__get_pull_request(pull_request)
 
        current_reviewers = PullRequestReviewers.query()\
 
                            .filter(PullRequestReviewers.pull_request==
 
                                   pull_request)\
 
                            .all()
 
        current_reviewers_ids = set([x.user.user_id for x in current_reviewers])
 

	
 
        to_add = reviewers_ids.difference(current_reviewers_ids)
 
        to_remove = current_reviewers_ids.difference(reviewers_ids)
 

	
 
        log.debug("Adding %s reviewers" % to_add)
 
        self.__add_reviewers(pull_request, to_add)
 

	
 
        log.debug("Removing %s reviewers" % to_remove)
 
        for uid in to_remove:
 
            reviewer = PullRequestReviewers.query()\
 
                    .filter(PullRequestReviewers.user_id==uid,
 
                            PullRequestReviewers.pull_request==pull_request)\
 
                    .scalar()
 
            if reviewer:
 
                Session().delete(reviewer)
 

	
 
    def delete(self, pull_request):
 
        pull_request = self.__get_pull_request(pull_request)
 
        Session().delete(pull_request)
 

	
 
    def close_pull_request(self, pull_request):
 
        pull_request = self.__get_pull_request(pull_request)
 
        pull_request.status = PullRequest.STATUS_CLOSED
 
        pull_request.updated_on = datetime.datetime.now()
 
        Session().add(pull_request)
0 comments (0 inline, 0 general)