Changeset - 27e6594645f1
[Not reviewed]
default
0 2 1
Søren Løvborg - 9 years ago 2016-08-02 17:24:25
sorenl@unity3d.com
pygmentsutils: separate from util2

__get_lem and __get_index_filenames has nothing in common with the rest
of util2, neither with respects to implementation nor usage, and belongs
in a dedicated module. Also, they are clearly not actually private, so
shouldn't be named with leading underscores.
3 files changed with 12 insertions and 770 deletions:
0 comments (0 inline, 0 general)
kallithea/config/conf.py
Show inline comments
 
@@ -4,62 +4,62 @@
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.config.conf
 
~~~~~~~~~~~~~~~~~~~~~
 

	
 
Various config settings for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Mar 7, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from kallithea.lib.utils2 import __get_lem, __get_index_filenames
 
from kallithea.lib import pygmentsutils
 

	
 

	
 
# language map is also used by whoosh indexer, which for those specified
 
# extensions will index it's content
 
LANGUAGES_EXTENSIONS_MAP = __get_lem()
 
LANGUAGES_EXTENSIONS_MAP = pygmentsutils.get_lem()
 

	
 
# Whoosh index targets
 

	
 
# Extensions we want to index content of using whoosh
 
INDEX_EXTENSIONS = LANGUAGES_EXTENSIONS_MAP.keys()
 

	
 
# Filenames we want to index content of using whoosh
 
INDEX_FILENAMES = __get_index_filenames()
 
INDEX_FILENAMES = pygmentsutils.get_index_filenames()
 

	
 
# list of readme files to search in file tree and display in summary
 
# attached weights defines the search  order lower is first
 
ALL_READMES = [
 
    ('readme', 0), ('README', 0), ('Readme', 0),
 
    ('doc/readme', 1), ('doc/README', 1), ('doc/Readme', 1),
 
    ('Docs/readme', 2), ('Docs/README', 2), ('Docs/Readme', 2),
 
    ('DOCS/readme', 2), ('DOCS/README', 2), ('DOCS/Readme', 2),
 
    ('docs/readme', 2), ('docs/README', 2), ('docs/Readme', 2),
 
]
 

	
 
# extension together with weights to search lower is first
 
RST_EXTS = [
 
    ('', 0), ('.rst', 1), ('.rest', 1),
 
    ('.RST', 2), ('.REST', 2),
 
    ('.txt', 3), ('.TXT', 3)
 
]
 

	
 
MARKDOWN_EXTS = [
 
    ('.md', 1), ('.MD', 1),
 
    ('.mkdn', 2), ('.MKDN', 2),
 
    ('.mdown', 3), ('.MDOWN', 3),
 
    ('.markdown', 4), ('.MARKDOWN', 4)
 
]
kallithea/lib/pygmentsutils.py
Show inline comments
 
file copied from kallithea/lib/utils2.py to kallithea/lib/pygmentsutils.py
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.utils2
 
~~~~~~~~~~~~~~~~~~~~
 
kallithea.lib.pygmentsutils
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Some simple helper functions
 
Functions for extracting internal Pygments data.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jan 5, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from collections import defaultdict
 
from itertools import ifilter
 
from string import lower
 

	
 
import os
 
import re
 
import sys
 
import time
 
import uuid
 
import datetime
 
import urllib
 
import binascii
 

	
 
import webob
 
import urlobject
 

	
 
from pylons.i18n.translation import _, ungettext
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.compat import json
 
from pygments import lexers
 

	
 

	
 
def __get_lem():
 
def get_lem():
 
    """
 
    Get language extension map based on what's inside pygments lexers
 
    """
 
    from pygments import lexers
 
    from string import lower
 
    from collections import defaultdict
 

	
 
    d = defaultdict(lambda: [])
 

	
 
    def __clean(s):
 
        s = s.lstrip('*')
 
        s = s.lstrip('.')
 

	
 
        if s.find('[') != -1:
 
            exts = []
 
            start, stop = s.find('['), s.find(']')
 

	
 
            for suffix in s[start + 1:stop]:
 
                exts.append(s[:s.find('[')] + suffix)
 
            return map(lower, exts)
 
        else:
 
            return map(lower, [s])
 

	
 
    for lx, t in sorted(lexers.LEXERS.items()):
 
        m = map(__clean, t[-2])
 
        if m:
 
            m = reduce(lambda x, y: x + y, m)
 
            for ext in m:
 
                desc = lx.replace('Lexer', '')
 
                d[ext].append(desc)
 

	
 
    return dict(d)
 

	
 

	
 
