diff --git a/kallithea/config/conf.py b/kallithea/config/conf.py --- a/kallithea/config/conf.py +++ b/kallithea/config/conf.py @@ -25,12 +25,12 @@ Original author and date, and relevant c :license: GPLv3, see LICENSE.md for more details. """ -from kallithea.lib.utils2 import __get_lem, __get_index_filenames +from kallithea.lib import pygmentsutils # language map is also used by whoosh indexer, which for those specified # extensions will index it's content -LANGUAGES_EXTENSIONS_MAP = __get_lem() +LANGUAGES_EXTENSIONS_MAP = pygmentsutils.get_lem() # Whoosh index targets @@ -38,7 +38,7 @@ LANGUAGES_EXTENSIONS_MAP = __get_lem() INDEX_EXTENSIONS = LANGUAGES_EXTENSIONS_MAP.keys() # Filenames we want to index content of using whoosh -INDEX_FILENAMES = __get_index_filenames() +INDEX_FILENAMES = pygmentsutils.get_index_filenames() # list of readme files to search in file tree and display in summary # attached weights defines the search order lower is first diff --git a/kallithea/lib/utils2.py b/kallithea/lib/pygmentsutils.py copy from kallithea/lib/utils2.py copy to kallithea/lib/pygmentsutils.py --- a/kallithea/lib/utils2.py +++ b/kallithea/lib/pygmentsutils.py @@ -12,10 +12,10 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ -kallithea.lib.utils2 -~~~~~~~~~~~~~~~~~~~~ +kallithea.lib.pygmentsutils +~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Some simple helper functions +Functions for extracting internal Pygments data. This file was forked by the Kallithea project in July 2014. Original author and date, and relevant copyright and licensing information is below: @@ -25,32 +25,17 @@ Original author and date, and relevant c :license: GPLv3, see LICENSE.md for more details. """ +from collections import defaultdict +from itertools import ifilter +from string import lower -import os -import re -import sys -import time -import uuid -import datetime -import urllib -import binascii - -import webob -import urlobject - -from pylons.i18n.translation import _, ungettext -from kallithea.lib.vcs.utils.lazy import LazyProperty -from kallithea.lib.compat import json +from pygments import lexers -def __get_lem(): +def get_lem(): """ Get language extension map based on what's inside pygments lexers """ - from pygments import lexers - from string import lower - from collections import defaultdict - d = defaultdict(lambda: []) def __clean(s): @@ -78,12 +63,10 @@ def __get_lem(): return dict(d) -def __get_index_filenames(): +def get_index_filenames(): """ Get list of known indexable filenames from pygment lexer internals """ - from pygments import lexers - from itertools import ifilter filenames = [] @@ -95,690 +78,3 @@ def __get_index_filenames(): filenames.append(f) return filenames - - -def str2bool(_str): - """ - returns True/False value from given string, it tries to translate the - string into boolean - - :param _str: string value to translate into boolean - :rtype: boolean - :returns: boolean from given string - """ - if _str is None: - return False - if _str in (True, False): - return _str - _str = str(_str).strip().lower() - return _str in ('t', 'true', 'y', 'yes', 'on', '1') - - -def aslist(obj, sep=None, strip=True): - """ - Returns given string separated by sep as list - - :param obj: - :param sep: - :param strip: - """ - if isinstance(obj, (basestring)): - lst = obj.split(sep) - if strip: - lst = [v.strip() for v in lst] - return lst - elif isinstance(obj, (list, tuple)): - return obj - elif obj is None: - return [] - else: - return [obj] - - -def convert_line_endings(line, mode): - """ - Converts a given line "line end" according to given mode - - Available modes are:: - 0 - Unix - 1 - Mac - 2 - DOS - - :param line: given line to convert - :param mode: mode to convert to - :rtype: str - :return: converted line according to mode - """ - from string import replace - - if mode == 0: - line = replace(line, '\r\n', '\n') - line = replace(line, '\r', '\n') - elif mode == 1: - line = replace(line, '\r\n', '\r') - line = replace(line, '\n', '\r') - elif mode == 2: - line = re.sub("\r(?!\n)|(?>>>> STARTING QUERY >>>>>")) - - def after_cursor_execute(conn, cursor, statement, - parameters, context, executemany): - total = time.time() - context._query_start_time - log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total)) - - event.listen(engine, "before_cursor_execute", - before_cursor_execute) - event.listen(engine, "after_cursor_execute", - after_cursor_execute) - - return engine - - -def age(prevdate, show_short_version=False, now=None): - """ - turns a datetime into an age string. - If show_short_version is True, then it will generate a not so accurate but shorter string, - example: 2days ago, instead of 2 days and 23 hours ago. - - :param prevdate: datetime object - :param show_short_version: if it should approximate the date and return a shorter string - :rtype: unicode - :returns: unicode words describing age - """ - now = now or datetime.datetime.now() - order = ['year', 'month', 'day', 'hour', 'minute', 'second'] - deltas = {} - future = False - - if prevdate > now: - now, prevdate = prevdate, now - future = True - if future: - prevdate = prevdate.replace(microsecond=0) - # Get date parts deltas - from dateutil import relativedelta - for part in order: - d = relativedelta.relativedelta(now, prevdate) - deltas[part] = getattr(d, part + 's') - - # Fix negative offsets (there is 1 second between 10:59:59 and 11:00:00, - # not 1 hour, -59 minutes and -59 seconds) - for num, length in [(5, 60), (4, 60), (3, 24)]: # seconds, minutes, hours - part = order[num] - carry_part = order[num - 1] - - if deltas[part] < 0: - deltas[part] += length - deltas[carry_part] -= 1 - - # Same thing for days except that the increment depends on the (variable) - # number of days in the month - month_lengths = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] - if deltas['day'] < 0: - if prevdate.month == 2 and (prevdate.year % 4 == 0 and - (prevdate.year % 100 != 0 or prevdate.year % 400 == 0)): - deltas['day'] += 29 - else: - deltas['day'] += month_lengths[prevdate.month - 1] - - deltas['month'] -= 1 - - if deltas['month'] < 0: - deltas['month'] += 12 - deltas['year'] -= 1 - - # In short version, we want nicer handling of ages of more than a year - if show_short_version: - if deltas['year'] == 1: - # ages between 1 and 2 years: show as months - deltas['month'] += 12 - deltas['year'] = 0 - if deltas['year'] >= 2: - # ages 2+ years: round - if deltas['month'] > 6: - deltas['year'] += 1 - deltas['month'] = 0 - - # Format the result - fmt_funcs = { - 'year': lambda d: ungettext(u'%d year', '%d years', d) % d, - 'month': lambda d: ungettext(u'%d month', '%d months', d) % d, - 'day': lambda d: ungettext(u'%d day', '%d days', d) % d, - 'hour': lambda d: ungettext(u'%d hour', '%d hours', d) % d, - 'minute': lambda d: ungettext(u'%d minute', '%d minutes', d) % d, - 'second': lambda d: ungettext(u'%d second', '%d seconds', d) % d, - } - - for i, part in enumerate(order): - value = deltas[part] - if value == 0: - continue - - if i < 5: - sub_part = order[i + 1] - sub_value = deltas[sub_part] - else: - sub_value = 0 - - if sub_value == 0 or show_short_version: - if future: - return _('in %s') % fmt_funcs[part](value) - else: - return _('%s ago') % fmt_funcs[part](value) - if future: - return _('in %s and %s') % (fmt_funcs[part](value), - fmt_funcs[sub_part](sub_value)) - else: - return _('%s and %s ago') % (fmt_funcs[part](value), - fmt_funcs[sub_part](sub_value)) - - return _('just now') - - -def uri_filter(uri): - """ - Removes user:password from given url string - - :param uri: - :rtype: unicode - :returns: filtered list of strings - """ - if not uri: - return '' - - proto = '' - - for pat in ('https://', 'http://', 'git://'): - if uri.startswith(pat): - uri = uri[len(pat):] - proto = pat - break - - # remove passwords and username - uri = uri[uri.find('@') + 1:] - - # get the port - cred_pos = uri.find(':') - if cred_pos == -1: - host, port = uri, None - else: - host, port = uri[:cred_pos], uri[cred_pos + 1:] - - return filter(None, [proto, host, port]) - - -def credentials_filter(uri): - """ - Returns a url with removed credentials - - :param uri: - """ - - uri = uri_filter(uri) - #check if we have port - if len(uri) > 2 and uri[2]: - uri[2] = ':' + uri[2] - - return ''.join(uri) - - -def get_clone_url(uri_tmpl, qualified_home_url, repo_name, repo_id, **override): - parsed_url = urlobject.URLObject(qualified_home_url) - decoded_path = safe_unicode(urllib.unquote(parsed_url.path.rstrip('/'))) - args = { - 'scheme': parsed_url.scheme, - 'user': '', - 'netloc': parsed_url.netloc+decoded_path, # path if we use proxy-prefix - 'prefix': decoded_path, - 'repo': repo_name, - 'repoid': str(repo_id) - } - args.update(override) - args['user'] = urllib.quote(safe_str(args['user'])) - - for k, v in args.items(): - uri_tmpl = uri_tmpl.replace('{%s}' % k, v) - - # remove leading @ sign if it's present. Case of empty user - url_obj = urlobject.URLObject(uri_tmpl) - url = url_obj.with_netloc(url_obj.netloc.lstrip('@')) - - return safe_unicode(url) - - -def get_changeset_safe(repo, rev): - """ - Safe version of get_changeset if this changeset doesn't exists for a - repo it returns a Dummy one instead - - :param repo: - :param rev: - """ - from kallithea.lib.vcs.backends.base import BaseRepository - from kallithea.lib.vcs.exceptions import RepositoryError - from kallithea.lib.vcs.backends.base import EmptyChangeset - if not isinstance(repo, BaseRepository): - raise Exception('You must pass an Repository ' - 'object as first argument got %s', type(repo)) - - try: - cs = repo.get_changeset(rev) - except (RepositoryError, LookupError): - cs = EmptyChangeset(requested_revision=rev) - return cs - - -def datetime_to_time(dt): - if dt: - return time.mktime(dt.timetuple()) - - -def time_to_datetime(tm): - if tm: - if isinstance(tm, basestring): - try: - tm = float(tm) - except ValueError: - return - return datetime.datetime.fromtimestamp(tm) - - -# Must match regexp in kallithea/public/js/base.js MentionsAutoComplete() -# Check char before @ - it must not look like we are in an email addresses. -# Matching is greedy so we don't have to look beyond the end. -MENTIONS_REGEX = re.compile(r'(?:^|(?<=[^a-zA-Z0-9]))@([a-zA-Z0-9][-_.a-zA-Z0-9]*[a-zA-Z0-9])') - -def extract_mentioned_usernames(text): - r""" - Returns list of (possible) usernames @mentioned in given text. - - >>> extract_mentioned_usernames('@1-2.a_X,@1234 not@not @ddd@not @n @ee @ff @gg, @gg;@hh @n\n@zz,') - ['1-2.a_X', '1234', 'ddd', 'ee', 'ff', 'gg', 'hh', 'zz'] - """ - return MENTIONS_REGEX.findall(text) - -def extract_mentioned_users(text): - """ Returns set of actual database Users @mentioned in given text. """ - from kallithea.model.db import User - result = set() - for name in extract_mentioned_usernames(text): - user = User.get_by_username(name, case_insensitive=True) - if user is not None and user.username != User.DEFAULT_USER: - result.add(user) - return result - - -class AttributeDict(dict): - def __getattr__(self, attr): - return self.get(attr, None) - __setattr__ = dict.__setitem__ - __delattr__ = dict.__delitem__ - - -def fix_PATH(os_=None): - """ - Get current active python path, and append it to PATH variable to fix issues - of subprocess calls and different python versions - """ - if os_ is None: - import os - else: - os = os_ - - cur_path = os.path.split(sys.executable)[0] - if not os.environ['PATH'].startswith(cur_path): - os.environ['PATH'] = '%s:%s' % (cur_path, os.environ['PATH']) - - -def obfuscate_url_pw(engine): - from sqlalchemy.engine import url as sa_url - from sqlalchemy.exc import ArgumentError - try: - _url = sa_url.make_url(engine or '') - except ArgumentError: - return engine - if _url.password: - _url.password = 'XXXXX' - return str(_url) - - -def get_server_url(environ): - req = webob.Request(environ) - return req.host_url + req.script_name - - -def _extract_extras(env=None): - """ - Extracts the Kallithea extras data from os.environ, and wraps it into named - AttributeDict object - """ - if not env: - env = os.environ - - try: - extras = json.loads(env['KALLITHEA_EXTRAS']) - except KeyError: - extras = {} - - try: - for k in ['username', 'repository', 'locked_by', 'scm', 'make_lock', - 'action', 'ip']: - extras[k] - except KeyError as e: - raise Exception('Missing key %s in os.environ %s' % (e, extras)) - - return AttributeDict(extras) - - -def _set_extras(extras): - # RC_SCM_DATA can probably be removed in the future, but for compatibility now... - os.environ['KALLITHEA_EXTRAS'] = os.environ['RC_SCM_DATA'] = json.dumps(extras) - - -def unique_id(hexlen=32): - alphabet = "23456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghjklmnpqrstuvwxyz" - return suuid(truncate_to=hexlen, alphabet=alphabet) - - -def suuid(url=None, truncate_to=22, alphabet=None): - """ - Generate and return a short URL safe UUID. - - If the url parameter is provided, set the namespace to the provided - URL and generate a UUID. - - :param url to get the uuid for - :truncate_to: truncate the basic 22 UUID to shorter version - - The IDs won't be universally unique any longer, but the probability of - a collision will still be very low. - """ - # Define our alphabet. - _ALPHABET = alphabet or "23456789ABCDEFGHJKLMNPQRSTUVWXYZ" - - # If no URL is given, generate a random UUID. - if url is None: - unique_id = uuid.uuid4().int - else: - unique_id = uuid.uuid3(uuid.NAMESPACE_URL, url).int - - alphabet_length = len(_ALPHABET) - output = [] - while unique_id > 0: - digit = unique_id % alphabet_length - output.append(_ALPHABET[digit]) - unique_id = int(unique_id / alphabet_length) - return "".join(output)[:truncate_to] - - -def get_current_authuser(): - """ - Gets kallithea user from threadlocal tmpl_context variable if it's - defined, else returns None. - """ - from pylons import tmpl_context - if hasattr(tmpl_context, 'authuser'): - return tmpl_context.authuser - - return None - - -class OptionalAttr(object): - """ - Special Optional Option that defines other attribute. Example:: - - def test(apiuser, userid=Optional(OAttr('apiuser')): - user = Optional.extract(userid) - # calls - - """ - - def __init__(self, attr_name): - self.attr_name = attr_name - - def __repr__(self): - return '' % self.attr_name - - def __call__(self): - return self - -#alias -OAttr = OptionalAttr - - -class Optional(object): - """ - Defines an optional parameter:: - - param = param.getval() if isinstance(param, Optional) else param - param = param() if isinstance(param, Optional) else param - - is equivalent of:: - - param = Optional.extract(param) - - """ - - def __init__(self, type_): - self.type_ = type_ - - def __repr__(self): - return '' % self.type_.__repr__() - - def __call__(self): - return self.getval() - - def getval(self): - """ - returns value from this Optional instance - """ - if isinstance(self.type_, OAttr): - # use params name - return self.type_.attr_name - return self.type_ - - @classmethod - def extract(cls, val): - """ - Extracts value from Optional() instance - - :param val: - :return: original value if it's not Optional instance else - value of instance - """ - if isinstance(val, cls): - return val.getval() - return val - -def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub): - return _cleanstringsub('_', safe_str(s)).rstrip('_') diff --git a/kallithea/lib/utils2.py b/kallithea/lib/utils2.py --- a/kallithea/lib/utils2.py +++ b/kallithea/lib/utils2.py @@ -43,60 +43,6 @@ from kallithea.lib.vcs.utils.lazy import from kallithea.lib.compat import json -def __get_lem(): - """ - Get language extension map based on what's inside pygments lexers - """ - from pygments import lexers - from string import lower - from collections import defaultdict - - d = defaultdict(lambda: []) - - def __clean(s): - s = s.lstrip('*') - s = s.lstrip('.') - - if s.find('[') != -1: - exts = [] - start, stop = s.find('['), s.find(']') - - for suffix in s[start + 1:stop]: - exts.append(s[:s.find('[')] + suffix) - return map(lower, exts) - else: - return map(lower, [s]) - - for lx, t in sorted(lexers.LEXERS.items()): - m = map(__clean, t[-2]) - if m: - m = reduce(lambda x, y: x + y, m) - for ext in m: - desc = lx.replace('Lexer', '') - d[ext].append(desc) - - return dict(d) - - -def __get_index_filenames(): - """ - Get list of known indexable filenames from pygment lexer internals - """ - from pygments import lexers - from itertools import ifilter - - filenames = [] - - def likely_filename(s): - return s.find('*') == -1 and s.find('[') == -1 - - for lx, t in sorted(lexers.LEXERS.items()): - for f in ifilter(likely_filename, t[-2]): - filenames.append(f) - - return filenames - - def str2bool(_str): """ returns True/False value from given string, it tries to translate the