Changeset - bd720d050d68
[Not reviewed]
Bradley M. Kuhn - 11 years ago 2014-07-03 01:05:36
bkuhn@sfconservancy.org
Rename RC_SCM_DATA to KALLITHEA_EXTRAS
1 file changed with 2 insertions and 2 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/utils2.py
Show inline comments
 
@@ -426,326 +426,326 @@ def age(prevdate, show_short_version=Fal
 
        if i < 5:
 
            sub_part = order[i + 1]
 
            sub_value = deltas[sub_part]
 
        else:
 
            sub_value = 0
 

	
 
        if sub_value == 0 or show_short_version:
 
            if future:
 
                return _(u'in %s') % fmt_funcs[part](value)
 
            else:
 
                return _(u'%s ago') % fmt_funcs[part](value)
 
        if future:
 
            return _(u'in %s and %s') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 
        else:
 
            return _(u'%s and %s ago') % (fmt_funcs[part](value),
 
                fmt_funcs[sub_part](sub_value))
 

	
 
    return _(u'just now')
 

	
 

	
 
def uri_filter(uri):
 
    """
 
    Removes user:password from given url string
 

	
 
    :param uri:
 
    :rtype: unicode
 
    :returns: filtered list of strings
 
    """
 
    if not uri:
 
        return ''
 

	
 
    proto = ''
 

	
 
    for pat in ('https://', 'http://'):
 
        if uri.startswith(pat):
 
            uri = uri[len(pat):]
 
            proto = pat
 
            break
 

	
 
    # remove passwords and username
 
    uri = uri[uri.find('@') + 1:]
 

	
 
    # get the port
 
    cred_pos = uri.find(':')
 
    if cred_pos == -1:
 
        host, port = uri, None
 
    else:
 
        host, port = uri[:cred_pos], uri[cred_pos + 1:]
 

	
 
    return filter(None, [proto, host, port])
 

	
 

	
 
def credentials_filter(uri):
 
    """
 
    Returns a url with removed credentials
 

	
 
    :param uri:
 
    """
 

	
 
    uri = uri_filter(uri)
 
    #check if we have port
 
    if len(uri) > 2 and uri[2]:
 
        uri[2] = ':' + uri[2]
 

	
 
    return ''.join(uri)
 

	
 

	
 
def get_clone_url(uri_tmpl, qualifed_home_url, repo_name, repo_id, **override):
 
    parsed_url = urlobject.URLObject(qualifed_home_url)
 
    decoded_path = safe_unicode(urllib.unquote(parsed_url.path.rstrip('/')))
 
    args = {
 
        'scheme': parsed_url.scheme,
 
        'user': '',
 
        'netloc': parsed_url.netloc+decoded_path,  # path if we use proxy-prefix
 
        'prefix': decoded_path,
 
        'repo': repo_name,
 
        'repoid': str(repo_id)
 
    }
 
    args.update(override)
 
    args['user'] = urllib.quote(safe_str(args['user']))
 

	
 
    for k, v in args.items():
 
        uri_tmpl = uri_tmpl.replace('{%s}' % k, v)
 

	
 
    # remove leading @ sign if it's present. Case of empty user
 
    url_obj = urlobject.URLObject(uri_tmpl)
 
    url = url_obj.with_netloc(url_obj.netloc.lstrip('@'))
 

	
 
    return safe_unicode(url)
 

	
 

	
 
def get_changeset_safe(repo, rev):
 
    """
 
    Safe version of get_changeset if this changeset doesn't exists for a
 
    repo it returns a Dummy one instead
 

	
 
    :param repo:
 
    :param rev:
 
    """
 
    from kallithea.lib.vcs.backends.base import BaseRepository
 
    from kallithea.lib.vcs.exceptions import RepositoryError
 
    from kallithea.lib.vcs.backends.base import EmptyChangeset
 
    if not isinstance(repo, BaseRepository):
 
        raise Exception('You must pass an Repository '
 
                        'object as first argument got %s', type(repo))
 

	
 
    try:
 
        cs = repo.get_changeset(rev)
 
    except (RepositoryError, LookupError):
 
        cs = EmptyChangeset(requested_revision=rev)
 
    return cs
 

	
 

	
 
def datetime_to_time(dt):
 
    if dt:
 
        return time.mktime(dt.timetuple())
 

	
 

	
 
def time_to_datetime(tm):
 
    if tm:
 
        if isinstance(tm, basestring):
 
            try:
 
                tm = float(tm)
 
            except ValueError:
 
                return
 
        return datetime.datetime.fromtimestamp(tm)
 

	
 
MENTIONS_REGEX = r'(?:^@|\s@)([a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+)(?:\s{1})'
 

	
 

	
 
def extract_mentioned_users(s):
 
    """
 
    Returns unique usernames from given string s that have @mention
 

	
 
    :param s: string to get mentions
 
    """
 
    usrs = set()
 
    for username in re.findall(MENTIONS_REGEX, s):
 
        usrs.add(username)
 

	
 
    return sorted(list(usrs), key=lambda k: k.lower())
 

	
 

	
 
class AttributeDict(dict):
 
    def __getattr__(self, attr):
 
        return self.get(attr, None)
 
    __setattr__ = dict.__setitem__
 
    __delattr__ = dict.__delitem__
 

	
 

	
 
def fix_PATH(os_=None):
 
    """
 
    Get current active python path, and append it to PATH variable to fix issues
 
    of subprocess calls and different python versions
 
    """
 
    if os_ is None:
 
        import os
 
    else:
 
        os = os_
 

	
 
    cur_path = os.path.split(sys.executable)[0]
 
    if not os.environ['PATH'].startswith(cur_path):
 
        os.environ['PATH'] = '%s:%s' % (cur_path, os.environ['PATH'])
 

	
 

	
 
def obfuscate_url_pw(engine):
 
    _url = engine or ''
 
    from sqlalchemy.engine import url as sa_url
 
    try:
 
        _url = sa_url.make_url(engine)
 
        if _url.password:
 
            _url.password = 'XXXXX'
 
    except Exception:
 
        pass
 
    return str(_url)
 

	
 

	
 
def get_server_url(environ):
 
    req = webob.Request(environ)
 
    return req.host_url + req.script_name
 

	
 

	
 
def _extract_extras(env=None):
 
    """
 
    Extracts the rc extras data from os.environ, and wraps it into named
 
    AttributeDict object
 
    """
 
    if not env:
 
        env = os.environ
 

	
 
    try:
 
        rc_extras = json.loads(env['RC_SCM_DATA'])
 
        rc_extras = json.loads(env['KALLITHEA_EXTRAS'])
 
    except Exception:
 
        print os.environ
 
        print >> sys.stderr, traceback.format_exc()
 
        rc_extras = {}
 

	
 
    try:
 
        for k in ['username', 'repository', 'locked_by', 'scm', 'make_lock',
 
                  'action', 'ip']:
 
            rc_extras[k]
 
    except KeyError, e:
 
        raise Exception('Missing key %s in os.environ %s' % (e, rc_extras))
 

	
 
    return AttributeDict(rc_extras)
 

	
 

	
 
def _set_extras(extras):
 
    os.environ['RC_SCM_DATA'] = json.dumps(extras)
 
    os.environ['KALLITHEA_EXTRAS'] = json.dumps(extras)
 

	
 

	
 
def unique_id(hexlen=32):
 
    alphabet = "23456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghjklmnpqrstuvwxyz"
 
    return suuid(truncate_to=hexlen, alphabet=alphabet)
 

	
 

	
 
def suuid(url=None, truncate_to=22, alphabet=None):
 
    """
 
    Generate and return a short URL safe UUID.
 

	
 
    If the url parameter is provided, set the namespace to the provided
 
    URL and generate a UUID.
 

	
 
    :param url to get the uuid for
 
    :truncate_to: truncate the basic 22 UUID to shorter version
 

	
 
    The IDs won't be universally unique any longer, but the probability of
 
    a collision will still be very low.
 
    """
 
    # Define our alphabet.
 
    _ALPHABET = alphabet or "23456789ABCDEFGHJKLMNPQRSTUVWXYZ"
 

	
 
    # If no URL is given, generate a random UUID.
 
    if url is None:
 
        unique_id = uuid.uuid4().int
 
    else:
 
        unique_id = uuid.uuid3(uuid.NAMESPACE_URL, url).int
 

	
 
    alphabet_length = len(_ALPHABET)
 
    output = []
 
    while unique_id > 0:
 
        digit = unique_id % alphabet_length
 
        output.append(_ALPHABET[digit])
 
        unique_id = int(unique_id / alphabet_length)
 
    return "".join(output)[:truncate_to]
 

	
 

	
 
def get_current_authuser():
 
    """
 
    Gets rhodecode user from threadlocal tmpl_context variable if it's
 
    defined, else returns None.
 
    """
 
    from pylons import tmpl_context
 
    if hasattr(tmpl_context, 'authuser'):
 
        return tmpl_context.authuser
 

	
 
    return None
 

	
 

	
 
class OptionalAttr(object):
 
    """
 
    Special Optional Option that defines other attribute. Example::
 

	
 
        def test(apiuser, userid=Optional(OAttr('apiuser')):
 
            user = Optional.extract(userid)
 
            # calls
 

	
 
    """
 

	
 
    def __init__(self, attr_name):
 
        self.attr_name = attr_name
 

	
 
    def __repr__(self):
 
        return '<OptionalAttr:%s>' % self.attr_name
 

	
 
    def __call__(self):
 
        return self
 

	
 
#alias
 
OAttr = OptionalAttr
 

	
 

	
 
class Optional(object):
 
    """
 
    Defines an optional parameter::
 

	
 
        param = param.getval() if isinstance(param, Optional) else param
 
        param = param() if isinstance(param, Optional) else param
 

	
 
    is equivalent of::
 

	
 
        param = Optional.extract(param)
 

	
 
    """
 

	
 
    def __init__(self, type_):
 
        self.type_ = type_
 

	
 
    def __repr__(self):
 
        return '<Optional:%s>' % self.type_.__repr__()
 

	
 
    def __call__(self):
 
        return self.getval()
 

	
 
    def getval(self):
 
        """
 
        returns value from this Optional instance
 
        """
 
        if isinstance(self.type_, OAttr):
 
            # use params name
 
            return self.type_.attr_name
 
        return self.type_
 

	
 
    @classmethod
 
    def extract(cls, val):
 
        """
 
        Extracts value from Optional() instance
 

	
 
        :param val:
 
        :return: original value if it's not Optional instance else
 
            value of instance
 
        """
 
        if isinstance(val, cls):
 
            return val.getval()
 
        return val
0 comments (0 inline, 0 general)