Changeset - 43481c3d70ca
[Not reviewed]
beta
0 4 0
Marcin Kuzminski - 14 years ago 2012-03-14 17:51:00
marcin@python-works.com
#399 added inheritance of permissions for users group on repos groups
3 files changed with 94 insertions and 11 deletions:
0 comments (0 inline, 0 general)
docs/changelog.rst
Show inline comments
 
.. _changelog:
 

	
 
=========
 
Changelog
 
=========
 

	
 

	
 
1.3.4 (**2012-XX-XX**)
 
----------------------
 

	
 
:status: in-progress
 
:branch: beta
 

	
 
news
 
++++
 

	
 
- Whoosh logging is now controlled by the .ini files logging setup
 
- added clone-url into edit form on /settings page
 
- added help text into repo add/edit forms
 
- created rcextensions module with additional mappings (ref #322) and
 
  post push/pull/create repo hooks callbacks
 
- implemented #377 Users view for his own permissions on account page
 
- #399 added inheritance of permissions for users group on repos groups
 

	
 
fixes
 
+++++
 

	
 
- fixed #390 cache invalidation problems on repos inside group
 
- fixed #385 clone by ID url was loosing proxy prefix in URL
 
- fixed some unicode problems with waitress
 
- fixed issue with escaping < and > in changeset commits
 
- fixed error occurring during recursive group creation in API 
 
  create_repo function
 
- fixed #393 py2.5 fixes for routes url generator
 
- fixed #397 Private repository groups shows up before login
 
- fixed #396 fixed problems with revoking users in nested groups
 
  
 
1.3.3 (**2012-03-02**)
 
----------------------
 

	
 
news
 
++++
 

	
 

	
 
fixes
 
+++++
 

	
 
- fixed some python2.5 compatibility issues 
 
- fixed issues with removed repos was accidentally added as groups, after
 
  full rescan of paths
 
- fixes #376 Cannot edit user (using container auth)
 
- fixes #378 Invalid image urls on changeset screen with proxy-prefix 
 
  configuration
 
- fixed initial sorting of repos inside repo group
 
- fixes issue when user tried to resubmit same permission into user/user_groups
 
- bumped beaker version that fixes #375 leap error bug
 
- fixed raw_changeset for git. It was generated with hg patch headers
 
- fixed vcs issue with last_changeset for filenodes
 
- fixed missing commit after hook delete
 
- fixed #372 issues with git operation detection that caused a security issue 
 
  for git repos
 

	
 
1.3.2 (**2012-02-28**)
 
----------------------
 

	
 
news
 
++++
 

	
 

	
 
fixes
 
+++++
 

	
 
- fixed git protocol issues with repos-groups
 
- fixed git remote repos validator that prevented from cloning remote git repos
 
- fixes #370 ending slashes fixes for repo and groups
 
- fixes #368 improved git-protocol detection to handle other clients
 
- fixes #366 When Setting Repository Group To Blank Repo Group Wont Be 
 
  Moved To Root
 
- fixes #371 fixed issues with beaker/sqlalchemy and non-ascii cache keys 
 
- fixed #373 missing cascade drop on user_group_to_perm table
 

	
 
1.3.1 (**2012-02-27**)
 
----------------------
 

	
 
news
 
++++
 

	
 

	
 
fixes
 
+++++
 

	
 
- redirection loop occurs when remember-me wasn't checked during login
 
- fixes issues with git blob history generation 
 
- don't fetch branch for git in file history dropdown. Causes unneeded slowness
 

	
 
1.3.0 (**2012-02-26**)
 
----------------------
 

	
 
news
 
++++
 

	
 
- code review, inspired by github code-comments 
 
- #215 rst and markdown README files support
 
- #252 Container-based and proxy pass-through authentication support
 
- #44 branch browser. Filtering of changelog by branches
 
- mercurial bookmarks support
 
- new hover top menu, optimized to add maximum size for important views
 
- configurable clone url template with possibility to specify  protocol like 
 
  ssh:// or http:// and also manually alter other parts of clone_url.
 
- enabled largefiles extension by default
 
- optimized summary file pages and saved a lot of unused space in them
 
- #239 option to manually mark repository as fork
 
- #320 mapping of commit authors to RhodeCode users
 
- #304 hashes are displayed using monospace font    
 
- diff configuration, toggle white lines and context lines
 
- #307 configurable diffs, whitespace toggle, increasing context lines
 
- sorting on branches, tags and bookmarks using YUI datatable
 
- improved file filter on files page
 
- implements #330 api method for listing nodes ar particular revision
rhodecode/model/user.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.model.user
 
    ~~~~~~~~~~~~~~~~~~~~
 

	
 
    users model for RhodeCode
 

	
 
    :created_on: Apr 9, 2010
 
    :author: marcink
 
    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 
import traceback
 

	
 
from pylons import url
 
from pylons.i18n.translation import _
 

	
 
from rhodecode.lib.utils2 import safe_unicode, generate_api_key
 
from rhodecode.lib.caching_query import FromCache
 

	
 
from rhodecode.model import BaseModel
 
from rhodecode.model.db import User, UserRepoToPerm, Repository, Permission, \
 
    UserToPerm, UsersGroupRepoToPerm, UsersGroupToPerm, UsersGroupMember, \
 
    Notification, RepoGroup, UserRepoGroupToPerm, UsersGroup
 
    Notification, RepoGroup, UserRepoGroupToPerm, UsersGroup,\
 
    UsersGroupRepoGroupToPerm
 
from rhodecode.lib.exceptions import DefaultUserException, \
 
    UserOwnsReposException
 

	
 
from sqlalchemy.exc import DatabaseError
 

	
 
from sqlalchemy.orm import joinedload
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
PERM_WEIGHTS = {
 
    'repository.none': 0,
 
    'repository.read': 1,
 
    'repository.write': 3,
 
    'repository.admin': 4,
 
    'group.none': 0,
 
    'group.read': 1,
 
    'group.write': 3,
 
    'group.admin': 4,
 
}
 

	
 

	
 
class UserModel(BaseModel):
 

	
 
    def __get_user(self, user):
 
        return self._get_instance(User, user, callback=User.get_by_username)
 

	
 
    def __get_perm(self, permission):
 
        return self._get_instance(Permission, permission,
 
                                  callback=Permission.get_by_key)
 

	
 
    def get(self, user_id, cache=False):
 
        user = self.sa.query(User)
 
        if cache:
 
            user = user.options(FromCache("sql_cache_short",
 
                                          "get_user_%s" % user_id))
 
        return user.get(user_id)
 

	
 
    def get_user(self, user):
 
        return self.__get_user(user)
 

	
 
    def get_by_username(self, username, cache=False, case_insensitive=False):
 

	
 
        if case_insensitive:
 
            user = self.sa.query(User).filter(User.username.ilike(username))
 
        else:
 
            user = self.sa.query(User)\
 
                .filter(User.username == username)
 
        if cache:
 
            user = user.options(FromCache("sql_cache_short",
 
                                          "get_user_%s" % username))
 
        return user.scalar()
 

	
 
    def get_by_api_key(self, api_key, cache=False):
 
        return User.get_by_api_key(api_key, cache)
 

	
 
    def create(self, form_data):
 
        try:
 
            new_user = User()
 
            for k, v in form_data.items():
 
                setattr(new_user, k, v)
 

	
 
            new_user.api_key = generate_api_key(form_data['username'])
 
            self.sa.add(new_user)
 
            return new_user
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def create_or_update(self, username, password, email, name, lastname,
 
                         active=True, admin=False, ldap_dn=None):
 
        """
 
        Creates a new instance if not found, or updates current one
 

	
 
        :param username:
 
        :param password:
 
        :param email:
 
        :param active:
 
        :param name:
 
        :param lastname:
 
        :param active:
 
        :param admin:
 
        :param ldap_dn:
 
        """
 

	
 
        from rhodecode.lib.auth import get_crypt_password
 

	
 
        log.debug('Checking for %s account in RhodeCode database' % username)
 
        user = User.get_by_username(username, case_insensitive=True)
 
        if user is None:
 
            log.debug('creating new user %s' % username)
 
            new_user = User()
 
        else:
 
            log.debug('updating user %s' % username)
 
            new_user = user
 

	
 
@@ -317,245 +318,271 @@ class UserModel(BaseModel):
 
        run_task(tasks.send_password_link, data['email'])
 

	
 
    def reset_password(self, data):
 
        from rhodecode.lib.celerylib import tasks, run_task
 
        run_task(tasks.reset_user_password, data['email'])
 

	
 
    def fill_data(self, auth_user, user_id=None, api_key=None):
 
        """
 
        Fetches auth_user by user_id,or api_key if present.
 
        Fills auth_user attributes with those taken from database.
 
        Additionally set's is_authenitated if lookup fails
 
        present in database
 

	
 
        :param auth_user: instance of user to set attributes
 
        :param user_id: user id to fetch by
 
        :param api_key: api key to fetch by
 
        """
 
        if user_id is None and api_key is None:
 
            raise Exception('You need to pass user_id or api_key')
 

	
 
        try:
 
            if api_key:
 
                dbuser = self.get_by_api_key(api_key)
 
            else:
 
                dbuser = self.get(user_id)
 

	
 
            if dbuser is not None and dbuser.active:
 
                log.debug('filling %s data' % dbuser)
 
                for k, v in dbuser.get_dict().items():
 
                    setattr(auth_user, k, v)
 
            else:
 
                return False
 

	
 
        except:
 
            log.error(traceback.format_exc())
 
            auth_user.is_authenticated = False
 
            return False
 

	
 
        return True
 

	
 
    def fill_perms(self, user):
 
        """
 
        Fills user permission attribute with permissions taken from database
 
        works for permissions given for repositories, and for permissions that
 
        are granted to groups
 

	
 
        :param user: user instance to fill his perms
 
        """
 
        RK = 'repositories'
 
        GK = 'repositories_groups'
 
        GLOBAL = 'global'
 
        user.permissions[RK] = {}
 
        user.permissions[GK] = {}
 
        user.permissions[GLOBAL] = set()
 

	
 
        #======================================================================
 
        # fetch default permissions
 
        #======================================================================
 
        default_user = User.get_by_username('default', cache=True)
 
        default_user_id = default_user.user_id
 

	
 
        default_repo_perms = Permission.get_default_perms(default_user_id)
 
        default_repo_groups_perms = Permission.get_default_group_perms(default_user_id)
 

	
 
        if user.is_admin:
 
            #==================================================================
 
            # admin user have all default rights for repositories
 
            # and groups set to admin
 
            #==================================================================
 
            user.permissions[GLOBAL].add('hg.admin')
 

	
 
            # repositories
 
            for perm in default_repo_perms:
 
                r_k = perm.UserRepoToPerm.repository.repo_name
 
                p = 'repository.admin'
 
                user.permissions[RK][r_k] = p
 

	
 
            # repositories groups
 
            for perm in default_repo_groups_perms:
 
                rg_k = perm.UserRepoGroupToPerm.group.group_name
 
                p = 'group.admin'
 
                user.permissions[GK][rg_k] = p
 

	
 
        else:
 
            #==================================================================
 
            # set default permissions first for repositories and groups
 
            #==================================================================
 
            uid = user.user_id
 

	
 
            # default global permissions
 
            default_global_perms = self.sa.query(UserToPerm)\
 
                .filter(UserToPerm.user_id == default_user_id)
 

	
 
            for perm in default_global_perms:
 
                user.permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
            # default for repositories
 
            # defaults for repositories, taken from default user
 
            for perm in default_repo_perms:
 
                r_k = perm.UserRepoToPerm.repository.repo_name
 
                if perm.Repository.private and not (perm.Repository.user_id == uid):
 
                    # disable defaults for private repos,
 
                    p = 'repository.none'
 
                elif perm.Repository.user_id == uid:
 
                    # set admin if owner
 
                    p = 'repository.admin'
 
                else:
 
                    p = perm.Permission.permission_name
 

	
 
                user.permissions[RK][r_k] = p
 

	
 
            # default for repositories groups
 
            # defaults for repositories groups taken from default user permission
 
            # on given group
 
            for perm in default_repo_groups_perms:
 
                rg_k = perm.UserRepoGroupToPerm.group.group_name
 
                p = perm.Permission.permission_name
 
                user.permissions[GK][rg_k] = p
 

	
 
            #==================================================================
 
            # overwrite default with user permissions if any
 
            # overwrite defaults with user permissions if any found
 
            #==================================================================
 

	
 
            # user global
 
            # user global permissions
 
            user_perms = self.sa.query(UserToPerm)\
 
                    .options(joinedload(UserToPerm.permission))\
 
                    .filter(UserToPerm.user_id == uid).all()
 

	
 
            for perm in user_perms:
 
                user.permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
            # user repositories
 
            # user explicit permissions for repositories
 
            user_repo_perms = \
 
             self.sa.query(UserRepoToPerm, Permission, Repository)\
 
             .join((Repository, UserRepoToPerm.repository_id == Repository.repo_id))\
 
             .join((Permission, UserRepoToPerm.permission_id == Permission.permission_id))\
 
             .filter(UserRepoToPerm.user_id == uid)\
 
             .all()
 

	
 
            for perm in user_repo_perms:
 
                # set admin if owner
 
                r_k = perm.UserRepoToPerm.repository.repo_name
 
                if perm.Repository.user_id == uid:
 
                    p = 'repository.admin'
 
                else:
 
                    p = perm.Permission.permission_name
 
                user.permissions[RK][r_k] = p
 

	
 
            #==================================================================
 
            # check if user is part of groups for this repository and fill in
 
            # (or replace with higher) permissions
 
            # check if user is part of user groups for this repository and
 
            # fill in (or replace with higher) permissions
 
            #==================================================================
 

	
 
            # users group global
 
            user_perms_from_users_groups = self.sa.query(UsersGroupToPerm)\
 
                .options(joinedload(UsersGroupToPerm.permission))\
 
                .join((UsersGroupMember, UsersGroupToPerm.users_group_id ==
 
                       UsersGroupMember.users_group_id))\
 
                .filter(UsersGroupMember.user_id == uid).all()
 

	
 
            for perm in user_perms_from_users_groups:
 
                user.permissions[GLOBAL].add(perm.permission.permission_name)
 

	
 
            # users group repositories
 
            # users group for repositories permissions
 
            user_repo_perms_from_users_groups = \
 
             self.sa.query(UsersGroupRepoToPerm, Permission, Repository,)\
 
             .join((Repository, UsersGroupRepoToPerm.repository_id == Repository.repo_id))\
 
             .join((Permission, UsersGroupRepoToPerm.permission_id == Permission.permission_id))\
 
             .join((UsersGroupMember, UsersGroupRepoToPerm.users_group_id == UsersGroupMember.users_group_id))\
 
             .filter(UsersGroupMember.user_id == uid)\
 
             .all()
 

	
 
            for perm in user_repo_perms_from_users_groups:
 
                r_k = perm.UsersGroupRepoToPerm.repository.repo_name
 
                p = perm.Permission.permission_name
 
                cur_perm = user.permissions[RK][r_k]
 
                # overwrite permission only if it's greater than permission
 
                # given from other sources
 
                if PERM_WEIGHTS[p] > PERM_WEIGHTS[cur_perm]:
 
                    user.permissions[RK][r_k] = p
 

	
 
            #==================================================================
 
            # get access for this user for repos group and override defaults
 
            #==================================================================
 

	
 
            # user repositories groups
 
            # user explicit permissions for repository
 
            user_repo_groups_perms = \
 
             self.sa.query(UserRepoGroupToPerm, Permission, RepoGroup)\
 
             .join((RepoGroup, UserRepoGroupToPerm.group_id == RepoGroup.group_id))\
 
             .join((Permission, UserRepoGroupToPerm.permission_id == Permission.permission_id))\
 
             .filter(UserRepoGroupToPerm.user_id == uid)\
 
             .all()
 

	
 
            for perm in user_repo_groups_perms:
 
                rg_k = perm.UserRepoGroupToPerm.group.group_name
 
                p = perm.Permission.permission_name
 
                cur_perm = user.permissions[GK][rg_k]
 
                if PERM_WEIGHTS[p] > PERM_WEIGHTS[cur_perm]:
 
                    user.permissions[GK][rg_k] = p
 

	
 
            #==================================================================
 
            # check if user is part of user groups for this repo group and
 
            # fill in (or replace with higher) permissions
 
            #==================================================================
 

	
 
            # users group for repositories permissions
 
            user_repo_group_perms_from_users_groups = \
 
             self.sa.query(UsersGroupRepoGroupToPerm, Permission, RepoGroup)\
 
             .join((RepoGroup, UsersGroupRepoGroupToPerm.group_id == RepoGroup.group_id))\
 
             .join((Permission, UsersGroupRepoGroupToPerm.permission_id == Permission.permission_id))\
 
             .join((UsersGroupMember, UsersGroupRepoGroupToPerm.users_group_id == UsersGroupMember.users_group_id))\
 
             .filter(UsersGroupMember.user_id == uid)\
 
             .all()
 
            
 
            for perm in user_repo_group_perms_from_users_groups:
 
                g_k = perm.UsersGroupRepoGroupToPerm.group.group_name
 
                print perm, g_k
 
                p = perm.Permission.permission_name
 
                cur_perm = user.permissions[GK][g_k]
 
                # overwrite permission only if it's greater than permission
 
                # given from other sources
 
                if PERM_WEIGHTS[p] > PERM_WEIGHTS[cur_perm]:
 
                    user.permissions[GK][g_k] = p
 

	
 
        return user
 

	
 
    def has_perm(self, user, perm):
 
        if not isinstance(perm, Permission):
 
            raise Exception('perm needs to be an instance of Permission class '
 
                            'got %s instead' % type(perm))
 

	
 
        user = self.__get_user(user)
 

	
 
        return UserToPerm.query().filter(UserToPerm.user == user)\
 
            .filter(UserToPerm.permission == perm).scalar() is not None
 

	
 
    def grant_perm(self, user, perm):
 
        """
 
        Grant user global permissions
 

	
 
        :param user:
 
        :param perm:
 
        """
 
        user = self.__get_user(user)
 
        perm = self.__get_perm(perm)
 
        # if this permission is already granted skip it
 
        _perm = UserToPerm.query()\
 
            .filter(UserToPerm.user == user)\
 
            .filter(UserToPerm.permission == perm)\
 
            .scalar()
 
        if _perm:
 
            return
 
        new = UserToPerm()
 
        new.user = user
 
        new.permission = perm
 
        self.sa.add(new)
 

	
 
    def revoke_perm(self, user, perm):
 
        """
 
        Revoke users global permissions
 

	
 
        :param user:
 
        :param perm:
 
        """
 
        user = self.__get_user(user)
 
        perm = self.__get_perm(perm)
 

	
 
        obj = UserToPerm.query()\
 
                .filter(UserToPerm.user == user)\
 
                .filter(UserToPerm.permission == perm)\
 
                .scalar()
 
        if obj:
 
            self.sa.delete(obj)
rhodecode/tests/test_models.py
Show inline comments
 
import os
 
import unittest
 
from rhodecode.tests import *
 

	
 
from rhodecode.model.repos_group import ReposGroupModel
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.db import RepoGroup, User, Notification, UserNotification, \
 
    UsersGroup, UsersGroupMember, Permission
 
    UsersGroup, UsersGroupMember, Permission, UsersGroupRepoGroupToPerm
 
from sqlalchemy.exc import IntegrityError
 
from rhodecode.model.user import UserModel
 

	
 
from rhodecode.model.meta import Session
 
from rhodecode.model.notification import NotificationModel
 
from rhodecode.model.users_group import UsersGroupModel
 
from rhodecode.lib.auth import AuthUser
 

	
 

	
 
def _make_group(path, desc='desc', parent_id=None,
 
                 skip_if_exists=False):
 

	
 
    gr = RepoGroup.get_by_group_name(path)
 
    if gr and skip_if_exists:
 
        return gr
 

	
 
    gr = ReposGroupModel().create(path, desc, parent_id)
 
    return gr
 

	
 

	
 
class TestReposGroups(unittest.TestCase):
 

	
 
    def setUp(self):
 
        self.g1 = _make_group('test1', skip_if_exists=True)
 
        Session.commit()
 
        self.g2 = _make_group('test2', skip_if_exists=True)
 
        Session.commit()
 
        self.g3 = _make_group('test3', skip_if_exists=True)
 
        Session.commit()
 

	
 
    def tearDown(self):
 
        print 'out'
 

	
 
    def __check_path(self, *path):
 
        """
 
        Checks the path for existance !
 
        """
 
        path = [TESTS_TMP_PATH] + list(path)
 
        path = os.path.join(*path)
 
        return os.path.isdir(path)
 

	
 
    def _check_folders(self):
 
        print os.listdir(TESTS_TMP_PATH)
 

	
 
    def __delete_group(self, id_):
 
        ReposGroupModel().delete(id_)
 

	
 
    def __update_group(self, id_, path, desc='desc', parent_id=None):
 
        form_data = dict(
 
            group_name=path,
 
            group_description=desc,
 
            group_parent_id=parent_id,
 
            perms_updates=[],
 
            perms_new=[]
 
        )
 
        gr = ReposGroupModel().update(id_, form_data)
 
        return gr
 

	
 
    def test_create_group(self):
 
        g = _make_group('newGroup')
 
        self.assertEqual(g.full_path, 'newGroup')
 

	
 
        self.assertTrue(self.__check_path('newGroup'))
 

	
 
    def test_create_same_name_group(self):
 
        self.assertRaises(IntegrityError, lambda:_make_group('newGroup'))
 
        Session.rollback()
 

	
 
    def test_same_subgroup(self):
 
        sg1 = _make_group('sub1', parent_id=self.g1.group_id)
 
        self.assertEqual(sg1.parent_group, self.g1)
 
        self.assertEqual(sg1.full_path, 'test1/sub1')
 
        self.assertTrue(self.__check_path('test1', 'sub1'))
 

	
 
        ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
 
        self.assertEqual(ssg1.parent_group, sg1)
 
        self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
 
        self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
 

	
 
    def test_remove_group(self):
 
        sg1 = _make_group('deleteme')
 
        self.__delete_group(sg1.group_id)
 

	
 
        self.assertEqual(RepoGroup.get(sg1.group_id), None)
 
        self.assertFalse(self.__check_path('deteteme'))
 

	
 
        sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
 
        self.__delete_group(sg1.group_id)
 

	
 
        self.assertEqual(RepoGroup.get(sg1.group_id), None)
 
        self.assertFalse(self.__check_path('test1', 'deteteme'))
 

	
 
    def test_rename_single_group(self):
 
        sg1 = _make_group('initial')
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'after')
 
