Changeset - c0cc8f8a71b0
[Not reviewed]
beta
0 10 3
Marcin Kuzminski - 13 years ago 2012-09-07 02:20:02
marcin@python-works.com
Permissions on group can be set in recursive mode setting defined permission to all children
- more explicit permissions
- fixes for empty values in permission form
13 files changed with 613 insertions and 61 deletions:
0 comments (0 inline, 0 general)
rhodecode/controllers/admin/repos_groups.py
Show inline comments
 
@@ -24,48 +24,49 @@
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 
import traceback
 
import formencode
 

	
 
from formencode import htmlfill
 

	
 
from pylons import request, tmpl_context as c, url
 
from pylons.controllers.util import redirect
 
from pylons.i18n.translation import _
 

	
 
from sqlalchemy.exc import IntegrityError
 

	
 
from rhodecode.lib import helpers as h
 
from rhodecode.lib.auth import LoginRequired, HasPermissionAnyDecorator,\
 
    HasReposGroupPermissionAnyDecorator
 
from rhodecode.lib.base import BaseController, render
 
from rhodecode.model.db import RepoGroup
 
from rhodecode.model.repos_group import ReposGroupModel
 
from rhodecode.model.forms import ReposGroupForm
 
from rhodecode.model.meta import Session
 
from rhodecode.model.repo import RepoModel
 
from webob.exc import HTTPInternalServerError, HTTPNotFound
 
from rhodecode.lib.utils2 import str2bool
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ReposGroupsController(BaseController):
 
    """REST Controller styled on the Atom Publishing Protocol"""
 
    # To properly map this controller, ensure your config/routing.py
 
    # file has a resource setup:
 
    #     map.resource('repos_group', 'repos_groups')
 

	
 
    @LoginRequired()
 
    def __before__(self):
 
        super(ReposGroupsController, self).__before__()
 

	
 
    def __load_defaults(self):
 
        c.repo_groups = RepoGroup.groups_choices()
 
        c.repo_groups_choices = map(lambda k: unicode(k[0]), c.repo_groups)
 

	
 
        repo_model = RepoModel()
 
        c.users_array = repo_model.get_users_js()
 
        c.users_groups_array = repo_model.get_users_groups_js()
 

	
 
    def __load_data(self, group_id):
 
        """
 
@@ -206,72 +207,74 @@ class ReposGroupsController(BaseControll
 
            if str(e.message).find('groups_group_parent_id_fkey') != -1:
 
                log.error(traceback.format_exc())
 
                h.flash(_('Cannot delete this group it still contains '
 
                          'subgroups'),
 
                        category='warning')
 
            else:
 
                log.error(traceback.format_exc())
 
                h.flash(_('error occurred during deletion of repos '
 
                          'group %s') % gr.group_name, category='error')
 

	
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('error occurred during deletion of repos '
 
                      'group %s') % gr.group_name, category='error')
 

	
 
        return redirect(url('repos_groups'))
 

	
 
    @HasReposGroupPermissionAnyDecorator('group.admin')
 
    def delete_repos_group_user_perm(self, group_name):
 
        """
 
        DELETE an existing repositories group permission user
 

	
 
        :param group_name:
 
        """
 

	
 
        try:
 
            ReposGroupModel().revoke_user_permission(
 
                repos_group=group_name, user=request.POST['user_id']
 
            recursive = str2bool(request.POST.get('recursive', False))
 
            ReposGroupModel().delete_permission(
 
                repos_group=group_name, obj=request.POST['user_id'],
 
                obj_type='user', recursive=recursive
 
            )
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during deletion of group user'),
 
                    category='error')
 
            raise HTTPInternalServerError()
 

	
 
    @HasReposGroupPermissionAnyDecorator('group.admin')
 
    def delete_repos_group_users_group_perm(self, group_name):
 
        """
 
        DELETE an existing repositories group permission users group
 

	
 
        :param group_name:
 
        """
 

	
 
        try:
 
            ReposGroupModel().revoke_users_group_permission(
 
                repos_group=group_name,
 
                group_name=request.POST['users_group_id']
 
            recursive = str2bool(request.POST.get('recursive', False))
 
            ReposGroupModel().delete_permission(
 
                repos_group=group_name, obj=request.POST['users_group_id'],
 
                obj_type='users_group', recursive=recursive
 
            )
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during deletion of group'
 
                      ' users groups'),
 
                    category='error')
 
            raise HTTPInternalServerError()
 

	
 
    def show_by_name(self, group_name):
 
        """
 
        This is a proxy that does a lookup group_name -> id, and shows
 
        the group by id view instead
 
        """
 
        group_name = group_name.rstrip('/')
 
        id_ = RepoGroup.get_by_group_name(group_name)
 
        if id_:
 
            return self.show(id_.group_id)
 
        raise HTTPNotFound
 

	
 
    @HasReposGroupPermissionAnyDecorator('group.read', 'group.write',
 
                                         'group.admin')
 
    def show(self, id, format='html'):
 
        """GET /repos_groups/id: Show a specific item"""
rhodecode/model/forms.py
Show inline comments
 
@@ -107,48 +107,49 @@ def UsersGroupForm(edit=False, old_data=
 
        users_group_active = v.StringBoolean(if_missing=False)
 

	
 
        if edit:
 
            users_group_members = v.OneOf(
 
                available_members, hideList=False, testValueList=True,
 
                if_missing=None, not_empty=False
 
            )
 

	
 
    return _UsersGroupForm
 

	
 

	
 
def ReposGroupForm(edit=False, old_data={}, available_groups=[]):
 
    class _ReposGroupForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 

	
 
        group_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
 
                               v.SlugifyName())
 
        group_description = v.UnicodeString(strip=True, min=1,
 
                                                not_empty=True)
 
        group_parent_id = v.OneOf(available_groups, hideList=False,
 
                                        testValueList=True,
 
                                        if_missing=None, not_empty=False)
 
        enable_locking = v.StringBoolean(if_missing=False)
 
        recursive = v.StringBoolean(if_missing=False)
 
        chained_validators = [v.ValidReposGroup(edit, old_data),
 
                              v.ValidPerms('group')]
 

	
 
    return _ReposGroupForm
 

	
 

	
 
def RegisterForm(edit=False, old_data={}):
 
    class _RegisterForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        username = All(
 
            v.ValidUsername(edit, old_data),
 
            v.UnicodeString(strip=True, min=1, not_empty=True)
 
        )
 
        password = All(
 
            v.ValidPassword(),
 
            v.UnicodeString(strip=False, min=6, not_empty=True)
 
        )
 
        password_confirmation = All(
 
            v.ValidPassword(),
 
            v.UnicodeString(strip=False, min=6, not_empty=True)
 
        )
 
        active = v.StringBoolean(if_missing=False)
 
        firstname = v.UnicodeString(strip=True, min=1, not_empty=False)
rhodecode/model/repo.py
Show inline comments
 
@@ -347,111 +347,117 @@ class RepoModel(BaseModel):
 
        """
 
        Grant permission for user on given repository, or update existing one
 
        if found
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param user: Instance of User, user_id or username
 
        :param perm: Instance of Permission, or permission_name
 
        """
 
        user = self._get_user(user)
 
        repo = self._get_repo(repo)
 
        permission = self._get_perm(perm)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UserRepoToPerm)\
 
            .filter(UserRepoToPerm.user == user)\
 
            .filter(UserRepoToPerm.repository == repo)\
 
            .scalar()
 
        if obj is None:
 
            # create new !
 
            obj = UserRepoToPerm()
 
        obj.repository = repo
 
        obj.user = user
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s' % (perm, user, repo))
 

	
 
    def revoke_user_permission(self, repo, user):
 
        """
 
        Revoke permission for user on given repository
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param user: Instance of User, user_id or username
 
        """
 

	
 
        user = self._get_user(user)
 
        repo = self._get_repo(repo)
 

	
 
        obj = self.sa.query(UserRepoToPerm)\
 
            .filter(UserRepoToPerm.repository == repo)\
 
            .filter(UserRepoToPerm.user == user)\
 
            .one()
 
            .scalar()
 
        if obj:
 
        self.sa.delete(obj)
 
            log.debug('Revoked perm on %s on %s' % (repo, user))
 

	
 
    def grant_users_group_permission(self, repo, group_name, perm):
 
        """
 
        Grant permission for users group on given repository, or update
 
        existing one if found
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or users group name
 
        :param perm: Instance of Permission, or permission_name
 
        """
 
        repo = self._get_repo(repo)
 
        group_name = self.__get_users_group(group_name)
 
        permission = self._get_perm(perm)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UsersGroupRepoToPerm)\
 
            .filter(UsersGroupRepoToPerm.users_group == group_name)\
 
            .filter(UsersGroupRepoToPerm.repository == repo)\
 
            .scalar()
 

	
 
        if obj is None:
 
            # create new
 
            obj = UsersGroupRepoToPerm()
 

	
 
        obj.repository = repo
 
        obj.users_group = group_name
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s' % (perm, group_name, repo))
 

	
 
    def revoke_users_group_permission(self, repo, group_name):
 
        """
 
        Revoke permission for users group on given repository
 

	
 
        :param repo: Instance of Repository, repository_id, or repository name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or users group name
 
        """
 
        repo = self._get_repo(repo)
 
        group_name = self.__get_users_group(group_name)
 

	
 
        obj = self.sa.query(UsersGroupRepoToPerm)\
 
            .filter(UsersGroupRepoToPerm.repository == repo)\
 
            .filter(UsersGroupRepoToPerm.users_group == group_name)\
 
            .one()
 
            .scalar()
 
        if obj:
 
        self.sa.delete(obj)
 
            log.debug('Revoked perm to %s on %s' % (repo, group_name))
 

	
 
    def delete_stats(self, repo_name):
 
        """
 
        removes stats for given repo
 

	
 
        :param repo_name:
 
        """
 
        try:
 
            obj = self.sa.query(Statistics)\
 
                    .filter(Statistics.repository ==
 
                            self.get_by_repo_name(repo_name))\
 
                    .one()
 
            self.sa.delete(obj)
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def __create_repo(self, repo_name, alias, parent, clone_uri=False):
 
        """
 
        makes repository on filesystem. It's group aware means it'll create
 
        a repository within a group, and alter the paths accordingly of
 
        group location
 

	
 
        :param repo_name:
