Changeset - 702da441f5c4
[Not reviewed]
default
0 4 0
Marcin Kuzminski - 13 years ago 2013-03-06 00:20:13
marcin@python-works.com
Grafted from: 94f251fda314
fixed issue with renaming repos group together with changing parents with multiple nested trees
added regresion tests for such cases
4 files changed with 157 insertions and 44 deletions:
0 comments (0 inline, 0 general)
rhodecode/model/db.py
Show inline comments
 
@@ -1242,114 +1242,124 @@ class RepoGroup(Base, BaseModel):
 
            gr = getattr(cur_gr, 'parent_group', None)
 
            cur_gr = cur_gr.parent_group
 
            if gr is None:
 
                break
 
            if cnt == parents_recursion_limit:
 
                # this will prevent accidental infinit loops
 
                log.error('group nested more than %s' %
 
                          parents_recursion_limit)
 
                break
 

	
 
            groups.insert(0, gr)
 
        return groups
 

	
 
    @property
 
    def children(self):
 
        return RepoGroup.query().filter(RepoGroup.parent_group == self)
 

	
 
    @property
 
    def name(self):
 
        return self.group_name.split(RepoGroup.url_sep())[-1]
 

	
 
    @property
 
    def full_path(self):
 
        return self.group_name
 

	
 
    @property
 
    def full_path_splitted(self):
 
        return self.group_name.split(RepoGroup.url_sep())
 

	
 
    @property
 
    def repositories(self):
 
        return Repository.query()\
 
                .filter(Repository.group == self)\
 
                .order_by(Repository.repo_name)
 

	
 
    @property
 
    def repositories_recursive_count(self):
 
        cnt = self.repositories.count()
 

	
 
        def children_count(group):
 
            cnt = 0
 
            for child in group.children:
 
                cnt += child.repositories.count()
 
                cnt += children_count(child)
 
            return cnt
 

	
 
        return cnt + children_count(self)
 

	
 
    def recursive_groups_and_repos(self):
 
        """
 
        Recursive return all groups, with repositories in those groups
 
        """
 
    def _recursive_objects(self, include_repos=True):
 
        all_ = []
 

	
 
        def _get_members(root_gr):
 
            if include_repos:
 
            for r in root_gr.repositories:
 
                all_.append(r)
 
            childs = root_gr.children.all()
 
            if childs:
 
                for gr in childs:
 
                    all_.append(gr)
 
                    _get_members(gr)
 

	
 
        _get_members(self)
 
        return [self] + all_
 

	
 
    def recursive_groups_and_repos(self):
 
        """
 
        Recursive return all groups, with repositories in those groups
 
        """
 
        return self._recursive_objects()
 

	
 
    def recursive_groups(self):
 
        """
 
        Returns all children groups for this group including children of children 
 
        """
 
        return self._recursive_objects(include_repos=False)
 

	
 
    def get_new_name(self, group_name):
 
        """
 
        returns new full group name based on parent and new name
 

	
 
        :param group_name:
 
        """
 
        path_prefix = (self.parent_group.full_path_splitted if
 
                       self.parent_group else [])
 
        return RepoGroup.url_sep().join(path_prefix + [group_name])
 

	
 

	
 