def __get_index_filenames():
 
def get_index_filenames():
 
    """
 
    Get list of known indexable filenames from pygment lexer internals
 
    """
 
    from pygments import lexers
 
    from itertools import ifilter
 

	
 
    filenames = []
 

	
 
    def likely_filename(s):
 
        return s.find('*') == -1 and s.find('[') == -1
 

	
 
    for lx, t in sorted(lexers.LEXERS.items()):
 
        for f in ifilter(likely_filename, t[-2]):
 
            filenames.append(f)
 

	
 
    return filenames
 

	
 

	
 
def str2bool(_str):
 
    """
 
    returns True/False value from given string, it tries to translate the
 
    string into boolean
 

	
 
    :param _str: string value to translate into boolean
 
    :rtype: boolean
 
    :returns: boolean from given string
 
    """
 
    if _str is None:
 
        return False
 
    if _str in (True, False):
 
        return _str
 
    _str = str(_str).strip().lower()
 
    return _str in ('t', 'true', 'y', 'yes', 'on', '1')
 

	
 

	
 
def aslist(obj, sep=None, strip=True):
 
    """
 
    Returns given string separated by sep as list
 

	
 
    :param obj:
 
    :param sep:
 
    :param strip:
 
    """
 
    if isinstance(obj, (basestring)):
 
        lst = obj.split(sep)
 
        if strip:
 
            lst = [v.strip() for v in lst]
 
        return lst
 
    elif isinstance(obj, (list, tuple)):
 
        return obj
 
    elif obj is None:
 
        return []
 
    else:
 
        return [obj]
 

	
 

	
 
def convert_line_endings(line, mode):
 
    """
 
    Converts a given line  "line end" according to given mode
 

	
 
    Available modes are::
 
        0 - Unix
 
        1 - Mac
 
        2 - DOS
 

	
 
    :param line: given line to convert
 
    :param mode: mode to convert to
 
    :rtype: str
 
    :return: converted line according to mode
 
    """
 
    from string import replace
 

	
 
    if mode == 0:
 
            line = replace(line, '\r\n', '\n')
 
            line = replace(line, '\r', '\n')
 
    elif mode == 1:
 
            line = replace(line, '\r\n', '\r')
 
            line = replace(line, '\n', '\r')
 
    elif mode == 2:
 
            line = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", line)
 
    return line
 

	
 

	
 
def detect_mode(line, default):
 
    """
 
    Detects line break for given line, if line break couldn't be found
 
    given default value is returned
 

	
 
    :param line: str line
 
    :param default: default
 
    :rtype: int
 
    :return: value of line end on of 0 - Unix, 1 - Mac, 2 - DOS
 
    """
 
    if line.endswith('\r\n'):
 
        return 2
 
    elif line.endswith('\n'):
 
        return 0
 
    elif line.endswith('\r'):
 
        return 1
 
    else:
 
        return default
 

	
 

	
 
def generate_api_key():
 
    """
 
    Generates a random (presumably unique) API key.
 
    """
 
    return binascii.hexlify(os.urandom(20))
 

	
 

	
 
def safe_int(val, default=None):
 
    """
 
    Returns int() of val if val is not convertable to int use default
 
    instead
 

	
 
    :param val:
 
    :param default:
 
    """
 

	
 
    try:
 
        val = int(val)
 
    except (ValueError, TypeError):
 
        val = default
 

	
 
    return val
 

	
 

	
 
