Changeset - af049a957506
[Not reviewed]
beta
0 4 0
Marcin Kuzminski - 13 years ago 2013-04-10 02:55:21
marcin@python-works.com
fixed default permissions population during upgrades
- it often happen that introducing new permission
caused default permission to reset it's state to installation
default.
new version makes sure that only missing permissions are
created while leaving old defaults
4 files changed with 146 insertions and 39 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/db_manage.py
Show inline comments
 
@@ -44,6 +44,7 @@ from rhodecode.model.repos_group import 
 
#from rhodecode.model import meta
 
from rhodecode.model.meta import Session, Base
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.permission import PermissionModel
 

	
 

	
 
log = logging.getLogger(__name__)
 
@@ -550,7 +551,7 @@ class DbManage(object):
 
        u2p = UserToPerm.query()\
 
            .filter(UserToPerm.user == default_user).all()
 
        fixed = False
 
        if len(u2p) != len(User.DEFAULT_PERMISSIONS):
 
        if len(u2p) != len(Permission.DEFAULT_USER_PERMISSIONS):
 
            for p in u2p:
 
                Session().delete(p)
 
            fixed = True
 
@@ -682,6 +683,9 @@ class DbManage(object):
 
                              firstname='Anonymous', lastname='User')
 

	
 
    def create_permissions(self):
 
        """
 
        Creates all permissions defined in the system
 
        """
 
        # module.(access|create|change|delete)_[name]
 
        # module.(none|read|write|admin)
 

	
 
@@ -693,27 +697,12 @@ class DbManage(object):
 
                self.sa.add(new_perm)
 

	
 
    def populate_default_permissions(self):
 
        """
 
        Populate default permissions. It will create only the default
 
        permissions that are missing, and not alter already defined ones
 
        """
 
        log.info('creating default user permissions')
 

	
 
        default_user = User.get_by_username('default')
 

	
 
        for def_perm in User.DEFAULT_PERMISSIONS:
 

	
 
            perm = self.sa.query(Permission)\
 
             .filter(Permission.permission_name == def_perm)\
 
             .scalar()
 
            if not perm:
 
                raise Exception(
 
                  'CRITICAL: permission %s not found inside database !!'
 
                  % def_perm
 
                )
 
            if not UserToPerm.query()\
 
                .filter(UserToPerm.permission == perm)\
 
                .filter(UserToPerm.user == default_user).scalar():
 
                reg_perm = UserToPerm()
 
                reg_perm.user = default_user
 
                reg_perm.permission = perm
 
                self.sa.add(reg_perm)
 
        PermissionModel(self.sa).create_default_permissions(user=User.DEFAULT_USER)
 

	
 
    @staticmethod
 
    def check_waitress():
rhodecode/model/db.py
Show inline comments
 