rhodecode/model/repos_group.py
Show inline comments
 
@@ -11,49 +11,49 @@
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import os
 
import logging
 
import traceback
 
import shutil
 

	
 
from rhodecode.lib.utils2 import LazyProperty
 

	
 
from rhodecode.model import BaseModel
 
from rhodecode.model.db import RepoGroup, RhodeCodeUi, UserRepoGroupToPerm, \
 
    User, Permission, UsersGroupRepoGroupToPerm, UsersGroup
 
    User, Permission, UsersGroupRepoGroupToPerm, UsersGroup, Repository
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ReposGroupModel(BaseModel):
 

	
 
    cls = RepoGroup
 

	
 
    def __get_users_group(self, users_group):
 
        return self._get_instance(UsersGroup, users_group,
 
                                  callback=UsersGroup.get_by_group_name)
 

	
 
    def _get_repos_group(self, repos_group):
 
        return self._get_instance(RepoGroup, repos_group,
 
                                  callback=RepoGroup.get_by_group_name)
 

	
 
    @LazyProperty
 
    def repos_path(self):
 
        """
 
        Get's the repositories root path from database
 
        """
 

	
 
        q = RhodeCodeUi.get_by_key('/')
 
        return q.ui_value
 
@@ -94,221 +94,316 @@ class ReposGroupModel(BaseModel):
 

	
 
    def __rename_group(self, old, new):
 
        """
 
        Renames a group on filesystem
 

	
 
        :param group_name:
 
        """
 

	
 
        if old == new:
 
            log.debug('skipping group rename')
 
            return
 

	
 
        log.debug('renaming repos group from %s to %s' % (old, new))
 

	
 
        old_path = os.path.join(self.repos_path, old)
 
        new_path = os.path.join(self.repos_path, new)
 

	
 
        log.debug('renaming repos paths from %s to %s' % (old_path, new_path))
 

	
 
        if os.path.isdir(new_path):
 
            raise Exception('Was trying to rename to already '
 
                            'existing dir %s' % new_path)
 
        shutil.move(old_path, new_path)
 

	
 
    def __delete_group(self, group):
 
    def __delete_group(self, group, force_delete=False):
 
        """
 
        Deletes a group from a filesystem
 

	
 
        :param group: instance of group from database
 
        :param force_delete: use shutil rmtree to remove all objects
 
        """
 
        paths = group.full_path.split(RepoGroup.url_sep())
 
        paths = os.sep.join(paths)
 

	
 
        rm_path = os.path.join(self.repos_path, paths)
 
        if os.path.isdir(rm_path):
 
            # delete only if that path really exists
 
            os.rmdir(rm_path)
 
            if force_delete:
 
                shutil.rmtree(rm_path)
 
            else:
 
                os.rmdir(rm_path)  # this raises an exception when there are still objects inside
 

	
 
    def create(self, group_name, group_description, parent=None, just_db=False):
 
        try:
 
            new_repos_group = RepoGroup()
 
            new_repos_group.group_description = group_description
 
            new_repos_group.parent_group = self._get_repos_group(parent)
 
            new_repos_group.group_name = new_repos_group.get_new_name(group_name)
 

	
 
            self.sa.add(new_repos_group)
 
            self._create_default_perms(new_repos_group)
 

	
 
            if not just_db:
 
                # we need to flush here, in order to check if database won't
 
                # throw any exceptions, create filesystem dirs at the very end
 
                self.sa.flush()
 
                self.__create_group(new_repos_group.group_name)
 

	
 
            return new_repos_group
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def _update_permissions(self, repos_group, perms_new=None,
 
                            perms_updates=None, recursive=False):
 
        from rhodecode.model.repo import RepoModel
 
        if not perms_new:
 
            perms_new = []
 
        if not perms_updates:
 
            perms_updates = []
 

	
 
        def _set_perm_user(obj, user, perm):
 
            if isinstance(obj, RepoGroup):
 
                ReposGroupModel().grant_user_permission(
 
                    repos_group=obj, user=user, perm=perm
 
                )
 
            elif isinstance(obj, Repository):
 
                # we set group permission but we have to switch to repo
 
                # permission
 
                perm = perm.replace('group.', 'repository.')
 
                RepoModel().grant_user_permission(
 
                    repo=obj, user=user, perm=perm
 
                )
 

	
 
        def _set_perm_group(obj, users_group, perm):
 
            if isinstance(obj, RepoGroup):
 
                ReposGroupModel().grant_users_group_permission(
 
                    repos_group=obj, group_name=users_group, perm=perm
 
                )
 
            elif isinstance(obj, Repository):
 
                # we set group permission but we have to switch to repo
 
                # permission
 
                perm = perm.replace('group.', 'repository.')
 
                RepoModel().grant_users_group_permission(
 
                    repo=obj, group_name=users_group, perm=perm
 
                )
 
        updates = []
 
        log.debug('Now updating permissions for %s in recursive mode:%s'
 
                  % (repos_group, recursive))
 

	
 
        for obj in repos_group.recursive_groups_and_repos():
 
            if not recursive:
 
                obj = repos_group
 

	
 
            # update permissions
 
            for member, perm, member_type in perms_updates:
 
                ## set for user
 
                if member_type == 'user':
 
                    # this updates also current one if found
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                ## set for users group
 
                else:
 
                    _set_perm_group(obj, users_group=member, perm=perm)
 
            # set new permissions
 
            for member, perm, member_type in perms_new:
 
                if member_type == 'user':
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                else:
 
                    _set_perm_group(obj, users_group=member, perm=perm)
 
            updates.append(obj)
 
            #if it's not recursive call
 
            # break the loop and don't proceed with other changes
 
            if not recursive:
 
                break
 
        return updates
 

	
 
    def update(self, repos_group_id, form_data):
 

	
 
        try:
 
            repos_group = RepoGroup.get(repos_group_id)
 

	
 
            # update permissions
 
            for member, perm, member_type in form_data['perms_updates']:
 
                if member_type == 'user':
 
                    # this updates also current one if found
 
                    ReposGroupModel().grant_user_permission(
 
                        repos_group=repos_group, user=member, perm=perm
 
                    )
 
                else:
 
                    ReposGroupModel().grant_users_group_permission(
 
                        repos_group=repos_group, group_name=member, perm=perm
 
                    )
 
            # set new permissions
 
            for member, perm, member_type in form_data['perms_new']:
 
                if member_type == 'user':
 
                    ReposGroupModel().grant_user_permission(
 
                        repos_group=repos_group, user=member, perm=perm
 
                    )
 
                else:
 
                    ReposGroupModel().grant_users_group_permission(
 
                        repos_group=repos_group, group_name=member, perm=perm
 
                    )
 
            recursive = form_data['recursive']
 
            # iterate over all members(if in recursive mode) of this groups and
 
            # set the permissions !
 
            # this can be potentially heavy operation
 
            self._update_permissions(repos_group, form_data['perms_new'],
 
                                     form_data['perms_updates'], recursive)
 

	
 
            old_path = repos_group.full_path
 

	
 
            # change properties
 
            repos_group.group_description = form_data['group_description']
 
            repos_group.parent_group = RepoGroup.get(form_data['group_parent_id'])
 
            repos_group.group_parent_id = form_data['group_parent_id']
 
            repos_group.enable_locking = form_data['enable_locking']
 
            repos_group.group_name = repos_group.get_new_name(form_data['group_name'])
 
            new_path = repos_group.full_path
 

	
 
            self.sa.add(repos_group)
 

	
 
            # iterate over all members of this groups and set the locking !
 
            # this can be potentially heavy operation
 

	
 
            for obj in repos_group.recursive_groups_and_repos():
 
                #set the value from it's parent
 
                obj.enable_locking = repos_group.enable_locking
 
                self.sa.add(obj)
 

	
 
            # we need to get all repositories from this new group and
 
            # rename them accordingly to new group path
 
            for r in repos_group.repositories:
 
                r.repo_name = r.get_new_name(r.just_name)
 
                self.sa.add(r)
 

	
 
            self.__rename_group(old_path, new_path)
 

	
 
            return repos_group
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def delete(self, repos_group):
 
    def delete(self, repos_group, force_delete=False):
 
        repos_group = self._get_repos_group(repos_group)
 
        try:
 
            self.sa.delete(repos_group)
 
            self.__delete_group(repos_group)
 
            self.__delete_group(repos_group, force_delete)
 
        except:
 
            log.exception('Error removing repos_group %s' % repos_group)
 
            raise
 

	
 
    def delete_permission(self, repos_group, obj, obj_type, recursive):
 
        """
 
        Revokes permission for repos_group for given obj(user or users_group),
 
        obj_type can be user or users group
 

	
 
        :param repos_group:
 
        :param obj: user or users group id
 
        :param obj_type: user or users group type
 
        :param recursive: recurse to all children of group
 
        """
 
        from rhodecode.model.repo import RepoModel
 
        repos_group = self._get_repos_group(repos_group)
 

	
 
        for el in repos_group.recursive_groups_and_repos():
 
            if not recursive:
 
                # if we don't recurse set the permission on only the top level
 
                # object
 
                el = repos_group
 

	
 
            if isinstance(el, RepoGroup):
 
                if obj_type == 'user':
 
                    ReposGroupModel().revoke_user_permission(el, user=obj)
 
                elif obj_type == 'users_group':
 
                    ReposGroupModel().revoke_users_group_permission(el, group_name=obj)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 
            elif isinstance(el, Repository):
 
                if obj_type == 'user':
 
                    RepoModel().revoke_user_permission(el, user=obj)
 
                elif obj_type == 'users_group':
 
                    RepoModel().revoke_users_group_permission(el, group_name=obj)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 

	
 
            #if it's not recursive call
 
            # break the loop and don't proceed with other changes
 
            if not recursive:
 
                break
 

	
 
    def grant_user_permission(self, repos_group, user, perm):
 
        """
 
        Grant permission for user on given repositories group, or update
 
        existing one if found
 

	
 
        :param repos_group: Instance of ReposGroup, repositories_group_id,
 
            or repositories_group name
 
        :param user: Instance of User, user_id or username
 
        :param perm: Instance of Permission, or permission_name
 
        """
 

	
 
        repos_group = self._get_repos_group(repos_group)
 
        user = self._get_user(user)
 
        permission = self._get_perm(perm)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UserRepoGroupToPerm)\
 
            .filter(UserRepoGroupToPerm.user == user)\
 
            .filter(UserRepoGroupToPerm.group == repos_group)\
 
            .scalar()
 
        if obj is None:
 
            # create new !
 
            obj = UserRepoGroupToPerm()
 
        obj.group = repos_group
 
        obj.user = user
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s' % (perm, user, repos_group))
 

	
 
    def revoke_user_permission(self, repos_group, user):
 
        """
 
        Revoke permission for user on given repositories group
 

	
 
        :param repos_group: Instance of ReposGroup, repositories_group_id,
 
            or repositories_group name
 
        :param user: Instance of User, user_id or username
 
        """
 

	
 
        repos_group = self._get_repos_group(repos_group)
 
        user = self._get_user(user)
 

	
 
        obj = self.sa.query(UserRepoGroupToPerm)\
 
            .filter(UserRepoGroupToPerm.user == user)\
 
            .filter(UserRepoGroupToPerm.group == repos_group)\
 
            .one()
 
            .scalar()
 
        if obj:
 
        self.sa.delete(obj)
 
            log.debug('Revoked perm on %s on %s' % (repos_group, user))
 

	
 
    def grant_users_group_permission(self, repos_group, group_name, perm):
 
        """
 
        Grant permission for users group on given repositories group, or update
 
        existing one if found
 

	
 
        :param repos_group: Instance of ReposGroup, repositories_group_id,
 
            or repositories_group name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or users group name
 
        :param perm: Instance of Permission, or permission_name
 
        """
 
        repos_group = self._get_repos_group(repos_group)
 
        group_name = self.__get_users_group(group_name)
 
        permission = self._get_perm(perm)
 

	
 
        # check if we have that permission already
 
        obj = self.sa.query(UsersGroupRepoGroupToPerm)\
 
            .filter(UsersGroupRepoGroupToPerm.group == repos_group)\
 
            .filter(UsersGroupRepoGroupToPerm.users_group == group_name)\
 
            .scalar()
 

	
 
        if obj is None:
 
            # create new
 
            obj = UsersGroupRepoGroupToPerm()
 

	
 
        obj.group = repos_group
 
        obj.users_group = group_name
 
        obj.permission = permission
 
        self.sa.add(obj)
 
        log.debug('Granted perm %s to %s on %s' % (perm, group_name, repos_group))
 

	
 
    def revoke_users_group_permission(self, repos_group, group_name):
 
        """
 
        Revoke permission for users group on given repositories group
 

	
 
        :param repos_group: Instance of ReposGroup, repositories_group_id,
 
            or repositories_group name
 
        :param group_name: Instance of UserGroup, users_group_id,
 
            or users group name
 
        """
 
        repos_group = self._get_repos_group(repos_group)
 
        group_name = self.__get_users_group(group_name)
 

	
 
        obj = self.sa.query(UsersGroupRepoGroupToPerm)\
 
            .filter(UsersGroupRepoGroupToPerm.group == repos_group)\
 
            .filter(UsersGroupRepoGroupToPerm.users_group == group_name)\
 
            .one()
 
            .scalar()
 
        if obj:
 
        self.sa.delete(obj)
 
            log.debug('Revoked perm to %s on %s' % (repos_group, group_name))