def safe_unicode(str_, from_encoding=None):
 
    """
 
    safe unicode function. Does few trick to turn str_ into unicode
 

	
 
    In case of UnicodeDecode error we try to return it with encoding detected
 
    by chardet library if it fails fallback to unicode with errors replaced
 

	
 
    :param str_: string to decode
 
    :rtype: unicode
 
    :returns: unicode object
 
    """
 
    if isinstance(str_, unicode):
 
        return str_
 

	
 
    if not from_encoding:
 
        import kallithea
 
        DEFAULT_ENCODINGS = aslist(kallithea.CONFIG.get('default_encoding',
 
                                                        'utf8'), sep=',')
 
        from_encoding = DEFAULT_ENCODINGS
 

	
 
    if not isinstance(from_encoding, (list, tuple)):
 
        from_encoding = [from_encoding]
 

	
 
    try:
 
        return unicode(str_)
 
    except UnicodeDecodeError:
 
        pass
 

	
 
    for enc in from_encoding:
 
        try:
 
            return unicode(str_, enc)
 
        except UnicodeDecodeError:
 
            pass
 

	
 
    try:
 
        import chardet
 
        encoding = chardet.detect(str_)['encoding']
 
        if encoding is None:
 
            raise Exception()
 
        return str_.decode(encoding)
 
    except (ImportError, UnicodeDecodeError, Exception):
 
        return unicode(str_, from_encoding[0], 'replace')
 

	
 

	
 
def safe_str(unicode_, to_encoding=None):
 
    """
 
    safe str function. Does few trick to turn unicode_ into string
 

	
 
    In case of UnicodeEncodeError we try to return it with encoding detected
 
    by chardet library if it fails fallback to string with errors replaced
 

	
 
    :param unicode_: unicode to encode
 
    :rtype: str
 
    :returns: str object
 
    """
 

	
 
    # if it's not basestr cast to str
 
    if not isinstance(unicode_, basestring):
 
        return str(unicode_)
 

	
 
    if isinstance(unicode_, str):
 
        return unicode_
 

	
 
    if not to_encoding:
 
        import kallithea
 
        DEFAULT_ENCODINGS = aslist(kallithea.CONFIG.get('default_encoding',
 
                                                        'utf8'), sep=',')
 
        to_encoding = DEFAULT_ENCODINGS
 

	
 
    if not isinstance(to_encoding, (list, tuple)):
 
        to_encoding = [to_encoding]
 

	
 
    for enc in to_encoding:
 
        try:
 
            return unicode_.encode(enc)
 
        except UnicodeEncodeError:
 
            pass
 

	
 
    try:
 
        import chardet
 
        encoding = chardet.detect(unicode_)['encoding']
 
        if encoding is None:
 
            raise UnicodeEncodeError()
 

	
 
        return unicode_.encode(encoding)
 
    except (ImportError, UnicodeEncodeError):
 
        return unicode_.encode(to_encoding[0], 'replace')
 

	
 

	
 
def remove_suffix(s, suffix):
 
    if s.endswith(suffix):
 
        s = s[:-1 * len(suffix)]
 
    return s
 

	
 

	
 
def remove_prefix(s, prefix):
 
    if s.startswith(prefix):
 
        s = s[len(prefix):]
 
    return s
 

	
 

	
 
def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs):
 
    """
 
    Custom engine_from_config functions that makes sure we use NullPool for
 
    file based sqlite databases. This prevents errors on sqlite. This only
 
    applies to sqlalchemy versions < 0.7.0
 

	
 
    """
 
    import sqlalchemy
 
    from sqlalchemy import engine_from_config as efc
 
    import logging
 

	
 
    if int(sqlalchemy.__version__.split('.')[1]) < 7:
 

	
 
        # This solution should work for sqlalchemy < 0.7.0, and should use
 
        # proxy=TimerProxy() for execution time profiling
 

	
 
        from sqlalchemy.pool import NullPool
 
        url = configuration[prefix + 'url']
 

	
 
        if url.startswith('sqlite'):
 
            kwargs.update({'poolclass': NullPool})
 
        return efc(configuration, prefix, **kwargs)
 
    else:
 
        import time
 
        from sqlalchemy import event
 

	
 
        log = logging.getLogger('sqlalchemy.engine')
 
        BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38)
 
        engine = efc(configuration, prefix, **kwargs)
 

	
 
        def color_sql(sql):
 
            COLOR_SEQ = "\033[1;%dm"
 
            COLOR_SQL = YELLOW
 
            normal = '\x1b[0m'
 
            return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal])
 

	
 
        if configuration['debug']:
 
            #attach events only for debug configuration
 

	
 
            def before_cursor_execute(conn, cursor, statement,
 
                                    parameters, context, executemany):
 
                context._query_start_time = time.time()
 
                log.info(color_sql(">>>>> STARTING QUERY >>>>>"))
 

	
 
            def after_cursor_execute(conn, cursor, statement,
 
                                    parameters, context, executemany):
 
                total = time.time() - context._query_start_time
 
                log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total))
 

	
 
            event.listen(engine, "before_cursor_execute",
 
                         before_cursor_execute)
 
            event.listen(engine, "after_cursor_execute",
 
                         after_cursor_execute)
 

	
 
    return engine
 

	
 

	
 
