Changeset - ed08a2117d8f
[Not reviewed]
default
0 1 0
Mads Kiilerich - 6 years ago 2019-12-26 15:07:36
mads@kiilerich.com
Grafted from: d8d3b0d56cd6
helpers: drop fmt_date iso8601 decoding to unicode - it will always be str
1 file changed with 1 insertions and 2 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/helpers.py
Show inline comments
 
@@ -326,386 +326,385 @@ def pygmentize(filenode, **kwargs):
 
    """
 
    pygmentize function using pygments
 

	
 
    :param filenode:
 
    """
 
    lexer = get_custom_lexer(filenode.extension) or filenode.lexer
 
    return literal(markup_whitespace(
 
        code_highlight(filenode.content, lexer, CodeHtmlFormatter(**kwargs))))
 

	
 

	
 
def pygmentize_annotation(repo_name, filenode, **kwargs):
 
    """
 
    pygmentize function for annotation
 

	
 
    :param filenode:
 
    """
 

	
 
    color_dict = {}
 

	
 
    def gen_color(n=10000):
 
        """generator for getting n of evenly distributed colors using
 
        hsv color and golden ratio. It always return same order of colors
 

	
 
        :returns: RGB tuple
 
        """
 

	
 
        def hsv_to_rgb(h, s, v):
 
            if s == 0.0:
 
                return v, v, v
 
            i = int(h * 6.0)  # XXX assume int() truncates!
 
            f = (h * 6.0) - i
 
            p = v * (1.0 - s)
 
            q = v * (1.0 - s * f)
 
            t = v * (1.0 - s * (1.0 - f))
 
            i = i % 6
 
            if i == 0:
 
                return v, t, p
 
            if i == 1:
 
                return q, v, p
 
            if i == 2:
 
                return p, v, t
 
            if i == 3:
 
                return p, q, v
 
            if i == 4:
 
                return t, p, v
 
            if i == 5:
 
                return v, p, q
 

	
 
        golden_ratio = 0.618033988749895
 
        h = 0.22717784590367374
 

	
 
        for _unused in xrange(n):
 
            h += golden_ratio
 
            h %= 1
 
            HSV_tuple = [h, 0.95, 0.95]
 
            RGB_tuple = hsv_to_rgb(*HSV_tuple)
 
            yield [str(int(x * 256)) for x in RGB_tuple]
 

	
 
    cgenerator = gen_color()
 

	
 
    def get_color_string(cs):
 
        if cs in color_dict:
 
            col = color_dict[cs]
 
        else:
 
            col = color_dict[cs] = cgenerator.next()
 
        return "color: rgb(%s)! important;" % (', '.join(col))
 

	
 
    def url_func(repo_name):
 

	
 
        def _url_func(changeset):
 
            author = escape(changeset.author)
 
            date = changeset.date
 
            message = escape(changeset.message)
 
            tooltip_html = ("<b>Author:</b> %s<br/>"
 
                            "<b>Date:</b> %s</b><br/>"
 
                            "<b>Message:</b> %s") % (author, date, message)
 

	
 
            lnk_format = show_id(changeset)
 
            uri = link_to(
 
                    lnk_format,
 
                    url('changeset_home', repo_name=repo_name,
 
                        revision=changeset.raw_id),
 
                    style=get_color_string(changeset.raw_id),
 
                    **{'data-toggle': 'popover',
 
                       'data-content': tooltip_html}
 
                  )
 

	
 
            uri += '\n'
 
            return uri
 
        return _url_func
 

	
 
    return literal(markup_whitespace(annotate_highlight(filenode, url_func(repo_name), **kwargs)))
 

	
 

	
 
class _Message(object):
 
    """A message returned by ``pop_flash_messages()``.
 

	
 
    Converting the message to a string returns the message text. Instances
 
    also have the following attributes:
 

	
 
    * ``message``: the message text.
 
    * ``category``: the category specified when the message was created.
 
    """
 

	
 
    def __init__(self, category, message):
 
        self.category = category
 
        self.message = message
 

	
 
    def __str__(self):
 
        return self.message
 

	
 
    __unicode__ = __str__
 

	
 
    def __html__(self):
 
        return escape(safe_unicode(self.message))
 

	
 

	
 
