Changeset - 0cef54d34605
[Not reviewed]
default
0 4 0
Marcin Kuzminski - 13 years ago 2013-03-11 17:59:38
marcin@python-works.com
Grafted from: af96fb19b53a
Pass in old groups data to CanWriteToGroup validator for later skipping group checks.
This will be a part of refactoring done to do user permissions changes without messing with main
repo form data
4 files changed with 61 insertions and 11 deletions:
0 comments (0 inline, 0 general)
rhodecode/controllers/admin/repos.py
Show inline comments
 
@@ -186,98 +186,100 @@ class ReposController(BaseController):
 
            return htmlfill.render(
 
                r,
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8")
 

	
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            msg = _('error occurred during creation of repository %s') \
 
                    % form_result.get('repo_name')
 
            h.flash(msg, category='error')
 
            return redirect(url('repos'))
 
        #redirect to our new repo !
 
        return redirect(url('summary_home', repo_name=new_repo.repo_name))
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def new(self, format='html'):
 
        """GET /repos/new: Form to create a new item"""
 
        new_repo = request.GET.get('repo', '')
 
        c.new_repo = repo_name_slug(new_repo)
 
        self.__load_defaults()
 
        ## apply the defaults from defaults page
 
        defaults = RhodeCodeSetting.get_default_repo_settings(strip_prefix=True)
 
        return htmlfill.render(
 
            render('admin/repos/repo_add.html'),
 
            defaults=defaults,
 
            errors={},
 
            prefix_error=False,
 
            encoding="UTF-8"
 
        )
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def update(self, repo_name):
 
        """
 
        PUT /repos/repo_name: Update an existing item"""
 
        # Forms posted to this method should contain a hidden field:
 
        #    <input type="hidden" name="_method" value="PUT" />
 
        # Or using helpers:
 
        #    h.form(url('repo', repo_name=ID),
 
        #           method='put')
 
        # url('repo', repo_name=ID)
 
        self.__load_defaults()
 
        repo_model = RepoModel()
 
        changed_name = repo_name
 
        #override the choices with extracted revisions !
 
        choices, c.landing_revs = ScmModel().get_repo_landing_revs(repo_name)
 
        c.landing_revs_choices = choices
 

	
 
        _form = RepoForm(edit=True, old_data={'repo_name': repo_name},
 
        repo = Repository.get_by_repo_name(repo_name)
 
        _form = RepoForm(edit=True, old_data={'repo_name': repo_name,
 
                                              'repo_group': repo.group.get_dict() \
 
                                              if repo.group else {}},
 
                         repo_groups=c.repo_groups_choices,
 
                         landing_revs=c.landing_revs_choices)()
 
        try:
 
            form_result = _form.to_python(dict(request.POST))
 
            repo = repo_model.update(repo_name, **form_result)
 
            invalidate_cache('get_repo_cached_%s' % repo_name)
 
            h.flash(_('Repository %s updated successfully') % repo_name,
 
                    category='success')
 
            changed_name = repo.repo_name
 
            action_logger(self.rhodecode_user, 'admin_updated_repo',
 
                              changed_name, self.ip_addr, self.sa)
 
            Session().commit()
 
        except formencode.Invalid, errors:
 
            defaults = self.__load_data(repo_name)
 
            defaults.update(errors.value)
 
            return htmlfill.render(
 
                render('admin/repos/repo_edit.html'),
 
                defaults=defaults,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8")
 

	
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('error occurred during update of repository %s') \
 
                    % repo_name, category='error')
 
        return redirect(url('edit_repo', repo_name=changed_name))
 

	
 
    @HasPermissionAllDecorator('hg.admin')
 
    def delete(self, repo_name):
 
        """
 
        DELETE /repos/repo_name: Delete an existing item"""
 
        # Forms posted to this method should contain a hidden field:
 
        #    <input type="hidden" name="_method" value="DELETE" />
 
        # Or using helpers:
 
        #    h.form(url('repo', repo_name=ID),
 
        #           method='delete')
 
        # url('repo', repo_name=ID)
 

	
 
        repo_model = RepoModel()
 
        repo = repo_model.get_by_repo_name(repo_name)
 
        if not repo:
 
            h.not_mapped_error(repo_name)
 
            return redirect(url('repos'))
 
        try:
 
            action_logger(self.rhodecode_user, 'admin_deleted_repo',
 
                              repo_name, self.ip_addr, self.sa)
 
            repo_model.delete(repo)
rhodecode/controllers/settings.py
Show inline comments
 
@@ -62,99 +62,101 @@ class SettingsController(BaseRepoControl
 
        repo_model = RepoModel()
 
        c.users_array = repo_model.get_users_js()
 
        c.users_groups_array = repo_model.get_users_groups_js()
 
        choices, c.landing_revs = ScmModel().get_repo_landing_revs()
 
        c.landing_revs_choices = choices
 

	
 
    def __load_data(self, repo_name=None):
 
        """
 
        Load defaults settings for edit, and update
 

	
 
        :param repo_name:
 
        """
 
        self.__load_defaults()
 

	
 
        c.repo_info = db_repo = Repository.get_by_repo_name(repo_name)
 
        repo = db_repo.scm_instance
 

	
 
        if c.repo_info is None:
 
            h.not_mapped_error(repo_name)
 
            return redirect(url('home'))
 

	
 
        ##override defaults for exact repo info here git/hg etc
 
        choices, c.landing_revs = ScmModel().get_repo_landing_revs(c.repo_info)
 
        c.landing_revs_choices = choices
 

	
 
        defaults = RepoModel()._get_defaults(repo_name)
 

	
 
        return defaults
 

	
 
    @HasRepoPermissionAllDecorator('repository.admin')
 
    def index(self, repo_name):
 
        defaults = self.__load_data(repo_name)
 

	
 
        return htmlfill.render(
 
            render('settings/repo_settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )
 

	
 
    @HasRepoPermissionAllDecorator('repository.admin')
 
    def update(self, repo_name):
 
        self.__load_defaults()
 
        repo_model = RepoModel()
 
        changed_name = repo_name
 
        #override the choices with extracted revisions !
 
        choices, c.landing_revs = ScmModel().get_repo_landing_revs(repo_name)
 
        c.landing_revs_choices = choices
 

	
 
        repo = Repository.get_by_repo_name(repo_name)
 
        _form = RepoSettingsForm(edit=True,
 
                                 old_data={'repo_name': repo_name},
 
                                old_data={'repo_name': repo_name,
 
                                          'repo_group': repo.group.get_dict() \
 
                                              if repo.group else {}},
 
                                 repo_groups=c.repo_groups_choices,
 
                                 landing_revs=c.landing_revs_choices)()
 
        try:
 
            form_result = _form.to_python(dict(request.POST))
 
            repo_model.update(repo_name, **form_result)
 
            invalidate_cache('get_repo_cached_%s' % repo_name)
 
            h.flash(_('Repository %s updated successfully') % repo_name,
 
                    category='success')
 
            changed_name = form_result['repo_name_full']
 
            action_logger(self.rhodecode_user, 'user_updated_repo',
 
                          changed_name, self.ip_addr, self.sa)
 
            Session().commit()
 
        except formencode.Invalid, errors:
 
            defaults = self.__load_data(repo_name)
 
            defaults.update(errors.value)
 
            return htmlfill.render(
 
                render('settings/repo_settings.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8")
 

	
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('error occurred during update of repository %s') \
 
                    % repo_name, category='error')
 

	
 
        return redirect(url('repo_settings_home', repo_name=changed_name))
 

	
 
    @HasRepoPermissionAllDecorator('repository.admin')
 
    def delete(self, repo_name):
 
        """DELETE /repos/repo_name: Delete an existing item"""
 
        # Forms posted to this method should contain a hidden field:
 
        #    <input type="hidden" name="_method" value="DELETE" />
 
        # Or using helpers:
 
        #    h.form(url('repo_settings_delete', repo_name=ID),
 
        #           method='delete')
 
        # url('repo_settings_delete', repo_name=ID)
 

	
 
        repo_model = RepoModel()
 
        repo = repo_model.get_by_repo_name(repo_name)
 
        if not repo:
 
            h.not_mapped_error(repo_name)
 
            return redirect(url('home'))
 
        try:
 
            action_logger(self.rhodecode_user, 'user_deleted_repo',
 
                              repo_name, self.ip_addr, self.sa)
 
            repo_model.delete(repo)
rhodecode/model/forms.py
Show inline comments
 
@@ -131,126 +131,126 @@ def ReposGroupForm(edit=False, old_data=
 
        recursive = v.StringBoolean(if_missing=False)
 
        chained_validators = [v.ValidReposGroup(edit, old_data),
 
                              v.ValidPerms('group')]
 

	
 
    return _ReposGroupForm
 

	
 

	
 
def RegisterForm(edit=False, old_data={}):
 
    class _RegisterForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        username = All(
 
            v.ValidUsername(edit, old_data),
 
            v.UnicodeString(strip=True, min=1, not_empty=True)
 
        )
 
        password = All(
 
            v.ValidPassword(),
 
            v.UnicodeString(strip=False, min=6, not_empty=True)
 
        )
 
        password_confirmation = All(
 
            v.ValidPassword(),
 
            v.UnicodeString(strip=False, min=6, not_empty=True)
 
        )
 
        active = v.StringBoolean(if_missing=False)
 
        firstname = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        lastname = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        email = All(v.Email(not_empty=True), v.UniqSystemEmail(old_data))
 

	
 
        chained_validators = [v.ValidPasswordsMatch()]
 

	
 
    return _RegisterForm
 

	
 

	
 
def PasswordResetForm():
 
    class _PasswordResetForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        email = All(v.ValidSystemEmail(), v.Email(not_empty=True))
 
    return _PasswordResetForm
 

	
 

	
 
def RepoForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
 
             repo_groups=[], landing_revs=[]):
 
    class _RepoForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                        v.SlugifyName())
 
        repo_group = All(v.CanWriteGroup(),
 
        repo_group = All(v.CanWriteGroup(old_data),
 
                         v.OneOf(repo_groups, hideList=True))
 
        repo_type = v.OneOf(supported_backends)
 
        repo_description = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        repo_private = v.StringBoolean(if_missing=False)
 
        repo_landing_rev = v.OneOf(landing_revs, hideList=True)
 
        clone_uri = All(v.UnicodeString(strip=True, min=1, not_empty=False))
 

	
 
        repo_enable_statistics = v.StringBoolean(if_missing=False)
 
        repo_enable_downloads = v.StringBoolean(if_missing=False)
 
        repo_enable_locking = v.StringBoolean(if_missing=False)
 

	
 
        if edit:
 
            #this is repo owner
 
            user = All(v.UnicodeString(not_empty=True), v.ValidRepoUser())
 

	
 
        chained_validators = [v.ValidCloneUri(),
 
                              v.ValidRepoName(edit, old_data),
 
                              v.ValidPerms()]
 
    return _RepoForm
 

	
 

	
 
def RepoSettingsForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
 
                     repo_groups=[], landing_revs=[]):
 
    class _RepoForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                        v.SlugifyName())
 
        repo_group = All(v.CanWriteGroup(),
 
        repo_group = All(v.CanWriteGroup(old_data),
 
                         v.OneOf(repo_groups, hideList=True))
 
        repo_description = v.UnicodeString(strip=True, min=1, not_empty=False)
 
        repo_private = v.StringBoolean(if_missing=False)
 
        repo_landing_rev = v.OneOf(landing_revs, hideList=True)
 
        clone_uri = All(v.UnicodeString(strip=True, min=1, not_empty=False))
 

	
 
        chained_validators = [v.ValidCloneUri(),
 
                              v.ValidRepoName(edit, old_data),
 
                              v.ValidPerms(),
 
                              v.ValidSettings()]
 
    return _RepoForm
 

	
 

	
 
def RepoForkForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
 
                 repo_groups=[], landing_revs=[]):
 
    class _RepoForkForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                        v.SlugifyName())
 
        repo_group = All(v.CanWriteGroup(),
 
                         v.OneOf(repo_groups, hideList=True))
 
        repo_type = All(v.ValidForkType(old_data), v.OneOf(supported_backends))
 
        description = v.UnicodeString(strip=True, min=1, not_empty=True)
 
        private = v.StringBoolean(if_missing=False)
 
        copy_permissions = v.StringBoolean(if_missing=False)
 
        update_after_clone = v.StringBoolean(if_missing=False)
 
        fork_parent_id = v.UnicodeString()
 
        chained_validators = [v.ValidForkName(edit, old_data)]
 
        landing_rev = v.OneOf(landing_revs, hideList=True)
 

	
 
    return _RepoForkForm
 

	
 

	
 
def ApplicationSettingsForm():
 
    class _ApplicationSettingsForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        rhodecode_title = v.UnicodeString(strip=True, min=1, not_empty=True)
 
        rhodecode_realm = v.UnicodeString(strip=True, min=1, not_empty=True)
 
        rhodecode_ga_code = v.UnicodeString(strip=True, min=1, not_empty=False)
 

	
 
    return _ApplicationSettingsForm
 

	
 

	
 
def ApplicationVisualisationForm():
 
    class _ApplicationVisualisationForm(formencode.Schema):
 
        allow_extra_fields = True
rhodecode/model/validators.py
Show inline comments
 
"""
 
Set of generic validators
 
"""
 
import os
 
import re
 
import formencode
 
import logging
 
from collections import defaultdict
 
from pylons.i18n.translation import _
 
from webhelpers.pylonslib.secure_form import authentication_token
 

	
 
from formencode.validators import (
 
    UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set,
 
    NotEmpty, IPAddress, CIDR
 
)
 
from rhodecode.lib.compat import OrderedSet
 
from rhodecode.lib import ipaddr
 
from rhodecode.lib.utils import repo_name_slug
 
from rhodecode.lib.utils2 import safe_int
 
from rhodecode.model.db import RepoGroup, Repository, UsersGroup, User,\
 
    ChangesetStatus
 
from rhodecode.lib.exceptions import LdapImportError
 
from rhodecode.config.routing import ADMIN_PREFIX
 
from rhodecode.lib.auth import HasReposGroupPermissionAny
 
from rhodecode.lib.auth import HasReposGroupPermissionAny, HasPermissionAny
 

	
 
# silence warnings and pylint
 
UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
 
    NotEmpty, IPAddress, CIDR
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UniqueList(formencode.FancyValidator):
 
    """
 
    Unique List !
 
    """
 
    messages = dict(
 
        empty=_('Value cannot be an empty list'),
 
        missing_value=_('Value cannot be an empty list'),
 
    )
 

	
 
    def _to_python(self, value, state):
 
        if isinstance(value, list):
 
            return value
 
        elif isinstance(value, set):
 
            return list(value)
 
        elif isinstance(value, tuple):
 
            return list(value)
 
        elif value is None:
 
            return []
 
        else:
 
            return [value]
 

	
 
    def empty_value(self, value):
 
        return []
 

	
 

	
 
class StateObj(object):
 
    """
 
    this is needed to translate the messages using _() in validators
 
    """
 
    _ = staticmethod(_)
 

	
 

	
 
def M(self, key, state=None, **kwargs):
 
    """
 
    returns string from self.message based on given key,
 
    passed kw params are used to substitute %(named)s params inside
 
    translated strings
 

	
 
    :param msg:
 
    :param state:
 
@@ -427,112 +428,157 @@ def ValidCloneUri():
 
                GitRepository._check_url(url)
 
            elif url.startswith('svn+http'):
 
                raise NotImplementedError()
 
            elif url.startswith('hg+http'):
 
                raise NotImplementedError()
 
            else:
 
                raise Exception('clone from URI %s not allowed' % (url))
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'clone_uri': _(u'invalid clone url'),
 
            'invalid_clone_uri': _(u'Invalid clone url, provide a '
 
                                    'valid clone http(s)/svn+http(s) url')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            repo_type = value.get('repo_type')
 
            url = value.get('clone_uri')
 

	
 
            if not url:
 
                pass
 
            else:
 
                try:
 
                    url_handler(repo_type, url, make_ui('db', clear_session=False))
 
                except Exception:
 
                    log.exception('Url validation failed')
 
                    msg = M(self, 'clone_uri')
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(clone_uri=msg)
 
                    )
 
    return _validator
 

	
 

	
 
def ValidForkType(old_data={}):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'invalid_fork_type': _(u'Fork have to be the same type as parent')
 
        }
 

	
 
        def validate_python(self, value, state):
 
            if old_data['repo_type'] != value:
 
                msg = M(self, 'invalid_fork_type', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_type=msg)
 
                )
 
    return _validator
 

	
 

	
 
def CanWriteGroup():
 
def CanWriteGroup(old_data=None):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _(u"You don't have permissions "
 
                                   "to create repository in this group")
 
        }
 

	
 
        def validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            if not HasReposGroupPermissionAny(
 
                'group.write', 'group.admin'
 
            )(gr.group_name, 'get group of repo form'):
 
            gr_name = gr.group_name if gr else None  # None means ROOT location
 
            val = HasReposGroupPermissionAny('group.write', 'group.admin')
 
            can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository')
 
            forbidden = not val(gr_name, 'can write into group validator')
 
            value_changed = old_data['repo_group'].get('group_id') != safe_int(value)
 
            if value_changed:  # do check if we changed the value
 
                #parent group need to be existing
 
                if gr and forbidden:
 
                msg = M(self, 'permission_denied', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(repo_type=msg)
 
                )
 
                ## check if we can write to root location !
 
                elif gr is None and can_create_repos() is False:
 
                    msg = M(self, 'permission_denied_root', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_type=msg)
 
                    )
 

	
 
    return _validator
 

	
 

	
 
def CanCreateGroup(can_create_in_root=False):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _(u"You don't have permissions "
 
                                   "to create a group in this location")
 
        }
 

	
 
        def to_python(self, value, state):
 
            #root location
 
            if value in [-1, "-1"]:
 
                return None
 
            return value
 

	
 
        def validate_python(self, value, state):
 
            gr = RepoGroup.get(value)
 
            gr_name = gr.group_name if gr else None  # None means ROOT location
 

	
 
            if can_create_in_root and gr is None:
 
                #we can create in root, we're fine no validations required
 
                return
 

	
 
            forbidden_in_root = gr is None and can_create_in_root is False
 
            val = HasReposGroupPermissionAny('group.admin')
 
            forbidden = not val(gr_name, 'can create group validator')
 
            if forbidden_in_root or forbidden:
 
                msg = M(self, 'permission_denied', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(group_parent_id=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidPerms(type_='repo'):
 
    if type_ == 'group':
 
        EMPTY_PERM = 'group.none'
 
    elif type_ == 'repo':
 
        EMPTY_PERM = 'repository.none'
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'perm_new_member_name':
 
                _(u'This username or users group name is not valid')
 
        }
 

	
 
        def to_python(self, value, state):
 
            perms_update = OrderedSet()
 
            perms_new = OrderedSet()
 
            # build a list of permission to update and new permission to create
 

	
 
            #CLEAN OUT ORG VALUE FROM NEW MEMBERS, and group them using
 
            new_perms_group = defaultdict(dict)
 
            for k, v in value.copy().iteritems():
 
                if k.startswith('perm_new_member'):
 
                    del value[k]
 
                    _type, part = k.split('perm_new_member_')
 
                    args = part.split('_')
 
                    if len(args) == 1:
 
                        new_perms_group[args[0]]['perm'] = v
 
                    elif len(args) == 2:
 
                        _key, pos = args
 
                        new_perms_group[pos][_key] = v
 

	
 
            # fill new permissions in order of how they were added
 
            for k in sorted(map(int, new_perms_group.keys())):
 
                perm_dict = new_perms_group[str(k)]
 
                new_member = perm_dict.get('name')
 
                new_perm = perm_dict.get('perm')
 
                new_type = perm_dict.get('type')
 
                if new_member and new_perm and new_type:
 
                    perms_new.add((new_member, new_perm, new_type))
 

	
 
            for k, v in value.iteritems():
 
                if k.startswith('u_perm_') or k.startswith('g_perm_'):
 
                    member = k[7:]
 
                    t = {'u': 'user',
 
                         'g': 'users_group'
 
                    }[k[0]]
0 comments (0 inline, 0 general)