rhodecode/model/user.py
Show inline comments
 
@@ -543,73 +543,73 @@ class UserModel(BaseModel):
 
            r_k = perm.UserRepoToPerm.repository.repo_name
 
            if perm.Repository.user_id == uid:
 
                p = 'repository.admin'
 
            else:
 
                p = perm.Permission.permission_name
 
            user.permissions[RK][r_k] = p
 

	
 
        # REPO GROUP
 
        #==================================================================
 
        # get access for this user for repos group and override defaults
 
        #==================================================================
 

	
 
        # user explicit permissions for repository
 
        user_repo_groups_perms = \
 
         self.sa.query(UserRepoGroupToPerm, Permission, RepoGroup)\
 
         .join((RepoGroup, UserRepoGroupToPerm.group_id == RepoGroup.group_id))\
 
         .join((Permission, UserRepoGroupToPerm.permission_id == Permission.permission_id))\
 
         .filter(UserRepoGroupToPerm.user_id == uid)\
 
         .all()
 

	
 
        for perm in user_repo_groups_perms:
 
            rg_k = perm.UserRepoGroupToPerm.group.group_name
 
            p = perm.Permission.permission_name
 
            cur_perm = user.permissions[GK][rg_k]
 
            if PERM_WEIGHTS[p] > PERM_WEIGHTS[cur_perm]:
 
            if PERM_WEIGHTS[p] > PERM_WEIGHTS[cur_perm] or 1:  # disable check
 
                user.permissions[GK][rg_k] = p
 

	
 
        # REPO GROUP + USER GROUP
 
        #==================================================================
 
        # check if user is part of user groups for this repo group and
 
        # fill in (or replace with higher) permissions
 
        #==================================================================
 

	
 
        # users group for repositories permissions
 
        user_repo_group_perms_from_users_groups = \
 
         self.sa.query(UsersGroupRepoGroupToPerm, Permission, RepoGroup)\
 
         .join((RepoGroup, UsersGroupRepoGroupToPerm.group_id == RepoGroup.group_id))\
 
         .join((Permission, UsersGroupRepoGroupToPerm.permission_id == Permission.permission_id))\
 
         .join((UsersGroupMember, UsersGroupRepoGroupToPerm.users_group_id == UsersGroupMember.users_group_id))\
 
         .filter(UsersGroupMember.user_id == uid)\
 
         .all()
 

	
 
        for perm in user_repo_group_perms_from_users_groups:
 
            g_k = perm.UsersGroupRepoGroupToPerm.group.group_name
 
            p = perm.Permission.permission_name
 
            cur_perm = user.permissions[GK][g_k]
 
            # overwrite permission only if it's greater than permission
 
            # given from other sources
 
            if PERM_WEIGHTS[p] > PERM_WEIGHTS[cur_perm]:
 
            if PERM_WEIGHTS[p] > PERM_WEIGHTS[cur_perm] or 1:  # disable check
 
                user.permissions[GK][g_k] = p
 

	
 
        return user
 

	
 
    def has_perm(self, user, perm):
 
        perm = self._get_perm(perm)
 
        user = self._get_user(user)
 

	
 
        return UserToPerm.query().filter(UserToPerm.user == user)\
 
            .filter(UserToPerm.permission == perm).scalar() is not None
 

	
 
    def grant_perm(self, user, perm):
 
        """
 
        Grant user global permissions
 

	
 
        :param user:
 
        :param perm:
 
        """
 
        user = self._get_user(user)
 
        perm = self._get_perm(perm)
 
        # if this permission is already granted skip it
 
        _perm = UserToPerm.query()\
 
            .filter(UserToPerm.user == user)\
 
            .filter(UserToPerm.permission == perm)\
