Changeset - a7dfe823933a
[Not reviewed]
beta
0 2 0
Marcin Kuzminski - 14 years ago 2012-01-13 23:56:37
marcin@python-works.com
added validation to repo groups to check for conflicting repository name fixes #337
2 files changed with 52 insertions and 14 deletions:
0 comments (0 inline, 0 general)
rhodecode/controllers/admin/repos_groups.py
Show inline comments
 
@@ -72,58 +72,57 @@ class ReposGroupsController(BaseControll
 

	
 
        data = repo_group.get_dict()
 

	
 
        data['group_name'] = repo_group.name
 

	
 
        return data
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def index(self, format='html'):
 
        """GET /repos_groups: All items in the collection"""
 
        # url('repos_groups')
 

	
 
        sk = lambda g:g.parents[0].group_name if g.parents else g.group_name
 
        sk = lambda g: g.parents[0].group_name if g.parents else g.group_name
 
        c.groups = sorted(RepoGroup.query().all(), key=sk)
 
        return render('admin/repos_groups/repos_groups_show.html')
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def create(self):
 
        """POST /repos_groups: Create a new item"""
 
        # url('repos_groups')
 
        self.__load_defaults()
 
        repos_group_form = ReposGroupForm(available_groups=
 
        repos_group_form = ReposGroupForm(available_groups =
 
                                          c.repo_groups_choices)()
 
        try:
 
            form_result = repos_group_form.to_python(dict(request.POST))
 
            ReposGroupModel().create(form_result)
 
            Session.commit()
 
            h.flash(_('created repos group %s') \
 
                    % form_result['group_name'], category='success')
 
            #TODO: in futureaction_logger(, '', '', '', self.sa)
 
        except formencode.Invalid, errors:
 

	
 
            return htmlfill.render(
 
                render('admin/repos_groups/repos_groups_add.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8")
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('error occurred during creation of repos group %s') \
 
                    % request.POST.get('group_name'), category='error')
 

	
 
        return redirect(url('repos_groups'))
 

	
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def new(self, format='html'):
 
        """GET /repos_groups/new: Form to create a new item"""
 
        # url('new_repos_group')
 
        self.__load_defaults()
 
        return render('admin/repos_groups/repos_groups_add.html')
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def update(self, id):
 
        """PUT /repos_groups/id: Update an existing item"""
 
        # Forms posted to this method should contain a hidden field:
 
        #    <input type="hidden" name="_method" value="PUT" />
rhodecode/model/forms.py
Show inline comments
 
@@ -32,40 +32,43 @@ from formencode.validators import Unicod
 
from pylons.i18n.translation import _
 
from webhelpers.pylonslib.secure_form import authentication_token
 

	
 
from rhodecode.config.routing import ADMIN_PREFIX
 
from rhodecode.lib.utils import repo_name_slug
 
from rhodecode.lib.auth import authenticate, get_crypt_password
 
from rhodecode.lib.exceptions import LdapImportError
 
from rhodecode.model.db import User, UsersGroup, RepoGroup, Repository
 
from rhodecode import BACKENDS
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
#this is needed to translate the messages using _() in validators
 
class State_obj(object):
 
    _ = staticmethod(_)
 

	
 

	
 
#==============================================================================
 
# VALIDATORS
 
#==============================================================================
 
class ValidAuthToken(formencode.validators.FancyValidator):
 
    messages = {'invalid_token':_('Token mismatch')}
 

	
 
    def validate_python(self, value, state):
 

	
 
        if value != authentication_token():
 
            raise formencode.Invalid(self.message('invalid_token', state,
 
                                            search_number=value), value, state)
 

	
 

	
 
def ValidUsername(edit, old_data):
 
    class _ValidUsername(formencode.validators.FancyValidator):
 

	
 
        def validate_python(self, value, state):
 
            if value in ['default', 'new_user']:
 
                raise formencode.Invalid(_('Invalid username'), value, state)
 
            #check if user is unique
 
            old_un = None
 
            if edit:
 
                old_un = User.get(old_data.get('user_id')).username
 

	
 
            if old_un != value or not edit:
 
@@ -94,72 +97,91 @@ def ValidUsersGroup(edit, old_data):
 
            old_ugname = None
 
            if edit:
 
                old_ugname = UsersGroup.get(
 
                            old_data.get('users_group_id')).users_group_name
 

	
 
            if old_ugname != value or not edit:
 
                if UsersGroup.get_by_group_name(value, cache=False,
 
                                               case_insensitive=True):
 
                    raise formencode.Invalid(_('This users group '
 
                                               'already exists') , value,
 
                                             state)
 

	
 

	
 
            if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
 
                raise formencode.Invalid(_('RepoGroup name may only contain '
 
                                           'alphanumeric characters '
 
                                           'underscores, periods or dashes '
 
                                           'and must begin with alphanumeric '
 
                                           'character'), value, state)
 

	
 
    return _ValidUsersGroup
 

	
 

	
 
def ValidReposGroup(edit, old_data):
 
    class _ValidReposGroup(formencode.validators.FancyValidator):
 

	
 
        def validate_python(self, value, state):
 
            # TODO WRITE VALIDATIONS
 
            group_name = value.get('group_name')
 
            group_parent_id = value.get('group_parent_id')
 

	
 
            # slugify repo group just in case :)
 
            slug = repo_name_slug(group_name)
 

	
 
            # check for parent of self
 
            parent_of_self = lambda:(old_data['group_id'] == int(group_parent_id)
 
                                     if group_parent_id else False)
 
            parent_of_self = lambda: (
 
                old_data['group_id'] == int(group_parent_id)
 
                if group_parent_id else False
 
            )
 
            if edit and parent_of_self():
 
                    e_dict = {'group_parent_id':_('Cannot assign this group '
 
                                                  'as parent')}
 
                    e_dict = {
 
                        'group_parent_id': _('Cannot assign this group as parent')
 
                    }
 
                    raise formencode.Invalid('', value, state,
 
                                             error_dict=e_dict)
 

	
 
            old_gname = None
 
            if edit:
 
                old_gname = RepoGroup.get(old_data.get('group_id')).group_name
 

	
 
            if old_gname != group_name or not edit:
 

	
 
                # check filesystem
 
                gr = RepoGroup.query().filter(RepoGroup.group_name == slug)\
 
                    .filter(RepoGroup.group_parent_id == group_parent_id).scalar()
 
                # check group
 
                gr = RepoGroup.query()\
 
                      .filter(RepoGroup.group_name == slug)\
 
                      .filter(RepoGroup.group_parent_id == group_parent_id)\
 
                      .scalar()
 

	
 
                if gr:
 
                    e_dict = {'group_name':_('This group already exists')}
 
                    e_dict = {
 
                        'group_name': _('This group already exists')
 
                    }
 
                    raise formencode.Invalid('', value, state,
 
                                             error_dict=e_dict)
 

	
 
                # check for same repo
 
                repo = Repository.query()\
 
                      .filter(Repository.repo_name == slug)\
 
                      .scalar()
 

	
 
                if repo:
 
                    e_dict = {
 
                        'group_name': _('Repository with this name already exists')
 
                    }
 
                    raise formencode.Invalid('', value, state,
 
                                             error_dict=e_dict)
 

	
 
    return _ValidReposGroup
 

	
 

	
 
class ValidPassword(formencode.validators.FancyValidator):
 

	
 
    def to_python(self, value, state):
 

	
 
        if value:
 

	
 
            if value.get('password'):
 
                try:
 
                    value['password'] = get_crypt_password(value['password'])
 
                except UnicodeEncodeError:
 
                    e_dict = {'password':_('Invalid characters in password')}
 
                    raise formencode.Invalid('', value, state, error_dict=e_dict)
 
@@ -173,34 +195,36 @@ class ValidPassword(formencode.validator
 
                    raise formencode.Invalid('', value, state, error_dict=e_dict)
 

	
 
            if value.get('new_password'):
 
                try:
 
                    value['new_password'] = \
 
                        get_crypt_password(value['new_password'])
 
                except UnicodeEncodeError:
 
                    e_dict = {'new_password':_('Invalid characters in password')}
 
                    raise formencode.Invalid('', value, state, error_dict=e_dict)
 

	
 
            return value
 

	
 

	
 
class ValidPasswordsMatch(formencode.validators.FancyValidator):
 

	
 
    def validate_python(self, value, state):
 

	
 
        pass_val = value.get('password') or value.get('new_password')
 
        if pass_val != value['password_confirmation']:
 
            e_dict = {'password_confirmation':
 
                   _('Passwords do not match')}
 
            raise formencode.Invalid('', value, state, error_dict=e_dict)
 

	
 

	
 
class ValidAuth(formencode.validators.FancyValidator):
 
    messages = {
 
        'invalid_password':_('invalid password'),
 
        'invalid_login':_('invalid user name'),
 
        'disabled_account':_('Your account is disabled')
 
    }
 

	
 
    # error mapping
 
    e_dict = {'username':messages['invalid_login'],
 
              'password':messages['invalid_password']}
 
    e_dict_disable = {'username':messages['disabled_account']}
 

	
 
@@ -215,35 +239,37 @@ class ValidAuth(formencode.validators.Fa
 
            if user and user.active is False:
 
                log.warning('user %s is disabled', username)
 
                raise formencode.Invalid(self.message('disabled_account',
 
                                         state=State_obj),
 
                                         value, state,
 
                                         error_dict=self.e_dict_disable)
 
            else:
 
                log.warning('user %s not authenticated', username)
 
                raise formencode.Invalid(self.message('invalid_password',
 
                                         state=State_obj), value, state,
 
                                         error_dict=self.e_dict)
 

	
 

	
 
class ValidRepoUser(formencode.validators.FancyValidator):
 

	
 
    def to_python(self, value, state):
 
        try:
 
            User.query().filter(User.active == True)\
 
                .filter(User.username == value).one()
 
        except Exception:
 
            raise formencode.Invalid(_('This username is not valid'),
 
                                     value, state)
 
        return value
 

	
 

	
 
def ValidRepoName(edit, old_data):
 
    class _ValidRepoName(formencode.validators.FancyValidator):
 
        def to_python(self, value, state):
 

	
 
            repo_name = value.get('repo_name')
 

	
 
            slug = repo_name_slug(repo_name)
 
            if slug in [ADMIN_PREFIX, '']:
 
                e_dict = {'repo_name': _('This repository name is disallowed')}
 
                raise formencode.Invalid('', value, state, error_dict=e_dict)
 

	
 

	
 
@@ -280,36 +306,38 @@ def ValidRepoName(edit, old_data):
 
                                                 error_dict=e_dict)
 

	
 
                elif Repository.get_by_repo_name(repo_name_full):
 
                        e_dict = {'repo_name':_('This repository '
 
                                                'already exists')}
 
                        raise formencode.Invalid('', value, state,
 
                                                 error_dict=e_dict)
 

	
 
            return value
 

	
 
    return _ValidRepoName
 

	
 

	
 
def ValidForkName(*args, **kwargs):
 
    return ValidRepoName(*args, **kwargs)
 

	
 

	
 
def SlugifyName():
 
    class _SlugifyName(formencode.validators.FancyValidator):
 

	
 
        def to_python(self, value, state):
 
            return repo_name_slug(value)
 

	
 
    return _SlugifyName
 

	
 

	
 
def ValidCloneUri():
 
    from mercurial.httprepo import httprepository, httpsrepository
 
    from rhodecode.lib.utils import make_ui
 

	
 
    class _ValidCloneUri(formencode.validators.FancyValidator):
 

	
 
        def to_python(self, value, state):
 
            if not value:
 
                pass
 
            elif value.startswith('https'):
 
                try:
 
                    httpsrepository(make_ui('db'), value).capabilities
 
@@ -323,35 +351,37 @@ def ValidCloneUri():
 
                except Exception, e:
 
                    log.error(traceback.format_exc())
 
                    raise formencode.Invalid(_('invalid clone url'), value,
 
                                             state)
 
            else:
 
                raise formencode.Invalid(_('Invalid clone url, provide a '
 
                                           'valid clone http\s url'), value,
 
                                         state)
 
            return value
 

	
 
    return _ValidCloneUri
 

	
 

	
 
def ValidForkType(old_data):
 
    class _ValidForkType(formencode.validators.FancyValidator):
 

	
 
        def to_python(self, value, state):
 
            if old_data['repo_type'] != value:
 
                raise formencode.Invalid(_('Fork have to be the same '
 
                                           'type as original'), value, state)
 

	
 
            return value
 
    return _ValidForkType
 

	
 

	
 
class ValidPerms(formencode.validators.FancyValidator):
 
    messages = {'perm_new_member_name':_('This username or users group name'
 
                                         ' is not valid')}
 

	
 
    def to_python(self, value, state):
 
        perms_update = []
 
        perms_new = []
 
        #build a list of permission to update and new permission to create
 
        for k, v in value.items():
 
            #means new added member to permissions
 
            if k.startswith('perm_new_member'):
 
                new_perm = value.get('perm_new_member', False)
 
@@ -384,92 +414,98 @@ class ValidPerms(formencode.validators.F
 
                if t is 'users_group':
 
                    self.user_db = UsersGroup.query()\
 
                        .filter(UsersGroup.users_group_active == True)\
 
                        .filter(UsersGroup.users_group_name == k).one()
 

	
 
            except Exception:
 
                msg = self.message('perm_new_member_name',
 
                                     state=State_obj)
 
                raise formencode.Invalid(msg, value, state,
 
                                         error_dict={'perm_new_member_name':msg})
 
        return value
 

	
 

	
 
class ValidSettings(formencode.validators.FancyValidator):
 

	
 
    def to_python(self, value, state):
 
        # settings  form can't edit user
 
        if value.has_key('user'):
 
            del['value']['user']
 

	
 
        return value
 

	
 

	
 
class ValidPath(formencode.validators.FancyValidator):
 
    def to_python(self, value, state):
 

	
 
        if not os.path.isdir(value):
 
            msg = _('This is not a valid path')
 
            raise formencode.Invalid(msg, value, state,
 
                                     error_dict={'paths_root_path':msg})
 
        return value
 

	
 

	
 
def UniqSystemEmail(old_data):
 
    class _UniqSystemEmail(formencode.validators.FancyValidator):
 
        def to_python(self, value, state):
 
            value = value.lower()
 
            if old_data.get('email','').lower() != value:
 
                user = User.get_by_email(value, case_insensitive=True)
 
                if user:
 
                    raise formencode.Invalid(
 
                                    _("This e-mail address is already taken"),
 
                                    value, state)
 
            return value
 

	
 
    return _UniqSystemEmail
 

	
 

	
 
class ValidSystemEmail(formencode.validators.FancyValidator):
 
    def to_python(self, value, state):
 
        value = value.lower()
 
        user = User.get_by_email(value, case_insensitive=True)
 
        if  user is None:
 
            raise formencode.Invalid(_("This e-mail address doesn't exist.") ,
 
                                     value, state)
 

	
 
        return value
 

	
 

	
 
class LdapLibValidator(formencode.validators.FancyValidator):
 

	
 
    def to_python(self, value, state):
 

	
 
        try:
 
            import ldap
 
        except ImportError:
 
            raise LdapImportError
 
        return value
 

	
 

	
 
class AttrLoginValidator(formencode.validators.FancyValidator):
 

	
 
    def to_python(self, value, state):
 

	
 
        if not value or not isinstance(value, (str, unicode)):
 
            raise formencode.Invalid(_("The LDAP Login attribute of the CN "
 
                                       "must be specified - this is the name "
 
                                       "of the attribute that is equivalent "
 
                                       "to 'username'"),
 
                                     value, state)
 

	
 
        return value
 

	
 
#===============================================================================
 
#==============================================================================
 
# FORMS
 
#===============================================================================
 
#==============================================================================
 
class LoginForm(formencode.Schema):
 
    allow_extra_fields = True
 
    filter_extra_fields = True
 
    username = UnicodeString(
 
                             strip=True,
 
                             min=1,
 
                             not_empty=True,
 
                             messages={
 
                                'empty':_('Please enter a login'),
 
                                'tooShort':_('Enter a value %(min)i characters long or more')}
 
                            )
 

	
 
@@ -477,24 +513,25 @@ class LoginForm(formencode.Schema):
 
                            strip=True,
 
                            min=3,
 
                            not_empty=True,
 
                            messages={
 
                                'empty':_('Please enter a password'),
 
                                'tooShort':_('Enter %(min)i characters or more')}
 
                                )
 

	
 
    remember = StringBoolean(if_missing=False)
 

	
 
    chained_validators = [ValidAuth]
 

	
 

	
 
def UserForm(edit=False, old_data={}):
 
    class _UserForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        username = All(UnicodeString(strip=True, min=1, not_empty=True),
 
                       ValidUsername(edit, old_data))
 
        if edit:
 
            new_password = All(UnicodeString(strip=True, min=6, not_empty=False))
 
            password_confirmation = All(UnicodeString(strip=True, min=6, not_empty=False))
 
            admin = StringBoolean(if_missing=False)
 
        else:
 
            password = All(UnicodeString(strip=True, min=6, not_empty=True))
 
@@ -518,41 +555,43 @@ def UsersGroupForm(edit=False, old_data=
 
        users_group_name = All(UnicodeString(strip=True, min=1, not_empty=True),
 
                       ValidUsersGroup(edit, old_data))
 

	
 
        users_group_active = StringBoolean(if_missing=False)
 

	
 
        if edit:
 
            users_group_members = OneOf(available_members, hideList=False,
 
                                        testValueList=True,
 
                                        if_missing=None, not_empty=False)
 

	
 
    return _UsersGroupForm
 

	
 

	
 
def ReposGroupForm(edit=False, old_data={}, available_groups=[]):
 
    class _ReposGroupForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 

	
 
        group_name = All(UnicodeString(strip=True, min=1, not_empty=True),
 
                               SlugifyName())
 
        group_description = UnicodeString(strip=True, min=1,
 
                                                not_empty=True)
 
        group_parent_id = OneOf(available_groups, hideList=False,
 
                                        testValueList=True,
 
                                        if_missing=None, not_empty=False)
 

	
 
        chained_validators = [ValidReposGroup(edit, old_data)]
 

	
 
    return _ReposGroupForm
 

	
 

	
 
def RegisterForm(edit=False, old_data={}):
 
    class _RegisterForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        username = All(ValidUsername(edit, old_data),
 
                       UnicodeString(strip=True, min=1, not_empty=True))
 
        password = All(UnicodeString(strip=True, min=6, not_empty=True))
 
        password_confirmation = All(UnicodeString(strip=True, min=6, not_empty=True))
 
        active = StringBoolean(if_missing=False)
 
        name = UnicodeString(strip=True, min=1, not_empty=True)
 
        lastname = UnicodeString(strip=True, min=1, not_empty=True)
 
        email = All(Email(not_empty=True), UniqSystemEmail(old_data))
0 comments (0 inline, 0 general)