def age(prevdate, show_short_version=False, now=None):
 
    """
 
    turns a datetime into an age string.
 
    If show_short_version is True, then it will generate a not so accurate but shorter string,
 
    example: 2days ago, instead of 2 days and 23 hours ago.
 

	
 
    :param prevdate: datetime object
 
    :param show_short_version: if it should approximate the date and return a shorter string
 
    :rtype: unicode
 
    :returns: unicode words describing age
 
    """
 
    now = now or datetime.datetime.now()
 
    order = ['year', 'month', 'day', 'hour', 'minute', 'second']
 
    deltas = {}
 
    future = False
 

	
 
    if prevdate > now:
 
        now, prevdate = prevdate, now
 
        future = True
 
    if future:
 
        prevdate = prevdate.replace(microsecond=0)
 
    # Get date parts deltas
 
    from dateutil import relativedelta
 
    for part in order:
 
        d = relativedelta.relativedelta(now, prevdate)
 
        deltas[part] = getattr(d, part + 's')
 

	
 
    # Fix negative offsets (there is 1 second between 10:59:59 and 11:00:00,
 
    # not 1 hour, -59 minutes and -59 seconds)
 
    for num, length in [(5, 60), (4, 60), (3, 24)]:  # seconds, minutes, hours
 
        part = order[num]
 
        carry_part = order[num - 1]
 

	
 
        if deltas[part] < 0:
 
            deltas[part] += length
 
            deltas[carry_part] -= 1
 

	
 
    # Same thing for days except that the increment depends on the (variable)
 
    # number of days in the month
 
    month_lengths = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
 
    if deltas['day'] < 0:
 
        if prevdate.month == 2 and (prevdate.year % 4 == 0 and
 
            (prevdate.year % 100 != 0 or prevdate.year % 400 == 0)):
 
            deltas['day'] += 29
 
        else:
 
            deltas['day'] += month_lengths[prevdate.month - 1]
 

	
 
        deltas['month'] -= 1
 

	
 
    if deltas['month'] < 0:
 
        deltas['month'] += 12
 
        deltas['year'] -= 1
 

	
 
    # In short version, we want nicer handling of ages of more than a year
 
    if show_short_version:
 
        if deltas['year'] == 1:
 
            # ages between 1 and 2 years: show as months
 
            deltas['month'] += 12
 
            deltas['year'] = 0
 
        if deltas['year'] >= 2:
 
            # ages 2+ years: round
 
            if deltas['month'] > 6:
 
                deltas['year'] += 1
 
                deltas['month'] = 0
 

	
 
    # Format the result
 
    fmt_funcs = {
 
        'year': lambda d: ungettext(u'%d year', '%d years', d) % d,
 
        'month': lambda d: ungettext(u'%d month', '%d months', d) % d,
 
        'day': lambda d: ungettext(u'%d day', '%d days', d) % d,
 
        'hour': lambda d: ungettext(u'%d hour', '%d hours', d) % d,
 
        'minute': lambda d: ungettext(u'%d minute', '%d minutes', d) % d,
 
        'second': lambda d: ungettext(u'%d second', '%d seconds', d) % d,
 
    }
 

	
 
    for i, part in enumerate(order):
 
        value = deltas[part]
 
        if value == 0:
 
            continue
 

	
 
        if i < 5:
 
            sub_part = order[i + 1]
 
            sub_value = deltas[sub_part]
 
        else:
 
            sub_value = 0
 

	
 
        if sub_value == 0 or show_short_version:
 
            if future:
 
                return _('in %s') % fmt_funcs[part](value)
 
            else:
 
                return _('%s ago') % fmt_funcs[part](value)
 
        if future:
 
            return _('in %s and %s') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 
        else:
 
            return _('%s and %s ago') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 

	
 
    return _('just now')
 

	
 

	
 
