Changeset - 9ff917d87291
[Not reviewed]
default
0 2 0
Mads Kiilerich - 8 years ago 2018-05-07 11:38:13
mads@kiilerich.com
utils: move clone URI validator function to more generic utils.is_valid_repo_uri

No changes to the functionality, even though the API and implementation could
use some clean-up ...
2 files changed with 39 insertions and 35 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/utils.py
Show inline comments
 
@@ -12,55 +12,57 @@
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.utils
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Utilities library for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 18, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import re
 
import logging
 
import datetime
 
import traceback
 
import beaker
 

	
 
from tg import request, response
 
from tg.i18n import ugettext as _
 
from webhelpers.text import collapse, remove_formatting, strip_tags
 
from beaker.cache import _cache_decorate
 

	
 
from kallithea.lib.vcs.utils.hgcompat import ui, config
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.exceptions import VCSError
 

	
 
from kallithea.lib.exceptions import HgsubversionImportError
 
from kallithea.model import meta
 
from kallithea.model.db import Repository, User, Ui, \
 
    UserLog, RepoGroup, Setting, UserGroup
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.lib.utils2 import safe_str, safe_unicode, get_current_authuser
 
from kallithea.lib.vcs.utils.fakemod import create_module
 

	
 
log = logging.getLogger(__name__)
 

	
 
REMOVED_REPO_PAT = re.compile(r'rm__\d{8}_\d{6}_\d{6}_.*')
 

	
 

	
 
def recursive_replace(str_, replace=' '):
 
    """
 
    Recursive replace of given sign to just one instance
 

	
 
    :param str_: given string
 
    :param replace: char to find and replace multiple instances
 

	
 
    Examples::
 
    >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
 
    'Mighty-Mighty-Bo-sstones'
 
    """
 

	
 
@@ -230,48 +232,82 @@ def get_filesystem_repos(path):
 

	
 
                if not os.access(cur_path, os.R_OK) or not os.access(cur_path, os.X_OK):
 
                    log.warning('ignoring repo path without access: %s', cur_path)
 
                    continue
 

	
 
                if not os.access(cur_path, os.W_OK):
 
                    log.warning('repo path without write access: %s', cur_path)
 

	
 
                try:
 
                    scm_info = get_scm(cur_path)
 
                    assert cur_path.startswith(path)
 
                    repo_path = cur_path[len(path) + 1:]
 
                    yield repo_path, scm_info
 
                    continue # no recursion
 
                except VCSError:
 
                    # We should perhaps ignore such broken repos, but especially
 
                    # the bare git detection is unreliable so we dive into it
 
                    pass
 

	
 
            recurse_dirs.append(subdir)
 

	
 
        dirs[:] = recurse_dirs
 

	
 

	
 
def is_valid_repo_uri(repo_type, url, ui):
 
    """Check if the url seems like a valid remote repo location - raise an Exception if any problems"""
 
    if repo_type == 'hg':
 
        from kallithea.lib.vcs.backends.hg.repository import MercurialRepository
 
        if url.startswith('http') or url.startswith('ssh'):
 
            # initially check if it's at least the proper URL
 
            # or does it pass basic auth
 
            MercurialRepository._check_url(url, ui)
 
        elif url.startswith('svn+http'):
 
            try:
 
                from hgsubversion.svnrepo import svnremoterepo
 
            except ImportError:
 
                raise HgsubversionImportError(_('Unable to activate hgsubversion support. '
 
                                                'The "hgsubversion" library is missing'))
 
            svnremoterepo(ui, url).svn.uuid
 
        elif url.startswith('git+http'):
 
            raise NotImplementedError()
 
        else:
 
            raise Exception('URI %s not allowed' % (url,))
 

	
 
    elif repo_type == 'git':
 
        from kallithea.lib.vcs.backends.git.repository import GitRepository
 
        if url.startswith('http') or url.startswith('git'):
 
            # initially check if it's at least the proper URL
 
            # or does it pass basic auth
 
            GitRepository._check_url(url)
 
        elif url.startswith('svn+http'):
 
            raise NotImplementedError()
 
        elif url.startswith('hg+http'):
 
            raise NotImplementedError()
 
        else:
 
            raise Exception('URI %s not allowed' % (url))
 

	
 

	
 
def is_valid_repo(repo_name, base_path, scm=None):
 
    """
 
    Returns True if given path is a valid repository False otherwise.
 
    If scm param is given also compare if given scm is the same as expected
 
    from scm parameter
 

	
 
    :param repo_name:
 
    :param base_path:
 
    :param scm:
 

	
 
    :return True: if given path is a valid repository
 
    """
 
    full_path = os.path.join(safe_str(base_path), safe_str(repo_name))
 

	
 
    try:
 
        scm_ = get_scm(full_path)
 
        if scm:
 
            return scm_[0] == scm
 
        return True
 
    except VCSError:
 
        return False
 

	
 

	
 