@@ -320,10 +320,7 @@ class User(Base, BaseModel):
 
         'mysql_charset': 'utf8'}
 
    )
 
    DEFAULT_USER = 'default'
 
    DEFAULT_PERMISSIONS = [
 
        'hg.register.manual_activate', 'hg.create.repository',
 
        'hg.fork.repository', 'repository.read', 'group.read'
 
    ]
 

	
 
    user_id = Column("user_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
 
    username = Column("username", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    password = Column("password", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
@@ -502,6 +499,13 @@ class User(Base, BaseModel):
 
            raise Exception('Missing administrative account!')
 
        return user
 

	
 
    @classmethod
 
    def get_default_user(cls, cache=False):
 
        user = User.get_by_username(User.DEFAULT_USER, cache=cache)
 
        if user is None:
 
            raise Exception('Missing default account!')
 
        return user
 

	
 
    def get_api_data(self):
 
        """
 
        Common function for generating user related data for API
 
@@ -1405,6 +1409,8 @@ class Permission(Base, BaseModel):
 
         'mysql_charset': 'utf8'},
 
    )
 
    PERMS = [
 
        ('hg.admin', _('RhodeCode Administrator')),
 

	
 
        ('repository.none', _('Repository no access')),
 
        ('repository.read', _('Repository read access')),
 
        ('repository.write', _('Repository write access')),
 
@@ -1420,11 +1426,12 @@ class Permission(Base, BaseModel):
 
        ('usergroup.write', _('User group write access')),
 
        ('usergroup.admin', _('User group admin access')),
 

	
 
        ('hg.admin', _('RhodeCode Administrator')),
 
        ('hg.create.none', _('Repository creation disabled')),
 
        ('hg.create.repository', _('Repository creation enabled')),
 

	
 
        ('hg.fork.none', _('Repository forking disabled')),
 
        ('hg.fork.repository', _('Repository forking enabled')),
 

	
 
        ('hg.register.none', _('Register disabled')),
 
        ('hg.register.manual_activate', _('Register new user with RhodeCode '
 
                                          'with manual activation')),
 
@@ -1433,6 +1440,16 @@ class Permission(Base, BaseModel):
 
                                        'with auto activation')),
 
    ]
 

	
 
    #definition of system default permissions for DEFAULT user
 
    DEFAULT_USER_PERMISSIONS = [
 
        'repository.read',
 
        'group.read',
 
        'usergroup.read',
 
        'hg.create.repository',
 
        'hg.fork.repository',
 
        'hg.register.manual_activate',
 
    ]
 

	
 
    # defines which permissions are more important higher the more important
 
    PERM_WEIGHTS = {
 
        'repository.none': 0,
rhodecode/model/permission.py
Show inline comments
 
@@ -43,11 +43,49 @@ class PermissionModel(BaseModel):
 

	
 
    cls = Permission
 

	
 
    def create_default_permissions(self, user):
 
        """
 
        Creates only missing default permissions for user
 

	
 
        :param user:
 
        """
 
        user = self._get_user(user)
 

	
 
        def _make_perm(perm):
 
            new_perm = UserToPerm()
 
            new_perm.user = user
 
            new_perm.permission = Permission.get_by_key(perm)
 
            return new_perm
 

	
 
        def _get_group(perm_name):
 
            return '.'.join(perm_name.split('.')[:1])
 

	
 
        perms = UserToPerm.query().filter(UserToPerm.user == user).all()
 
        defined_perms_groups = map(_get_group,
 
                                (x.permission.permission_name for x in perms))
 
        log.debug('GOT ALREADY DEFINED:%s' % perms)
 
        DEFAULT_PERMS = Permission.DEFAULT_USER_PERMISSIONS
 

	
 
        # for every default permission that needs to be created, we check if
 
        # it's group is already defined, if it's not we create default perm
 
        for perm_name in DEFAULT_PERMS:
 
            gr = _get_group(perm_name)
 
            if gr not in defined_perms_groups:
 
                log.debug('GR:%s not found, creating permission %s'
 
                          % (gr, perm_name))
 
                new_perm = _make_perm(perm_name)
 
                self.sa.add(new_perm)
 

	
 
    def update(self, form_result):
 
        perm_user = User.get_by_username(username=form_result['perm_user_name'])
 
        u2p = self.sa.query(UserToPerm).filter(UserToPerm.user == perm_user).all()
 

	
 
        try:
 
            # stage 1 set anonymous access
 
            if perm_user.username == 'default':
 
                perm_user.active = str2bool(form_result['anonymous'])
 
                self.sa.add(perm_user)
 

	
 
            # stage 2 reset defaults and set them from form data
 
            def _make_new(usr, perm_name):
 
                new = UserToPerm()
 
                new.user = usr
 
@@ -56,6 +94,9 @@ class PermissionModel(BaseModel):
 
            # clear current entries, to make this function idempotent
 
            # it will fix even if we define more permissions or permissions
 
            # are somehow missing
 
            u2p = self.sa.query(UserToPerm)\
 
                .filter(UserToPerm.user == perm_user)\
 
                .all()
 
            for p in u2p:
 
                self.sa.delete(p)
 
            #create fresh set of permissions
 
@@ -65,7 +106,7 @@ class PermissionModel(BaseModel):
 
                p = _make_new(perm_user, form_result[def_perm_key])
 
                self.sa.add(p)
 

	
 
            #stage 2 update all default permissions for repos if checked
 
            #stage 3 update all default permissions for repos if checked
 
            if form_result['overwrite_default_repo'] == True:
 
                _def_name = form_result['default_repo_perm'].split('repository.')[-1]
 
                _def = Permission.get_by_key('repository.' + _def_name)
 
@@ -89,11 +130,6 @@ class PermissionModel(BaseModel):
 
                    g2p.permission = _def
 
                    self.sa.add(g2p)
 

	
 
            # stage 3 set anonymous access
 
            if perm_user.username == 'default':
 
                perm_user.active = str2bool(form_result['anonymous'])
 
                self.sa.add(perm_user)
 

	
 
            self.sa.commit()
 
        except (DatabaseError,):
 
            log.error(traceback.format_exc())
rhodecode/tests/models/test_permissions.py
Show inline comments
 
@@ -4,12 +4,14 @@ from rhodecode.tests import *
 
from rhodecode.tests.fixture import Fixture
 
from rhodecode.model.repos_group import ReposGroupModel
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.db import RepoGroup, User, UserGroupRepoGroupToPerm
 
from rhodecode.model.db import RepoGroup, User, UserGroupRepoGroupToPerm,\
 
    Permission, UserToPerm
 
from rhodecode.model.user import UserModel
 

	
 
from rhodecode.model.meta import Session
 
from rhodecode.model.users_group import UserGroupModel
 
from rhodecode.lib.auth import AuthUser
 
from rhodecode.model.permission import PermissionModel
 

	
 

	
 
fixture = Fixture()
 
@@ -101,13 +103,15 @@ class TestPermissions(unittest.TestCase)
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        perms = {
 
            'repositories_groups': {u'test1': 'group.read', u'test2': 'group.read'},
 
            'global': set([u'hg.create.repository', u'repository.read', u'hg.register.manual_activate']),
 
            'global': set(Permission.DEFAULT_USER_PERMISSIONS),
 
            'repositories': {u'vcs_test_hg': u'repository.read'}
 
        }
 
        self.assertEqual(u1_auth.permissions['repositories'][HG_REPO],
 
                         perms['repositories'][HG_REPO])
 
        self.assertEqual(u1_auth.permissions['repositories_groups'],
 
                         perms['repositories_groups'])
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         perms['global'])
 

	
 
    def test_default_admin_group_perms(self):
 
        self.g1 = fixture.create_group('test1', skip_if_exists=True)
 
@@ -347,7 +351,8 @@ class TestPermissions(unittest.TestCase)
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         set(['hg.create.repository', 'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'repository.read', 'group.read']))
 
                              'repository.read', 'group.read',
 
                              'usergroup.read']))
 

	
 
    def test_inherited_permissions_from_default_on_user_disabled(self):
 
        user_model = UserModel()
 
@@ -365,7 +370,8 @@ class TestPermissions(unittest.TestCase)
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         set(['hg.create.none', 'hg.fork.none',
 
                              'hg.register.manual_activate',
 
                              'repository.read', 'group.read']))
 
                              'repository.read', 'group.read',
 
                              'usergroup.read']))
 

	
 
    def test_non_inherited_permissions_from_default_on_user_enabled(self):
 
        user_model = UserModel()
 
@@ -391,7 +397,8 @@ class TestPermissions(unittest.TestCase)
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         set(['hg.create.none', 'hg.fork.none',
 
                              'hg.register.manual_activate',
 
                              'repository.read', 'group.read']))
 
                              'repository.read', 'group.read',
 
                              'usergroup.read']))
 

	
 
    def test_non_inherited_permissions_from_default_on_user_disabled(self):
 
        user_model = UserModel()
 
@@ -417,7 +424,8 @@ class TestPermissions(unittest.TestCase)
 
        self.assertEqual(u1_auth.permissions['global'],
 
                         set(['hg.create.repository', 'hg.fork.repository',
 
                              'hg.register.manual_activate',
 
                              'repository.read', 'group.read']))
 
                              'repository.read', 'group.read',
 
                              'usergroup.read']))
 

	
 
    def test_owner_permissions_doesnot_get_overwritten_by_group(self):
 
        #create repo as USER,
 
@@ -458,3 +466,60 @@ class TestPermissions(unittest.TestCase)
 
        u1_auth = AuthUser(user_id=self.u1.user_id)
 
        self.assertEqual(u1_auth.permissions['repositories']['myownrepo'],
 
                         'repository.admin')
 

	
 
    def _test_def_perm_equal(self, user, change_factor=0):
 
        perms = UserToPerm.query()\
 
                .filter(UserToPerm.user == user)\
 
                .all()
 
        self.assertEqual(len(perms),
 
                         len(Permission.DEFAULT_USER_PERMISSIONS,)+change_factor,
 
                         msg=perms)
 

	
 
    def test_set_default_permissions(self):
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 

	
 
    def test_set_default_permissions_after_one_is_missing(self):
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 
        #now we delete one, it should be re-created after another call
 
        perms = UserToPerm.query()\
 
                .filter(UserToPerm.user == self.u1)\
 
                .all()
 
        Session().delete(perms[0])
 
        Session().commit()
 

	
 
        self._test_def_perm_equal(user=self.u1, change_factor=-1)
 

	
 
        #create missing one !
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 

	
 
    @parameterized.expand([
 
        ('repository.read', 'repository.none'),
 
        ('group.read', 'group.none'),
 
        ('usergroup.read', 'usergroup.none'),
 
        ('hg.create.repository', 'hg.create.none'),
 
        ('hg.fork.repository', 'hg.fork.none'),
 
        ('hg.register.manual_activate', 'hg.register.auto_activate',)
 
    ])
 
    def test_set_default_permissions_after_modification(self, perm, modify_to):
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
 

	
 
        old = Permission.get_by_key(perm)
 
        new = Permission.get_by_key(modify_to)
 
        self.assertNotEqual(old, None)
 
        self.assertNotEqual(new, None)
 

	
 
        #now modify permissions
 
        p = UserToPerm.query()\
 
                .filter(UserToPerm.user == self.u1)\
 
                .filter(UserToPerm.permission == old)\
 
                .one()
 
        p.permission = new
 
        Session().add(p)
 
        Session().commit()
 

	
 
        PermissionModel().create_default_permissions(user=self.u1)
 
        self._test_def_perm_equal(user=self.u1)
0 comments (0 inline, 0 general)