Changeset - f0e8d673f2a2
[Not reviewed]
default
0 2 0
Mads Kiilerich - 6 years ago 2019-08-03 23:31:43
mads@kiilerich.com
Grafted from: 73e81977a2c9
flake8: fix F632 use ==/!= to compare str, bytes, and int literals

- even if the values probably would have been interned ...
2 files changed with 3 insertions and 3 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/vcs/utils/helpers.py
Show inline comments
 
"""
 
Utilities aimed to help achieve mostly basic tasks.
 
"""
 
from __future__ import division
 

	
 
import datetime
 
import os
 
import re
 
import time
 

	
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError
 
from kallithea.lib.vcs.utils.paths import abspath
 

	
 

	
 
ALIASES = ['hg', 'git']
 

	
 

	
 
def get_scm(path, search_up=False, explicit_alias=None):
 
    """
 
    Returns one of alias from ``ALIASES`` (in order of precedence same as
 
    shortcuts given in ``ALIASES``) and top working dir path for the given
 
    argument. If no scm-specific directory is found or more than one scm is
 
    found at that directory, ``VCSError`` is raised.
 

	
 
    :param search_up: if set to ``True``, this function would try to
 
      move up to parent directory every time no scm is recognized for the
 
      currently checked path. Default: ``False``.
 
    :param explicit_alias: can be one of available backend aliases, when given
 
      it will return given explicit alias in repositories under more than one
 
      version control, if explicit_alias is different than found it will raise
 
      VCSError
 
    """
 
    if not os.path.isdir(path):
 
        raise VCSError("Given path %s is not a directory" % path)
 

	
 
    def get_scms(path):
 
        return [(scm, path) for scm in get_scms_for_path(path)]
 

	
 
    found_scms = get_scms(path)
 
    while not found_scms and search_up:
 
        newpath = abspath(path, '..')
 
        if newpath == path:
 
            break
 
        path = newpath
 
        found_scms = get_scms(path)
 

	
 
    if len(found_scms) > 1:
 
        for scm in found_scms:
 
            if scm[0] == explicit_alias:
 
                return scm
 
        raise VCSError('More than one [%s] scm found at given path %s'
 
                       % (', '.join((x[0] for x in found_scms)), path))
 

	
 
    if len(found_scms) is 0:
 
    if len(found_scms) == 0:
 
        raise VCSError('No scm found at given path %s' % path)
 

	
 
    return found_scms[0]
 

	
 

	
 
def get_scms_for_path(path):
 
    """
 
    Returns all scm's found at the given path. If no scm is recognized
 
    - empty list is returned.
 

	
 
    :param path: path to directory which should be checked. May be callable.
 

	
 
    :raises VCSError: if given ``path`` is not a directory
 
    """
 
    from kallithea.lib.vcs.backends import get_backend
 
    if hasattr(path, '__call__'):
 
        path = path()
 
    if not os.path.isdir(path):
 
        raise VCSError("Given path %r is not a directory" % path)
 

	
 
    result = []
 
    for key in ALIASES:
 
        # find .hg / .git
 
        dirname = os.path.join(path, '.' + key)
 
        if os.path.isdir(dirname):
 
            result.append(key)
 
            continue
 
        # find rm__.hg / rm__.git too - left overs from old method for deleting
 
        dirname = os.path.join(path, 'rm__.' + key)
 
        if os.path.isdir(dirname):
 
            return result
 
        # We still need to check if it's not bare repository as
 
        # bare repos don't have working directories
 
        try:
 
            get_backend(key)(path)
 
            result.append(key)
 
            continue
 
        except RepositoryError:
 
            # Wrong backend
 
            pass
 
        except VCSError:
 
            # No backend at all
 
            pass
 
    return result
 

	
 

	
 
def get_highlighted_code(name, code, type='terminal'):
 
    """
 
    If pygments are available on the system
 
    then returned output is colored. Otherwise
 
    unchanged content is returned.
 
    """
 
    import logging
 
    try:
 
        import pygments
 
        pygments
 
    except ImportError:
 
        return code
 
    from pygments import highlight
 
    from pygments.lexers import guess_lexer_for_filename, ClassNotFound
 
    from pygments.formatters import TerminalFormatter
 

	
 
    try:
 
        lexer = guess_lexer_for_filename(name, code)
 
        formatter = TerminalFormatter()
 
        content = highlight(code, lexer, formatter)
 
    except ClassNotFound:
 
        logging.debug("Couldn't guess Lexer, will not use pygments.")
 
        content = code
 
    return content
 

	
 

	
 
def parse_changesets(text):
 
    """
 
    Returns dictionary with *start*, *main* and *end* ids.
 

	
 
    Examples::
 

	
 
        >>> parse_changesets('aaabbb')
 
        {'start': None, 'main': 'aaabbb', 'end': None}
 
        >>> parse_changesets('aaabbb..cccddd')
 
        {'start': 'aaabbb', 'main': None, 'end': 'cccddd'}
 

	
 
    """
 
    text = text.strip()
 
    CID_RE = r'[a-zA-Z0-9]+'
 
    if '..' not in text:
 
        m = re.match(r'^(?P<cid>%s)$' % CID_RE, text)
 
        if m:
 
            return {
 
                'start': None,
 
                'main': text,
 
                'end': None,
 
            }
 
    else:
 
        RE = r'^(?P<start>%s)?\.{2,3}(?P<end>%s)?$' % (CID_RE, CID_RE)