rhodecode/model/validators.py
Show inline comments
 
@@ -478,51 +478,51 @@ def ValidPerms(type_='repo'):
 
                _(u'This username or users group name is not valid')
 
        }
 

	
 
        def to_python(self, value, state):
 
            perms_update = OrderedSet()
 
            perms_new = OrderedSet()
 
            # build a list of permission to update and new permission to create
 

	
 
            #CLEAN OUT ORG VALUE FROM NEW MEMBERS, and group them using
 
            new_perms_group = defaultdict(dict)
 
            for k, v in value.copy().iteritems():
 
                if k.startswith('perm_new_member'):
 
                    del value[k]
 
                    _type, part = k.split('perm_new_member_')
 
                    args = part.split('_')
 
                    if len(args) == 1:
 
                        new_perms_group[args[0]]['perm'] = v
 
                    elif len(args) == 2:
 
                        _key, pos = args
 
                        new_perms_group[pos][_key] = v
 

	
 
            # fill new permissions in order of how they were added
 
            for k in sorted(map(int, new_perms_group.keys())):
 
                perm_dict = new_perms_group[str(k)]
 
                new_member = perm_dict['name']
 
                new_perm = perm_dict['perm']
 
                new_type = perm_dict['type']
 
                new_member = perm_dict.get('name')
 
                new_perm = perm_dict.get('perm')
 
                new_type = perm_dict.get('type')
 
                if new_member and new_perm and new_type:
 
                    perms_new.add((new_member, new_perm, new_type))
 

	
 
            for k, v in value.iteritems():
 
                if k.startswith('u_perm_') or k.startswith('g_perm_'):
 
                    member = k[7:]
 
                    t = {'u': 'user',
 
                         'g': 'users_group'
 
                    }[k[0]]
 
                    if member == 'default':
 
                        if value.get('private'):
 
                            # set none for default when updating to
 
                            # private repo
 
                            v = EMPTY_PERM
 
                    perms_update.add((member, v, t))
 

	
 
            value['perms_updates'] = list(perms_update)
 
            value['perms_new'] = list(perms_new)
 

	
 
            # update permissions
 
            for k, v, t in perms_new:
 
                try:
 
                    if t is 'user':
 
                        self.user_db = User.query()\