def uri_filter(uri):
 
    """
 
    Removes user:password from given url string
 

	
 
    :param uri:
 
    :rtype: unicode
 
    :returns: filtered list of strings
 
    """
 
    if not uri:
 
        return ''
 

	
 
    proto = ''
 

	
 
    for pat in ('https://', 'http://', 'git://'):
 
        if uri.startswith(pat):
 
            uri = uri[len(pat):]
 
            proto = pat
 
            break
 

	
 
    # remove passwords and username
 
    uri = uri[uri.find('@') + 1:]
 

	
 
    # get the port
 
    cred_pos = uri.find(':')
 
    if cred_pos == -1:
 
        host, port = uri, None
 
    else:
 
        host, port = uri[:cred_pos], uri[cred_pos + 1:]
 

	
 
    return filter(None, [proto, host, port])
 

	
 

	
 
def credentials_filter(uri):
 
    """
 
    Returns a url with removed credentials
 

	
 
    :param uri:
 
    """
 

	
 
    uri = uri_filter(uri)
 
    #check if we have port
 
    if len(uri) > 2 and uri[2]:
 
        uri[2] = ':' + uri[2]
 

	
 
    return ''.join(uri)
 

	
 

	
 
def get_clone_url(uri_tmpl, qualified_home_url, repo_name, repo_id, **override):
 
    parsed_url = urlobject.URLObject(qualified_home_url)
 
    decoded_path = safe_unicode(urllib.unquote(parsed_url.path.rstrip('/')))
 
    args = {
 
        'scheme': parsed_url.scheme,
 
        'user': '',
 
        'netloc': parsed_url.netloc+decoded_path,  # path if we use proxy-prefix
 
        'prefix': decoded_path,
 
        'repo': repo_name,
 
        'repoid': str(repo_id)
 
    }
 
    args.update(override)
 
    args['user'] = urllib.quote(safe_str(args['user']))
 

	
 
    for k, v in args.items():
 
        uri_tmpl = uri_tmpl.replace('{%s}' % k, v)
 

	
 
    # remove leading @ sign if it's present. Case of empty user
 
    url_obj = urlobject.URLObject(uri_tmpl)
 
    url = url_obj.with_netloc(url_obj.netloc.lstrip('@'))
 

	
 
    return safe_unicode(url)
 

	
 

	
 
def get_changeset_safe(repo, rev):
 
    """
 
    Safe version of get_changeset if this changeset doesn't exists for a
 
    repo it returns a Dummy one instead
 

	
 
    :param repo:
 
    :param rev:
 
    """
 
    from kallithea.lib.vcs.backends.base import BaseRepository
 
    from kallithea.lib.vcs.exceptions import RepositoryError
 
    from kallithea.lib.vcs.backends.base import EmptyChangeset
 
    if not isinstance(repo, BaseRepository):
 
        raise Exception('You must pass an Repository '
 
                        'object as first argument got %s', type(repo))
 

	
 
    try:
 
        cs = repo.get_changeset(rev)
 
    except (RepositoryError, LookupError):
 
        cs = EmptyChangeset(requested_revision=rev)
 
    return cs
 

	
 

	
 
def datetime_to_time(dt):
 
    if dt:
 
        return time.mktime(dt.timetuple())
 

	
 

	
 
def time_to_datetime(tm):
 
    if tm:
 
        if isinstance(tm, basestring):
 
            try:
 
                tm = float(tm)
 
            except ValueError:
 
                return
 
        return datetime.datetime.fromtimestamp(tm)
 

	
 

	
 
# Must match regexp in kallithea/public/js/base.js MentionsAutoComplete()
 
# Check char before @ - it must not look like we are in an email addresses.
 
# Matching is greedy so we don't have to look beyond the end.
 
MENTIONS_REGEX = re.compile(r'(?:^|(?<=[^a-zA-Z0-9]))@([a-zA-Z0-9][-_.a-zA-Z0-9]*[a-zA-Z0-9])')
 

	
 
def extract_mentioned_usernames(text):
 
    r"""
 
    Returns list of (possible) usernames @mentioned in given text.
 

	
 
    >>> extract_mentioned_usernames('@1-2.a_X,@1234 not@not @ddd@not @n @ee @ff @gg, @gg;@hh @n\n@zz,')
 
    ['1-2.a_X', '1234', 'ddd', 'ee', 'ff', 'gg', 'hh', 'zz']
 
    """
 
    return MENTIONS_REGEX.findall(text)
 

	
 