kallithea/model/validators.py
Show inline comments
 
@@ -490,197 +490,197 @@ def CanWriteGroup(old_data=None):
 

	
 
    return _validator
 

	
 

	
 
def CanCreateGroup(can_create_in_root=False):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _("You don't have permissions "
 
                                   "to create a group in this location")
 
        }
 

	
 
        def to_python(self, value, state):
 
            # root location
 
            if value == -1:
 
                return None
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            if can_create_in_root and gr is None:
 
                # we can create in root, we're fine no validations required
 
                return
 

	
 
            forbidden_in_root = gr is None and not can_create_in_root
 
            forbidden = not HasRepoGroupPermissionLevel('admin')(gr_name, 'can create group validator')
 
            if forbidden_in_root or forbidden:
 
                msg = self.message('permission_denied', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(parent_group_id=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidPerms(type_='repo'):
 
    if type_ == 'repo_group':
 
        EMPTY_PERM = 'group.none'
 
    elif type_ == 'repo':
 
        EMPTY_PERM = 'repository.none'
 
    elif type_ == 'user_group':
 
        EMPTY_PERM = 'usergroup.none'
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'perm_new_member_name':
 
                _('This username or user group name is not valid')
 
        }
 

	
 
        def to_python(self, value, state):
 
            perms_update = OrderedSet()
 
            perms_new = OrderedSet()
 
            # build a list of permission to update and new permission to create
 

	
 
            # CLEAN OUT ORG VALUE FROM NEW MEMBERS, and group them using
 
            new_perms_group = defaultdict(dict)
 
            for k, v in value.copy().iteritems():
 
                if k.startswith('perm_new_member'):
 
                    del value[k]
 
                    _type, part = k.split('perm_new_member_')
 
                    args = part.split('_')
 
                    if len(args) == 1:
 
                        new_perms_group[args[0]]['perm'] = v
 
                    elif len(args) == 2:
 
                        _key, pos = args
 
                        new_perms_group[pos][_key] = v
 

	
 
            # fill new permissions in order of how they were added
 
            for k in sorted(map(int, new_perms_group.keys())):
 
                perm_dict = new_perms_group[str(k)]
 
                new_member = perm_dict.get('name')
 
                new_perm = perm_dict.get('perm')
 
                new_type = perm_dict.get('type')
 
                if new_member and new_perm and new_type:
 
                    perms_new.add((new_member, new_perm, new_type))
 

	
 
            for k, v in value.iteritems():
 
                if k.startswith('u_perm_') or k.startswith('g_perm_'):
 
                    member = k[7:]
 
                    t = {'u': 'user',
 
                         'g': 'users_group'
 
                    }[k[0]]
 
                    if member == User.DEFAULT_USER:
 
                        if str2bool(value.get('repo_private')):
 
                            # set none for default when updating to
 
                            # private repo protects against form manipulation
 
                            v = EMPTY_PERM
 
                    perms_update.add((member, v, t))
 

	
 
            value['perms_updates'] = list(perms_update)
 
            value['perms_new'] = list(perms_new)
 

	
 
            # update permissions
 
            for k, v, t in perms_new:
 
                try:
 
                    if t is 'user':
 
                    if t == 'user':
 
                        self.user_db = User.query() \
 
                            .filter(User.active == True) \
 
                            .filter(User.username == k).one()
 
                    if t is 'users_group':
 
                    if t == 'users_group':
 
                        self.user_db = UserGroup.query() \
 
                            .filter(UserGroup.users_group_active == True) \
 
                            .filter(UserGroup.users_group_name == k).one()
 

	
 
                except Exception:
 
                    log.exception('Updated permission failed')
 
                    msg = self.message('perm_new_member_type', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(perm_new_member_name=msg)
 
                    )
 
            return value
 
    return _validator
 

	
 

	
 
def ValidSettings():
 
    class _validator(formencode.validators.FancyValidator):
 
        def _convert_to_python(self, value, state):
 
            # settings  form for users that are not admin
 
            # can't edit certain parameters, it's extra backup if they mangle
 
            # with forms
 

	
 
            forbidden_params = [
 
                'user', 'repo_type',
 
                'repo_enable_downloads', 'repo_enable_statistics'
 
            ]
 

	
 
            for param in forbidden_params:
 
                if param in value:
 
                    del value[param]
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            pass
 
    return _validator
 

	
 

	
 
def ValidPath():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_path': _('This is not a valid path')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            if not os.path.isdir(value):
 
                msg = self.message('invalid_path', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(paths_root_path=msg)
 
                )
 
    return _validator
 

	
 

	
 
def UniqSystemEmail(old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'email_taken': _('This email address is already in use')
 
        }
 

	
 
        def _convert_to_python(self, value, state):
 
            return value.lower()
 

	
 
        def _validate_python(self, value, state):
 
            if (old_data.get('email') or '').lower() != value:
 
                user = User.get_by_email(value)
 
                if user is not None:
 
                    msg = self.message('email_taken', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(email=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidSystemEmail():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'non_existing_email': _('Email address "%(email)s" not found')
 
        }
 

	
 
        def _convert_to_python(self, value, state):
 
            return value.lower()
 

	
 
        def _validate_python(self, value, state):
 
            user = User.get_by_email(value)
 
            if user is None:
 
                msg = self.message('non_existing_email', state, email=value)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(email=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def LdapLibValidator():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
0 comments (0 inline, 0 general)