def is_valid_repo_group(repo_group_name, base_path, skip_path_check=False):
kallithea/model/validators.py
Show inline comments
 
@@ -10,52 +10,52 @@
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Set of generic validators
 
"""
 

	
 
import os
 
import re
 
import formencode
 
import logging
 
from collections import defaultdict
 
from tg.i18n import ugettext as _
 
from sqlalchemy import func
 
from webhelpers.pylonslib.secure_form import authentication_token
 
import sqlalchemy
 

	
 
from formencode.validators import (
 
    UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set,
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 
)
 
from kallithea.lib.compat import OrderedSet
 
from kallithea.lib import ipaddr
 
from kallithea.lib.utils import repo_name_slug
 
from kallithea.lib.utils import repo_name_slug, is_valid_repo_uri
 
from kallithea.lib.utils2 import str2bool, aslist
 
from kallithea.model.db import RepoGroup, Repository, UserGroup, User
 
from kallithea.lib.exceptions import LdapImportError, HgsubversionImportError
 
from kallithea.lib.exceptions import LdapImportError
 
from kallithea.config.routing import ADMIN_PREFIX
 
from kallithea.lib.auth import HasRepoGroupPermissionLevel, HasPermissionAny
 

	
 
# silence warnings and pylint
 
UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def UniqueListFromString():
 
    class _UniqueListFromString(formencode.FancyValidator):
 
        """
 
        Split value on ',' and make unique while preserving order
 
        """
 
        messages = dict(
 
            empty=_('Value cannot be an empty list'),
 
            missing_value=_('Value cannot be an empty list'),
 
        )
 

	
 
        def _to_python(self, value, state):
 
            value = aslist(value, ',')
 
            seen = set()
 
            return [c for c in value if not (c in seen or seen.add(c))]
 
@@ -393,94 +393,62 @@ def ValidRepoName(edit=False, old_data=N
 
                        )
 
            return value
 
    return _validator
 

	
 

	
 
def ValidForkName(*args, **kwargs):
 
    return ValidRepoName(*args, **kwargs)
 

	
 

	
 
def SlugifyName():
 
    class _validator(formencode.validators.FancyValidator):
 

	
 
        def _to_python(self, value, state):
 
            return repo_name_slug(value)
 

	
 
        def validate_python(self, value, state):
 
            pass
 

	
 
    return _validator
 

	
 

	
 
def ValidCloneUri():
 
    from kallithea.lib.utils import make_ui
 

	
 
    def url_handler(repo_type, url, ui):
 
        if repo_type == 'hg':
 
            from kallithea.lib.vcs.backends.hg.repository import MercurialRepository
 
            if url.startswith('http') or url.startswith('ssh'):
 
                # initially check if it's at least the proper URL
 
                # or does it pass basic auth
 
                MercurialRepository._check_url(url, ui)
 
            elif url.startswith('svn+http'):
 
                try:
 
                    from hgsubversion.svnrepo import svnremoterepo
 
                except ImportError:
 
                    raise HgsubversionImportError(_('Unable to activate hgsubversion support. '
 
                          'The "hgsubversion" library is missing'))
 
                svnremoterepo(ui, url).svn.uuid
 
            elif url.startswith('git+http'):
 
                raise NotImplementedError()
 
            else:
 
                raise Exception('clone from URI %s not allowed' % (url,))
 

	
 
        elif repo_type == 'git':
 
            from kallithea.lib.vcs.backends.git.repository import GitRepository
 
            if url.startswith('http') or url.startswith('git'):
 
                # initially check if it's at least the proper URL
 
                # or does it pass basic auth
 
                GitRepository._check_url(url)
 
            elif url.startswith('svn+http'):
 
                raise NotImplementedError()
 
            elif url.startswith('hg+http'):
 
                raise NotImplementedError()
 
            else:
 
                raise Exception('clone from URI %s not allowed' % (url))
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'clone_uri': _('Invalid repository URL'),
 
            'invalid_clone_uri': _('Invalid repository URL. It must be a '
 
                                   'valid http, https, ssh, svn+http or svn+https URL'),
 
        }
 

	
 
        def validate_python(self, value, state):
 
            repo_type = value.get('repo_type')
 
            url = value.get('clone_uri')
 

	
 
            if url and url != value.get('clone_uri_hidden'):
 
                try:
 
                    url_handler(repo_type, url, make_ui('db', clear_session=False))
 
                    is_valid_repo_uri(repo_type, url, make_ui('db', clear_session=False))
 
                except Exception:
 
                    log.exception('URL validation failed')
 
                    msg = self.message('clone_uri', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(clone_uri=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidForkType(old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_fork_type': _('Fork has to be the same type as parent')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if old_data['repo_type'] != value:
 
                msg = self.message('invalid_fork_type', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_type=msg)
 
                )
 
    return _validator
0 comments (0 inline, 0 general)