def extract_mentioned_users(text):
 
    """ Returns set of actual database Users @mentioned in given text. """
 
    from kallithea.model.db import User
 
    result = set()
 
    for name in extract_mentioned_usernames(text):
 
        user = User.get_by_username(name, case_insensitive=True)
 
        if user is not None and user.username != User.DEFAULT_USER:
 
            result.add(user)
 
    return result
 

	
 

	
 
class AttributeDict(dict):
 
    def __getattr__(self, attr):
 
        return self.get(attr, None)
 
    __setattr__ = dict.__setitem__
 
    __delattr__ = dict.__delitem__
 

	
 

	
 
def fix_PATH(os_=None):
 
    """
 
    Get current active python path, and append it to PATH variable to fix issues
 
    of subprocess calls and different python versions
 
    """
 
    if os_ is None:
 
        import os
 
    else:
 
        os = os_
 

	
 
    cur_path = os.path.split(sys.executable)[0]
 
    if not os.environ['PATH'].startswith(cur_path):
 
        os.environ['PATH'] = '%s:%s' % (cur_path, os.environ['PATH'])
 

	
 

	
 
def obfuscate_url_pw(engine):
 
    from sqlalchemy.engine import url as sa_url
 
    from sqlalchemy.exc import ArgumentError
 
    try:
 
        _url = sa_url.make_url(engine or '')
 
    except ArgumentError:
 
        return engine
 
    if _url.password:
 
        _url.password = 'XXXXX'
 
    return str(_url)
 

	
 

	
 
def get_server_url(environ):
 
    req = webob.Request(environ)
 
    return req.host_url + req.script_name
 

	
 

	
 
def _extract_extras(env=None):
 
    """
 
    Extracts the Kallithea extras data from os.environ, and wraps it into named
 
    AttributeDict object
 
    """
 
    if not env:
 
        env = os.environ
 

	
 
    try:
 
        extras = json.loads(env['KALLITHEA_EXTRAS'])
 
    except KeyError:
 
        extras = {}
 

	
 
    try:
 
        for k in ['username', 'repository', 'locked_by', 'scm', 'make_lock',
 
                  'action', 'ip']:
 
            extras[k]
 
    except KeyError as e:
 
        raise Exception('Missing key %s in os.environ %s' % (e, extras))
 

	
 
    return AttributeDict(extras)
 

	
 

	
 
def _set_extras(extras):
 
    # RC_SCM_DATA can probably be removed in the future, but for compatibility now...
 
    os.environ['KALLITHEA_EXTRAS'] = os.environ['RC_SCM_DATA'] = json.dumps(extras)
 

	
 

	
 
def unique_id(hexlen=32):
 
    alphabet = "23456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghjklmnpqrstuvwxyz"
 
    return suuid(truncate_to=hexlen, alphabet=alphabet)
 

	
 

	
 
def suuid(url=None, truncate_to=22, alphabet=None):
 
    """
 
    Generate and return a short URL safe UUID.
 

	
 
    If the url parameter is provided, set the namespace to the provided
 
    URL and generate a UUID.
 

	
 
    :param url to get the uuid for
 
    :truncate_to: truncate the basic 22 UUID to shorter version
 

	
 
    The IDs won't be universally unique any longer, but the probability of
 
    a collision will still be very low.
 
    """
 
    # Define our alphabet.
 
    _ALPHABET = alphabet or "23456789ABCDEFGHJKLMNPQRSTUVWXYZ"
 

	
 
    # If no URL is given, generate a random UUID.
 
    if url is None:
 
        unique_id = uuid.uuid4().int
 
    else:
 
        unique_id = uuid.uuid3(uuid.NAMESPACE_URL, url).int
 

	
 
    alphabet_length = len(_ALPHABET)
 
    output = []
 
    while unique_id > 0:
 
        digit = unique_id % alphabet_length
 
        output.append(_ALPHABET[digit])
 
        unique_id = int(unique_id / alphabet_length)
 
    return "".join(output)[:truncate_to]
 

	
 

	
 
def get_current_authuser():
 
    """
 
    Gets kallithea user from threadlocal tmpl_context variable if it's
 
    defined, else returns None.
 
    """
 
    from pylons import tmpl_context
 
    if hasattr(tmpl_context, 'authuser'):
 
        return tmpl_context.authuser
 

	
 
    return None
 

	
 

	
 