def _session_flash_messages(append=None, clear=False):
 
    """Manage a message queue in tg.session: return the current message queue
 
    after appending the given message, and possibly clearing the queue."""
 
    key = 'flash'
 
    from tg import session
 
    if key in session:
 
        flash_messages = session[key]
 
    else:
 
        if append is None:  # common fast path - also used for clearing empty queue
 
            return []  # don't bother saving
 
        flash_messages = []
 
        session[key] = flash_messages
 
    if append is not None and append not in flash_messages:
 
        flash_messages.append(append)
 
    if clear:
 
        session.pop(key, None)
 
    session.save()
 
    return flash_messages
 

	
 

	
 
def flash(message, category=None, logf=None):
 
    """
 
    Show a message to the user _and_ log it through the specified function
 

	
 
    category: notice (default), warning, error, success
 
    logf: a custom log function - such as log.debug
 

	
 
    logf defaults to log.info, unless category equals 'success', in which
 
    case logf defaults to log.debug.
 
    """
 
    if logf is None:
 
        logf = log.info
 
        if category == 'success':
 
            logf = log.debug
 

	
 
    logf('Flash %s: %s', category, message)
 

	
 
    _session_flash_messages(append=(category, message))
 

	
 

	
 
def pop_flash_messages():
 
    """Return all accumulated messages and delete them from the session.
 

	
 
    The return value is a list of ``Message`` objects.
 
    """
 
    return [_Message(*m) for m in _session_flash_messages(clear=True)]
 

	
 

	
 
age = lambda x, y=False: _age(x, y)
 
capitalize = lambda x: x.capitalize()
 
email = author_email
 
short_id = lambda x: x[:12]
 
hide_credentials = lambda x: ''.join(credentials_filter(x))
 

	
 

	
 
def show_id(cs):
 
    """
 
    Configurable function that shows ID
 
    by default it's r123:fffeeefffeee
 

	
 
    :param cs: changeset instance
 
    """
 
    from kallithea import CONFIG
 
    def_len = safe_int(CONFIG.get('show_sha_length', 12))
 
    show_rev = str2bool(CONFIG.get('show_revision_number', False))
 

	
 
    raw_id = cs.raw_id[:def_len]
 
    if show_rev:
 
        return 'r%s:%s' % (cs.revision, raw_id)
 
    else:
 
        return raw_id
 

	
 

	
 
def fmt_date(date):
 
    if date:
 
        return date.strftime("%Y-%m-%d %H:%M:%S").decode('utf-8')
 

	
 
        return date.strftime("%Y-%m-%d %H:%M:%S")
 
    return ""
 

	
 

	
 
