Changeset - 4419551b2915
[Not reviewed]
codereview
0 7 0
Marcin Kuzminski - 13 years ago 2012-06-17 21:37:07
marcin@python-works.com
Switched forms to new validators
7 files changed with 310 insertions and 768 deletions:
0 comments (0 inline, 0 general)
rhodecode/controllers/admin/users.py
Show inline comments
 
@@ -220,7 +220,7 @@ class UsersController(BaseController):
 
        return redirect(url('edit_user', id=id))
 

	
 
    def add_email(self, id):
 
        """PUT /user_emails/id: Update an existing item"""
 
        """POST /user_emails:Add an existing item"""
 
        # url('user_emails', id=ID, method='put')
 

	
 
        #TODO: validation and form !!!
rhodecode/model/forms.py
Show inline comments
 
@@ -19,573 +19,76 @@ list=[1,2,3,4,5]
 
for SELECT use formencode.All(OneOf(list), Int())
 

	
 
"""
 
import os
 
import re
 
import logging
 
import traceback
 

	
 
import formencode
 
from formencode import All
 
from formencode.validators import UnicodeString, OneOf, Int, Number, Regex, \
 
    Email, Bool, StringBoolean, Set
 

	
 
from pylons.i18n.translation import _
 
from webhelpers.pylonslib.secure_form import authentication_token
 

	
 
from rhodecode.config.routing import ADMIN_PREFIX
 
from rhodecode.lib.utils import repo_name_slug
 
from rhodecode.lib.auth import authenticate, get_crypt_password
 
from rhodecode.lib.exceptions import LdapImportError
 
from rhodecode.model.db import User, UsersGroup, RepoGroup, Repository
 
from rhodecode.model import validators as v
 
from rhodecode import BACKENDS
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
#this is needed to translate the messages using _() in validators
 
class State_obj(object):
 
    _ = staticmethod(_)
 

	
 

	
 
#==============================================================================
 
# VALIDATORS
 
#==============================================================================
 
class ValidAuthToken(formencode.validators.FancyValidator):
 
    messages = {'invalid_token': _('Token mismatch')}
 

	
 
    def validate_python(self, value, state):
 

	
 
        if value != authentication_token():
 
            raise formencode.Invalid(
 
                self.message('invalid_token',
 
                             state, search_number=value),
 
                value,
 
                state
 
            )
 

	
 

	
 
def ValidUsername(edit, old_data):
 
    class _ValidUsername(formencode.validators.FancyValidator):
 

	
 
        def validate_python(self, value, state):
 
            if value in ['default', 'new_user']:
 
                raise formencode.Invalid(_('Invalid username'), value, state)
 
            #check if user is unique
 
            old_un = None
 
            if edit:
 
                old_un = User.get(old_data.get('user_id')).username
 

	
 
            if old_un != value or not edit:
 
                if User.get_by_username(value, case_insensitive=True):
 
                    raise formencode.Invalid(_('This username already '
 
                                               'exists') , value, state)
 

	
 
            if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
 
                raise formencode.Invalid(
 
                    _('Username may only contain alphanumeric characters '
 
                      'underscores, periods or dashes and must begin with '
 
                      'alphanumeric character'),
 
                    value,
 
                    state
 
                )
 

	
 
    return _ValidUsername
 

	
 

	
 
def ValidUsersGroup(edit, old_data):
 

	
 
    class _ValidUsersGroup(formencode.validators.FancyValidator):
 

	
 
        def validate_python(self, value, state):
 
            if value in ['default']:
 
                raise formencode.Invalid(_('Invalid group name'), value, state)
 
            #check if group is unique
 
            old_ugname = None
 
            if edit:
 
                old_ugname = UsersGroup.get(
 
                            old_data.get('users_group_id')).users_group_name
 

	
 
            if old_ugname != value or not edit:
 
                if UsersGroup.get_by_group_name(value, cache=False,
 
                                               case_insensitive=True):
 
                    raise formencode.Invalid(_('This users group '
 
                                               'already exists'), value,
 
                                             state)
 

	
 
            if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
 
                raise formencode.Invalid(
 
                    _('RepoGroup name may only contain  alphanumeric characters '
 
                      'underscores, periods or dashes and must begin with '
 
                      'alphanumeric character'),
 
                    value,
 
                    state
 
                )
 

	
 
    return _ValidUsersGroup
 

	
 

	
 
def ValidReposGroup(edit, old_data):
 
    class _ValidReposGroup(formencode.validators.FancyValidator):
 

	
 
        def validate_python(self, value, state):
 
            # TODO WRITE VALIDATIONS
 
            group_name = value.get('group_name')
 
            group_parent_id = value.get('group_parent_id')
 

	
 
            # slugify repo group just in case :)
 
            slug = repo_name_slug(group_name)
 

	
 
            # check for parent of self
 
            parent_of_self = lambda: (
 
                old_data['group_id'] == int(group_parent_id)
 
                if group_parent_id else False
 
            )
 
            if edit and parent_of_self():
 
                    e_dict = {
 
                        'group_parent_id': _('Cannot assign this group as parent')
 
                    }
 
                    raise formencode.Invalid('', value, state,
 
                                             error_dict=e_dict)
 

	
 
            old_gname = None
 
            if edit:
 
                old_gname = RepoGroup.get(old_data.get('group_id')).group_name
 

	
 
            if old_gname != group_name or not edit:
 

	
 
                # check group
 
                gr = RepoGroup.query()\
 
                      .filter(RepoGroup.group_name == slug)\
 
                      .filter(RepoGroup.group_parent_id == group_parent_id)\
 
                      .scalar()
 

	
 
                if gr:
 
                    e_dict = {
 
                        'group_name': _('This group already exists')
 
                    }
 
                    raise formencode.Invalid('', value, state,
 
                                             error_dict=e_dict)
 

	
 
                # check for same repo
 
                repo = Repository.query()\
 
                      .filter(Repository.repo_name == slug)\
 
                      .scalar()
 

	
 
                if repo:
 
                    e_dict = {
 
                        'group_name': _('Repository with this name already exists')
 
                    }
 
                    raise formencode.Invalid('', value, state,
 
                                             error_dict=e_dict)
 

	
 
    return _ValidReposGroup
 

	
 

	
 
class ValidPassword(formencode.validators.FancyValidator):
 

	
 
    def to_python(self, value, state):
 

	
 
        if not value:
 
            return
 

	
 
        if value.get('password'):
 
            try:
 
                value['password'] = get_crypt_password(value['password'])
 
            except UnicodeEncodeError:
 
                e_dict = {'password': _('Invalid characters in password')}
 
                raise formencode.Invalid('', value, state, error_dict=e_dict)
 

	
 
        if value.get('password_confirmation'):
 
            try:
 
                value['password_confirmation'] = \
 
                    get_crypt_password(value['password_confirmation'])
 
            except UnicodeEncodeError:
 
                e_dict = {
 
                    'password_confirmation': _('Invalid characters in password')
 
                }
 
                raise formencode.Invalid('', value, state, error_dict=e_dict)
 

	
 
        if value.get('new_password'):
 
            try:
 
                value['new_password'] = \
 
                    get_crypt_password(value['new_password'])
 
            except UnicodeEncodeError:
 
                e_dict = {'new_password': _('Invalid characters in password')}
 
                raise formencode.Invalid('', value, state, error_dict=e_dict)
 

	
 
        return value
 

	
 

	
 
class ValidPasswordsMatch(formencode.validators.FancyValidator):
 

	
 
    def validate_python(self, value, state):
 

	
 
        pass_val = value.get('password') or value.get('new_password')
 
        if pass_val != value['password_confirmation']:
 
            e_dict = {'password_confirmation':
 
                   _('Passwords do not match')}
 
            raise formencode.Invalid('', value, state, error_dict=e_dict)
 

	
 

	
 
class ValidAuth(formencode.validators.FancyValidator):
 
    messages = {
 
        'invalid_password':_('invalid password'),
 
        'invalid_login':_('invalid user name'),
 
        'disabled_account':_('Your account is disabled')
 
    }
 

	
 
    # error mapping
 
    e_dict = {'username': messages['invalid_login'],
 
              'password': messages['invalid_password']}
 
    e_dict_disable = {'username': messages['disabled_account']}
 

	
 
    def validate_python(self, value, state):
 
        password = value['password']
 
        username = value['username']
 
        user = User.get_by_username(username)
 

	
 
        if authenticate(username, password):
 
            return value
 
        else:
 
            if user and user.active is False:
 
                log.warning('user %s is disabled' % username)
 
                raise formencode.Invalid(
 
                    self.message('disabled_account',
 
                    state=State_obj),
 
                    value, state,
 
                    error_dict=self.e_dict_disable
 
                )
 
            else:
 
                log.warning('user %s failed to authenticate' % username)
 
                raise formencode.Invalid(
 
                    self.message('invalid_password',
 
                    state=State_obj), value, state,
 
                    error_dict=self.e_dict
 
                )
 

	
 

	
 
class ValidRepoUser(formencode.validators.FancyValidator):
 

	
 
    def to_python(self, value, state):
 
        try:
 
            User.query().filter(User.active == True)\
 
                .filter(User.username == value).one()
 
        except Exception:
 
            raise formencode.Invalid(_('This username is not valid'),
 
                                     value, state)
 
        return value
 

	
 

	
 
def ValidRepoName(edit, old_data):
 
    class _ValidRepoName(formencode.validators.FancyValidator):
 
        def to_python(self, value, state):
 

	
 
            repo_name = value.get('repo_name')
 

	
 
            slug = repo_name_slug(repo_name)
 
            if slug in [ADMIN_PREFIX, '']:
 
                e_dict = {'repo_name': _('This repository name is disallowed')}
 
                raise formencode.Invalid('', value, state, error_dict=e_dict)
 

	
 
            if value.get('repo_group'):
 
                gr = RepoGroup.get(value.get('repo_group'))
 
                group_path = gr.full_path
 
                # value needs to be aware of group name in order to check
 
                # db key This is an actual just the name to store in the
 
                # database
 
                repo_name_full = group_path + RepoGroup.url_sep() + repo_name
 

	
 
            else:
 
                group_path = ''
 
                repo_name_full = repo_name
 

	
 
            value['repo_name_full'] = repo_name_full
 
            rename = old_data.get('repo_name') != repo_name_full
 
            create = not edit
 
            if  rename or create:
 

	
 
                if group_path != '':
 
                    if Repository.get_by_repo_name(repo_name_full):
 
                        e_dict = {
 
                            'repo_name': _('This repository already exists in '
 
                                           'a group "%s"') % gr.group_name
 
                        }
 
                        raise formencode.Invalid('', value, state,
 
                                                 error_dict=e_dict)
 
                elif RepoGroup.get_by_group_name(repo_name_full):
 
                        e_dict = {
 
                            'repo_name': _('There is a group with this name '
 
                                           'already "%s"') % repo_name_full
 
                        }
 
                        raise formencode.Invalid('', value, state,
 
                                                 error_dict=e_dict)
 

	
 
                elif Repository.get_by_repo_name(repo_name_full):
 
                        e_dict = {'repo_name': _('This repository '
 
                                                'already exists')}
 
                        raise formencode.Invalid('', value, state,
 
                                                 error_dict=e_dict)
 

	
 
            return value
 

	
 
    return _ValidRepoName
 

	
 

	
 
def ValidForkName(*args, **kwargs):
 
    return ValidRepoName(*args, **kwargs)
 

	
 

	
 
def SlugifyName():
 
    class _SlugifyName(formencode.validators.FancyValidator):
 

	
 
        def to_python(self, value, state):
 
            return repo_name_slug(value)
 

	
 
    return _SlugifyName
 

	
 

	
 
def ValidCloneUri():
 
    from rhodecode.lib.utils import make_ui
 

	
 
    def url_handler(repo_type, url, proto, ui=None):
 
        if repo_type == 'hg':
 
            from mercurial.httprepo import httprepository, httpsrepository
 
            if proto == 'https':
 
                httpsrepository(make_ui('db'), url).capabilities
 
            elif proto == 'http':
 
                httprepository(make_ui('db'), url).capabilities
 
        elif repo_type == 'git':
 
            #TODO: write a git url validator
 
            pass
 

	
 
    class _ValidCloneUri(formencode.validators.FancyValidator):
 

	
 
        def to_python(self, value, state):
 

	
 
            repo_type = value.get('repo_type')
 
            url = value.get('clone_uri')
 
            e_dict = {'clone_uri': _('invalid clone url')}
 

	
 
            if not url:
 
                pass
 
            elif url.startswith('https'):
 
                try:
 
                    url_handler(repo_type, url, 'https', make_ui('db'))
 
                except Exception:
 
                    log.error(traceback.format_exc())
 
                    raise formencode.Invalid('', value, state, error_dict=e_dict)
 
            elif url.startswith('http'):
 
                try:
 
                    url_handler(repo_type, url, 'http', make_ui('db'))
 
                except Exception:
 
                    log.error(traceback.format_exc())
 
                    raise formencode.Invalid('', value, state, error_dict=e_dict)
 
            else:
 
                e_dict = {'clone_uri': _('Invalid clone url, provide a '
 
                                         'valid clone http\s url')}
 
                raise formencode.Invalid('', value, state, error_dict=e_dict)
 

	
 
            return value
 

	
 
    return _ValidCloneUri
 

	
 

	
 
def ValidForkType(old_data):
 
    class _ValidForkType(formencode.validators.FancyValidator):
 

	
 
        def to_python(self, value, state):
 
            if old_data['repo_type'] != value:
 
                raise formencode.Invalid(_('Fork have to be the same '
 
                                           'type as original'), value, state)
 

	
 
            return value
 
    return _ValidForkType
 

	
 

	
 
def ValidPerms(type_='repo'):
 
    if type_ == 'group':
 
        EMPTY_PERM = 'group.none'
 
    elif type_ == 'repo':
 
        EMPTY_PERM = 'repository.none'
 

	
 
    class _ValidPerms(formencode.validators.FancyValidator):
 
        messages = {
 
            'perm_new_member_name':
 
                _('This username or users group name is not valid')
 
        }
 

	
 
        def to_python(self, value, state):
 
            perms_update = []
 
            perms_new = []
 
            # build a list of permission to update and new permission to create
 
            for k, v in value.items():
 
                # means new added member to permissions
 
                if k.startswith('perm_new_member'):
 
                    new_perm = value.get('perm_new_member', False)
 
                    new_member = value.get('perm_new_member_name', False)
 
                    new_type = value.get('perm_new_member_type')
 

	
 
                    if new_member and new_perm:
 
                        if (new_member, new_perm, new_type) not in perms_new:
 
                            perms_new.append((new_member, new_perm, new_type))
 
                elif k.startswith('u_perm_') or k.startswith('g_perm_'):
 
                    member = k[7:]
 
                    t = {'u': 'user',
 
                         'g': 'users_group'
 
                    }[k[0]]
 
                    if member == 'default':
 
                        if value.get('private'):
 
                            # set none for default when updating to private repo
 
                            v = EMPTY_PERM
 
                    perms_update.append((member, v, t))
 

	
 
            value['perms_updates'] = perms_update
 
            value['perms_new'] = perms_new
 

	
 
            # update permissions
 
            for k, v, t in perms_new:
 
                try:
 
                    if t is 'user':
 
                        self.user_db = User.query()\
 
                            .filter(User.active == True)\
 
                            .filter(User.username == k).one()
 
                    if t is 'users_group':
 
                        self.user_db = UsersGroup.query()\
 
                            .filter(UsersGroup.users_group_active == True)\
 
                            .filter(UsersGroup.users_group_name == k).one()
 

	
 
                except Exception:
 
                    msg = self.message('perm_new_member_name',
 
                                         state=State_obj)
 
                    raise formencode.Invalid(
 
                        msg, value, state, error_dict={'perm_new_member_name': msg}
 
                    )
 
            return value
 
    return _ValidPerms
 

	
 

	
 
class ValidSettings(formencode.validators.FancyValidator):
 

	
 
    def to_python(self, value, state):
 
        # settings  form can't edit user
 
        if 'user' in value:
 
            del['value']['user']
 
        return value
 

	
 

	
 
class ValidPath(formencode.validators.FancyValidator):
 
    def to_python(self, value, state):
 

	
 
        if not os.path.isdir(value):
 
            msg = _('This is not a valid path')
 
            raise formencode.Invalid(msg, value, state,
 
                                     error_dict={'paths_root_path': msg})
 
        return value
 

	
 

	
 
def UniqSystemEmail(old_data):
 
    class _UniqSystemEmail(formencode.validators.FancyValidator):
 
        def to_python(self, value, state):
 
            value = value.lower()
 
            if (old_data.get('email') or '').lower() != value:
 
                user = User.get_by_email(value, case_insensitive=True)
 
                if user:
 
                    raise formencode.Invalid(
 
                        _("This e-mail address is already taken"), value, state
 
                    )
 
            return value
 

	
 
    return _UniqSystemEmail
 

	
 

	
 
class ValidSystemEmail(formencode.validators.FancyValidator):
 
    def to_python(self, value, state):
 
        value = value.lower()
 
        user = User.get_by_email(value, case_insensitive=True)
 
        if  user is None:
 
            raise formencode.Invalid(
 
                _("This e-mail address doesn't exist."), value, state
 
            )
 

	
 
        return value
 

	
 

	
 
class LdapLibValidator(formencode.validators.FancyValidator):
 

	
 
    def to_python(self, value, state):
 

	
 
        try:
 
            import ldap
 
        except ImportError:
 
            raise LdapImportError
 
        return value
 

	
 

	
 
class AttrLoginValidator(formencode.validators.FancyValidator):
 

	
 
    def to_python(self, value, state):
 

	
 
        if not value or not isinstance(value, (str, unicode)):
 
            raise formencode.Invalid(
 
                _("The LDAP Login attribute of the CN must be specified - "
 
                  "this is the name of the attribute that is equivalent "
 
                  "to 'username'"), value, state
 
            )
 

	
 
        return value
 

	
 

	
 
#==============================================================================
 
# FORMS
 
#==============================================================================
 
class LoginForm(formencode.Schema):
 
    allow_extra_fields = True
 
    filter_extra_fields = True
 
    username = UnicodeString(
 
    username = v.UnicodeString(
 
        strip=True,
 
        min=1,
 
        not_empty=True,
 
        messages={
 
           'empty': _('Please enter a login'),
 
           'tooShort': _('Enter a value %(min)i characters long or more')}
 
           'empty': _(u'Please enter a login'),
 
           'tooShort': _(u'Enter a value %(min)i characters long or more')}
 
    )
 

	
 
    password = UnicodeString(
 
    password = v.UnicodeString(
 
        strip=False,
 
        min=3,
 
        not_empty=True,
 
        messages={
 
            'empty': _('Please enter a password'),
 
            'tooShort': _('Enter %(min)i characters or more')}
 
            'empty': _(u'Please enter a password'),
 
            'tooShort': _(u'Enter %(min)i characters or more')}
 
    )
 

	
 
    remember = StringBoolean(if_missing=False)
 
    remember = v.StringBoolean(if_missing=False)
 

	
 
    chained_validators = [ValidAuth]
 
    chained_validators = [v.ValidAuth()]
 

	
 

	
 
def UserForm(edit=False, old_data={}):
 
    class _UserForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        username = All(UnicodeString(strip=True, min=1, not_empty=True),
 
                       ValidUsername(edit, old_data))
 
        username = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                       v.ValidUsername(edit, old_data))
 
        if edit:
 
            new_password = All(UnicodeString(strip=False, min=6, not_empty=False))
 
            password_confirmation = All(UnicodeString(strip=False, min=6,
 
                                                      not_empty=False))
 
            admin = StringBoolean(if_missing=False)
 
            new_password = All(
 
                v.UnicodeString(strip=False, min=6, not_empty=False)
 
            )
 
            password_confirmation = All(
 
                v.ValidPassword(),
 
                v.UnicodeString(strip=False, min=6, not_empty=False),
 
            )
 
            admin = v.StringBoolean(if_missing=False)
 
        else:
 
            password = All(UnicodeString(strip=False, min=6, not_empty=True))
 
            password_confirmation = All(UnicodeString(strip=False, min=6,
 
                                                      not_empty=False))
 
            password = All(
 
                v.ValidPassword(),
 
                v.UnicodeString(strip=False, min=6, not_empty=True)
 
            )
 
            password_confirmation = All(
 
                v.ValidPassword(),
 
                v.UnicodeString(strip=False, min=6, not_empty=False)
 
            )
 

	
 
        active = StringBoolean(if_missing=False)
 
        name = UnicodeString(strip=True, min=1, not_empty=False)
 
        lastname = UnicodeString(strip=True, min=1, not_empty=False)
 
        email = All(Email(not_empty=True), UniqSystemEmail(old_data))
 
        active = v.StringBoolean(if_missing=False)
 
        name = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        lastname = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        email = All(v.Email(not_empty=True), v.UniqSystemEmail(old_data))
 

	
 
        chained_validators = [ValidPasswordsMatch, ValidPassword]
 
        chained_validators = [v.ValidPasswordsMatch()]
 

	
 
    return _UserForm
 

	
 
@@ -595,15 +98,18 @@ def UsersGroupForm(edit=False, old_data=
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 

	
 
        users_group_name = All(UnicodeString(strip=True, min=1, not_empty=True),
 
                       ValidUsersGroup(edit, old_data))
 
        users_group_name = All(
 
            v.UnicodeString(strip=True, min=1, not_empty=True),
 
            v.ValidUsersGroup(edit, old_data)
 
        )
 

	
 
        users_group_active = StringBoolean(if_missing=False)
 
        users_group_active = v.StringBoolean(if_missing=False)
 

	
 
        if edit:
 
            users_group_members = OneOf(available_members, hideList=False,
 
                                        testValueList=True,
 
                                        if_missing=None, not_empty=False)
 
            users_group_members = v.OneOf(
 
                available_members, hideList=False, testValueList=True,
 
                if_missing=None, not_empty=False
 
            )
 

	
 
    return _UsersGroupForm
 

	
 
@@ -613,15 +119,16 @@ def ReposGroupForm(edit=False, old_data=
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 

	
 
        group_name = All(UnicodeString(strip=True, min=1, not_empty=True),
 
                               SlugifyName())
 
        group_description = UnicodeString(strip=True, min=1,
 
        group_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                               v.SlugifyName())
 
        group_description = v.UnicodeString(strip=True, min=1,
 
                                                not_empty=True)
 
        group_parent_id = OneOf(available_groups, hideList=False,
 
        group_parent_id = v.OneOf(available_groups, hideList=False,
 
                                        testValueList=True,
 
                                        if_missing=None, not_empty=False)
 

	
 
        chained_validators = [ValidReposGroup(edit, old_data), ValidPerms('group')]
 
        chained_validators = [v.ValidReposGroup(edit, old_data),
 
                              v.ValidPerms('group')]
 

	
 
    return _ReposGroupForm
 

	
 
@@ -630,16 +137,24 @@ def RegisterForm(edit=False, old_data={}
 
    class _RegisterForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        username = All(ValidUsername(edit, old_data),
 
                       UnicodeString(strip=True, min=1, not_empty=True))
 
        password = All(UnicodeString(strip=False, min=6, not_empty=True))
 
        password_confirmation = All(UnicodeString(strip=False, min=6, not_empty=True))
 
        active = StringBoolean(if_missing=False)
 
        name = UnicodeString(strip=True, min=1, not_empty=False)
 
        lastname = UnicodeString(strip=True, min=1, not_empty=False)
 
        email = All(Email(not_empty=True), UniqSystemEmail(old_data))
 
        username = All(
 
            v.ValidUsername(edit, old_data),
 
            v.UnicodeString(strip=True, min=1, not_empty=True)
 
        )
 
        password = All(
 
            v.ValidPassword(),
 
            v.UnicodeString(strip=False, min=6, not_empty=True)
 
        )
 
        password_confirmation = All(
 
            v.ValidPassword(),
 
            v.UnicodeString(strip=False, min=6, not_empty=True)
 
        )
 
        active = v.StringBoolean(if_missing=False)
 
        name = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        lastname = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        email = All(v.Email(not_empty=True), v.UniqSystemEmail(old_data))
 

	
 
        chained_validators = [ValidPasswordsMatch, ValidPassword]
 
        chained_validators = [v.ValidPasswordsMatch()]
 

	
 
    return _RegisterForm
 

	
 
@@ -648,7 +163,7 @@ def PasswordResetForm():
 
    class _PasswordResetForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        email = All(ValidSystemEmail(), Email(not_empty=True))
 
        email = All(v.ValidSystemEmail(), v.Email(not_empty=True))
 
    return _PasswordResetForm
 

	
 

	
 
@@ -657,24 +172,24 @@ def RepoForm(edit=False, old_data={}, su
 
    class _RepoForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(UnicodeString(strip=True, min=1, not_empty=True),
 
                        SlugifyName())
 
        clone_uri = All(UnicodeString(strip=True, min=1, not_empty=False))
 
        repo_group = OneOf(repo_groups, hideList=True)
 
        repo_type = OneOf(supported_backends)
 
        description = UnicodeString(strip=True, min=1, not_empty=False)
 
        private = StringBoolean(if_missing=False)
 
        enable_statistics = StringBoolean(if_missing=False)
 
        enable_downloads = StringBoolean(if_missing=False)
 
        landing_rev = OneOf(landing_revs, hideList=True)
 
        repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                        v.SlugifyName())
 
        clone_uri = All(v.UnicodeString(strip=True, min=1, not_empty=False))
 
        repo_group = v.OneOf(repo_groups, hideList=True)
 
        repo_type = v.OneOf(supported_backends)
 
        description = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        private = v.StringBoolean(if_missing=False)
 
        enable_statistics = v.StringBoolean(if_missing=False)
 
        enable_downloads = v.StringBoolean(if_missing=False)
 
        landing_rev = v.OneOf(landing_revs, hideList=True)
 

	
 
        if edit:
 
            #this is repo owner
 
            user = All(UnicodeString(not_empty=True), ValidRepoUser)
 
            user = All(v.UnicodeString(not_empty=True), v.ValidRepoUser())
 

	
 
        chained_validators = [ValidCloneUri()(),
 
                              ValidRepoName(edit, old_data),
 
                              ValidPerms()]
 
        chained_validators = [v.ValidCloneUri(),
 
                              v.ValidRepoName(edit, old_data),
 
                              v.ValidPerms()]
 
    return _RepoForm
 

	
 

	
 
@@ -683,33 +198,34 @@ def RepoForkForm(edit=False, old_data={}
 
    class _RepoForkForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(UnicodeString(strip=True, min=1, not_empty=True),
 
                        SlugifyName())
 
        repo_group = OneOf(repo_groups, hideList=True)
 
        repo_type = All(ValidForkType(old_data), OneOf(supported_backends))
 
        description = UnicodeString(strip=True, min=1, not_empty=True)
 
        private = StringBoolean(if_missing=False)
 
        copy_permissions = StringBoolean(if_missing=False)
 
        update_after_clone = StringBoolean(if_missing=False)
 
        fork_parent_id = UnicodeString()
 
        chained_validators = [ValidForkName(edit, old_data)]
 
        repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                        v.SlugifyName())
 
        repo_group = v.OneOf(repo_groups, hideList=True)
 
        repo_type = All(v.ValidForkType(old_data), v.OneOf(supported_backends))
 
        description = v.UnicodeString(strip=True, min=1, not_empty=True)
 
        private = v.StringBoolean(if_missing=False)
 
        copy_permissions = v.StringBoolean(if_missing=False)
 
        update_after_clone = v.StringBoolean(if_missing=False)
 
        fork_parent_id = v.UnicodeString()
 
        chained_validators = [v.ValidForkName(edit, old_data)]
 

	
 
    return _RepoForkForm
 

	
 

	
 
def RepoSettingsForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
 
                     repo_groups=[], landing_revs=[]):
 
def RepoSettingsForm(edit=False, old_data={},
 
                     supported_backends=BACKENDS.keys(), repo_groups=[],
 
                     landing_revs=[]):
 
    class _RepoForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(UnicodeString(strip=True, min=1, not_empty=True),
 
                        SlugifyName())
 
        description = UnicodeString(strip=True, min=1, not_empty=True)
 
        repo_group = OneOf(repo_groups, hideList=True)
 
        private = StringBoolean(if_missing=False)
 
        landing_rev = OneOf(landing_revs, hideList=True)
 
        chained_validators = [ValidRepoName(edit, old_data), ValidPerms(),
 
                              ValidSettings]
 
        repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                        v.SlugifyName())
 
        description = v.UnicodeString(strip=True, min=1, not_empty=True)
 
        repo_group = v.OneOf(repo_groups, hideList=True)
 
        private = v.StringBoolean(if_missing=False)
 
        landing_rev = v.OneOf(landing_revs, hideList=True)
 
        chained_validators = [v.ValidRepoName(edit, old_data), v.ValidPerms(),
 
                              v.ValidSettings()]
 
    return _RepoForm
 

	
 

	
 
@@ -717,9 +233,9 @@ def ApplicationSettingsForm():
 
    class _ApplicationSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        rhodecode_title = UnicodeString(strip=True, min=1, not_empty=True)
 
        rhodecode_realm = UnicodeString(strip=True, min=1, not_empty=True)
 
        rhodecode_ga_code = UnicodeString(strip=True, min=1, not_empty=False)
 
        rhodecode_title = v.UnicodeString(strip=True, min=1, not_empty=True)
 
        rhodecode_realm = v.UnicodeString(strip=True, min=1, not_empty=True)
 
        rhodecode_ga_code = v.UnicodeString(strip=True, min=1, not_empty=False)
 

	
 
    return _ApplicationSettingsForm
 

	
 
@@ -728,12 +244,19 @@ def ApplicationUiSettingsForm():
 
    class _ApplicationUiSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        web_push_ssl = OneOf(['true', 'false'], if_missing='false')
 
        paths_root_path = All(ValidPath(), UnicodeString(strip=True, min=1, not_empty=True))
 
        hooks_changegroup_update = OneOf(['True', 'False'], if_missing=False)
 
        hooks_changegroup_repo_size = OneOf(['True', 'False'], if_missing=False)
 
        hooks_changegroup_push_logger = OneOf(['True', 'False'], if_missing=False)
 
        hooks_preoutgoing_pull_logger = OneOf(['True', 'False'], if_missing=False)
 
        web_push_ssl = v.OneOf(['true', 'false'], if_missing='false')
 
        paths_root_path = All(
 
            v.ValidPath(),
 
            v.UnicodeString(strip=True, min=1, not_empty=True)
 
        )
 
        hooks_changegroup_update = v.OneOf(['True', 'False'],
 
                                           if_missing=False)
 
        hooks_changegroup_repo_size = v.OneOf(['True', 'False'],
 
                                              if_missing=False)
 
        hooks_changegroup_push_logger = v.OneOf(['True', 'False'],
 
                                                if_missing=False)
 
        hooks_preoutgoing_pull_logger = v.OneOf(['True', 'False'],
 
                                                if_missing=False)
 

	
 
    return _ApplicationUiSettingsForm
 

	
 
@@ -742,33 +265,37 @@ def DefaultPermissionsForm(perms_choices
 
    class _DefaultPermissionsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        overwrite_default = StringBoolean(if_missing=False)
 
        anonymous = OneOf(['True', 'False'], if_missing=False)
 
        default_perm = OneOf(perms_choices)
 
        default_register = OneOf(register_choices)
 
        default_create = OneOf(create_choices)
 
        overwrite_default = v.StringBoolean(if_missing=False)
 
        anonymous = v.OneOf(['True', 'False'], if_missing=False)
 
        default_perm = v.OneOf(perms_choices)
 
        default_register = v.OneOf(register_choices)
 
        default_create = v.OneOf(create_choices)
 

	
 
    return _DefaultPermissionsForm
 

	
 

	
 
def LdapSettingsForm(tls_reqcert_choices, search_scope_choices, tls_kind_choices):
 
def LdapSettingsForm(tls_reqcert_choices, search_scope_choices,
 
                     tls_kind_choices):
 
    class _LdapSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        #pre_validators = [LdapLibValidator]
 
        ldap_active = StringBoolean(if_missing=False)
 
        ldap_host = UnicodeString(strip=True,)
 
        ldap_port = Number(strip=True,)
 
        ldap_tls_kind = OneOf(tls_kind_choices)
 
        ldap_tls_reqcert = OneOf(tls_reqcert_choices)
 
        ldap_dn_user = UnicodeString(strip=True,)
 
        ldap_dn_pass = UnicodeString(strip=True,)
 
        ldap_base_dn = UnicodeString(strip=True,)
 
        ldap_filter = UnicodeString(strip=True,)
 
        ldap_search_scope = OneOf(search_scope_choices)
 
        ldap_attr_login = All(AttrLoginValidator, UnicodeString(strip=True,))
 
        ldap_attr_firstname = UnicodeString(strip=True,)
 
        ldap_attr_lastname = UnicodeString(strip=True,)
 
        ldap_attr_email = UnicodeString(strip=True,)
 
        ldap_active = v.StringBoolean(if_missing=False)
 
        ldap_host = v.UnicodeString(strip=True,)
 
        ldap_port = v.Number(strip=True,)
 
        ldap_tls_kind = v.OneOf(tls_kind_choices)
 
        ldap_tls_reqcert = v.OneOf(tls_reqcert_choices)
 
        ldap_dn_user = v.UnicodeString(strip=True,)
 
        ldap_dn_pass = v.UnicodeString(strip=True,)
 
        ldap_base_dn = v.UnicodeString(strip=True,)
 
        ldap_filter = v.UnicodeString(strip=True,)
 
        ldap_search_scope = v.OneOf(search_scope_choices)
 
        ldap_attr_login = All(
 
            v.AttrLoginValidator(),
 
            v.UnicodeString(strip=True,)
 
        )
 
        ldap_attr_firstname = v.UnicodeString(strip=True,)
 
        ldap_attr_lastname = v.UnicodeString(strip=True,)
 
        ldap_attr_email = v.UnicodeString(strip=True,)
 

	
 
    return _LdapSettingsForm
rhodecode/model/repos_group.py
Show inline comments
 
@@ -199,13 +199,13 @@ class ReposGroupModel(BaseModel):
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def delete(self, users_group_id):
 
    def delete(self, repos_group):
 
        repos_group = self.__get_repos_group(repos_group)
 
        try:
 
            users_group = RepoGroup.get(users_group_id)
 
            self.sa.delete(users_group)
 
            self.__delete_group(users_group)
 
            self.sa.delete(repos_group)
 
            self.__delete_group(repos_group)
 
        except:
 
            log.error(traceback.format_exc())
 
            log.exception('Error removing repos_group %s' % repos_group)
 
            raise
 

	
 
    def grant_user_permission(self, repos_group, user, perm):
rhodecode/model/user.py
Show inline comments
 
@@ -87,9 +87,12 @@ class UserModel(BaseModel):
 
        return User.get_by_api_key(api_key, cache)
 

	
 
    def create(self, form_data):
 
        from rhodecode.lib.auth import get_crypt_password
 
        try:
 
            new_user = User()
 
            for k, v in form_data.items():
 
                if k == 'password':
 
                    v = get_crypt_password(v)
 
                setattr(new_user, k, v)
 

	
 
            new_user.api_key = generate_api_key(form_data['username'])
 
@@ -265,15 +268,17 @@ class UserModel(BaseModel):
 
            raise
 

	
 
    def update_my_account(self, user_id, form_data):
 
        from rhodecode.lib.auth import get_crypt_password
 
        try:
 
            user = self.get(user_id, cache=False)
 
            if user.username == 'default':
 
                raise DefaultUserException(
 
                                _("You can't Edit this user since it's"
 
                                  " crucial for entire application"))
 
                    _("You can't Edit this user since it's"
 
                      " crucial for entire application")
 
                )
 
            for k, v in form_data.items():
 
                if k == 'new_password' and v != '':
 
                    user.password = v
 
                    user.password = get_crypt_password(v)
 
                    user.api_key = generate_api_key(user.username)
 
                else:
 
                    if k not in ['admin', 'active']:
rhodecode/tests/functional/test_admin_settings.py
Show inline comments
 
@@ -3,6 +3,8 @@
 
from rhodecode.lib.auth import get_crypt_password, check_password
 
from rhodecode.model.db import User, RhodeCodeSetting
 
from rhodecode.tests import *
 
from rhodecode.lib import helpers as h
 

	
 

	
 
class TestAdminSettingsController(TestController):
 

	
 
@@ -47,7 +49,6 @@ class TestAdminSettingsController(TestCo
 
        response = self.app.get(url('formatted_admin_edit_setting',
 
                                    setting_id=1, format='xml'))
 

	
 

	
 
    def test_ga_code_active(self):
 
        self.log_user()
 
        old_title = 'RhodeCode'
 
@@ -92,7 +93,6 @@ class TestAdminSettingsController(TestCo
 
        self.assertTrue("""_gaq.push(['_setAccount', '%s']);""" % new_ga_code
 
                        not in response.body)
 

	
 

	
 
    def test_title_change(self):
 
        self.log_user()
 
        old_title = 'RhodeCode'
 
@@ -117,7 +117,6 @@ class TestAdminSettingsController(TestCo
 
            self.assertTrue("""<h1><a href="/">%s</a></h1>""" % new_title
 
                        in response.body)
 

	
 

	
 
    def test_my_account(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_my_account'))
 
@@ -132,12 +131,11 @@ class TestAdminSettingsController(TestCo
 
        new_lastname = 'NewLastname'
 
        new_password = 'test123'
 

	
 

	
 
        response = self.app.post(url('admin_settings_my_account_update'),
 
                                 params=dict(_method='put',
 
                                             username='test_admin',
 
                                             new_password=new_password,
 
                                             password_confirmation = new_password,
 
                                             password_confirmation=new_password,
 
                                             password='',
 
                                             name=new_name,
 
                                             lastname=new_lastname,
 
@@ -146,7 +144,7 @@ class TestAdminSettingsController(TestCo
 

	
 
        assert 'Your account was updated successfully' in response.session['flash'][0][1], 'no flash message about success of change'
 
        user = self.Session.query(User).filter(User.username == 'test_admin').one()
 
        assert user.email == new_email , 'incorrect user email after update got %s vs %s' % (user.email, new_email)
 
        assert user.email == new_email, 'incorrect user email after update got %s vs %s' % (user.email, new_email)
 
        assert user.name == new_name, 'updated field mismatch %s vs %s' % (user.name, new_name)
 
        assert user.lastname == new_lastname, 'updated field mismatch %s vs %s' % (user.lastname, new_lastname)
 
        assert check_password(new_password, user.password) is True, 'password field mismatch %s vs %s' % (user.password, new_password)
 
@@ -161,7 +159,7 @@ class TestAdminSettingsController(TestCo
 
                                                            _method='put',
 
                                                            username='test_admin',
 
                                                            new_password=old_password,
 
                                                            password_confirmation = old_password,
 
                                                            password_confirmation=old_password,
 
                                                            password='',
 
                                                            name=old_name,
 
                                                            lastname=old_lastname,
 
@@ -172,41 +170,46 @@ class TestAdminSettingsController(TestCo
 
                               'Your account was updated successfully')
 

	
 
        user = self.Session.query(User).filter(User.username == 'test_admin').one()
 
        assert user.email == old_email , 'incorrect user email after update got %s vs %s' % (user.email, old_email)
 
        assert user.email == old_email, 'incorrect user email after update got %s vs %s' % (user.email, old_email)
 

	
 
        assert user.email == old_email , 'incorrect user email after update got %s vs %s' % (user.email, old_email)
 
        assert user.email == old_email, 'incorrect user email after update got %s vs %s' % (user.email, old_email)
 
        assert user.name == old_name, 'updated field mismatch %s vs %s' % (user.name, old_name)
 
        assert user.lastname == old_lastname, 'updated field mismatch %s vs %s' % (user.lastname, old_lastname)
 
        assert check_password(old_password, user.password) is True , 'password updated field mismatch %s vs %s' % (user.password, old_password)
 

	
 
        assert check_password(old_password, user.password) is True, 'password updated field mismatch %s vs %s' % (user.password, old_password)
 

	
 
    def test_my_account_update_err_email_exists(self):
 
        self.log_user()
 

	
 
        new_email = 'test_regular@mail.com'#already exisitn email
 
        new_email = 'test_regular@mail.com'  # already exisitn email
 
        response = self.app.post(url('admin_settings_my_account_update'), params=dict(
 
                                                            _method='put',
 
                                                            username='test_admin',
 
                                                            new_password='test12',
 
                                                            password_confirmation = 'test122',
 
                                                            password_confirmation='test122',
 
                                                            name='NewName',
 
                                                            lastname='NewLastname',
 
                                                            email=new_email,))
 

	
 
        assert 'This e-mail address is already taken' in response.body, 'Missing error message about existing email'
 

	
 

	
 
    def test_my_account_update_err(self):
 
        self.log_user('test_regular2', 'test12')
 

	
 
        new_email = 'newmail.pl'
 
        response = self.app.post(url('admin_settings_my_account_update'), params=dict(
 
                                                            _method='put',
 
                                                            username='test_admin',
 
                                                            new_password='test12',
 
                                                            password_confirmation = 'test122',
 
                                                            name='NewName',
 
                                                            lastname='NewLastname',
 
                                                            email=new_email,))
 
        assert 'An email address must contain a single @' in response.body, 'Missing error message about wrong email'
 
        assert 'This username already exists' in response.body, 'Missing error message about existing user'
 
        response = self.app.post(url('admin_settings_my_account_update'),
 
                                 params=dict(
 
                                            _method='put',
 
                                            username='test_admin',
 
                                            new_password='test12',
 
                                            password_confirmation='test122',
 
                                            name='NewName',
 
                                            lastname='NewLastname',
 
                                            email=new_email,)
 
                                 )
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        from rhodecode.model import validators
 
        msg = validators.ValidUsername(edit=False,
 
                                    old_data={})._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': 'test_admin'})
 
        response.mustcontain(u"%s" % msg)
rhodecode/tests/functional/test_admin_users.py
Show inline comments
 
from sqlalchemy.orm.exc import NoResultFound
 

	
 
from rhodecode.tests import *
 
from rhodecode.model.db import User, Permission
 
from rhodecode.lib.auth import check_password
 
from sqlalchemy.orm.exc import NoResultFound
 
from rhodecode.model.user import UserModel
 
from rhodecode.model import validators
 
from rhodecode.lib import helpers as h
 

	
 

	
 
class TestAdminUsersController(TestController):
 

	
 
@@ -24,26 +28,25 @@ class TestAdminUsersController(TestContr
 
        email = 'mail@mail.com'
 

	
 
        response = self.app.post(url('users'),
 
                                 {'username':username,
 
                                   'password':password,
 
                                   'password_confirmation':password_confirmation,
 
                                   'name':name,
 
                                   'active':True,
 
                                   'lastname':lastname,
 
                                   'email':email})
 
                             {'username': username,
 
                               'password': password,
 
                               'password_confirmation': password_confirmation,
 
                               'name': name,
 
                               'active': True,
 
                               'lastname': lastname,
 
                               'email': email})
 

	
 
        self.checkSessionFlash(response, '''created user %s''' % (username))
 

	
 
        self.assertTrue('''created user %s''' % (username) in
 
                        response.session['flash'][0])
 

	
 
        new_user = self.Session.query(User).\
 
            filter(User.username == username).one()
 

	
 
        self.assertEqual(new_user.username,username)
 
        self.assertEqual(check_password(password, new_user.password),True)
 
        self.assertEqual(new_user.name,name)
 
        self.assertEqual(new_user.lastname,lastname)
 
        self.assertEqual(new_user.email,email)
 
        self.assertEqual(new_user.username, username)
 
        self.assertEqual(check_password(password, new_user.password), True)
 
        self.assertEqual(new_user.name, name)
 
        self.assertEqual(new_user.lastname, lastname)
 
        self.assertEqual(new_user.email, email)
 

	
 
        response.follow()
 
        response = response.follow()
 
@@ -57,16 +60,18 @@ class TestAdminUsersController(TestContr
 
        lastname = 'lastname'
 
        email = 'errmail.com'
 

	
 
        response = self.app.post(url('users'), {'username':username,
 
                                               'password':password,
 
                                               'name':name,
 
                                               'active':False,
 
                                               'lastname':lastname,
 
                                               'email':email})
 
        response = self.app.post(url('users'), {'username': username,
 
                                               'password': password,
 
                                               'name': name,
 
                                               'active': False,
 
                                               'lastname': lastname,
 
                                               'email': email})
 

	
 
        self.assertTrue("""<span class="error-message">Invalid username</span>""" in response.body)
 
        self.assertTrue("""<span class="error-message">Please enter a value</span>""" in response.body)
 
        self.assertTrue("""<span class="error-message">An email address must contain a single @</span>""" in response.body)
 
        msg = validators.ValidUsername(False, {})._messages['system_invalid_username']
 
        msg = h.html_escape(msg % {'username': 'new_user'})
 
        response.mustcontain("""<span class="error-message">%s</span>""" % msg)
 
        response.mustcontain("""<span class="error-message">Please enter a value</span>""")
 
        response.mustcontain("""<span class="error-message">An email address must contain a single @</span>""")
 

	
 
        def get_user():
 
            self.Session.query(User).filter(User.username == username).one()
 
@@ -94,13 +99,13 @@ class TestAdminUsersController(TestContr
 
        lastname = 'lastname'
 
        email = 'todeletemail@mail.com'
 

	
 
        response = self.app.post(url('users'), {'username':username,
 
                                               'password':password,
 
                                               'password_confirmation':password,
 
                                               'name':name,
 
                                               'active':True,
 
                                               'lastname':lastname,
 
                                               'email':email})
 
        response = self.app.post(url('users'), {'username': username,
 
                                               'password': password,
 
                                               'password_confirmation': password,
 
                                               'name': name,
 
                                               'active': True,
 
                                               'lastname': lastname,
 
                                               'email': email})
 

	
 
        response = response.follow()
 

	
 
@@ -111,7 +116,6 @@ class TestAdminUsersController(TestContr
 
        self.assertTrue("""successfully deleted user""" in
 
                        response.session['flash'][0])
 

	
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('user', id=1),
 
                                 params=dict(_method='delete'))
 
@@ -127,7 +131,6 @@ class TestAdminUsersController(TestContr
 
        user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        response = self.app.get(url('edit_user', id=user.user_id))
 

	
 

	
 
    def test_add_perm_create_repo(self):
 
        self.log_user()
 
        perm_none = Permission.get_by_key('hg.create.none')
 
@@ -135,7 +138,6 @@ class TestAdminUsersController(TestContr
 

	
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 

	
 

	
 
        #User should have None permission on creation repository
 
        self.assertEqual(UserModel().has_perm(user, perm_none), False)
 
        self.assertEqual(UserModel().has_perm(user, perm_create), False)
 
@@ -159,7 +161,6 @@ class TestAdminUsersController(TestContr
 

	
 
        user = User.get_by_username(TEST_USER_REGULAR2_LOGIN)
 

	
 

	
 
        #User should have None permission on creation repository
 
        self.assertEqual(UserModel().has_perm(user, perm_none), False)
 
        self.assertEqual(UserModel().has_perm(user, perm_create), False)
rhodecode/tests/functional/test_login.py
Show inline comments
 
@@ -4,6 +4,8 @@ from rhodecode.model.db import User, Not
 
from rhodecode.lib.utils2 import generate_api_key
 
from rhodecode.lib.auth import check_password
 
from rhodecode.model.meta import Session
 
from rhodecode.lib import helpers as h
 
from rhodecode.model import validators
 

	
 

	
 
class TestLoginController(TestController):
 
@@ -22,21 +24,21 @@ class TestLoginController(TestController
 

	
 
    def test_login_admin_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':'test_admin',
 
                                  'password':'test12'})
 
                                 {'username': 'test_admin',
 
                                  'password': 'test12'})
 
        self.assertEqual(response.status, '302 Found')
 
        self.assertEqual(response.session['rhodecode_user'].get('username') ,
 
        self.assertEqual(response.session['rhodecode_user'].get('username'),
 
                         'test_admin')
 
        response = response.follow()
 
        self.assertTrue('%s repository' % HG_REPO in response.body)
 

	
 
    def test_login_regular_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':'test_regular',
 
                                  'password':'test12'})
 
                                 {'username': 'test_regular',
 
                                  'password': 'test12'})
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        self.assertEqual(response.session['rhodecode_user'].get('username') ,
 
        self.assertEqual(response.session['rhodecode_user'].get('username'),
 
                         'test_regular')
 
        response = response.follow()
 
        self.assertTrue('%s repository' % HG_REPO in response.body)
 
@@ -46,8 +48,8 @@ class TestLoginController(TestController
 
        test_came_from = '/_admin/users'
 
        response = self.app.post(url(controller='login', action='index',
 
                                     came_from=test_came_from),
 
                                 {'username':'test_admin',
 
                                  'password':'test12'})
 
                                 {'username': 'test_admin',
 
                                  'password': 'test12'})
 
        self.assertEqual(response.status, '302 Found')
 
        response = response.follow()
 

	
 
@@ -56,17 +58,16 @@ class TestLoginController(TestController
 

	
 
    def test_login_short_password(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':'test_admin',
 
                                  'password':'as'})
 
                                 {'username': 'test_admin',
 
                                  'password': 'as'})
 
        self.assertEqual(response.status, '200 OK')
 

	
 
        self.assertTrue('Enter 3 characters or more' in response.body)
 

	
 
    def test_login_wrong_username_password(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':'error',
 
                                  'password':'test12'})
 
        self.assertEqual(response.status , '200 OK')
 
                                 {'username': 'error',
 
                                  'password': 'test12'})
 

	
 
        self.assertTrue('invalid user name' in response.body)
 
        self.assertTrue('invalid password' in response.body)
 
@@ -79,62 +80,63 @@ class TestLoginController(TestController
 
        self.assertTrue('Sign Up to RhodeCode' in response.body)
 

	
 
    def test_register_err_same_username(self):
 
        uname = 'test_admin'
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username':'test_admin',
 
                                             'password':'test12',
 
                                             'password_confirmation':'test12',
 
                                             'email':'goodmail@domain.com',
 
                                             'name':'test',
 
                                             'lastname':'test'})
 
                                            {'username': uname,
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmail@domain.com',
 
                                             'name': 'test',
 
                                             'lastname': 'test'})
 

	
 
        self.assertEqual(response.status , '200 OK')
 
        self.assertTrue('This username already exists' in response.body)
 
        msg = validators.ValidUsername()._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': uname})
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_same_email(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username':'test_admin_0',
 
                                             'password':'test12',
 
                                             'password_confirmation':'test12',
 
                                             'email':'test_admin@mail.com',
 
                                             'name':'test',
 
                                             'lastname':'test'})
 
                                            {'username': 'test_admin_0',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'test_admin@mail.com',
 
                                             'name': 'test',
 
                                             'lastname': 'test'})
 

	
 
        self.assertEqual(response.status , '200 OK')
 
        response.mustcontain('This e-mail address is already taken')
 
        msg = validators.UniqSystemEmail()()._messages['email_taken']
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_same_email_case_sensitive(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username':'test_admin_1',
 
                                             'password':'test12',
 
                                             'password_confirmation':'test12',
 
                                             'email':'TesT_Admin@mail.COM',
 
                                             'name':'test',
 
                                             'lastname':'test'})
 
        self.assertEqual(response.status , '200 OK')
 
        response.mustcontain('This e-mail address is already taken')
 
                                            {'username': 'test_admin_1',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'TesT_Admin@mail.COM',
 
                                             'name': 'test',
 
                                             'lastname': 'test'})
 
        msg = validators.UniqSystemEmail()()._messages['email_taken']
 
        response.mustcontain(msg)
 

	
 
    def test_register_err_wrong_data(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username':'xs',
 
                                             'password':'test',
 
                                             'password_confirmation':'test',
 
                                             'email':'goodmailm',
 
                                             'name':'test',
 
                                             'lastname':'test'})
 
        self.assertEqual(response.status , '200 OK')
 
                                            {'username': 'xs',
 
                                             'password': 'test',
 
                                             'password_confirmation': 'test',
 
                                             'email': 'goodmailm',
 
                                             'name': 'test',
 
                                             'lastname': 'test'})
 
        self.assertEqual(response.status, '200 OK')
 
        response.mustcontain('An email address must contain a single @')
 
        response.mustcontain('Enter a value 6 characters long or more')
 

	
 
    def test_register_err_username(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username':'error user',
 
                                             'password':'test12',
 
                                             'password_confirmation':'test12',
 
                                             'email':'goodmailm',
 
                                             'name':'test',
 
                                             'lastname':'test'})
 
                                            {'username': 'error user',
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmailm',
 
                                             'name': 'test',
 
                                             'lastname': 'test'})
 

	
 
        self.assertEqual(response.status , '200 OK')
 
        response.mustcontain('An email address must contain a single @')
 
        response.mustcontain('Username may only contain '
 
                'alphanumeric characters underscores, '
 
@@ -142,41 +144,42 @@ class TestLoginController(TestController
 
                'alphanumeric character')
 

	
 
    def test_register_err_case_sensitive(self):
 
        usr = 'Test_Admin'
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username':'Test_Admin',
 
                                             'password':'test12',
 
                                             'password_confirmation':'test12',
 
                                             'email':'goodmailm',
 
                                             'name':'test',
 
                                             'lastname':'test'})
 
                                            {'username': usr,
 
                                             'password': 'test12',
 
                                             'password_confirmation': 'test12',
 
                                             'email': 'goodmailm',
 
                                             'name': 'test',
 
                                             'lastname': 'test'})
 

	
 
        self.assertEqual(response.status , '200 OK')
 
        self.assertTrue('An email address must contain a single @' in response.body)
 
        self.assertTrue('This username already exists' in response.body)
 
        response.mustcontain('An email address must contain a single @')
 
        msg = validators.ValidUsername()._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': usr})
 
        response.mustcontain(msg)
 

	
 
    def test_register_special_chars(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username':'xxxaxn',
 
                                             'password':'ąćźżąśśśś',
 
                                             'password_confirmation':'ąćźżąśśśś',
 
                                             'email':'goodmailm@test.plx',
 
                                             'name':'test',
 
                                             'lastname':'test'})
 
                                        {'username': 'xxxaxn',
 
                                         'password': 'ąćźżąśśśś',
 
                                         'password_confirmation': 'ąćźżąśśśś',
 
                                         'email': 'goodmailm@test.plx',
 
                                         'name': 'test',
 
                                         'lastname': 'test'})
 

	
 
        self.assertEqual(response.status , '200 OK')
 
        self.assertTrue('Invalid characters in password' in response.body)
 
        msg = validators.ValidPassword()._messages['invalid_password']
 
        response.mustcontain(msg)
 

	
 
    def test_register_password_mismatch(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username':'xs',
 
                                             'password':'123qwe',
 
                                             'password_confirmation':'qwe123',
 
                                             'email':'goodmailm@test.plxa',
 
                                             'name':'test',
 
                                             'lastname':'test'})
 

	
 
        self.assertEqual(response.status, '200 OK')
 
        response.mustcontain('Passwords do not match')
 
                                            {'username': 'xs',
 
                                             'password': '123qwe',
 
                                             'password_confirmation': 'qwe123',
 
                                             'email': 'goodmailm@test.plxa',
 
                                             'name': 'test',
 
                                             'lastname': 'test'})
 
        msg = validators.ValidPasswordsMatch()._messages['password_mismatch']
 
        response.mustcontain(msg)
 

	
 
    def test_register_ok(self):
 
        username = 'test_regular4'
 
@@ -186,13 +189,13 @@ class TestLoginController(TestController
 
        lastname = 'testlastname'
 

	
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username':username,
 
                                             'password':password,
 
                                             'password_confirmation':password,
 
                                             'email':email,
 
                                             'name':name,
 
                                             'lastname':lastname,
 
                                             'admin':True}) # This should be overriden
 
                                            {'username': username,
 
                                             'password': password,
 
                                             'password_confirmation': password,
 
                                             'email': email,
 
                                             'name': name,
 
                                             'lastname': lastname,
 
                                             'admin': True})  # This should be overriden
 
        self.assertEqual(response.status, '302 Found')
 
        self.checkSessionFlash(response, 'You have successfully registered into rhodecode')
 

	
 
@@ -206,12 +209,15 @@ class TestLoginController(TestController
 
        self.assertEqual(ret.admin, False)
 

	
 
    def test_forgot_password_wrong_mail(self):
 
        bad_email = 'marcin@wrongmail.org'
 
        response = self.app.post(
 
                        url(controller='login', action='password_reset'),
 
                            {'email': 'marcin@wrongmail.org',}
 
                            {'email': bad_email, }
 
        )
 

	
 
        response.mustcontain("This e-mail address doesn't exist")
 
        msg = validators.ValidSystemEmail()._messages['non_existing_email']
 
        msg = h.html_escape(msg % {'email': bad_email})
 
        response.mustcontain()
 

	
 
    def test_forgot_password(self):
 
        response = self.app.get(url(controller='login',
 
@@ -236,7 +242,7 @@ class TestLoginController(TestController
 

	
 
        response = self.app.post(url(controller='login',
 
                                     action='password_reset'),
 
                                 {'email':email, })
 
                                 {'email': email, })
 

	
 
        self.checkSessionFlash(response, 'Your password reset link was sent')
 

	
0 comments (0 inline, 0 general)