diff --git a/kallithea/config/conf.py b/kallithea/config/conf.py
--- a/kallithea/config/conf.py
+++ b/kallithea/config/conf.py
@@ -25,12 +25,12 @@ Original author and date, and relevant c
:license: GPLv3, see LICENSE.md for more details.
"""
-from kallithea.lib.utils2 import __get_lem, __get_index_filenames
+from kallithea.lib import pygmentsutils
# language map is also used by whoosh indexer, which for those specified
# extensions will index it's content
-LANGUAGES_EXTENSIONS_MAP = __get_lem()
+LANGUAGES_EXTENSIONS_MAP = pygmentsutils.get_lem()
# Whoosh index targets
@@ -38,7 +38,7 @@ LANGUAGES_EXTENSIONS_MAP = __get_lem()
INDEX_EXTENSIONS = LANGUAGES_EXTENSIONS_MAP.keys()
# Filenames we want to index content of using whoosh
-INDEX_FILENAMES = __get_index_filenames()
+INDEX_FILENAMES = pygmentsutils.get_index_filenames()
# list of readme files to search in file tree and display in summary
# attached weights defines the search order lower is first
diff --git a/kallithea/lib/utils2.py b/kallithea/lib/pygmentsutils.py
copy from kallithea/lib/utils2.py
copy to kallithea/lib/pygmentsutils.py
--- a/kallithea/lib/utils2.py
+++ b/kallithea/lib/pygmentsutils.py
@@ -12,10 +12,10 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
"""
-kallithea.lib.utils2
-~~~~~~~~~~~~~~~~~~~~
+kallithea.lib.pygmentsutils
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
-Some simple helper functions
+Functions for extracting internal Pygments data.
This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
@@ -25,32 +25,17 @@ Original author and date, and relevant c
:license: GPLv3, see LICENSE.md for more details.
"""
+from collections import defaultdict
+from itertools import ifilter
+from string import lower
-import os
-import re
-import sys
-import time
-import uuid
-import datetime
-import urllib
-import binascii
-
-import webob
-import urlobject
-
-from pylons.i18n.translation import _, ungettext
-from kallithea.lib.vcs.utils.lazy import LazyProperty
-from kallithea.lib.compat import json
+from pygments import lexers
-def __get_lem():
+def get_lem():
"""
Get language extension map based on what's inside pygments lexers
"""
- from pygments import lexers
- from string import lower
- from collections import defaultdict
-
d = defaultdict(lambda: [])
def __clean(s):
@@ -78,12 +63,10 @@ def __get_lem():
return dict(d)
-def __get_index_filenames():
+def get_index_filenames():
"""
Get list of known indexable filenames from pygment lexer internals
"""
- from pygments import lexers
- from itertools import ifilter
filenames = []
@@ -95,690 +78,3 @@ def __get_index_filenames():
filenames.append(f)
return filenames
-
-
-def str2bool(_str):
- """
- returns True/False value from given string, it tries to translate the
- string into boolean
-
- :param _str: string value to translate into boolean
- :rtype: boolean
- :returns: boolean from given string
- """
- if _str is None:
- return False
- if _str in (True, False):
- return _str
- _str = str(_str).strip().lower()
- return _str in ('t', 'true', 'y', 'yes', 'on', '1')
-
-
-def aslist(obj, sep=None, strip=True):
- """
- Returns given string separated by sep as list
-
- :param obj:
- :param sep:
- :param strip:
- """
- if isinstance(obj, (basestring)):
- lst = obj.split(sep)
- if strip:
- lst = [v.strip() for v in lst]
- return lst
- elif isinstance(obj, (list, tuple)):
- return obj
- elif obj is None:
- return []
- else:
- return [obj]
-
-
-def convert_line_endings(line, mode):
- """
- Converts a given line "line end" according to given mode
-
- Available modes are::
- 0 - Unix
- 1 - Mac
- 2 - DOS
-
- :param line: given line to convert
- :param mode: mode to convert to
- :rtype: str
- :return: converted line according to mode
- """
- from string import replace
-
- if mode == 0:
- line = replace(line, '\r\n', '\n')
- line = replace(line, '\r', '\n')
- elif mode == 1:
- line = replace(line, '\r\n', '\r')
- line = replace(line, '\n', '\r')
- elif mode == 2:
- line = re.sub("\r(?!\n)|(?>>>> STARTING QUERY >>>>>"))
-
- def after_cursor_execute(conn, cursor, statement,
- parameters, context, executemany):
- total = time.time() - context._query_start_time
- log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total))
-
- event.listen(engine, "before_cursor_execute",
- before_cursor_execute)
- event.listen(engine, "after_cursor_execute",
- after_cursor_execute)
-
- return engine
-
-
-def age(prevdate, show_short_version=False, now=None):
- """
- turns a datetime into an age string.
- If show_short_version is True, then it will generate a not so accurate but shorter string,
- example: 2days ago, instead of 2 days and 23 hours ago.
-
- :param prevdate: datetime object
- :param show_short_version: if it should approximate the date and return a shorter string
- :rtype: unicode
- :returns: unicode words describing age
- """
- now = now or datetime.datetime.now()
- order = ['year', 'month', 'day', 'hour', 'minute', 'second']
- deltas = {}
- future = False
-
- if prevdate > now:
- now, prevdate = prevdate, now
- future = True
- if future:
- prevdate = prevdate.replace(microsecond=0)
- # Get date parts deltas
- from dateutil import relativedelta
- for part in order:
- d = relativedelta.relativedelta(now, prevdate)
- deltas[part] = getattr(d, part + 's')
-
- # Fix negative offsets (there is 1 second between 10:59:59 and 11:00:00,
- # not 1 hour, -59 minutes and -59 seconds)
- for num, length in [(5, 60), (4, 60), (3, 24)]: # seconds, minutes, hours
- part = order[num]
- carry_part = order[num - 1]
-
- if deltas[part] < 0:
- deltas[part] += length
- deltas[carry_part] -= 1
-
- # Same thing for days except that the increment depends on the (variable)
- # number of days in the month
- month_lengths = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
- if deltas['day'] < 0:
- if prevdate.month == 2 and (prevdate.year % 4 == 0 and
- (prevdate.year % 100 != 0 or prevdate.year % 400 == 0)):
- deltas['day'] += 29
- else:
- deltas['day'] += month_lengths[prevdate.month - 1]
-
- deltas['month'] -= 1
-
- if deltas['month'] < 0:
- deltas['month'] += 12
- deltas['year'] -= 1
-
- # In short version, we want nicer handling of ages of more than a year
- if show_short_version:
- if deltas['year'] == 1:
- # ages between 1 and 2 years: show as months
- deltas['month'] += 12
- deltas['year'] = 0
- if deltas['year'] >= 2:
- # ages 2+ years: round
- if deltas['month'] > 6:
- deltas['year'] += 1
- deltas['month'] = 0
-
- # Format the result
- fmt_funcs = {
- 'year': lambda d: ungettext(u'%d year', '%d years', d) % d,
- 'month': lambda d: ungettext(u'%d month', '%d months', d) % d,
- 'day': lambda d: ungettext(u'%d day', '%d days', d) % d,
- 'hour': lambda d: ungettext(u'%d hour', '%d hours', d) % d,
- 'minute': lambda d: ungettext(u'%d minute', '%d minutes', d) % d,
- 'second': lambda d: ungettext(u'%d second', '%d seconds', d) % d,
- }
-
- for i, part in enumerate(order):
- value = deltas[part]
- if value == 0:
- continue
-
- if i < 5:
- sub_part = order[i + 1]
- sub_value = deltas[sub_part]
- else:
- sub_value = 0
-
- if sub_value == 0 or show_short_version:
- if future:
- return _('in %s') % fmt_funcs[part](value)
- else:
- return _('%s ago') % fmt_funcs[part](value)
- if future:
- return _('in %s and %s') % (fmt_funcs[part](value),
- fmt_funcs[sub_part](sub_value))
- else:
- return _('%s and %s ago') % (fmt_funcs[part](value),
- fmt_funcs[sub_part](sub_value))
-
- return _('just now')
-
-
-def uri_filter(uri):
- """
- Removes user:password from given url string
-
- :param uri:
- :rtype: unicode
- :returns: filtered list of strings
- """
- if not uri:
- return ''
-
- proto = ''
-
- for pat in ('https://', 'http://', 'git://'):
- if uri.startswith(pat):
- uri = uri[len(pat):]
- proto = pat
- break
-
- # remove passwords and username
- uri = uri[uri.find('@') + 1:]
-
- # get the port
- cred_pos = uri.find(':')
- if cred_pos == -1:
- host, port = uri, None
- else:
- host, port = uri[:cred_pos], uri[cred_pos + 1:]
-
- return filter(None, [proto, host, port])
-
-
-def credentials_filter(uri):
- """
- Returns a url with removed credentials
-
- :param uri:
- """
-
- uri = uri_filter(uri)
- #check if we have port
- if len(uri) > 2 and uri[2]:
- uri[2] = ':' + uri[2]
-
- return ''.join(uri)
-
-
-def get_clone_url(uri_tmpl, qualified_home_url, repo_name, repo_id, **override):
- parsed_url = urlobject.URLObject(qualified_home_url)
- decoded_path = safe_unicode(urllib.unquote(parsed_url.path.rstrip('/')))
- args = {
- 'scheme': parsed_url.scheme,
- 'user': '',
- 'netloc': parsed_url.netloc+decoded_path, # path if we use proxy-prefix
- 'prefix': decoded_path,
- 'repo': repo_name,
- 'repoid': str(repo_id)
- }
- args.update(override)
- args['user'] = urllib.quote(safe_str(args['user']))
-
- for k, v in args.items():
- uri_tmpl = uri_tmpl.replace('{%s}' % k, v)
-
- # remove leading @ sign if it's present. Case of empty user
- url_obj = urlobject.URLObject(uri_tmpl)
- url = url_obj.with_netloc(url_obj.netloc.lstrip('@'))
-
- return safe_unicode(url)
-
-
-def get_changeset_safe(repo, rev):
- """
- Safe version of get_changeset if this changeset doesn't exists for a
- repo it returns a Dummy one instead
-
- :param repo:
- :param rev:
- """
- from kallithea.lib.vcs.backends.base import BaseRepository
- from kallithea.lib.vcs.exceptions import RepositoryError
- from kallithea.lib.vcs.backends.base import EmptyChangeset
- if not isinstance(repo, BaseRepository):
- raise Exception('You must pass an Repository '
- 'object as first argument got %s', type(repo))
-
- try:
- cs = repo.get_changeset(rev)
- except (RepositoryError, LookupError):
- cs = EmptyChangeset(requested_revision=rev)
- return cs
-
-
-def datetime_to_time(dt):
- if dt:
- return time.mktime(dt.timetuple())
-
-
-def time_to_datetime(tm):
- if tm:
- if isinstance(tm, basestring):
- try:
- tm = float(tm)
- except ValueError:
- return
- return datetime.datetime.fromtimestamp(tm)
-
-
-# Must match regexp in kallithea/public/js/base.js MentionsAutoComplete()
-# Check char before @ - it must not look like we are in an email addresses.
-# Matching is greedy so we don't have to look beyond the end.
-MENTIONS_REGEX = re.compile(r'(?:^|(?<=[^a-zA-Z0-9]))@([a-zA-Z0-9][-_.a-zA-Z0-9]*[a-zA-Z0-9])')
-
-def extract_mentioned_usernames(text):
- r"""
- Returns list of (possible) usernames @mentioned in given text.
-
- >>> extract_mentioned_usernames('@1-2.a_X,@1234 not@not @ddd@not @n @ee @ff @gg, @gg;@hh @n\n@zz,')
- ['1-2.a_X', '1234', 'ddd', 'ee', 'ff', 'gg', 'hh', 'zz']
- """
- return MENTIONS_REGEX.findall(text)
-
-def extract_mentioned_users(text):
- """ Returns set of actual database Users @mentioned in given text. """
- from kallithea.model.db import User
- result = set()
- for name in extract_mentioned_usernames(text):
- user = User.get_by_username(name, case_insensitive=True)
- if user is not None and user.username != User.DEFAULT_USER:
- result.add(user)
- return result
-
-
-class AttributeDict(dict):
- def __getattr__(self, attr):
- return self.get(attr, None)
- __setattr__ = dict.__setitem__
- __delattr__ = dict.__delitem__
-
-
-def fix_PATH(os_=None):
- """
- Get current active python path, and append it to PATH variable to fix issues
- of subprocess calls and different python versions
- """
- if os_ is None:
- import os
- else:
- os = os_
-
- cur_path = os.path.split(sys.executable)[0]
- if not os.environ['PATH'].startswith(cur_path):
- os.environ['PATH'] = '%s:%s' % (cur_path, os.environ['PATH'])
-
-
-def obfuscate_url_pw(engine):
- from sqlalchemy.engine import url as sa_url
- from sqlalchemy.exc import ArgumentError
- try:
- _url = sa_url.make_url(engine or '')
- except ArgumentError:
- return engine
- if _url.password:
- _url.password = 'XXXXX'
- return str(_url)
-
-
-def get_server_url(environ):
- req = webob.Request(environ)
- return req.host_url + req.script_name
-
-
-def _extract_extras(env=None):
- """
- Extracts the Kallithea extras data from os.environ, and wraps it into named
- AttributeDict object
- """
- if not env:
- env = os.environ
-
- try:
- extras = json.loads(env['KALLITHEA_EXTRAS'])
- except KeyError:
- extras = {}
-
- try:
- for k in ['username', 'repository', 'locked_by', 'scm', 'make_lock',
- 'action', 'ip']:
- extras[k]
- except KeyError as e:
- raise Exception('Missing key %s in os.environ %s' % (e, extras))
-
- return AttributeDict(extras)
-
-
-def _set_extras(extras):
- # RC_SCM_DATA can probably be removed in the future, but for compatibility now...
- os.environ['KALLITHEA_EXTRAS'] = os.environ['RC_SCM_DATA'] = json.dumps(extras)
-
-
-def unique_id(hexlen=32):
- alphabet = "23456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghjklmnpqrstuvwxyz"
- return suuid(truncate_to=hexlen, alphabet=alphabet)
-
-
-def suuid(url=None, truncate_to=22, alphabet=None):
- """
- Generate and return a short URL safe UUID.
-
- If the url parameter is provided, set the namespace to the provided
- URL and generate a UUID.
-
- :param url to get the uuid for
- :truncate_to: truncate the basic 22 UUID to shorter version
-
- The IDs won't be universally unique any longer, but the probability of
- a collision will still be very low.
- """
- # Define our alphabet.
- _ALPHABET = alphabet or "23456789ABCDEFGHJKLMNPQRSTUVWXYZ"
-
- # If no URL is given, generate a random UUID.
- if url is None:
- unique_id = uuid.uuid4().int
- else:
- unique_id = uuid.uuid3(uuid.NAMESPACE_URL, url).int
-
- alphabet_length = len(_ALPHABET)
- output = []
- while unique_id > 0:
- digit = unique_id % alphabet_length
- output.append(_ALPHABET[digit])
- unique_id = int(unique_id / alphabet_length)
- return "".join(output)[:truncate_to]
-
-
-def get_current_authuser():
- """
- Gets kallithea user from threadlocal tmpl_context variable if it's
- defined, else returns None.
- """
- from pylons import tmpl_context
- if hasattr(tmpl_context, 'authuser'):
- return tmpl_context.authuser
-
- return None
-
-
-class OptionalAttr(object):
- """
- Special Optional Option that defines other attribute. Example::
-
- def test(apiuser, userid=Optional(OAttr('apiuser')):
- user = Optional.extract(userid)
- # calls
-
- """
-
- def __init__(self, attr_name):
- self.attr_name = attr_name
-
- def __repr__(self):
- return '' % self.attr_name
-
- def __call__(self):
- return self
-
-#alias
-OAttr = OptionalAttr
-
-
-class Optional(object):
- """
- Defines an optional parameter::
-
- param = param.getval() if isinstance(param, Optional) else param
- param = param() if isinstance(param, Optional) else param
-
- is equivalent of::
-
- param = Optional.extract(param)
-
- """
-
- def __init__(self, type_):
- self.type_ = type_
-
- def __repr__(self):
- return '' % self.type_.__repr__()
-
- def __call__(self):
- return self.getval()
-
- def getval(self):
- """
- returns value from this Optional instance
- """
- if isinstance(self.type_, OAttr):
- # use params name
- return self.type_.attr_name
- return self.type_
-
- @classmethod
- def extract(cls, val):
- """
- Extracts value from Optional() instance
-
- :param val:
- :return: original value if it's not Optional instance else
- value of instance
- """
- if isinstance(val, cls):
- return val.getval()
- return val
-
-def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub):
- return _cleanstringsub('_', safe_str(s)).rstrip('_')
diff --git a/kallithea/lib/utils2.py b/kallithea/lib/utils2.py
--- a/kallithea/lib/utils2.py
+++ b/kallithea/lib/utils2.py
@@ -43,60 +43,6 @@ from kallithea.lib.vcs.utils.lazy import
from kallithea.lib.compat import json
-def __get_lem():
- """
- Get language extension map based on what's inside pygments lexers
- """
- from pygments import lexers
- from string import lower
- from collections import defaultdict
-
- d = defaultdict(lambda: [])
-
- def __clean(s):
- s = s.lstrip('*')
- s = s.lstrip('.')
-
- if s.find('[') != -1:
- exts = []
- start, stop = s.find('['), s.find(']')
-
- for suffix in s[start + 1:stop]:
- exts.append(s[:s.find('[')] + suffix)
- return map(lower, exts)
- else:
- return map(lower, [s])
-
- for lx, t in sorted(lexers.LEXERS.items()):
- m = map(__clean, t[-2])
- if m:
- m = reduce(lambda x, y: x + y, m)
- for ext in m:
- desc = lx.replace('Lexer', '')
- d[ext].append(desc)
-
- return dict(d)
-
-
-def __get_index_filenames():
- """
- Get list of known indexable filenames from pygment lexer internals
- """
- from pygments import lexers
- from itertools import ifilter
-
- filenames = []
-
- def likely_filename(s):
- return s.find('*') == -1 and s.find('[') == -1
-
- for lx, t in sorted(lexers.LEXERS.items()):
- for f in ifilter(likely_filename, t[-2]):
- filenames.append(f)
-
- return filenames
-
-
def str2bool(_str):
"""
returns True/False value from given string, it tries to translate the