class Permission(Base, BaseModel):
 
    __tablename__ = 'permissions'
 
    __table_args__ = (
 
        Index('p_perm_name_idx', 'permission_name'),
 
        {'extend_existing': True, 'mysql_engine': 'InnoDB',
 
         'mysql_charset': 'utf8'},
 
    )
 
    PERMS = [
 
        ('repository.none', _('Repository no access')),
 
        ('repository.read', _('Repository read access')),
 
        ('repository.write', _('Repository write access')),
 
        ('repository.admin', _('Repository admin access')),
 

	
 
        ('group.none', _('Repositories Group no access')),
 
        ('group.read', _('Repositories Group read access')),
 
        ('group.write', _('Repositories Group write access')),
 
        ('group.admin', _('Repositories Group admin access')),
 

	
 
        ('hg.admin', _('RhodeCode Administrator')),
 
        ('hg.create.none', _('Repository creation disabled')),
 
        ('hg.create.repository', _('Repository creation enabled')),
 
        ('hg.fork.none', _('Repository forking disabled')),
 
        ('hg.fork.repository', _('Repository forking enabled')),
 
        ('hg.register.none', _('Register disabled')),
 
        ('hg.register.manual_activate', _('Register new user with RhodeCode '
 
                                          'with manual activation')),
 

	
 
        ('hg.register.auto_activate', _('Register new user with RhodeCode '
 
                                        'with auto activation')),
 
    ]
 

	
 
    # defines which permissions are more important higher the more important
 
    PERM_WEIGHTS = {
 
        'repository.none': 0,
 
        'repository.read': 1,
 
        'repository.write': 3,
 
        'repository.admin': 4,
rhodecode/model/repos_group.py
Show inline comments
 
@@ -199,116 +199,126 @@ class ReposGroupModel(BaseModel):
 
                    repo=obj, group_name=users_group, perm=perm
 
                )
 
        updates = []
 
        log.debug('Now updating permissions for %s in recursive mode:%s'
 
                  % (repos_group, recursive))
 

	
 
        for obj in repos_group.recursive_groups_and_repos():
 
            #obj is an instance of a group or repositories in that group
 
            if not recursive:
 
                obj = repos_group
 

	
 
            # update permissions
 
            for member, perm, member_type in perms_updates:
 
                ## set for user
 
                if member_type == 'user':
 
                    # this updates also current one if found
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                ## set for users group
 
                else:
 
                    _set_perm_group(obj, users_group=member, perm=perm)
 
            # set new permissions
 
            for member, perm, member_type in perms_new:
 
                if member_type == 'user':
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                else:
 
                    _set_perm_group(obj, users_group=member, perm=perm)
 
            updates.append(obj)
 
            #if it's not recursive call
 
            # break the loop and don't proceed with other changes
 
            if not recursive:
 
                break
 
        return updates
 

	
 
    def update(self, repos_group_id, form_data):
 

	
 
        try:
 
            repos_group = RepoGroup.get(repos_group_id)
 
            recursive = form_data['recursive']
 
            # iterate over all members(if in recursive mode) of this groups and
 
            # set the permissions !
 
            # this can be potentially heavy operation
 
            self._update_permissions(repos_group, form_data['perms_new'],
 
                                     form_data['perms_updates'], recursive)
 

	
 
            old_path = repos_group.full_path
 

	
 
            # change properties
 
            repos_group.group_description = form_data['group_description']
 
            repos_group.parent_group = RepoGroup.get(form_data['group_parent_id'])
 
            repos_group.group_parent_id = form_data['group_parent_id']
 
            repos_group.enable_locking = form_data['enable_locking']
 

	
 
            repos_group.parent_group = RepoGroup.get(form_data['group_parent_id'])
 
            repos_group.group_name = repos_group.get_new_name(form_data['group_name'])
 
            new_path = repos_group.full_path
 

	
 
            self.sa.add(repos_group)
 

	
 
            # iterate over all members of this groups and set the locking !
 
            # iterate over all members of this groups and do fixes
 
            # set locking if given
 
            # if obj is a repoGroup also fix the name of the group according
 
            # to the parent
 
            # if obj is a Repo fix it's name
 
            # this can be potentially heavy operation
 
            for obj in repos_group.recursive_groups_and_repos():
 
                #set the value from it's parent
 
                obj.enable_locking = repos_group.enable_locking
 
                self.sa.add(obj)
 

	
 
                if isinstance(obj, RepoGroup):
 
                    new_name = obj.get_new_name(obj.name)
 
                    log.debug('Fixing group %s to new name %s' \
 
                                % (obj.group_name, new_name))
 
                    obj.group_name = new_name
 
                elif isinstance(obj, Repository):
 
            # we need to get all repositories from this new group and
 
            # rename them accordingly to new group path
 
            for r in repos_group.repositories:
 
                r.repo_name = r.get_new_name(r.just_name)
 
                self.sa.add(r)
 
                    new_name = obj.get_new_name(obj.just_name)
 
                    log.debug('Fixing repo %s to new name %s' \
 
                                % (obj.repo_name, new_name))
 
                    obj.repo_name = new_name
 
                self.sa.add(obj)
 

	
 
            self.__rename_group(old_path, new_path)
 

	
 
            return repos_group
 
        except:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def delete(self, repos_group, force_delete=False):
 
        repos_group = self._get_repos_group(repos_group)
 
        try:
 
            self.sa.delete(repos_group)
 
            self.__delete_group(repos_group, force_delete)
 
        except:
 
            log.error('Error removing repos_group %s' % repos_group)
 
            raise
 

	
 
    def delete_permission(self, repos_group, obj, obj_type, recursive):
 
        """
 
        Revokes permission for repos_group for given obj(user or users_group),
 
        obj_type can be user or users group
 

	
 
        :param repos_group:
 
        :param obj: user or users group id
 
        :param obj_type: user or users group type
 
        :param recursive: recurse to all children of group
 
        """
 
        from rhodecode.model.repo import RepoModel
 
        repos_group = self._get_repos_group(repos_group)
 

	
 
        for el in repos_group.recursive_groups_and_repos():
 
            if not recursive:
 
                # if we don't recurse set the permission on only the top level
 
                # object
 
                el = repos_group
 

	
 
            if isinstance(el, RepoGroup):
 
                if obj_type == 'user':
 
                    ReposGroupModel().revoke_user_permission(el, user=obj)
 
                elif obj_type == 'users_group':
 
                    ReposGroupModel().revoke_users_group_permission(el, group_name=obj)
 
                else:
 
                    raise Exception('undefined object type %s' % obj_type)
 
            elif isinstance(el, Repository):
 
                if obj_type == 'user':
 
                    RepoModel().revoke_user_permission(el, user=obj)
 
                elif obj_type == 'users_group':
 
                    RepoModel().revoke_users_group_permission(el, group_name=obj)
