Changeset - d9421a78a534
[Not reviewed]
default
0 3 0
Mads Kiilerich - 6 years ago 2019-08-06 22:50:03
mads@kiilerich.com
helpers: drop unused references to secure_form
3 files changed with 0 insertions and 22 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/auth.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.auth
 
~~~~~~~~~~~~~~~~~~
 

	
 
authentication and permission libraries
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 4, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 
import os
 
import logging
 
import traceback
 
import hashlib
 
import itertools
 
import collections
 

	
 
from decorator import decorator
 

	
 
from tg import request, session
 
from tg.i18n import ugettext as _
 
from webhelpers.pylonslib import secure_form
 
from sqlalchemy.orm.exc import ObjectDeletedError
 
from sqlalchemy.orm import joinedload
 
from webob.exc import HTTPFound, HTTPBadRequest, HTTPForbidden, HTTPMethodNotAllowed
 

	
 
from kallithea import __platform__, is_windows, is_unix
 
from kallithea.config.routing import url
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.model.meta import Session
 
from kallithea.model.user import UserModel
 
from kallithea.model.db import User, Repository, Permission, \
 
    UserToPerm, UserGroupRepoToPerm, UserGroupToPerm, UserGroupMember, \
 
    RepoGroup, UserGroupRepoGroupToPerm, UserIpMap, UserGroupUserGroupToPerm, \
 
    UserGroup, UserApiKeys
 

	
 
from kallithea.lib.utils2 import safe_str, safe_unicode, aslist
 
from kallithea.lib.utils import get_repo_slug, get_repo_group_slug, \
 
    get_user_group_slug, conditional_cache
 