rhodecode/templates/admin/repos_groups/repos_group_edit_perms.html
Show inline comments
 
@@ -47,66 +47,74 @@
 
<%
 
    _tmpl = h.literal("""' \
 
        <td><input type="radio" value="group.none" name="perm_new_member_{0}" id="perm_new_member_{0}"></td> \
 
        <td><input type="radio" value="group.read" name="perm_new_member_{0}" id="perm_new_member_{0}"></td> \
 
        <td><input type="radio" value="group.write" name="perm_new_member_{0}" id="perm_new_member_{0}"></td> \
 
        <td><input type="radio" value="group.admin" name="perm_new_member_{0}" id="perm_new_member_{0}"></td> \
 
        <td class="ac"> \
 
            <div class="perm_ac" id="perm_ac_{0}"> \
 
                <input class="yui-ac-input" id="perm_new_member_name_{0}" name="perm_new_member_name_{0}" value="" type="text"> \
 
                <input id="perm_new_member_type_{0}" name="perm_new_member_type_{0}" value="" type="hidden">  \
 
                <div id="perm_container_{0}"></div> \
 
            </div> \
 
        </td> \
 
        <td></td>'""")
 
    %>
 
    ## ADD HERE DYNAMICALLY NEW INPUTS FROM THE '_tmpl'
 
    <tr class="new_members last_new_member" id="add_perm_input"></tr>
 
    <tr>
 
        <td colspan="6">
 
            <span id="add_perm" class="add_icon" style="cursor: pointer;">
 
            ${_('Add another member')}
 
            </span>
 
        </td>
 
    </tr>
 
    <tr>
 
        <td colspan="6">
 
           ${h.checkbox('recursive',value="True", label=_('apply to parents'))}
 
           <span class="help-block">${_('Set or revoke permission to all children of that group, including repositories and other groups')}</span>
 
        </td>
 
    </tr>
 
</table>
 
<script type="text/javascript">
 
function ajaxActionUser(user_id, field_id) {
 
    var sUrl = "${h.url('delete_repos_group_user_perm',group_name=c.repos_group.group_name)}";
 
    var callback = {
 
        success: function (o) {
 
            var tr = YUD.get(String(field_id));
 
            tr.parentNode.removeChild(tr);
 
        },
 
        failure: function (o) {
 
            alert("${_('Failed to remove user')}");
 
        },
 
    };
 
    var postData = '_method=delete&user_id=' + user_id;
 
    var recursive = YUD.get('recursive').checked;
 
    var postData = '_method=delete&recursive={0}&user_id={1}'.format(recursive,user_id);
 
    var request = YAHOO.util.Connect.asyncRequest('POST', sUrl, callback, postData);
 
};
 

	
 
function ajaxActionUsersGroup(users_group_id,field_id){
 
    var sUrl = "${h.url('delete_repos_group_users_group_perm',group_name=c.repos_group.group_name)}";
 
    var callback = {
 
        success:function(o){
 
            var tr = YUD.get(String(field_id));
 
            tr.parentNode.removeChild(tr);
 
        },
 
        failure:function(o){
 
            alert("${_('Failed to remove users group')}");
 
        },
 
    };
 
    var postData = '_method=delete&users_group_id='+users_group_id;
 
    var recursive = YUD.get('recursive').checked;
 
    var postData = '_method=delete&recursive={0}&users_group_id={1}'.format(recursive,users_group_id);
 
    var request = YAHOO.util.Connect.asyncRequest('POST', sUrl, callback, postData);
 
};
 

	
 
YUE.onDOMReady(function () {
 
    if (!YUD.hasClass('perm_new_member_name', 'error')) {
 
        YUD.setStyle('add_perm_input', 'display', 'none');
 
    }
 
    YAHOO.util.Event.addListener('add_perm', 'click', function () {
 
    	addPermAction(${_tmpl}, ${c.users_array|n}, ${c.users_groups_array|n});
 
    });
 
});
 

	
 
</script>
rhodecode/tests/functional/test_compare.py
Show inline comments
 
@@ -270,25 +270,24 @@ class TestCompareController(TestControll
 
                author=TEST_USER_ADMIN_LOGIN,
 
                message='commit2',
 
                content='line1',
 
                f_path='file2'
 
            )
 
            #compare !
 
            rev1 = 'default'
 
            rev2 = 'default'
 
            response = self.app.get(url(controller='compare', action='index',
 
                                        repo_name=r2_name,
 
                                        org_ref_type="branch",
 
                                        org_ref=rev1,
 
                                        other_ref_type="branch",
 
                                        other_ref=rev2,
 
                                        repo=r1_name
 
                                        ))
 

	
 
            response.mustcontain('%s@%s -> %s@%s' % (r2_name, rev1, r1_name, rev2))
 
            response.mustcontain("""file1-line1-from-fork""")
 
            response.mustcontain("""file2-line1-from-fork""")
 
            response.mustcontain("""file3-line1-from-fork""")
 
        finally:
 
            RepoModel().delete(r2_id)
 
            RepoModel().delete(r1_id)
 

	
rhodecode/tests/models/common.py
Show inline comments
 
