Changeset - 13879d044762
[Not reviewed]
default
0 1 0
domruf - 9 years ago 2017-05-10 20:13:24
dominikruf@gmail.com
validators: wrap hgsubversion import with try/except block like in settings.py

Found by pylint.
1 file changed with 5 insertions and 1 deletions:
0 comments (0 inline, 0 general)
kallithea/model/validators.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Set of generic validators
 
"""
 

	
 
import os
 
import re
 
import formencode
 
import logging
 
from collections import defaultdict
 
from tg.i18n import ugettext as _
 
from sqlalchemy import func
 
from webhelpers.pylonslib.secure_form import authentication_token
 
import sqlalchemy
 

	
 
from formencode.validators import (
 
    UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set,
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 
)
 
from kallithea.lib.compat import OrderedSet
 
from kallithea.lib import ipaddr
 
from kallithea.lib.utils import repo_name_slug
 
from kallithea.lib.utils2 import str2bool, aslist
 
from kallithea.model.db import RepoGroup, Repository, UserGroup, User
 
from kallithea.lib.exceptions import LdapImportError
 
from kallithea.lib.exceptions import LdapImportError, HgsubversionImportError
 
from kallithea.config.routing import ADMIN_PREFIX
 
from kallithea.lib.auth import HasRepoGroupPermissionLevel, HasPermissionAny
 

	
 
# silence warnings and pylint
 
UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def UniqueListFromString():
 
    class _UniqueListFromString(formencode.FancyValidator):
 
        """
 
        Split value on ',' and make unique while preserving order
 
        """
 
        messages = dict(
 
            empty=_('Value cannot be an empty list'),
 
            missing_value=_('Value cannot be an empty list'),
 
        )
 

	
 
        def _to_python(self, value, state):
 
            value = aslist(value, ',')
 
            seen = set()
 
            return [c for c in value if not (c in seen or seen.add(c))]
 

	
 
        def empty_value(self, value):
 
            return []
 

	
 
    return _UniqueListFromString
 

	
 

	
 
def ValidUsername(edit=False, old_data=None):
 
    old_data = old_data or {}
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'username_exists': _('Username "%(username)s" already exists'),
 
            'system_invalid_username':
 
                _('Username "%(username)s" cannot be used'),
 
            'invalid_username':
 
                _('Username may only contain alphanumeric characters '
 
                  'underscores, periods or dashes and must begin with an '
 
                  'alphanumeric character or underscore')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if value in ['default', 'new_user']:
 
                msg = self.message('system_invalid_username', state, username=value)
 
                raise formencode.Invalid(msg, value, state)
 
            #check if user is unique
 
            old_un = None
 
            if edit:
 
                old_un = User.get(old_data.get('user_id')).username
 

	
 
            if old_un != value or not edit:
 
                if User.get_by_username(value, case_insensitive=True):
 
                    msg = self.message('username_exists', state, username=value)
 
                    raise formencode.Invalid(msg, value, state)
 

	
 
            if re.match(r'^[a-zA-Z0-9\_]{1}[a-zA-Z0-9\-\_\.]*$', value) is None:
 
                msg = self.message('invalid_username', state)
 
                raise formencode.Invalid(msg, value, state)
 
    return _validator
 

	
 

	
 
def ValidRegex(msg=None):
 
    class _validator(formencode.validators.Regex):
 
        messages = dict(invalid=msg or _('The input is not valid'))
 
    return _validator
 

	
 

	
 
def ValidRepoUser():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_username': _('Username %(username)s is not valid')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            try:
 
                User.query().filter(User.active == True) \
 
                    .filter(User.username == value).one()
 
            except sqlalchemy.exc.InvalidRequestError: # NoResultFound/MultipleResultsFound
 
                msg = self.message('invalid_username', state, username=value)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(username=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidUserGroup(edit=False, old_data=None):
 
    old_data = old_data or {}
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_group': _('Invalid user group name'),
 
            'group_exist': _('User group "%(usergroup)s" already exists'),
 
            'invalid_usergroup_name':
 
@@ -327,193 +327,197 @@ def ValidRepoName(edit=False, old_data=N
 
            'repository_exists':
 
                _('Repository named %(repo)s already exists'),
 
            'repository_in_group_exists': _('Repository "%(repo)s" already '
 
                                            'exists in group "%(group)s"'),
 
            'same_group_exists': _('Repository group with name "%(repo)s" '
 
                                   'already exists')
 
        }
 

	
 
        def _to_python(self, value, state):
 
            repo_name = repo_name_slug(value.get('repo_name', ''))
 
            repo_group = value.get('repo_group')
 
            if repo_group:
 
                gr = RepoGroup.get(repo_group)
 
                group_path = gr.full_path
 
                group_name = gr.group_name
 
                # value needs to be aware of group name in order to check
 
                # db key This is an actual just the name to store in the
 
                # database
 
                repo_name_full = group_path + RepoGroup.url_sep() + repo_name
 
            else:
 
                group_name = group_path = ''
 
                repo_name_full = repo_name
 

	
 
            value['repo_name'] = repo_name
 
            value['repo_name_full'] = repo_name_full
 
            value['group_path'] = group_path
 
            value['group_name'] = group_name
 
            return value
 

	
 
        def validate_python(self, value, state):
 
            repo_name = value.get('repo_name')
 
            repo_name_full = value.get('repo_name_full')
 
            group_path = value.get('group_path')
 
            group_name = value.get('group_name')
 

	
 
            if repo_name in [ADMIN_PREFIX, '']:
 
                msg = self.message('invalid_repo_name', state, repo=repo_name)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_name=msg)
 
                )
 

	
 
            rename = old_data.get('repo_name') != repo_name_full
 
            create = not edit
 
            if rename or create:
 
                repo = Repository.get_by_repo_name(repo_name_full, case_insensitive=True)
 
                repo_group = RepoGroup.get_by_group_name(repo_name_full, case_insensitive=True)
 
                if group_path != '':
 
                    if repo is not None:
 
                        msg = self.message('repository_in_group_exists', state,
 
                                repo=repo.repo_name, group=group_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 
                elif repo_group is not None:
 
                        msg = self.message('same_group_exists', state,
 
                                repo=repo_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 
                elif repo is not None:
 
                        msg = self.message('repository_exists', state,
 
                                repo=repo.repo_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 
            return value
 
    return _validator
 

	
 

	
 
def ValidForkName(*args, **kwargs):
 
    return ValidRepoName(*args, **kwargs)
 

	
 

	
 
def SlugifyName():
 
    class _validator(formencode.validators.FancyValidator):
 

	
 
        def _to_python(self, value, state):
 
            return repo_name_slug(value)
 

	
 
        def validate_python(self, value, state):
 
            pass
 

	
 
    return _validator
 

	
 

	
 
def ValidCloneUri():
 
    from kallithea.lib.utils import make_ui
 

	
 
    def url_handler(repo_type, url, ui):
 
        if repo_type == 'hg':
 
            from kallithea.lib.vcs.backends.hg.repository import MercurialRepository
 
            if url.startswith('http') or url.startswith('ssh'):
 
                # initially check if it's at least the proper URL
 
                # or does it pass basic auth
 
                MercurialRepository._check_url(url, ui)
 
            elif url.startswith('svn+http'):
 
                try:
 
                from hgsubversion.svnrepo import svnremoterepo
 
                except ImportError:
 
                    raise HgsubversionImportError(_('Unable to activate hgsubversion support. '
 
                          'The "hgsubversion" library is missing'))
 
                svnremoterepo(ui, url).svn.uuid
 
            elif url.startswith('git+http'):
 
                raise NotImplementedError()
 
            else:
 
                raise Exception('clone from URI %s not allowed' % (url,))
 

	
 
        elif repo_type == 'git':
 
            from kallithea.lib.vcs.backends.git.repository import GitRepository
 
            if url.startswith('http') or url.startswith('git'):
 
                # initially check if it's at least the proper URL
 
                # or does it pass basic auth
 
                GitRepository._check_url(url)
 
            elif url.startswith('svn+http'):
 
                raise NotImplementedError()
 
            elif url.startswith('hg+http'):
 
                raise NotImplementedError()
 
            else:
 
                raise Exception('clone from URI %s not allowed' % (url))
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'clone_uri': _('Invalid repository URL'),
 
            'invalid_clone_uri': _('Invalid repository URL. It must be a '
 
                                   'valid http, https, ssh, svn+http or svn+https URL'),
 
        }
 

	
 
        def validate_python(self, value, state):
 
            repo_type = value.get('repo_type')
 
            url = value.get('clone_uri')
 

	
 
            if url and url != value.get('clone_uri_hidden'):
 
                try:
 
                    url_handler(repo_type, url, make_ui('db', clear_session=False))
 
                except Exception:
 
                    log.exception('URL validation failed')
 
                    msg = self.message('clone_uri', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(clone_uri=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidForkType(old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_fork_type': _('Fork has to be the same type as parent')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if old_data['repo_type'] != value:
 
                msg = self.message('invalid_fork_type', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_type=msg)
 
                )
 
    return _validator
 

	
 

	
 
def CanWriteGroup(old_data=None):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _("You don't have permissions "
 
                                   "to create repository in this group"),
 
            'permission_denied_root': _("no permission to create repository "
 
                                        "in root location")
 
        }
 

	
 
        def _to_python(self, value, state):
 
            #root location
 
            if value == -1:
 
                return None
 
            return value
 

	
 
        def validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            # create repositories with write permission on group is set to true
 
            create_on_write = HasPermissionAny('hg.create.write_on_repogroup.true')()
 
            group_admin = HasRepoGroupPermissionLevel('admin')(gr_name,
 
                                            'can write into group validator')
 
            group_write = HasRepoGroupPermissionLevel('write')(gr_name,
 
                                            'can write into group validator')
 
            forbidden = not (group_admin or (group_write and create_on_write))
 
            can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository')
 
            gid = (old_data['repo_group'].get('group_id')
 
                   if (old_data and 'repo_group' in old_data) else None)
 
            value_changed = gid != value
 
            new = not old_data
 
            # do check if we changed the value, there's a case that someone got
 
            # revoked write permissions to a repository, he still created, we
 
            # don't need to check permission if he didn't change the value of
 
            # groups in form box
 
            if value_changed or new:
 
                #parent group need to be existing
0 comments (0 inline, 0 general)