rhodecode/tests/__init__.py
Show inline comments
 
@@ -2,97 +2,98 @@
 

	
 
This package assumes the Pylons environment is already loaded, such as
 
when this script is imported from the `nosetests --with-pylons=test.ini`
 
command.
 

	
 
This module initializes the application via ``websetup`` (`paster
 
setup-app`) and provides the base testing objects.
 
"""
 
import os
 
import time
 
import logging
 
import datetime
 
import hashlib
 
import tempfile
 
from os.path import join as jn
 

	
 
from unittest import TestCase
 
from tempfile import _RandomNameSequence
 

	
 
from paste.deploy import loadapp
 
from paste.script.appinstall import SetupCommand
 
from pylons import config, url
 
from routes.util import URLGenerator
 
from webtest import TestApp
 

	
 
from rhodecode import is_windows
 
from rhodecode.model.meta import Session
 
from rhodecode.model.db import User
 
from rhodecode.tests.nose_parametrized import parameterized
 

	
 
import pylons.test
 

	
 

	
 
os.environ['TZ'] = 'UTC'
 
if not is_windows:
 
    time.tzset()
 

	
 
log = logging.getLogger(__name__)
 

	
 
__all__ = [
 
    'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
 
    'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
 
    'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
 
    'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
 
    'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
 
    'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
 
    'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
 
    'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
 
    'GIT_REMOTE_REPO', 'SCM_TESTS', '_get_repo_create_params'
 
    'GIT_REMOTE_REPO', 'SCM_TESTS', '_get_repo_create_params',
 
    '_get_group_create_params'
 
]
 

	
 
# Invoke websetup with the current config file
 
# SetupCommand('setup-app').run([config_file])
 

	
 
##RUNNING DESIRED TESTS
 
# nosetests -x rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
 
# nosetests --pdb --pdb-failures
 
# nosetests --with-coverage --cover-package=rhodecode.model.validators rhodecode.tests.test_validators
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 

	
 
TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
 
TEST_USER_ADMIN_LOGIN = 'test_admin'
 
TEST_USER_ADMIN_PASS = 'test12'
 
TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
 

	
 
TEST_USER_REGULAR_LOGIN = 'test_regular'
 
TEST_USER_REGULAR_PASS = 'test12'
 
TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
 

	
 
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
 

	
 
HG_REPO = 'vcs_test_hg'
 
GIT_REPO = 'vcs_test_git'
 

	
 
NEW_HG_REPO = 'vcs_test_hg_new'
 
NEW_GIT_REPO = 'vcs_test_git_new'
 

	
 
HG_FORK = 'vcs_test_hg_fork'
 
GIT_FORK = 'vcs_test_git_fork'
 

	
 
## VCS
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
 

	
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 

	
 
@@ -137,48 +138,63 @@ class TestController(TestCase):
 

	
 
    def log_user(self, username=TEST_USER_ADMIN_LOGIN,
 
                 password=TEST_USER_ADMIN_PASS):
 
        self._logged_username = username
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': username,
 
                                  'password': password})
 

	
 
        if 'invalid user name' in response.body:
 
            self.fail('could not login using %s %s' % (username, password))
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        ses = response.session['rhodecode_user']
 
        self.assertEqual(ses.get('username'), username)
 
        response = response.follow()
 
        self.assertEqual(ses.get('is_authenticated'), True)
 

	
 
        return response.session['rhodecode_user']
 

	
 
    def _get_logged_user(self):
 
        return User.get_by_username(self._logged_username)
 

	
 
    def checkSessionFlash(self, response, msg):
 
        self.assertTrue('flash' in response.session)
 
        if not msg in response.session['flash'][0][1]:
 
            self.fail(
 
                'msg `%s` not found in session flash: got `%s` instead' % (
 
                      msg, response.session['flash'])
 
            )
 

	
 

	
 
## HELPERS ##
 

	
 
def _get_repo_create_params(**custom):
 
    defs = {
 
        'repo_name': None,
 
        'repo_type': 'hg',
 
        'clone_uri': '',
 
        'repo_group': '',
 
        'repo_description': 'DESC',
 
        'repo_private': False,
 
        'repo_landing_rev': 'tip'
 
    }
 
    defs.update(custom)
 
    if 'repo_name_full' not in custom:
 
        defs.update({'repo_name_full': defs['repo_name']})
 

	
 
    return defs
 

	
 

	
 
def _get_group_create_params(**custom):
 
    defs = dict(
 
        group_name=None,
 
        group_description='DESC',
 
        group_parent_id=None,
 
        perms_updates=[],
 
        perms_new=[],
 
        enable_locking=False,
 
        recursive=False
 
    )
 
    defs.update(custom)
 

	
 
    return defs
rhodecode/tests/models/test_repos_groups.py
Show inline comments
 
import os
 
import unittest
 
from rhodecode.tests import *
 

	
 
from rhodecode.model.repos_group import ReposGroupModel
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.db import RepoGroup, User
 
from rhodecode.model.meta import Session
 
from sqlalchemy.exc import IntegrityError
 

	
 

	
 
def _make_group(path, desc='desc', parent_id=None,
 
                 skip_if_exists=False):
 

	
 
    gr = RepoGroup.get_by_group_name(path)
 
    if gr and skip_if_exists:
 
        return gr
 
    if isinstance(parent_id, RepoGroup):
 
        parent_id = parent_id.group_id
 
    gr = ReposGroupModel().create(path, desc, parent_id)
 
    return gr
 

	
 

	
 
def _update_group(id_, group_name, desc='desc', parent_id=None):
 
    form_data = _get_group_create_params(group_name=group_name,
 
                                         group_desc=desc,
 
                                         group_parent_id=parent_id)
 
    gr = ReposGroupModel().update(id_, form_data)
 
    return gr
 

	
 

	
 
def _make_repo(name, **kwargs):
 
    form_data = _get_repo_create_params(repo_name=name, **kwargs)
 
    cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
    r = RepoModel().create(form_data, cur_user)
 
    return r
 

	
 

	
 
def _update_repo(name, **kwargs):
 
    form_data = _get_repo_create_params(**kwargs)
 
    if not 'repo_name' in kwargs:
 
        form_data['repo_name'] = name
 
    if not 'perms_new' in kwargs:
 
        form_data['perms_new'] = []
 
    if not 'perms_updates' in kwargs:
 
        form_data['perms_updates'] = []
 
    r = RepoModel().update(name, **form_data)
 
    return r
 

	
 

	
 
class TestReposGroups(unittest.TestCase):
 

	
 
    def setUp(self):
 
        self.g1 = _make_group('test1', skip_if_exists=True)
 
        Session().commit()
 
        self.g2 = _make_group('test2', skip_if_exists=True)
 
        Session().commit()
 
        self.g3 = _make_group('test3', skip_if_exists=True)
 
        Session().commit()
 

	
 
    def tearDown(self):
 
        print 'out'
 
        Session.remove()
 

	
 
    def __check_path(self, *path):
 
        """
 
        Checks the path for existance !
 
        """
 
        path = [TESTS_TMP_PATH] + list(path)
 
        path = os.path.join(*path)
 
        return os.path.isdir(path)
 

	
 
    def _check_folders(self):
 
        print os.listdir(TESTS_TMP_PATH)
 

	
 
    def __delete_group(self, id_):
 
        ReposGroupModel().delete(id_)
 

	
 
    def __update_group(self, id_, path, desc='desc', parent_id=None):
 
        form_data = dict(
 
            group_name=path,
 
            group_description=desc,
 
            group_parent_id=parent_id,
 
            perms_updates=[],
 
            perms_new=[],
 
            enable_locking=False,
 
            recursive=False
 
        )
 
        gr = ReposGroupModel().update(id_, form_data)
 
        return gr
 

	
 
    def test_create_group(self):
 
        g = _make_group('newGroup')
 
        Session().commit()
 
        self.assertEqual(g.full_path, 'newGroup')
 

	
 
        self.assertTrue(self.__check_path('newGroup'))
 

	
 
    def test_create_same_name_group(self):
 
        self.assertRaises(IntegrityError, lambda: _make_group('newGroup'))
 
        Session().rollback()
 

	
 
    def test_same_subgroup(self):
 
        sg1 = _make_group('sub1', parent_id=self.g1.group_id)
 
        Session().commit()
 
        self.assertEqual(sg1.parent_group, self.g1)
 
        self.assertEqual(sg1.full_path, 'test1/sub1')
 
        self.assertTrue(self.__check_path('test1', 'sub1'))
 

	
 
        ssg1 = _make_group('subsub1', parent_id=sg1.group_id)
 
        Session().commit()
 
        self.assertEqual(ssg1.parent_group, sg1)
 
        self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
 
        self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
 

	
 
    def test_remove_group(self):
 
        sg1 = _make_group('deleteme')
 
        Session().commit()
 
        self.__delete_group(sg1.group_id)
 

	
 
        self.assertEqual(RepoGroup.get(sg1.group_id), None)
 
        self.assertFalse(self.__check_path('deteteme'))
 

	
 
        sg1 = _make_group('deleteme', parent_id=self.g1.group_id)
 
        Session().commit()
 
        self.__delete_group(sg1.group_id)
 

	
 
        self.assertEqual(RepoGroup.get(sg1.group_id), None)
 
        self.assertFalse(self.__check_path('test1', 'deteteme'))
 

	
 
    def test_rename_single_group(self):
 
        sg1 = _make_group('initial')
 
        Session().commit()
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'after')
 
        new_sg1 = _update_group(sg1.group_id, 'after')
 
        self.assertTrue(self.__check_path('after'))
 
        self.assertEqual(RepoGroup.get_by_group_name('initial'), None)
 

	
 
    def test_update_group_parent(self):
 

	
 
        sg1 = _make_group('initial', parent_id=self.g1.group_id)
 
        Session().commit()
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
 
        new_sg1 = _update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
 
        self.assertTrue(self.__check_path('test1', 'after'))
 
        self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
 
        new_sg1 = _update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
 
        self.assertTrue(self.__check_path('test3', 'after'))
 
        self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
 

	
 
        new_sg1 = self.__update_group(sg1.group_id, 'hello')
 
        new_sg1 = _update_group(sg1.group_id, 'hello')
 
        self.assertTrue(self.__check_path('hello'))
 

	
 
        self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
 

	
 
    def test_subgrouping_with_repo(self):
 

	
 
        g1 = _make_group('g1')
 
        g2 = _make_group('g2')
 

	
 
        Session().commit()
 
        # create new repo
 
        form_data = _get_repo_create_params(repo_name='john')
 
        cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
        r = RepoModel().create(form_data, cur_user)
 

	
 
        r = _make_repo('john')
 
        Session().commit()
 
        self.assertEqual(r.repo_name, 'john')
 

	
 
        # put repo into group
 
        form_data = form_data
 
        form_data['repo_group'] = g1.group_id
 
        form_data['perms_new'] = []
 
        form_data['perms_updates'] = []
 
        RepoModel().update(r.repo_name, **form_data)
 
        r = _update_repo('john', repo_group=g1.group_id)
 
        Session().commit()
 
        self.assertEqual(r.repo_name, 'g1/john')
 

	
 
        self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
 
        _update_group(g1.group_id, 'g1', parent_id=g2.group_id)
 
        self.assertTrue(self.__check_path('g2', 'g1'))
 

	
 
        # test repo
 
        self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1',
 
                                                                r.just_name]))
 

	
 
    def test_move_to_root(self):
 
        g1 = _make_group('t11')
 
        Session().commit()
 
        g2 = _make_group('t22', parent_id=g1.group_id)
 
        Session().commit()
 

	
 
        self.assertEqual(g2.full_path, 't11/t22')
 
        self.assertTrue(self.__check_path('t11', 't22'))
 

	
 
        g2 = self.__update_group(g2.group_id, 'g22', parent_id=None)
 
        g2 = _update_group(g2.group_id, 'g22', parent_id=None)
 
        Session().commit()
 

	
 
        self.assertEqual(g2.group_name, 'g22')
 
        # we moved out group from t1 to '' so it's full path should be 'g2'
 
        self.assertEqual(g2.full_path, 'g22')
 
        self.assertFalse(self.__check_path('t11', 't22'))
 
        self.assertTrue(self.__check_path('g22'))
 

	
 
    def test_rename_top_level_group_in_nested_setup(self):
 
        g1 = _make_group('L1')
 
        Session().commit()
 
        g2 = _make_group('L2', parent_id=g1.group_id)
 
        Session().commit()
 
        g3 = _make_group('L3', parent_id=g2.group_id)
 
        Session().commit()
 

	
 
        r = _make_repo('L1/L2/L3/L3_REPO', repo_group=g3.group_id)
 
        Session().commit()
 

	
 
        ##rename L1 all groups should be now changed
 
        _update_group(g1.group_id, 'L1_NEW')
 
        Session().commit()
 
        self.assertEqual(g1.full_path, 'L1_NEW')
 
        self.assertEqual(g2.full_path, 'L1_NEW/L2')
 
        self.assertEqual(g3.full_path, 'L1_NEW/L2/L3')
 
        self.assertEqual(r.repo_name,  'L1_NEW/L2/L3/L3_REPO')
 

	
 
    def test_change_parent_of_top_level_group_in_nested_setup(self):
 
        g1 = _make_group('R1')
 
        Session().commit()
 
        g2 = _make_group('R2', parent_id=g1.group_id)
 
        Session().commit()
 
        g3 = _make_group('R3', parent_id=g2.group_id)
 
        Session().commit()
 

	
 
        g4 = _make_group('R1_NEW')
 
        Session().commit()
 

	
 
        r = _make_repo('R1/R2/R3/R3_REPO', repo_group=g3.group_id)
 
        Session().commit()
 
        ##rename L1 all groups should be now changed
 
        _update_group(g1.group_id, 'R1', parent_id=g4.group_id)
 
        Session().commit()
 
        self.assertEqual(g1.full_path, 'R1_NEW/R1')
 
        self.assertEqual(g2.full_path, 'R1_NEW/R1/R2')
 
        self.assertEqual(g3.full_path, 'R1_NEW/R1/R2/R3')
 
        self.assertEqual(r.repo_name,  'R1_NEW/R1/R2/R3/R3_REPO')
 

	
 
    def test_change_parent_of_top_level_group_in_nested_setup_with_rename(self):
 
        g1 = _make_group('X1')
 
        Session().commit()
 
        g2 = _make_group('X2', parent_id=g1.group_id)
 
        Session().commit()
 
        g3 = _make_group('X3', parent_id=g2.group_id)
 
        Session().commit()
 

	
 
        g4 = _make_group('X1_NEW')
 
        Session().commit()
 

	
 
        r = _make_repo('X1/X2/X3/X3_REPO', repo_group=g3.group_id)
 
        Session().commit()
 

	
 
        ##rename L1 all groups should be now changed
 
        _update_group(g1.group_id, 'X1_PRIM', parent_id=g4.group_id)
 
        Session().commit()
 
        self.assertEqual(g1.full_path, 'X1_NEW/X1_PRIM')
 
        self.assertEqual(g2.full_path, 'X1_NEW/X1_PRIM/X2')
 
        self.assertEqual(g3.full_path, 'X1_NEW/X1_PRIM/X2/X3')
 
        self.assertEqual(r.repo_name,  'X1_NEW/X1_PRIM/X2/X3/X3_REPO')
0 comments (0 inline, 0 general)