new file 100644
 
import os
 
import unittest
 
import functools
 
from rhodecode.tests import *
 

	
 

	
 
from rhodecode.model.repos_group import ReposGroupModel
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.db import RepoGroup, Repository, User
 
from rhodecode.model.user import UserModel
 

	
 
from rhodecode.lib.auth import AuthUser
 
from rhodecode.model.meta import Session
 

	
 

	
 
def _make_group(path, desc='desc', parent_id=None,
 
                 skip_if_exists=False):
 

	
 
    gr = RepoGroup.get_by_group_name(path)
 
    if gr and skip_if_exists:
 
        return gr
 
    if isinstance(parent_id, RepoGroup):
 
        parent_id = parent_id.group_id
 
    gr = ReposGroupModel().create(path, desc, parent_id)
 
    return gr
 

	
 

	
 
def _make_repo(name, repos_group=None, repo_type='hg'):
 
    return RepoModel().create_repo(name, repo_type, 'desc',
 
                                   TEST_USER_ADMIN_LOGIN,
 
                                   repos_group=repos_group)
 

	
 

	
 
def _destroy_project_tree(test_u1_id):
 
    Session.remove()
 
    repos_group = RepoGroup.get_by_group_name(group_name='g0')
 
    for el in reversed(repos_group.recursive_groups_and_repos()):
 
        if isinstance(el, Repository):
 
            RepoModel().delete(el)
 
        elif isinstance(el, RepoGroup):
 
            ReposGroupModel().delete(el, force_delete=True)
 

	
 
    u = User.get(test_u1_id)
 
    Session().delete(u)
 
    Session().commit()
 

	
 

	
 
def _create_project_tree():
 
    """
 
    Creates a tree of groups and repositories to test permissions
 

	
 
    structure
 
     [g0] - group `g0` with 3 subgroups
 
     |
 
     |__[g0_1] group g0_1 with 2 groups 0 repos
 
     |  |
 
     |  |__[g0_1_1] group g0_1_1 with 1 group 2 repos
 
     |  |   |__<g0/g0_1/g0_1_1/g0_1_1_r1>
 
     |  |   |__<g0/g0_1/g0_1_1/g0_1_1_r2>
 
     |  |__<g0/g0_1/g0_1_r1>
 
     |
 
     |__[g0_2] 2 repos
 
     |  |
 
     |  |__<g0/g0_2/g0_2_r1>
 
     |  |__<g0/g0_2/g0_2_r2>
 
     |
 
     |__[g0_3] 1 repo
 
        |
 
        |_<g0/g0_3/g0_3_r1>
 

	
 
    """
 
    test_u1 = UserModel().create_or_update(
 
        username=u'test_u1', password=u'qweqwe',
 
        email=u'test_u1@rhodecode.org', firstname=u'test_u1', lastname=u'test_u1'
 
    )
 
    g0 = _make_group('g0')
 
    g0_1 = _make_group('g0_1', parent_id=g0)
 
    g0_1_1 = _make_group('g0_1_1', parent_id=g0_1)
 
    g0_1_1_r1 = _make_repo('g0/g0_1/g0_1_1/g0_1_1_r1', repos_group=g0_1_1)
 
    g0_1_1_r2 = _make_repo('g0/g0_1/g0_1_1/g0_1_1_r2', repos_group=g0_1_1)
 
    g0_1_r1 = _make_repo('g0/g0_1/g0_1_r1', repos_group=g0_1)
 
    g0_2 = _make_group('g0_2', parent_id=g0)
 
    g0_2_r1 = _make_repo('g0/g0_2/g0_2_r1', repos_group=g0_2)
 
    g0_2_r2 = _make_repo('g0/g0_2/g0_2_r2', repos_group=g0_2)
 
    g0_3 = _make_group('g0_3', parent_id=g0)
 
    g0_3_r1 = _make_repo('g0/g0_3/g0_3_r1', repos_group=g0_3)
 
    return test_u1
 

	
 

	
 
def expected_count(group_name, objects=False):
 
    repos_group = RepoGroup.get_by_group_name(group_name=group_name)
 
    objs = repos_group.recursive_groups_and_repos()
 
    if objects:
 
        return objs
 
    return len(objs)
 

	
 

	
 
def _check_expected_count(items, repo_items, expected):
 
    should_be = len(items + repo_items)
 
    there_are = len(expected)
 
    assert  should_be == there_are, ('%s != %s' % ((items + repo_items), expected))
 

	
 

	
 
def check_tree_perms(obj_name, repo_perm, prefix, expected_perm):
 
    assert repo_perm == expected_perm, ('obj:`%s` got perm:`%s` should:`%s`'
 
                                    % (obj_name, repo_perm, expected_perm))
 

	
 

	
 
def _get_perms(filter_='', recursive=True, key=None, test_u1_id=None):
 
    test_u1 = AuthUser(user_id=test_u1_id)
 
    for k, v in test_u1.permissions[key].items():
 
        if recursive and k.startswith(filter_):
 
            yield k, v
 
        elif not recursive:
 
            if k == filter_:
 
                yield k, v
rhodecode/tests/models/test_permissions.py
Show inline comments
 
import os
 
import unittest
 
from rhodecode.tests import *
 

	
 
from rhodecode.tests.models.common import _make_group
 
from rhodecode.model.repos_group import ReposGroupModel
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.db import RepoGroup, User, UsersGroupRepoGroupToPerm
 
from rhodecode.model.user import UserModel
 

	
 
from rhodecode.model.meta import Session
 
from rhodecode.model.users_group import UsersGroupModel
 
from rhodecode.lib.auth import AuthUser
 

	
 

	
 
def _make_group(path, desc='desc', parent_id=None,
 
                 skip_if_exists=False):
 

	
 
    gr = RepoGroup.get_by_group_name(path)
 
    if gr and skip_if_exists:
 
        return gr
 

	
 
    gr = ReposGroupModel().create(path, desc, parent_id)
 
    return gr
 

	
 

	
 
class TestPermissions(unittest.TestCase):
 
    def __init__(self, methodName='runTest'):
 
        super(TestPermissions, self).__init__(methodName=methodName)
 

	
 
    def setUp(self):
 
        self.u1 = UserModel().create_or_update(
 
            username=u'u1', password=u'qweqwe',
 
            email=u'u1@rhodecode.org', firstname=u'u1', lastname=u'u1'
 
        )
 
        self.u2 = UserModel().create_or_update(
 
            username=u'u2', password=u'qweqwe',
 
            email=u'u2@rhodecode.org', firstname=u'u2', lastname=u'u2'
 
        )
 
        self.u3 = UserModel().create_or_update(
 
            username=u'u3', password=u'qweqwe',
 
            email=u'u3@rhodecode.org', firstname=u'u3', lastname=u'u3'
 
        )
 
        self.anon = User.get_by_username('default')
 
        self.a1 = UserModel().create_or_update(
 
            username=u'a1', password=u'qweqwe',
 
            email=u'a1@rhodecode.org', firstname=u'a1', lastname=u'a1', admin=True
 
        )
 
        Session().commit()