def is_git(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'git'
 

	
 

	
 
def is_hg(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'hg'
 

	
 

	
 
@cache_region('long_term', 'user_attr_or_none')
 
def user_attr_or_none(author, show_attr):
 
    """Try to match email part of VCS committer string with a local user and return show_attr
 
    - or return None if user not found"""
 
    email = author_email(author)
 
    if email:
 
        from kallithea.model.db import User
 
        user = User.get_by_email(email, cache=True) # cache will only use sql_cache_short
 
        if user is not None:
 
            return getattr(user, show_attr)
 
    return None
 

	
 

	
 
def email_or_none(author):
 
    """Try to match email part of VCS committer string with a local user.
 
    Return primary email of user, email part of the specified author name, or None."""
 
    if not author:
 
        return None
 
    email = user_attr_or_none(author, 'email')
 
    if email is not None:
 
        return email # always use user's main email address - not necessarily the one used to find user
 

	
 
    # extract email from the commit string
 
    email = author_email(author)
 
    if email:
 
        return email
 

	
 
    # No valid email, not a valid user in the system, none!
 
    return None
 

	
 

	
 
def person(author, show_attr="username"):
 
    """Find the user identified by 'author', return one of the users attributes,
 
    default to the username attribute, None if there is no user"""
 
    from kallithea.model.db import User
 
    # if author is already an instance use it for extraction
 
    if isinstance(author, User):
 
        return getattr(author, show_attr)
 

	
 
    value = user_attr_or_none(author, show_attr)
 
    if value is not None:
 
        return value
 

	
 
    # Still nothing?  Just pass back the author name if any, else the email
 
    return author_name(author) or email(author)
 

	
 

	
 
def person_by_id(id_, show_attr="username"):
 
    from kallithea.model.db import User
 
    # attr to return from fetched user
 
    person_getter = lambda usr: getattr(usr, show_attr)
 

	
 
    # maybe it's an ID ?
 
    if str(id_).isdigit() or isinstance(id_, int):
 
        id_ = int(id_)
 
        user = User.get(id_)
 
        if user is not None:
 
            return person_getter(user)
 
    return id_
 

	
 

	
 
def boolicon(value):
 
    """Returns boolean value of a value, represented as small html image of true/false
 
    icons
 

	
 
    :param value: value
 
    """
 

	
 
    if value:
 
        return HTML.tag('i', class_="icon-ok")
 
    else:
 
        return HTML.tag('i', class_="icon-minus-circled")
 

	
 

	
 
def action_parser(user_log, feed=False, parse_cs=False):
 
    """
 
    This helper will action_map the specified string action into translated
 
    fancy names with icons and links
 

	
 
    :param user_log: user log instance
 
    :param feed: use output for feeds (no html and fancy icons)
 
    :param parse_cs: parse Changesets into VCS instances
 
    """
 

	
 
    action = user_log.action
 
    action_params = ' '
 

	
 
    x = action.split(':')
 

	
 
    if len(x) > 1:
 
        action, action_params = x
 

	
 
    def get_cs_links():
 
        revs_limit = 3  # display this amount always
 
        revs_top_limit = 50  # show upto this amount of changesets hidden
 
        revs_ids = action_params.split(',')
 
        deleted = user_log.repository is None
 
        if deleted:
 
            return ','.join(revs_ids)
 

	
 
        repo_name = user_log.repository.repo_name
 

	
 
        def lnk(rev, repo_name):
 
            lazy_cs = False
 
            title_ = None
 
            url_ = '#'
 
            if isinstance(rev, BaseChangeset) or isinstance(rev, AttributeDict):
 
                if rev.op and rev.ref_name:
 
                    if rev.op == 'delete_branch':
 
                        lbl = _('Deleted branch: %s') % rev.ref_name
 
                    elif rev.op == 'tag':
 
                        lbl = _('Created tag: %s') % rev.ref_name
 
                    else:
 
                        lbl = 'Unknown operation %s' % rev.op
 
                else:
 
                    lazy_cs = True
 
                    lbl = rev.short_id[:8]
 
                    url_ = url('changeset_home', repo_name=repo_name,
 
                               revision=rev.raw_id)
 
            else:
 
                # changeset cannot be found - it might have been stripped or removed
 
                lbl = rev[:12]
 
                title_ = _('Changeset %s not found') % lbl
 
            if parse_cs:
 
                return link_to(lbl, url_, title=title_, **{'data-toggle': 'tooltip'})
 
            return link_to(lbl, url_, class_='lazy-cs' if lazy_cs else '',
 
                           **{'data-raw_id': rev.raw_id, 'data-repo_name': repo_name})
 

	
 
        def _get_op(rev_txt):
 
            _op = None
 
            _name = rev_txt
 
            if len(rev_txt.split('=>')) == 2:
 
                _op, _name = rev_txt.split('=>')
 
            return _op, _name
 

	
 
        revs = []
 
        if len([v for v in revs_ids if v != '']) > 0:
 
            repo = None
 
            for rev in revs_ids[:revs_top_limit]:
 
                _op, _name = _get_op(rev)
 

	
 
                # we want parsed changesets, or new log store format is bad
 
                if parse_cs:
 
                    try:
 
                        if repo is None:
 
                            repo = user_log.repository.scm_instance
 
                        _rev = repo.get_changeset(rev)
 
                        revs.append(_rev)
 
                    except ChangesetDoesNotExistError:
 
                        log.error('cannot find revision %s in this repo', rev)
 
                        revs.append(rev)
 
                else:
 
                    _rev = AttributeDict({
 
                        'short_id': rev[:12],
 
                        'raw_id': rev,
 
                        'message': '',
 
                        'op': _op,
 
                        'ref_name': _name
 
                    })
 
                    revs.append(_rev)
 
        cs_links = [" " + ', '.join(
 
            [lnk(rev, repo_name) for rev in revs[:revs_limit]]
 
        )]
 
        _op1, _name1 = _get_op(revs_ids[0])
 
        _op2, _name2 = _get_op(revs_ids[-1])
 

	
 
        _rev = '%s...%s' % (_name1, _name2)
 

	
 
        compare_view = (
0 comments (0 inline, 0 general)