from kallithea.lib.caching_query import FromCache
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class PasswordGenerator(object):
 
    """
 
    This is a simple class for generating password from different sets of
 
    characters
 
    usage::
 

	
 
        passwd_gen = PasswordGenerator()
 
        #print 8-letter password containing only big and small letters
 
            of alphabet
 
        passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL)
 
    """
 
    ALPHABETS_NUM = r'''1234567890'''
 
    ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''
 
    ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''
 
    ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?'''
 
    ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL \
 
        + ALPHABETS_NUM + ALPHABETS_SPECIAL
 
    ALPHABETS_ALPHANUM = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM
 
    ALPHABETS_BIG_SMALL = ALPHABETS_BIG + ALPHABETS_SMALL
 
    ALPHABETS_ALPHANUM_BIG = ALPHABETS_BIG + ALPHABETS_NUM
 
    ALPHABETS_ALPHANUM_SMALL = ALPHABETS_SMALL + ALPHABETS_NUM
 

	
 
    def gen_password(self, length, alphabet=ALPHABETS_FULL):
 
        assert len(alphabet) <= 256, alphabet
 
        l = []
 
        while len(l) < length:
 
            i = ord(os.urandom(1))
 
            if i < len(alphabet):
 
                l.append(alphabet[i])
 
        return ''.join(l)
 

	
 

	
 
def get_crypt_password(password):
 
    """
 
    Cryptographic function used for password hashing based on pybcrypt
 
    or Python's own OpenSSL wrapper on windows
 

	
 
    :param password: password to hash
 
    """
 
    if is_windows:
 
        return hashlib.sha256(password).hexdigest()
 
    elif is_unix:
 
        import bcrypt
 
        return bcrypt.hashpw(safe_str(password), bcrypt.gensalt(10))
 
    else:
 
        raise Exception('Unknown or unsupported platform %s'
 
                        % __platform__)
 

	
 

	
 
def check_password(password, hashed):
 
    """
 
    Checks matching password with it's hashed value, runs different
 
    implementation based on platform it runs on
 

	
 
    :param password: password
 
    :param hashed: password in hashed form
 
    """
 

	
 
    if is_windows:
 
        return hashlib.sha256(password).hexdigest() == hashed
 
    elif is_unix:
 
        import bcrypt
 
        try:
 
            return bcrypt.checkpw(safe_str(password), safe_str(hashed))
 
        except ValueError as e:
 
            # bcrypt will throw ValueError 'Invalid hashed_password salt' on all password errors
 
            log.error('error from bcrypt checking password: %s', e)
 
            return False
 
    else:
 
        raise Exception('Unknown or unsupported platform %s'
 
                        % __platform__)
 

	
 

	
 
def _cached_perms_data(user_id, user_is_admin):
 
    RK = 'repositories'
 
    GK = 'repositories_groups'
 
    UK = 'user_groups'
 
    GLOBAL = 'global'
 
    PERM_WEIGHTS = Permission.PERM_WEIGHTS
 
    permissions = {RK: {}, GK: {}, UK: {}, GLOBAL: set()}
 

	
 
    def bump_permission(kind, key, new_perm):
 
        """Add a new permission for kind and key.
 
        Assuming the permissions are comparable, set the new permission if it
 
        has higher weight, else drop it and keep the old permission.
 
        """
 
        cur_perm = permissions[kind][key]
 
        new_perm_val = PERM_WEIGHTS[new_perm]
 
        cur_perm_val = PERM_WEIGHTS[cur_perm]
 
        if new_perm_val > cur_perm_val:
 
            permissions[kind][key] = new_perm
 

	
 
    #======================================================================
 
    # fetch default permissions
 
    #======================================================================
 
    default_user = User.get_by_username('default', cache=True)
 
    default_user_id = default_user.user_id
 

	
 
    default_repo_perms = Permission.get_default_perms(default_user_id)
 
    default_repo_groups_perms = Permission.get_default_group_perms(default_user_id)
 
    default_user_group_perms = Permission.get_default_user_group_perms(default_user_id)
 

	
 
    if user_is_admin:
 
        #==================================================================
 
        # admin users have all rights;
 
        # based on default permissions, just set everything to admin
 
        #==================================================================
 
        permissions[GLOBAL].add('hg.admin')
 
        permissions[GLOBAL].add('hg.create.write_on_repogroup.true')
 

	
 
        # repositories
 
        for perm in default_repo_perms:
 
            r_k = perm.UserRepoToPerm.repository.repo_name
 
            p = 'repository.admin'
 
            permissions[RK][r_k] = p
 

	
 
        # repository groups
 
        for perm in default_repo_groups_perms:
 
            rg_k = perm.UserRepoGroupToPerm.group.group_name
 
            p = 'group.admin'
 
            permissions[GK][rg_k] = p
 

	
 
        # user groups
 
        for perm in default_user_group_perms:
 
            u_k = perm.UserUserGroupToPerm.user_group.users_group_name
 
            p = 'usergroup.admin'
 
            permissions[UK][u_k] = p
 
        return permissions
 

	
 
    #==================================================================
 
    # SET DEFAULTS GLOBAL, REPOS, REPOSITORY GROUPS
 
    #==================================================================
 

	
 
    # default global permissions taken from the default user
 
    default_global_perms = UserToPerm.query() \
 
        .filter(UserToPerm.user_id == default_user_id) \
 
        .options(joinedload(UserToPerm.permission))
 

	
 
    for perm in default_global_perms:
 
        permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
    # defaults for repositories, taken from default user
 
    for perm in default_repo_perms:
 
        r_k = perm.UserRepoToPerm.repository.repo_name
 
        if perm.Repository.owner_id == user_id:
 
            p = 'repository.admin'
 
        elif perm.Repository.private:
 
            p = 'repository.none'
 
        else:
 
            p = perm.Permission.permission_name
 
        permissions[RK][r_k] = p
 

	
 
    # defaults for repository groups taken from default user permission
 
    # on given group
 
    for perm in default_repo_groups_perms:
 
        rg_k = perm.UserRepoGroupToPerm.group.group_name
 
        p = perm.Permission.permission_name
 
        permissions[GK][rg_k] = p
 

	
 
    # defaults for user groups taken from default user permission
 
    # on given user group
 
    for perm in default_user_group_perms:
 
        u_k = perm.UserUserGroupToPerm.user_group.users_group_name
 
        p = perm.Permission.permission_name
 
        permissions[UK][u_k] = p
 

	
 
    #======================================================================
 
    # !! Augment GLOBALS with user permissions if any found !!
 
    #======================================================================
kallithea/model/validators.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Set of generic validators
 
"""
 

	
 
import os
 
import re
 
import formencode
 
import logging
 
from collections import defaultdict
 
from tg.i18n import ugettext as _
 
from sqlalchemy import func
 
from webhelpers.pylonslib.secure_form import authentication_token
 
import sqlalchemy
 

	
 
from formencode.validators import (
 
    UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set,
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 
)
 
from kallithea.lib.compat import OrderedSet
 
from kallithea.lib import ipaddr
 
from kallithea.lib.utils import is_valid_repo_uri
 
from kallithea.lib.utils2 import str2bool, aslist, repo_name_slug
 
from kallithea.model.db import RepoGroup, Repository, UserGroup, User
 
from kallithea.lib.exceptions import LdapImportError
 
from kallithea.config.routing import ADMIN_PREFIX
 
from kallithea.lib.auth import HasRepoGroupPermissionLevel, HasPermissionAny
 

	
 
# silence warnings and pylint
 
UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def UniqueListFromString():
 
    class _UniqueListFromString(formencode.FancyValidator):
 
        """
 
        Split value on ',' and make unique while preserving order
 
        """
 
        messages = dict(
 
            empty=_('Value cannot be an empty list'),
 
            missing_value=_('Value cannot be an empty list'),
 
        )
 

	
 
        def _convert_to_python(self, value, state):
 
            value = aslist(value, ',')
 
            seen = set()
 
            return [c for c in value if not (c in seen or seen.add(c))]
 

	
 
        def empty_value(self, value):
 
            return []
 

	
 
    return _UniqueListFromString
 

	
 

	
 
def ValidUsername(edit=False, old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'username_exists': _('Username "%(username)s" already exists'),
 
            'system_invalid_username':
 
                _('Username "%(username)s" cannot be used'),
 
            'invalid_username':
 
                _('Username may only contain alphanumeric characters '
 
                  'underscores, periods or dashes and must begin with an '
 
                  'alphanumeric character or underscore')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            if value in ['default', 'new_user']:
 
                msg = self.message('system_invalid_username', state, username=value)
 
                raise formencode.Invalid(msg, value, state)
 
            # check if user is unique
 
            old_un = None
 
            if edit:
 
                old_un = User.get(old_data.get('user_id')).username
 

	
 
            if old_un != value or not edit:
 
                if User.get_by_username(value, case_insensitive=True):
 
                    msg = self.message('username_exists', state, username=value)
 
                    raise formencode.Invalid(msg, value, state)
 

	
 
            if re.match(r'^[a-zA-Z0-9\_]{1}[a-zA-Z0-9\-\_\.]*$', value) is None:
 
                msg = self.message('invalid_username', state)
 
                raise formencode.Invalid(msg, value, state)
 
    return _validator
 

	
 

	
 
def ValidRegex(msg=None):
 
    class _validator(formencode.validators.Regex):
 
        messages = dict(invalid=msg or _('The input is not valid'))
 
    return _validator
 

	
 

	
 
def ValidRepoUser():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_username': _('Username %(username)s is not valid')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            try:
 
                User.query().filter(User.active == True) \
 
                    .filter(User.username == value).one()
 
            except sqlalchemy.exc.InvalidRequestError: # NoResultFound/MultipleResultsFound
 
                msg = self.message('invalid_username', state, username=value)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(username=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidUserGroup(edit=False, old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_group': _('Invalid user group name'),
 
            'group_exist': _('User group "%(usergroup)s" already exists'),
 
            'invalid_usergroup_name':
 
                _('user group name may only contain alphanumeric '
 
                  'characters underscores, periods or dashes and must begin '
 
                  'with alphanumeric character')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            if value in ['default']:
 
                msg = self.message('invalid_group', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(users_group_name=msg)
 
                )
 
            # check if group is unique
 
            old_ugname = None
 
            if edit:
 
                old_id = old_data.get('users_group_id')
 
                old_ugname = UserGroup.get(old_id).users_group_name
 

	
 
            if old_ugname != value or not edit:
 
                is_existing_group = UserGroup.get_by_group_name(value,
 
                                                        case_insensitive=True)
 
                if is_existing_group:
 
                    msg = self.message('group_exist', state, usergroup=value)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(users_group_name=msg)
 
                    )
 

	
 
            if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
 
                msg = self.message('invalid_usergroup_name', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(users_group_name=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidRepoGroup(edit=False, old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'parent_group_id': _('Cannot assign this group as parent'),
 
            'group_exists': _('Group "%(group_name)s" already exists'),
 
            'repo_exists':
 
                _('Repository with name "%(group_name)s" already exists')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            # TODO WRITE VALIDATIONS
 
            group_name = value.get('group_name')
 
            parent_group_id = value.get('parent_group_id')
 

	
 
            # slugify repo group just in case :)
 
            slug = repo_name_slug(group_name)
 

	
 
            # check for parent of self
 
            parent_of_self = lambda: (
 
                old_data['group_id'] == parent_group_id
 
                if parent_group_id else False
 
            )
 
            if edit and parent_of_self():
 
                msg = self.message('parent_group_id', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(parent_group_id=msg)
 
                )
 

	
 
            old_gname = None
 
            if edit:
 
                old_gname = RepoGroup.get(old_data.get('group_id')).group_name
 

	
 
            if old_gname != group_name or not edit:
 

	
 
                # check group
 
                gr = RepoGroup.query() \
 
                      .filter(func.lower(RepoGroup.group_name) == func.lower(slug)) \
 
                      .filter(RepoGroup.parent_group_id == parent_group_id) \
 
                      .scalar()
 
                if gr is not None:
 
                    msg = self.message('group_exists', state, group_name=slug)
 
                    raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(group_name=msg)
 
                    )
 

	
 
                # check for same repo
 
                repo = Repository.query() \
 
                      .filter(func.lower(Repository.repo_name) == func.lower(slug)) \
 
                      .scalar()
 
                if repo is not None:
 
                    msg = self.message('repo_exists', state, group_name=slug)
 
                    raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(group_name=msg)
 
                    )
 

	
 
    return _validator
 

	
 

	
 
def ValidPassword():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_password':
 
                _('Invalid characters (non-ascii) in password')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            try:
 
                (value or '').decode('ascii')
 
            except UnicodeError:
 
                msg = self.message('invalid_password', state)
 
                raise formencode.Invalid(msg, value, state,)
 
    return _validator
 

	
 

	
 
def ValidOldPassword(username):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_password': _('Invalid old password')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            from kallithea.lib import auth_modules
 
            if auth_modules.authenticate(username, value, '') is None:
 
                msg = self.message('invalid_password', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(current_password=msg)
 
                )
 
    return _validator
 

	
 

	
 
def ValidPasswordsMatch(password_field, password_confirmation_field):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'password_mismatch': _('Passwords do not match'),
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            if value.get(password_field) != value[password_confirmation_field]:
 
                msg = self.message('password_mismatch', state)
 
                raise formencode.Invalid(msg, value, state,
 
                     error_dict={password_field:msg, password_confirmation_field: msg}
 
                )
 
    return _validator
 

	
 

	
 
def ValidAuth():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_auth': _(u'Invalid username or password'),
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            from kallithea.lib import auth_modules
 

	
 
            password = value['password']
 
            username = value['username']
 

	
 
            # authenticate returns unused dict but has called
 
            # plugin._authenticate which has create_or_update'ed the username user in db
 
            if auth_modules.authenticate(username, password) is None:
 
                user = User.get_by_username_or_email(username)
 
                if user and not user.active:
 
                    log.warning('user %s is disabled', username)
 
                    msg = self.message('invalid_auth', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(username=' ', password=msg)
 
                    )
 
                else:
 
                    log.warning('user %s failed to authenticate', username)
 
                    msg = self.message('invalid_auth', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(username=' ', password=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidAuthToken():
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_token': _('Token mismatch')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            if value != authentication_token():
 
                msg = self.message('invalid_token', state)
 
                raise formencode.Invalid(msg, value, state)
 
    return _validator
 

	
 

	
 
def ValidRepoName(edit=False, old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_repo_name':
 
                _('Repository name %(repo)s is not allowed'),
 
            'repository_exists':
 
                _('Repository named %(repo)s already exists'),
 
            'repository_in_group_exists': _('Repository "%(repo)s" already '
 
                                            'exists in group "%(group)s"'),
 
            'same_group_exists': _('Repository group with name "%(repo)s" '
 
                                   'already exists')
 
        }
 

	
 
        def _convert_to_python(self, value, state):
 
            repo_name = repo_name_slug(value.get('repo_name', ''))
 
            repo_group = value.get('repo_group')
 
            if repo_group:
 
                gr = RepoGroup.get(repo_group)
 
                group_path = gr.full_path
 
                group_name = gr.group_name
 
                # value needs to be aware of group name in order to check
 
                # db key This is an actual just the name to store in the
 
                # database
 
                repo_name_full = group_path + RepoGroup.url_sep() + repo_name
 
            else:
 
                group_name = group_path = ''
 
                repo_name_full = repo_name
 

	
 
            value['repo_name'] = repo_name
 
            value['repo_name_full'] = repo_name_full
 
            value['group_path'] = group_path
 
            value['group_name'] = group_name
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            repo_name = value.get('repo_name')
 
            repo_name_full = value.get('repo_name_full')
 
            group_path = value.get('group_path')
 
            group_name = value.get('group_name')
 

	
 
            if repo_name in [ADMIN_PREFIX, '']:
 
                msg = self.message('invalid_repo_name', state, repo=repo_name)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_name=msg)
 
                )
 

	
 
            rename = old_data.get('repo_name') != repo_name_full
 
            create = not edit
 
            if rename or create:
 
                repo = Repository.get_by_repo_name(repo_name_full, case_insensitive=True)
 
                repo_group = RepoGroup.get_by_group_name(repo_name_full, case_insensitive=True)
 
                if group_path != '':
 
                    if repo is not None:
 
                        msg = self.message('repository_in_group_exists', state,
 
                                repo=repo.repo_name, group=group_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 
                elif repo_group is not None:
 
                        msg = self.message('same_group_exists', state,
 
                                repo=repo_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 
                elif repo is not None:
 
                        msg = self.message('repository_exists', state,
 
                                repo=repo.repo_name)
 
                        raise formencode.Invalid(msg, value, state,
 
                            error_dict=dict(repo_name=msg)
 
                        )
 
            return value
 
    return _validator
 

	
 

	
 
def ValidForkName(*args, **kwargs):
 
    return ValidRepoName(*args, **kwargs)
 

	
 

	
 
def SlugifyName():
 
    class _validator(formencode.validators.FancyValidator):
 

	
 
        def _convert_to_python(self, value, state):
 
            return repo_name_slug(value)
 

	
 
        def _validate_python(self, value, state):
 
            pass
 

	
 
    return _validator
 

	
 

	
 
def ValidCloneUri():
 
    from kallithea.lib.utils import make_ui
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'clone_uri': _('Invalid repository URL'),
 
            'invalid_clone_uri': _('Invalid repository URL. It must be a '
 
                                   'valid http, https, ssh, svn+http or svn+https URL'),
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            repo_type = value.get('repo_type')
 
            url = value.get('clone_uri')
 

	
 
            if url and url != value.get('clone_uri_hidden'):
 
                try:
 
                    is_valid_repo_uri(repo_type, url, make_ui(clear_session=False))
 
                except Exception:
 
                    log.exception('URL validation failed')
 
                    msg = self.message('clone_uri', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(clone_uri=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidForkType(old_data=None):
 
    old_data = old_data or {}
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_fork_type': _('Fork has to be the same type as parent')
 
        }
 

	
 
        def _validate_python(self, value, state):
 
            if old_data['repo_type'] != value:
 
                msg = self.message('invalid_fork_type', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_type=msg)
 
                )
 
    return _validator
 

	
 

	
 
def CanWriteGroup(old_data=None):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _("You don't have permissions "
 
                                   "to create repository in this group"),
 
            'permission_denied_root': _("no permission to create repository "
 
                                        "in root location")
 
        }
 

	
 
        def _convert_to_python(self, value, state):
 
            # root location
 
            if value == -1:
 
                return None
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            # create repositories with write permission on group is set to true
 
            create_on_write = HasPermissionAny('hg.create.write_on_repogroup.true')()
 
            group_admin = HasRepoGroupPermissionLevel('admin')(gr_name,
 
                                            'can write into group validator')
 
            group_write = HasRepoGroupPermissionLevel('write')(gr_name,
 
                                            'can write into group validator')
 
            forbidden = not (group_admin or (group_write and create_on_write))
 
            can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository')
 
            gid = (old_data['repo_group'].get('group_id')
 
                   if (old_data and 'repo_group' in old_data) else None)
 
            value_changed = gid != value
 
            new = not old_data
 
            # do check if we changed the value, there's a case that someone got
 
            # revoked write permissions to a repository, he still created, we
 
            # don't need to check permission if he didn't change the value of
 
            # groups in form box
 
            if value_changed or new:
 
                # parent group need to be existing
 
                if gr and forbidden:
 
                    msg = self.message('permission_denied', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_type=msg)
 
                    )
 
                ## check if we can write to root location !
 
                elif gr is None and not can_create_repos():
 
                    msg = self.message('permission_denied_root', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_type=msg)
 
                    )
 

	
 
    return _validator
 

	
 

	
 
def CanCreateGroup(can_create_in_root=False):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _("You don't have permissions "
 
                                   "to create a group in this location")
kallithea/tests/other/test_validators.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
import formencode
 
import pytest
 

	
 
from kallithea.tests.base import *
 

	
 
from kallithea.model import validators as v
 
from kallithea.model.user_group import UserGroupModel
 

	
 
from kallithea.model.meta import Session
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.tests.fixture import Fixture
 

	
 
fixture = Fixture()
 

	
 

	
 
@pytest.mark.usefixtures("test_context_fixture") # apply fixture for all test methods
 
class TestRepoGroups(TestController):
 

	
 
    def teardown_method(self, method):
 
        Session.remove()
 

	
 
    def test_Message_extractor(self):
 
        validator = v.ValidUsername()
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python('default')
 

	
 
        class StateObj(object):
 
            pass
 

	
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python('default', StateObj)
 

	
 
    def test_ValidUsername(self):
 
        validator = v.ValidUsername()
 

	
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python('default')
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python('new_user')
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python('.,')
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python(TEST_USER_ADMIN_LOGIN)
 
        assert 'test' == validator.to_python('test')
 

	
 
        validator = v.ValidUsername(edit=True, old_data={'user_id': 1})
 

	
 
    def test_ValidRepoUser(self):
 
        validator = v.ValidRepoUser()
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python('nouser')
 
        assert TEST_USER_ADMIN_LOGIN == validator.to_python(TEST_USER_ADMIN_LOGIN)
 

	
 
    def test_ValidUserGroup(self):
 
        validator = v.ValidUserGroup()
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python(u'default')
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python(u'.,')
 

	
 
        gr = fixture.create_user_group(u'test')
 
        gr2 = fixture.create_user_group(u'tes2')
 
        Session().commit()
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python(u'test')
 
        assert gr.users_group_id is not None
 
        validator = v.ValidUserGroup(edit=True,
 
                                    old_data={'users_group_id':
 
                                              gr2.users_group_id})
 

	
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python(u'test')
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python(u'TesT')
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python(u'TEST')
 
        UserGroupModel().delete(gr)
 
        UserGroupModel().delete(gr2)
 
        Session().commit()
 

	
 
    def test_ValidRepoGroup(self):
 
        validator = v.ValidRepoGroup()
 
        model = RepoGroupModel()
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python({'group_name': HG_REPO, })
 
        gr = model.create(group_name=u'test_gr', group_description=u'desc',
 
                          parent=None,
 
                          just_db=True,
 
                          owner=TEST_USER_ADMIN_LOGIN)
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python({'group_name': gr.group_name, })
 

	
 
        validator = v.ValidRepoGroup(edit=True,
 
                                      old_data={'group_id':  gr.group_id})
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python({
 
                                        'group_name': gr.group_name + 'n',
 
                                        'parent_group_id': gr.group_id
 
                                        })
 
        model.delete(gr)
 

	
 
    def test_ValidPassword(self):
 
        validator = v.ValidPassword()
 
        assert 'lol' == validator.to_python('lol')
 
        assert validator.to_python(None) is None
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python('ąćżź')
 

	
 
    def test_ValidPasswordsMatch(self):
 
        validator = v.ValidPasswordsMatch('new_password', 'password_confirmation')
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python({'new_password': 'pass',
 
                                          'password_confirmation': 'pass2'})
 

	
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python({'new_password': 'pass',
 
                                          'password_confirmation': 'pass2'})
 

	
 
        assert {'new_password': 'pass',
 
                          'password_confirmation': 'pass'} == validator.to_python({'new_password': 'pass',
 
                                         'password_confirmation': 'pass'})
 

	
 
        assert {'new_password': 'pass',
 
                          'password_confirmation': 'pass'} == validator.to_python({'new_password': 'pass',
 
                                         'password_confirmation': 'pass'})
 

	
 
    def test_ValidAuth(self):
 
        validator = v.ValidAuth()
 
        valid_creds = {
 
            'username': TEST_USER_REGULAR2_LOGIN,
 
            'password': TEST_USER_REGULAR2_PASS,
 
        }
 
        invalid_creds = {
 
            'username': 'err',
 
            'password': 'err',
 
        }
 
        assert valid_creds == validator.to_python(valid_creds)
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python(invalid_creds)
 

	
 
    def test_ValidAuthToken(self):
 
        validator = v.ValidAuthToken()
 
        # this is untestable without a threadlocal
 
#        self.assertRaises(formencode.Invalid,
 
#                          validator.to_python, 'BadToken')
 
        validator
 

	
 
    def test_ValidRepoName(self):
 
        validator = v.ValidRepoName()
 

	
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python({'repo_name': ''})
 

	
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python({'repo_name': HG_REPO})
 

	
 
        gr = RepoGroupModel().create(group_name=u'group_test',
 
                                      group_description=u'desc',
 
                                      parent=None,
 
                                      owner=TEST_USER_ADMIN_LOGIN)
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python({'repo_name': gr.group_name})
 

	
 
        # TODO: write an error case for that ie. create a repo withinh a group
 
#        self.assertRaises(formencode.Invalid,
 
#                          validator.to_python, {'repo_name': 'some',
 
#                                                'repo_group': gr.group_id})
 

	
 
    def test_ValidForkName(self):
 
        # this uses ValidRepoName validator
 
        assert True
 

	
 
    @parametrize('name,expected', [
 
        ('test', 'test'), ('lolz!', 'lolz'), ('  aavv', 'aavv'),
 
        ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'),
 
        ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'),
 
        ('/]re po', 're-po')])
 
    def test_SlugifyName(self, name, expected):
 
        validator = v.SlugifyName()
 
        assert expected == validator.to_python(name)
 

	
 
    def test_ValidCloneUri(self):
 
            # TODO: write this one
 
            pass
 

	
 
    def test_ValidForkType(self):
 
            validator = v.ValidForkType(old_data={'repo_type': 'hg'})
 
            assert 'hg' == validator.to_python('hg')
 
            with pytest.raises(formencode.Invalid):
 
                validator.to_python('git')
 

	
 
    def test_ValidPerms(self):
 
            # TODO: write this one
 
            pass
 

	
 
    def test_ValidSettings(self):
 
        validator = v.ValidSettings()
 
        assert {'pass': 'pass'} == validator.to_python(value={'user': 'test',
 
                                                    'pass': 'pass'})
 

	
 
        assert {'user2': 'test', 'pass': 'pass'} == validator.to_python(value={'user2': 'test',
 
                                                    'pass': 'pass'})
 

	
 
    def test_ValidPath(self):
 
            validator = v.ValidPath()
 
            assert TESTS_TMP_PATH == validator.to_python(TESTS_TMP_PATH)
 
            with pytest.raises(formencode.Invalid):
 
                validator.to_python('/no_such_dir')
 

	
 
    def test_UniqSystemEmail(self):
 
        validator = v.UniqSystemEmail(old_data={})
 

	
 
        assert 'mail@python.org' == validator.to_python('MaiL@Python.org')
 

	
 
        email = TEST_USER_REGULAR2_EMAIL
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python(email)
 

	
 
    def test_ValidSystemEmail(self):
 
        validator = v.ValidSystemEmail()
 
        email = TEST_USER_REGULAR2_EMAIL
 

	
 
        assert email == validator.to_python(email)
 
        with pytest.raises(formencode.Invalid):
 
            validator.to_python('err')
 

	
 
    def test_LdapLibValidator(self):
 
        if ldap_lib_installed:
 
            validator = v.LdapLibValidator()
 
            assert "DN" == validator.to_python('DN')
 
        else:
 
            validator = v.LdapLibValidator()
 
            with pytest.raises(v.LdapImportError):
 
                validator.to_python('err')
 

	
 
    def test_AttrLoginValidator(self):
 
        validator = v.AttrLoginValidator()
 
        assert 'DN_attr' == validator.to_python('DN_attr')
0 comments (0 inline, 0 general)