rhodecode/tests/models/test_repos_groups.py
Show inline comments
 
import os
 
import unittest
 
from rhodecode.tests import *
 

	
 
from rhodecode.model.repos_group import ReposGroupModel
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.db import RepoGroup, User
 
from rhodecode.model.db import RepoGroup, User, Repository
 
from rhodecode.model.meta import Session
 
from sqlalchemy.exc import IntegrityError
 

	
 

	
 
def _make_group(path, desc='desc', parent_id=None,
 
                 skip_if_exists=False):
 

	
 
    gr = RepoGroup.get_by_group_name(path)
 
    if gr and skip_if_exists:
 
        return gr
 

	
 
    if isinstance(parent_id, RepoGroup):
 
        parent_id = parent_id.group_id
 
    gr = ReposGroupModel().create(path, desc, parent_id)
 
    return gr
 

	
 

	
 
class TestReposGroups(unittest.TestCase):
 

	
 
    def setUp(self):
 
        self.g1 = _make_group('test1', skip_if_exists=True)
 
        Session().commit()
 
        self.g2 = _make_group('test2', skip_if_exists=True)
 
        Session().commit()
 
        self.g3 = _make_group('test3', skip_if_exists=True)
 
        Session().commit()
 

	
 
    def tearDown(self):
 
        print 'out'
 

	
 
    def __check_path(self, *path):
 
        """
 
        Checks the path for existance !
 
        """
 
        path = [TESTS_TMP_PATH] + list(path)
 
        path = os.path.join(*path)
 
        return os.path.isdir(path)
 

	
 
    def _check_folders(self):
 
        print os.listdir(TESTS_TMP_PATH)
 

	
 
    def __delete_group(self, id_):
 
        ReposGroupModel().delete(id_)
 

	
 
    def __update_group(self, id_, path, desc='desc', parent_id=None):
 
        form_data = dict(
 
            group_name=path,
 
            group_description=desc,
 
            group_parent_id=parent_id,
 
            perms_updates=[],
 
            perms_new=[],
 
            enable_locking=False
 
            enable_locking=False,
 
            recursive=False
 
        )
 
        gr = ReposGroupModel().update(id_, form_data)
 
        return gr
 

	
 
    def test_create_group(self):
 
        g = _make_group('newGroup')
 
        self.assertEqual(g.full_path, 'newGroup')
 

	
 
        self.assertTrue(self.__check_path('newGroup'))
 

	
 
    def test_create_same_name_group(self):
 
        self.assertRaises(IntegrityError, lambda: _make_group('newGroup'))
 
        Session().rollback()
 

	
 
    def test_same_subgroup(self):
 
        sg1 = _make_group('sub1', parent_id=self.g1.group_id)
 
        self.assertEqual(sg1.parent_group, self.g1)
 
        self.assertEqual(sg1.full_path, 'test1/sub1')
 
        self.assertTrue(self.__check_path('test1', 'sub1'))
 

	
 
        ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
 
        self.assertEqual(ssg1.parent_group, sg1)
 
        self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
 
        self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
 
@@ -111,49 +113,50 @@ class TestReposGroups(unittest.TestCase)
 
        new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
 
        self.assertTrue(self.__check_path('test3', 'after'))
 
        self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'hello')
 
        self.assertTrue(self.__check_path('hello'))
 

	
 
        self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
 

	
 
    def test_subgrouping_with_repo(self):
 

	
 
        g1 = _make_group('g1')
 
        g2 = _make_group('g2')
 

	
 
        # create new repo
 
        form_data = dict(repo_name='john',
 
                         repo_name_full='john',
 
                         fork_name=None,
 
                         description=None,
 
                         repo_group=None,
 
                         private=False,
 
                         repo_type='hg',
 
                         clone_uri=None,
 
                         landing_rev='tip',
 
                         enable_locking=False)
 
                         enable_locking=False,
 
                         recursive=False)
 
        cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        r = RepoModel().create(form_data, cur_user)
 

	
 
        self.assertEqual(r.repo_name, 'john')
 

	
 
        # put repo into group
 
        form_data = form_data
 
        form_data['repo_group'] = g1.group_id
 
        form_data['perms_new'] = []
 
        form_data['perms_updates'] = []
 
        RepoModel().update(r.repo_name, form_data)
 
        self.assertEqual(r.repo_name, 'g1/john')
 

	
 
        self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
 
        self.assertTrue(self.__check_path('g2', 'g1'))
 

	
 
        # test repo
 
        self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1',
 
                                                                r.just_name]))
 

	
 
    def test_move_to_root(self):
 
        g1 = _make_group('t11')
 
        Session().commit()
 
        g2 = _make_group('t22', parent_id=g1.group_id)
rhodecode/tests/models/test_user_permissions_on_groups.py
Show inline comments
 
new file 100644
 
import os
 
import unittest
 
import functools
 
from rhodecode.tests import *
 

	
 
from rhodecode.model.repos_group import ReposGroupModel
 
from rhodecode.model.db import RepoGroup, Repository, User
 

	
 
from rhodecode.model.meta import Session
 
from nose.tools import with_setup
 
from rhodecode.tests.models.common import _create_project_tree, check_tree_perms, \
 
    _get_perms, _check_expected_count, expected_count, _destroy_project_tree
 
from rhodecode.model.repo import RepoModel
 

	
 

	
 
test_u1_id = None
 
_get_repo_perms = None
 
_get_group_perms = None
 

	
 

	
 
def permissions_setup_func(group_name='g0', perm='group.read', recursive=True):
 
    """
 
    Resets all permissions to perm attribute
 
    """
 
    repos_group = RepoGroup.get_by_group_name(group_name=group_name)
 
    if not repos_group:
 
        raise Exception('Cannot get group %s' % group_name)
 
    perms_updates = [[test_u1_id, perm, 'user']]
 
    ReposGroupModel()._update_permissions(repos_group,
 
                                          perms_updates=perms_updates,
 
                                          recursive=recursive)
 
    Session().commit()
 

	
 

	
 
def setup_module():
 
    global test_u1_id, _get_repo_perms, _get_group_perms
 
    test_u1 = _create_project_tree()
 
    Session().commit()
 
    test_u1_id = test_u1.user_id
 
    _get_repo_perms = functools.partial(_get_perms, key='repositories',
 
                                        test_u1_id=test_u1_id)
 
    _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
 
                                         test_u1_id=test_u1_id)
 

	
 

	
 
def teardown_module():
 
    _destroy_project_tree(test_u1_id)
 

	
 

	
 
@with_setup(permissions_setup_func)
 
def test_user_permissions_on_group_without_recursive_mode():
 
    # set permission to g0 non-recursive mode
 
    recursive = False
 
    group = 'g0'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    items = [x for x in _get_repo_perms(group, recursive)]
 
    expected = 0
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'repository.read'
 

	
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    expected = 1
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'group.write'
 

	
 

	
 
@with_setup(permissions_setup_func)
 