class OptionalAttr(object):
 
    """
 
    Special Optional Option that defines other attribute. Example::
 

	
 
        def test(apiuser, userid=Optional(OAttr('apiuser')):
 
            user = Optional.extract(userid)
 
            # calls
 

	
 
    """
 

	
 
    def __init__(self, attr_name):
 
        self.attr_name = attr_name
 

	
 
    def __repr__(self):
 
        return '<OptionalAttr:%s>' % self.attr_name
 

	
 
    def __call__(self):
 
        return self
 

	
 
#alias
 
OAttr = OptionalAttr
 

	
 

	
 
class Optional(object):
 
    """
 
    Defines an optional parameter::
 

	
 
        param = param.getval() if isinstance(param, Optional) else param
 
        param = param() if isinstance(param, Optional) else param
 

	
 
    is equivalent of::
 

	
 
        param = Optional.extract(param)
 

	
 
    """
 

	
 
    def __init__(self, type_):
 
        self.type_ = type_
 

	
 
    def __repr__(self):
 
        return '<Optional:%s>' % self.type_.__repr__()
 

	
 
    def __call__(self):
 
        return self.getval()
 

	
 
    def getval(self):
 
        """
 
        returns value from this Optional instance
 
        """
 
        if isinstance(self.type_, OAttr):
 
            # use params name
 
            return self.type_.attr_name
 
        return self.type_
 

	
 
    @classmethod
 
    def extract(cls, val):
 
        """
 
        Extracts value from Optional() instance
 

	
 
        :param val:
 
        :return: original value if it's not Optional instance else
 
            value of instance
 
        """
 
        if isinstance(val, cls):
 
            return val.getval()
 
        return val
 

	
 
def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub):
 
    return _cleanstringsub('_', safe_str(s)).rstrip('_')
kallithea/lib/utils2.py
Show inline comments
 
@@ -22,102 +22,48 @@ Original author and date, and relevant c
 
:created_on: Jan 5, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import os
 
import re
 
import sys
 
import time
 
import uuid
 
import datetime
 
import urllib
 
import binascii
 

	
 
import webob
 
import urlobject
 

	
 
from pylons.i18n.translation import _, ungettext
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.compat import json
 

	
 

	
 
def __get_lem():
 
    """
 
    Get language extension map based on what's inside pygments lexers
 
    """
 
    from pygments import lexers
 
    from string import lower
 
    from collections import defaultdict
 

	
 
    d = defaultdict(lambda: [])
 

	
 
    def __clean(s):
 
        s = s.lstrip('*')
 
        s = s.lstrip('.')
 

	
 
        if s.find('[') != -1:
 
            exts = []
 
            start, stop = s.find('['), s.find(']')
 

	
 
            for suffix in s[start + 1:stop]:
 
                exts.append(s[:s.find('[')] + suffix)
 
            return map(lower, exts)
 
        else:
 
            return map(lower, [s])
 

	
 
    for lx, t in sorted(lexers.LEXERS.items()):
 
        m = map(__clean, t[-2])
 
        if m:
 
            m = reduce(lambda x, y: x + y, m)
 
            for ext in m:
 
                desc = lx.replace('Lexer', '')
 
                d[ext].append(desc)
 

	
 
    return dict(d)
 

	
 

	
 
def __get_index_filenames():
 
    """
 
    Get list of known indexable filenames from pygment lexer internals
 
    """
 
    from pygments import lexers
 
    from itertools import ifilter
 

	
 
    filenames = []
 

	
 
    def likely_filename(s):
 
        return s.find('*') == -1 and s.find('[') == -1
 

	
 
    for lx, t in sorted(lexers.LEXERS.items()):
 
        for f in ifilter(likely_filename, t[-2]):
 
            filenames.append(f)
 

	
 
    return filenames
 

	
 

	
 
def str2bool(_str):
 
    """
 
    returns True/False value from given string, it tries to translate the
 
    string into boolean
 

	
 
    :param _str: string value to translate into boolean
 
    :rtype: boolean
 
    :returns: boolean from given string
 
    """
 
    if _str is None:
 
        return False
 
    if _str in (True, False):
 
        return _str
 
    _str = str(_str).strip().lower()
 
    return _str in ('t', 'true', 'y', 'yes', 'on', '1')
 

	
 

	
 
def aslist(obj, sep=None, strip=True):
 
    """
 
    Returns given string separated by sep as list
 

	
 
    :param obj:
 
    :param sep:
 
    :param strip:
0 comments (0 inline, 0 general)