@@ -515,146 +515,201 @@ class TestPermissions(unittest.TestCase)
 
            'global': set(['hg.admin']),
 
            'repositories': {u'vcs_test_hg': 'repository.admin'}
 
        }
 

	
 
        self.assertEqual(a1_auth.permissions['repositories'][HG_REPO],
 
                         perms['repositories'][HG_REPO])
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 

	
 
    def test_propagated_permission_from_users_group(self):
 
        # make group
 
        self.ug1 = UsersGroupModel().create('G1')
 
        # add user to group
 
        UsersGroupModel().add_user_to_group(self.ug1, self.u1)
 

	
 
        # set permission to lower
 
        new_perm = 'repository.none'
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1, perm=new_perm)
 
        Session.commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm)
 

	
 
        # grant perm for group this should override permission from user
 
        new_perm = 'repository.write'
 
        RepoModel().grant_users_group_permission(repo=HG_REPO,
 
                                                 group_name=self.ug1,
 
                                                 perm=new_perm)
 
        # check perms
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set([u'hg.create.repository', u'repository.read',
 
                           u'hg.register.manual_activate']),
 
            'repositories': {u'vcs_test_hg': u'repository.read'}
 
        }
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 

	
 
    def test_propagated_permission_from_users_group_lower_weight(self):
 
        # make group
 
        self.ug1 = UsersGroupModel().create('G1')
 
        # add user to group
 
        UsersGroupModel().add_user_to_group(self.ug1, self.u1)
 

	
 
        # set permission to lower
 
        new_perm_h = 'repository.write'
 
        RepoModel().grant_user_permission(repo=HG_REPO, user=self.u1,
 
                                          perm=new_perm_h)
 
        Session.commit()
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm_h)
 

	
 
        # grant perm for group this should NOT override permission from user
 
        # since it's lower than granted
 
        new_perm_l = 'repository.read'
 
        RepoModel().grant_users_group_permission(repo=HG_REPO,
 
                                                 group_name=self.ug1,
 
                                                 perm=new_perm_l)
 
        # check perms
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {},
 
            'global': set([u'hg.create.repository', u'repository.read',
 
                           u'hg.register.manual_activate']),
 
            'repositories': {u'vcs_test_hg': u'repository.write'}
 
        }
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         new_perm_h)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 

	
 
    def test_repo_in_group_permissions(self):
 
        self.g1 = _make_group('group1', skip_if_exists=True)
 
        self.g2 = _make_group('group2', skip_if_exists=True)
 
        Session.commit()
 
        # both perms should be read !
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.read', u'group2': u'group.read'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.read', u'group2': u'group.read'})
 

	
 
        #Change perms to none for both groups
 
        ReposGroupModel().grant_user_permission(repos_group=self.g1,
 
                                                user=self.anon,
 
                                                perm='group.none')
 
        ReposGroupModel().grant_user_permission(repos_group=self.g2,
 
                                                user=self.anon,
 
                                                perm='group.none')
 

	
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        # add repo to group
 
        form_data = {
 
            'repo_name':HG_REPO,
 
            'repo_name_full':os.path.join(self.g1.group_name,HG_REPO),
 
            'repo_type':'hg',
 
            'clone_uri':'',
 
            'repo_group':self.g1.group_id,
 
            'description':'desc',
 
            'private':False
 
        }
 
        self.test_repo = RepoModel().create(form_data, cur_user=self.u1)
 
        Session.commit()
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        #grant permission for u2 !
 
        ReposGroupModel().grant_user_permission(repos_group=self.g1,
 
                                                user=self.u2,
 
                                                perm='group.read')
 
        ReposGroupModel().grant_user_permission(repos_group=self.g2,
 
                                                user=self.u2,
 
                                                perm='group.read')
 
        Session.commit()
 
        self.assertNotEqual(self.u1, self.u2)
 
        #u1 and anon should have not change perms while u2 should !
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
        u2_auth = AuthUser(user_id=self.u2.user_id)
 
        self.assertEqual(u2_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.read', u'group2': u'group.read'})
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                 {u'group1': u'group.none', u'group2': u'group.none'})
 

	
 
    def test_repo_group_user_as_user_group_member(self):
 
        # create Group1
 
        self.g1 = _make_group('group1', skip_if_exists=True)
 
        Session.commit()
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 

	
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.read'})
 

	
 
        # set default permission to none
 
        ReposGroupModel().grant_user_permission(repos_group=self.g1,
 
                                                user=self.anon,
 
                                                perm='group.none')
 
        # make group
 
        self.ug1 = UsersGroupModel().create('G1')
 
        # add user to group
 
        UsersGroupModel().add_user_to_group(self.ug1, self.u1)
 
        Session.commit()
 

	
 
        # check if user is in the group
 
        membrs = [x.user_id for x in UsersGroupModel().get(self.ug1.users_group_id).members]
 
        self.assertEqual(membrs, [self.u1.user_id])
 
        # add some user to that group
 

	
 
        # check his permissions
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.none'})
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.none'})
 

	
 
        # grant ug1 read permissions for
 
        ReposGroupModel().grant_users_group_permission(repos_group=self.g1,
 
                                                       group_name=self.ug1,
 
                                                       perm='group.read')
 
        Session.commit()
 
        # check if the
 
        obj = Session.query(UsersGroupRepoGroupToPerm)\
 
            .filter(UsersGroupRepoGroupToPerm.group == self.g1)\
 
            .filter(UsersGroupRepoGroupToPerm.users_group == self.ug1)\
 
            .scalar()
 
        self.assertEqual(obj.permission.permission_name, 'group.read')
 

	
 
        a1_auth = AuthUser(user_id=self.anon.user_id)
 

	
 
        self.assertEqual(a1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.none'})
 

	
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         {u'group1': u'group.read'})
0 comments (0 inline, 0 general)