def test_user_permissions_on_group_without_recursive_mode_subgroup():
 
    # set permission to g0 non-recursive mode
 
    recursive = False
 
    group = 'g0/g0_1'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    items = [x for x in _get_repo_perms(group, recursive)]
 
    expected = 0
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'repository.read'
 

	
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    expected = 1
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'group.write'
 

	
 

	
 
@with_setup(permissions_setup_func)
 
def test_user_permissions_on_group_with_recursive_mode():
 

	
 
    # set permission to g0 recursive mode, all children including
 
    # other repos and groups should have this permission now set !
 
    recursive = True
 
    group = 'g0'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        yield check_tree_perms, name, perm, group, 'repository.write'
 

	
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'group.write'
 

	
 

	
 
@with_setup(permissions_setup_func)
 
def test_user_permissions_on_group_with_recursive_mode_inner_group():
 
    ## set permission to g0_3 group to none
 
    recursive = True
 
    group = 'g0/g0_3'
 
    permissions_setup_func(group, 'group.none', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        yield check_tree_perms, name, perm, group, 'repository.none'
 

	
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'group.none'
 

	
 

	
 
@with_setup(permissions_setup_func)
 
def test_user_permissions_on_group_with_recursive_mode_deepest():
 
    ## set permission to g0_3 group to none
 
    recursive = True
 
    group = 'g0/g0_1/g0_1_1'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        yield check_tree_perms, name, perm, group, 'repository.write'
 

	
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'group.write'
 

	
 

	
 
@with_setup(permissions_setup_func)
 
def test_user_permissions_on_group_with_recursive_mode_only_with_repos():
 
    ## set permission to g0_3 group to none
 
    recursive = True
 
    group = 'g0/g0_2'
 
    permissions_setup_func(group, 'group.admin', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        yield check_tree_perms, name, perm, group, 'repository.admin'
 

	
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'group.admin'
rhodecode/tests/models/test_users_group_permissions_on_groups.py
Show inline comments
 
new file 100644
 
import os
 
import unittest
 
import functools
 
from rhodecode.tests import *
 

	
 
from rhodecode.model.repos_group import ReposGroupModel
 
from rhodecode.model.db import RepoGroup, Repository, User
 

	
 
from rhodecode.model.meta import Session
 
from nose.tools import with_setup
 
from rhodecode.tests.models.common import _create_project_tree, check_tree_perms, \
 
    _get_perms, _check_expected_count, expected_count, _destroy_project_tree
 
from rhodecode.model.users_group import UsersGroupModel
 
from rhodecode.model.repo import RepoModel
 

	
 

	
 
test_u2_id = None
 
test_u2_gr_id = None
 
_get_repo_perms = None
 
_get_group_perms = None
 

	
 

	
 
def permissions_setup_func(group_name='g0', perm='group.read', recursive=True):
 
    """
 
    Resets all permissions to perm attribute
 
    """
 
    repos_group = RepoGroup.get_by_group_name(group_name=group_name)
 
    if not repos_group:
 
        raise Exception('Cannot get group %s' % group_name)
 
    perms_updates = [[test_u2_gr_id, perm, 'users_group']]
 
    ReposGroupModel()._update_permissions(repos_group,
 
                                          perms_updates=perms_updates,
 
                                          recursive=recursive)
 
    Session().commit()
 

	
 

	
 
def setup_module():
 
    global test_u2_id, test_u2_gr_id, _get_repo_perms, _get_group_perms
 
    test_u2 = _create_project_tree()
 
    Session().commit()
 
    test_u2_id = test_u2.user_id
 

	
 
    gr1 = UsersGroupModel().create(name='perms_group_1')
 
    Session().commit()
 
    test_u2_gr_id = gr1.users_group_id
 
    UsersGroupModel().add_user_to_group(gr1, user=test_u2_id)
 
    Session().commit()
 

	
 
    _get_repo_perms = functools.partial(_get_perms, key='repositories',
 
                                        test_u1_id=test_u2_id)
 
    _get_group_perms = functools.partial(_get_perms, key='repositories_groups',
 
                                         test_u1_id=test_u2_id)
 

	
 

	
 
def teardown_module():
 
    _destroy_project_tree(test_u2_id)
 

	
 

	
 
@with_setup(permissions_setup_func)
 
def test_user_permissions_on_group_without_recursive_mode():
 
    # set permission to g0 non-recursive mode
 
    recursive = False
 
    group = 'g0'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    items = [x for x in _get_repo_perms(group, recursive)]
 
    expected = 0
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'repository.read'
 

	
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    expected = 1
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'group.write'
 

	
 

	
 
@with_setup(permissions_setup_func)
 
def test_user_permissions_on_group_without_recursive_mode_subgroup():
 
    # set permission to g0 non-recursive mode
 
    recursive = False
 
    group = 'g0/g0_1'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    items = [x for x in _get_repo_perms(group, recursive)]
 
    expected = 0
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'repository.read'
 

	
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    expected = 1
 
    assert len(items) == expected, ' %s != %s' % (len(items), expected)
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'group.write'
 

	
 

	
 
@with_setup(permissions_setup_func)
 
def test_user_permissions_on_group_with_recursive_mode():
 

	
 
    # set permission to g0 recursive mode, all children including
 
    # other repos and groups should have this permission now set !
 
    recursive = True
 
    group = 'g0'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        yield check_tree_perms, name, perm, group, 'repository.write'
 

	
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'group.write'
 

	
 

	
 
@with_setup(permissions_setup_func)
 
def test_user_permissions_on_group_with_recursive_mode_inner_group():
 
    ## set permission to g0_3 group to none
 
    recursive = True
 
    group = 'g0/g0_3'
 
    permissions_setup_func(group, 'group.none', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        yield check_tree_perms, name, perm, group, 'repository.none'
 

	
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'group.none'
 

	
 

	
 
@with_setup(permissions_setup_func)
 
def test_user_permissions_on_group_with_recursive_mode_deepest():
 
    ## set permission to g0_3 group to none
 
    recursive = True
 
    group = 'g0/g0_1/g0_1_1'
 
    permissions_setup_func(group, 'group.write', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        yield check_tree_perms, name, perm, group, 'repository.write'
 

	
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'group.write'
 

	
 

	
 
@with_setup(permissions_setup_func)
 
def test_user_permissions_on_group_with_recursive_mode_only_with_repos():
 
    ## set permission to g0_3 group to none
 
    recursive = True
 
    group = 'g0/g0_2'
 
    permissions_setup_func(group, 'group.admin', recursive=recursive)
 

	
 
    repo_items = [x for x in _get_repo_perms(group, recursive)]
 
    items = [x for x in _get_group_perms(group, recursive)]
 
    _check_expected_count(items, repo_items, expected_count(group, True))
 

	
 
    for name, perm in repo_items:
 
        yield check_tree_perms, name, perm, group, 'repository.admin'
 

	
 
    for name, perm in items:
 
        yield check_tree_perms, name, perm, group, 'group.admin'
0 comments (0 inline, 0 general)