Changeset - 0bae66824ac5
[Not reviewed]
default
0 5 0
Mads Kiilerich - 9 years ago 2017-05-08 05:25:41
mads@kiilerich.com
tests: clarify that default parameters are for form - direct model access requires different types

_get_repo_group_create_params parent_group_id is thus set to the form value
'-1' instead of None. It worked before anyway because the model failed to find
the repo '-1' and thus got pretty much the same as None.
5 files changed with 47 insertions and 40 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/celerylib/tasks.py
Show inline comments
 
@@ -407,97 +407,96 @@ def create_repo(form_data, cur_user):
 

	
 

	
 
@celerylib.task
 
@celerylib.dbsession
 
def create_repo_fork(form_data, cur_user):
 
    """
 
    Creates a fork of repository using interval VCS methods
 

	
 
    :param form_data:
 
    :param cur_user:
 
    """
 
    from kallithea.model.repo import RepoModel
 

	
 
    DBS = celerylib.get_session()
 

	
 
    base_path = Repository.base_path()
 
    cur_user = User.guess_instance(cur_user)
 

	
 
    repo_name = form_data['repo_name']  # fork in this case
 
    repo_name_full = form_data['repo_name_full']
 

	
 
    repo_type = form_data['repo_type']
 
    owner = cur_user
 
    private = form_data['private']
 
    clone_uri = form_data.get('clone_uri')
 
    repo_group = form_data['repo_group']
 
    landing_rev = form_data['landing_rev']
 
    copy_fork_permissions = form_data.get('copy_permissions')
 

	
 
    try:
 
        fork_of = Repository.guess_instance(form_data.get('fork_parent_id'))
 

	
 
        RepoModel()._create_repo(
 
            repo_name=repo_name_full,
 
            repo_type=repo_type,
 
            description=form_data['description'],
 
            owner=owner,
 
            private=private,
 
            clone_uri=clone_uri,
 
            repo_group=repo_group,
 
            landing_rev=landing_rev,
 
            fork_of=fork_of,
 
            copy_fork_permissions=copy_fork_permissions
 
        )
 
        action_logger(cur_user, 'user_forked_repo:%s' % repo_name_full,
 
                      fork_of.repo_name, '')
 
        DBS.commit()
 

	
 
        update_after_clone = form_data['update_after_clone'] # FIXME - unused!
 
        source_repo_path = os.path.join(base_path, fork_of.repo_name)
 

	
 
        # now create this repo on Filesystem
 
        RepoModel()._create_filesystem_repo(
 
            repo_name=repo_name,
 
            repo_type=repo_type,
 
            repo_group=RepoGroup.guess_instance(repo_group),
 
            clone_uri=source_repo_path,
 
        )
 
        repo = Repository.get_by_repo_name(repo_name_full)
 
        log_create_repository(repo.get_dict(), created_by=owner.username)
 

	
 
        # update repo changeset caches initially
 
        repo.update_changeset_cache()
 

	
 
        # set new created state
 
        repo.set_state(Repository.STATE_CREATED)
 
        DBS.commit()
 
    except Exception as e:
 
        log.warning('Exception %s occurred when forking repository, '
 
                    'doing cleanup...' % e)
 
        #rollback things manually !
 
        repo = Repository.get_by_repo_name(repo_name_full)
 
        if repo:
 
            Repository.delete(repo.repo_id)
 
            DBS.commit()
 
            RepoModel()._delete_filesystem_repo(repo)
 
        raise
 

	
 
    return True
 

	
 

	
 
def __get_codes_stats(repo_name):
 
    from kallithea.config.conf import LANGUAGES_EXTENSIONS_MAP
 
    repo = Repository.get_by_repo_name(repo_name).scm_instance
 

	
 
    tip = repo.get_changeset()
 
    code_stats = {}
 

	
 
    def aggregate(cs):
 
        for f in cs[2]:
 
            ext = lower(f.extension)
 
            if ext in LANGUAGES_EXTENSIONS_MAP.keys() and not f.is_binary:
 
                if ext in code_stats:
 
                    code_stats[ext] += 1
 
                else:
 
                    code_stats[ext] = 1
 

	
kallithea/model/repo.py
Show inline comments
 
@@ -252,96 +252,97 @@ class RepoModel(object):
 

	
 
        defaults = repo_info.get_dict()
 
        defaults['repo_name'] = repo_info.just_name
 
        defaults['repo_group'] = repo_info.group_id
 

	
 
        for strip, k in [(0, 'repo_type'), (1, 'repo_enable_downloads'),
 
                         (1, 'repo_description'), (1, 'repo_enable_locking'),
 
                         (1, 'repo_landing_rev'), (0, 'clone_uri'),
 
                         (1, 'repo_private'), (1, 'repo_enable_statistics')]:
 
            attr = k
 
            if strip:
 
                attr = remove_prefix(k, 'repo_')
 

	
 
            val = defaults[attr]
 
            if k == 'repo_landing_rev':
 
                val = ':'.join(defaults[attr])
 
            defaults[k] = val
 
            if k == 'clone_uri':
 
                defaults['clone_uri_hidden'] = repo_info.clone_uri_hidden
 

	
 
        # fill owner
 
        if repo_info.owner:
 
            defaults.update({'owner': repo_info.owner.username})
 
        else:
 
            replacement_user = User.query().filter(User.admin ==
 
                                                   True).first().username
 
            defaults.update({'owner': replacement_user})
 

	
 
        # fill repository users
 
        for p in repo_info.repo_to_perm:
 
            defaults.update({'u_perm_%s' % p.user.username:
 
                                 p.permission.permission_name})
 

	
 
        # fill repository groups
 
        for p in repo_info.users_group_to_perm:
 
            defaults.update({'g_perm_%s' % p.users_group.users_group_name:
 
                                 p.permission.permission_name})
 

	
 
        return defaults
 

	
 
    def update(self, repo, **kwargs):
 
        try:
 
            cur_repo = Repository.guess_instance(repo)
 
            org_repo_name = cur_repo.repo_name
 
            if 'owner' in kwargs:
 
                cur_repo.owner = User.get_by_username(kwargs['owner'])
 

	
 
            if 'repo_group' in kwargs:
 
                assert kwargs['repo_group'] != u'-1', kwargs # RepoForm should have converted to None
 
                cur_repo.group = RepoGroup.get(kwargs['repo_group'])
 
                cur_repo.repo_name = cur_repo.get_new_name(cur_repo.just_name)
 
            log.debug('Updating repo %s with params:%s', cur_repo, kwargs)
 
            for k in ['repo_enable_downloads',
 
                      'repo_description',
 
                      'repo_enable_locking',
 
                      'repo_landing_rev',
 
                      'repo_private',
 
                      'repo_enable_statistics',
 
                      ]:
 
                if k in kwargs:
 
                    setattr(cur_repo, remove_prefix(k, 'repo_'), kwargs[k])
 
            clone_uri = kwargs.get('clone_uri')
 
            if clone_uri is not None and clone_uri != cur_repo.clone_uri_hidden:
 
                cur_repo.clone_uri = clone_uri
 

	
 
            if 'repo_name' in kwargs:
 
                cur_repo.repo_name = cur_repo.get_new_name(kwargs['repo_name'])
 

	
 
            #if private flag is set, reset default permission to NONE
 
            if kwargs.get('repo_private'):
 
                EMPTY_PERM = 'repository.none'
 
                RepoModel().grant_user_permission(
 
                    repo=cur_repo, user='default', perm=EMPTY_PERM
 
                )
 
                #handle extra fields
 
            for field in filter(lambda k: k.startswith(RepositoryField.PREFIX),
 
                                kwargs):
 
                k = RepositoryField.un_prefix_key(field)
 
                ex_field = RepositoryField.get_by_key_name(key=k, repo=cur_repo)
 
                if ex_field:
 
                    ex_field.field_value = kwargs[field]
 

	
 
            if org_repo_name != cur_repo.repo_name:
 
                # rename repository
 
                self._rename_filesystem_repo(old=org_repo_name, new=cur_repo.repo_name)
 

	
 
            return cur_repo
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def _create_repo(self, repo_name, repo_type, description, owner,
 
                     private=False, clone_uri=None, repo_group=None,
 
                     landing_rev='rev:tip', fork_of=None,
 
                     copy_fork_permissions=False, enable_statistics=False,
 
                     enable_locking=False, enable_downloads=False,
 
                     copy_group_permissions=False, state=Repository.STATE_PENDING):
kallithea/model/repo_group.py
Show inline comments
 
@@ -228,109 +228,114 @@ class RepoGroupModel(object):
 
        updates = []
 
        log.debug('Now updating permissions for %s in recursive mode:%s',
 
                  repo_group, recursive)
 

	
 
        for obj in repo_group.recursive_groups_and_repos():
 
            # iterated obj is an instance of a repos group or repository in
 
            # that group, recursive option can be: none, repos, groups, all
 
            if recursive == 'all':
 
                pass
 
            elif recursive == 'repos':
 
                # skip groups, other than this one
 
                if isinstance(obj, RepoGroup) and not obj == repo_group:
 
                    continue
 
            elif recursive == 'groups':
 
                # skip repos
 
                if isinstance(obj, Repository):
 
                    continue
 
            else:  # recursive == 'none': # DEFAULT don't apply to iterated objects
 
                obj = repo_group
 
                # also we do a break at the end of this loop.
 

	
 
            # update permissions
 
            for member, perm, member_type in perms_updates:
 
                ## set for user
 
                if member_type == 'user':
 
                    # this updates also current one if found
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                ## set for user group
 
                else:
 
                    #check if we have permissions to alter this usergroup's access
 
                    if not check_perms or HasUserGroupPermissionLevel('read')(member):
 
                        _set_perm_group(obj, users_group=member, perm=perm)
 
            # set new permissions
 
            for member, perm, member_type in perms_new:
 
                if member_type == 'user':
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                else:
 
                    #check if we have permissions to alter this usergroup's access
 
                    if not check_perms or HasUserGroupPermissionLevel('read')(member):
 
                        _set_perm_group(obj, users_group=member, perm=perm)
 
            updates.append(obj)
 
            # if it's not recursive call for all,repos,groups
 
            # break the loop and don't proceed with other changes
 
            if recursive not in ['all', 'repos', 'groups']:
 
                break
 

	
 
        return updates
 

	
 
    def update(self, repo_group, form_data):
 

	
 
    def update(self, repo_group, kwargs):
 
        try:
 
            repo_group = RepoGroup.guess_instance(repo_group)
 
            old_path = repo_group.full_path
 

	
 
            # change properties
 
            repo_group.group_description = form_data['group_description']
 
            repo_group.parent_group_id = form_data['parent_group_id']
 
            repo_group.enable_locking = form_data['enable_locking']
 
            if 'group_description' in kwargs:
 
                repo_group.group_description = kwargs['group_description']
 
            if 'parent_group_id' in kwargs:
 
                repo_group.parent_group_id = kwargs['parent_group_id']
 
            if 'enable_locking' in kwargs:
 
                repo_group.enable_locking = kwargs['enable_locking']
 

	
 
            repo_group.parent_group = RepoGroup.get(form_data['parent_group_id'])
 
            repo_group.group_name = repo_group.get_new_name(form_data['group_name'])
 
            if 'parent_group_id' in kwargs:
 
                assert kwargs['parent_group_id'] != u'-1', kwargs # RepoGroupForm should have converted to None
 
                repo_group.parent_group = RepoGroup.get(kwargs['parent_group_id'])
 
            if 'group_name' in kwargs:
 
                repo_group.group_name = repo_group.get_new_name(kwargs['group_name'])
 
            new_path = repo_group.full_path
 
            Session().add(repo_group)
 

	
 
            # iterate over all members of this groups and do fixes
 
            # set locking if given
 
            # if obj is a repoGroup also fix the name of the group according
 
            # to the parent
 
            # if obj is a Repo fix it's name
 
            # this can be potentially heavy operation
 
            for obj in repo_group.recursive_groups_and_repos():
 
                #set the value from it's parent
 
                obj.enable_locking = repo_group.enable_locking
 
                if isinstance(obj, RepoGroup):
 
                    new_name = obj.get_new_name(obj.name)
 
                    log.debug('Fixing group %s to new name %s' \
 
                                % (obj.group_name, new_name))
 
                    obj.group_name = new_name
 
                elif isinstance(obj, Repository):
 
                    # we need to get all repositories from this new group and
 
                    # rename them accordingly to new group path
 
                    new_name = obj.get_new_name(obj.just_name)
 
                    log.debug('Fixing repo %s to new name %s' \
 
                                % (obj.repo_name, new_name))
 
                    obj.repo_name = new_name
 

	
 
            self._rename_group(old_path, new_path)
 

	
 
            return repo_group
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def delete(self, repo_group, force_delete=False):
 
        repo_group = RepoGroup.guess_instance(repo_group)
 
        try:
 
            Session().delete(repo_group)
 
            self._delete_group(repo_group, force_delete)
 
        except Exception:
 
            log.error('Error removing repo_group %s', repo_group)
 
            raise
 

	
 
    def add_permission(self, repo_group, obj, obj_type, perm, recursive):
 
        from kallithea.model.repo import RepoModel
 
        repo_group = RepoGroup.guess_instance(repo_group)
 
        perm = Permission.guess_instance(perm)
 

	
 
        for el in repo_group.recursive_groups_and_repos():
 
            # iterated obj is an instance of a repos group or repository in
kallithea/tests/fixture.py
Show inline comments
 
@@ -32,213 +32,215 @@ from kallithea.model.gist import GistMod
 
from kallithea.model.scm import ScmModel
 
from kallithea.lib.db_manage import DbManage
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.tests.base import invalidate_all_caches, GIT_REPO, HG_REPO, TESTS_TMP_PATH, TEST_USER_ADMIN_LOGIN
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
FIXTURES = os.path.join(dirname(dirname(os.path.abspath(__file__))), 'tests', 'fixtures')
 

	
 

	
 
def error_function(*args, **kwargs):
 
    raise Exception('Total Crash !')
 

	
 

	
 
class Fixture(object):
 

	
 
    def __init__(self):
 
        pass
 

	
 
    def anon_access(self, status):
 
        """
 
        Context manager for controlling anonymous access.
 
        Anon access will be set and committed, but restored again when exiting the block.
 

	
 
        Usage:
 

	
 
        fixture = Fixture()
 
        with fixture.anon_access(False):
 
            stuff
 
        """
 

	
 
        class context(object):
 
            def __enter__(self):
 
                anon = User.get_default_user()
 
                self._before = anon.active
 
                anon.active = status
 
                Session().commit()
 
                invalidate_all_caches()
 

	
 
            def __exit__(self, exc_type, exc_val, exc_tb):
 
                anon = User.get_default_user()
 
                anon.active = self._before
 
                Session().commit()
 

	
 
        return context()
 

	
 
    def _get_repo_create_params(self, **custom):
 
        """Return form values to be validated through RepoForm"""
 
        defs = dict(
 
            repo_name=None,
 
            repo_type='hg',
 
            clone_uri='',
 
            repo_group=u'-1',
 
            repo_description=u'DESC',
 
            repo_private=False,
 
            repo_landing_rev='rev:tip',
 
            repo_copy_permissions=False,
 
            repo_state=Repository.STATE_CREATED,
 
        )
 
        defs.update(custom)
 
        if 'repo_name_full' not in custom:
 
            defs.update({'repo_name_full': defs['repo_name']})
 

	
 
        # fix the repo name if passed as repo_name_full
 
        if defs['repo_name']:
 
            defs['repo_name'] = defs['repo_name'].split('/')[-1]
 

	
 
        return defs
 

	
 
    def _get_group_create_params(self, **custom):
 
    def _get_repo_group_create_params(self, **custom):
 
        """Return form values to be validated through RepoGroupForm"""
 
        defs = dict(
 
            group_name=None,
 
            group_description=u'DESC',
 
            parent_group_id=None,
 
            parent_group_id=u'-1',
 
            perms_updates=[],
 
            perms_new=[],
 
            enable_locking=False,
 
            recursive=False
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def _get_user_create_params(self, name, **custom):
 
        defs = dict(
 
            username=name,
 
            password='qweqwe',
 
            email='%s+test@example.com' % name,
 
            firstname=u'TestUser',
 
            lastname=u'Test',
 
            active=True,
 
            admin=False,
 
            extern_type='internal',
 
            extern_name=None
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def _get_user_group_create_params(self, name, **custom):
 
        defs = dict(
 
            users_group_name=name,
 
            user_group_description=u'DESC',
 
            users_group_active=True,
 
            user_group_data={},
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def create_repo(self, name, **kwargs):
 
    def create_repo(self, name, repo_group=None, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            r = Repository.get_by_repo_name(name)
 
            if r:
 
                return r
 

	
 
        if isinstance(kwargs.get('repo_group'), RepoGroup):
 
            kwargs['repo_group'] = kwargs['repo_group'].group_id
 
        if isinstance(repo_group, RepoGroup):
 
            repo_group = repo_group.group_id
 

	
 
        form_data = self._get_repo_create_params(repo_name=name, **kwargs)
 
        form_data['repo_group'] = repo_group # patch form dict so it can be used directly by model
 
        cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        RepoModel().create(form_data, cur_user)
 
        Session().commit()
 
        ScmModel().mark_for_invalidation(name)
 
        return Repository.get_by_repo_name(name)
 

	
 
    def create_fork(self, repo_to_fork, fork_name, **kwargs):
 
        repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
 

	
 
        form_data = self._get_repo_create_params(repo_name=fork_name,
 
                                            fork_parent_id=repo_to_fork,
 
                                            repo_type=repo_to_fork.repo_type,
 
                                            **kwargs)
 
        form_data['update_after_clone'] = False
 

	
 
        #TODO: fix it !!
 
        # patch form dict so it can be used directly by model
 
        form_data['description'] = form_data['repo_description']
 
        form_data['private'] = form_data['repo_private']
 
        form_data['landing_rev'] = form_data['repo_landing_rev']
 

	
 
        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        RepoModel().create_fork(form_data, cur_user=owner)
 
        Session().commit()
 
        ScmModel().mark_for_invalidation(fork_name)
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r
 
        return r
 

	
 
    def destroy_repo(self, repo_name, **kwargs):
 
        RepoModel().delete(repo_name, **kwargs)
 
        Session().commit()
 

	
 
    def create_repo_group(self, name, **kwargs):
 
    def create_repo_group(self, name, parent_group_id=None, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            gr = RepoGroup.get_by_group_name(group_name=name)
 
            if gr:
 
                return gr
 
        form_data = self._get_group_create_params(group_name=name, **kwargs)
 
        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        form_data = self._get_repo_group_create_params(group_name=name, **kwargs)
 
        gr = RepoGroupModel().create(
 
            group_name=form_data['group_name'],
 
            group_description=form_data['group_name'],
 
            owner=owner, parent=form_data['parent_group_id'])
 
            parent=parent_group_id,
 
            owner=kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN),
 
            )
 
        Session().commit()
 
        gr = RepoGroup.get_by_group_name(gr.group_name)
 
        return gr
 

	
 
    def destroy_repo_group(self, repogroupid):
 
        RepoGroupModel().delete(repogroupid)
 
        Session().commit()
 

	
 
    def create_user(self, name, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            user = User.get_by_username(name)
 
            if user:
 
                return user
 
        form_data = self._get_user_create_params(name, **kwargs)
 
        user = UserModel().create(form_data)
 
        Session().commit()
 
        user = User.get_by_username(user.username)
 
        return user
 

	
 
    def destroy_user(self, userid):
 
        UserModel().delete(userid)
 
        Session().commit()
 

	
 
    def create_user_group(self, name, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            gr = UserGroup.get_by_group_name(group_name=name)
 
            if gr:
 
                return gr
 
        form_data = self._get_user_group_create_params(name, **kwargs)
 
        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        user_group = UserGroupModel().create(
 
            name=form_data['users_group_name'],
 
            description=form_data['user_group_description'],
 
            owner=owner, active=form_data['users_group_active'],
 
            group_data=form_data['user_group_data'])
 
        Session().commit()
 
        user_group = UserGroup.get_by_group_name(user_group.users_group_name)
 
        return user_group
 

	
 
    def destroy_user_group(self, usergroupid):
 
        UserGroupModel().delete(user_group=usergroupid, force=True)
 
        Session().commit()
 

	
 
    def create_gist(self, **kwargs):
 
        form_data = {
 
            'description': u'new-gist',
kallithea/tests/models/test_repo_groups.py
Show inline comments
 
import os
 
import pytest
 
from sqlalchemy.exc import IntegrityError
 

	
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 

	
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.db import RepoGroup
 
from kallithea.model.meta import Session
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
def _update_group(id_, group_name, desc=u'desc', parent_id=None):
 
    form_data = fixture._get_group_create_params(group_name=group_name,
 
                                                 group_desc=desc,
 
                                                 parent_group_id=parent_id)
 
    gr = RepoGroupModel().update(id_, form_data)
 
    return gr
 
def _update_repo_group(id_, group_name, desc=u'desc', parent_id=None):
 
    form_data = dict(
 
        group_name=group_name,
 
        group_description=desc,
 
        parent_group_id=parent_id,
 
        )
 
    return RepoGroupModel().update(id_, form_data)
 

	
 

	
 
def _update_repo(name, **kwargs):
 
    form_data = fixture._get_repo_create_params(**kwargs)
 
    if not 'repo_name' in kwargs:
 
        form_data['repo_name'] = name
 
        kwargs['repo_name'] = name
 
    if not 'perms_new' in kwargs:
 
        form_data['perms_new'] = []
 
        kwargs['perms_new'] = []
 
    if not 'perms_updates' in kwargs:
 
        form_data['perms_updates'] = []
 
    r = RepoModel().update(name, **form_data)
 
        kwargs['perms_updates'] = []
 
    r = RepoModel().update(name, **kwargs)
 
    return r
 

	
 

	
 
class TestRepoGroups(TestController):
 

	
 
    def setup_method(self, method):
 
        self.g1 = fixture.create_repo_group(u'test1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group(u'test2', skip_if_exists=True)
 
        self.g3 = fixture.create_repo_group(u'test3', skip_if_exists=True)
 

	
 
    def teardown_method(self, method):
 
        Session.remove()
 

	
 
    def __check_path(self, *path):
 
        """
 
        Checks the path for existence !
 
        """
 
        path = [TESTS_TMP_PATH] + list(path)
 
        path = os.path.join(*path)
 
        return os.path.isdir(path)
 

	
 
    def _check_folders(self):
 
        print os.listdir(TESTS_TMP_PATH)
 

	
 
    def __delete_group(self, id_):
 
        RepoGroupModel().delete(id_)
 

	
 
    def test_create_group(self):
 
        g = fixture.create_repo_group(u'newGroup')
 
        Session().commit()
 
        assert g.full_path == 'newGroup'
 

	
 
        assert self.__check_path('newGroup')
 

	
 
    def test_create_same_name_group(self):
 
        with pytest.raises(IntegrityError):
 
            fixture.create_repo_group(u'newGroup')
 
        Session().rollback()
 

	
 
    def test_same_subgroup(self):
 
        sg1 = fixture.create_repo_group(u'sub1', parent_group_id=self.g1.group_id)
 
        assert sg1.parent_group == self.g1
 
        assert sg1.full_path == 'test1/sub1'
 
        assert self.__check_path('test1', 'sub1')
 

	
 
        ssg1 = fixture.create_repo_group(u'subsub1', parent_group_id=sg1.group_id)
 
        assert ssg1.parent_group == sg1
 
        assert ssg1.full_path == 'test1/sub1/subsub1'
 
        assert self.__check_path('test1', 'sub1', 'subsub1')
 

	
 
    def test_remove_group(self):
 
        sg1 = fixture.create_repo_group(u'deleteme')
 
        self.__delete_group(sg1.group_id)
 

	
 
        assert RepoGroup.get(sg1.group_id) == None
 
        assert not self.__check_path('deteteme')
 

	
 
        sg1 = fixture.create_repo_group(u'deleteme', parent_group_id=self.g1.group_id)
 
        self.__delete_group(sg1.group_id)
 

	
 
        assert RepoGroup.get(sg1.group_id) == None
 
        assert not self.__check_path('test1', 'deteteme')
 

	
 
    def test_rename_single_group(self):
 
        sg1 = fixture.create_repo_group(u'initial')
 

	
 
        new_sg1 = _update_group(sg1.group_id, u'after')
 
        new_sg1 = _update_repo_group(sg1.group_id, u'after')
 
        assert self.__check_path('after')
 
        assert RepoGroup.get_by_group_name(u'initial') == None
 

	
 
    def test_update_group_parent(self):
 

	
 
        sg1 = fixture.create_repo_group(u'initial', parent_group_id=self.g1.group_id)
 

	
 
        new_sg1 = _update_group(sg1.group_id, u'after', parent_id=self.g1.group_id)
 
        new_sg1 = _update_repo_group(sg1.group_id, u'after', parent_id=self.g1.group_id)
 
        assert self.__check_path('test1', 'after')
 
        assert RepoGroup.get_by_group_name(u'test1/initial') == None
 

	
 
        new_sg1 = _update_group(sg1.group_id, u'after', parent_id=self.g3.group_id)
 
        new_sg1 = _update_repo_group(sg1.group_id, u'after', parent_id=self.g3.group_id)
 
        assert self.__check_path('test3', 'after')
 
        assert RepoGroup.get_by_group_name(u'test3/initial') == None
 

	
 
        new_sg1 = _update_group(sg1.group_id, u'hello')
 
        new_sg1 = _update_repo_group(sg1.group_id, u'hello')
 
        assert self.__check_path('hello')
 

	
 
        assert RepoGroup.get_by_group_name(u'hello') == new_sg1
 

	
 
    def test_subgrouping_with_repo(self):
 

	
 
        g1 = fixture.create_repo_group(u'g1')
 
        g2 = fixture.create_repo_group(u'g2')
 
        # create new repo
 
        r = fixture.create_repo(u'john')
 

	
 
        assert r.repo_name == 'john'
 
        # put repo into group
 
        r = _update_repo(u'john', repo_group=g1.group_id)
 
        Session().commit()
 
        assert r.repo_name == 'g1/john'
 

	
 
        _update_group(g1.group_id, u'g1', parent_id=g2.group_id)
 
        _update_repo_group(g1.group_id, u'g1', parent_id=g2.group_id)
 
        assert self.__check_path('g2', 'g1')
 

	
 
        # test repo
 
        assert r.repo_name == RepoGroup.url_sep().join(['g2', 'g1',
 
                                                                r.just_name])
 

	
 
    def test_move_to_root(self):
 
        g1 = fixture.create_repo_group(u't11')
 
        g2 = fixture.create_repo_group(u't22', parent_group_id=g1.group_id)
 

	
 
        assert g2.full_path == 't11/t22'
 
        assert self.__check_path('t11', 't22')
 

	
 
        g2 = _update_group(g2.group_id, u'g22', parent_id=None)
 
        g2 = _update_repo_group(g2.group_id, u'g22', parent_id=None)
 
        Session().commit()
 

	
 
        assert g2.group_name == 'g22'
 
        # we moved out group from t1 to '' so it's full path should be 'g2'
 
        assert g2.full_path == 'g22'
 
        assert not self.__check_path('t11', 't22')
 
        assert self.__check_path('g22')
 

	
 
    def test_rename_top_level_group_in_nested_setup(self):
 
        g1 = fixture.create_repo_group(u'L1')
 
        g2 = fixture.create_repo_group(u'L2', parent_group_id=g1.group_id)
 
        g3 = fixture.create_repo_group(u'L3', parent_group_id=g2.group_id)
 

	
 
        r = fixture.create_repo(u'L1/L2/L3/L3_REPO', repo_group=g3.group_id)
 

	
 
        ##rename L1 all groups should be now changed
 
        _update_group(g1.group_id, u'L1_NEW')
 
        _update_repo_group(g1.group_id, u'L1_NEW')
 
        Session().commit()
 
        assert g1.full_path == 'L1_NEW'
 
        assert g2.full_path == 'L1_NEW/L2'
 
        assert g3.full_path == 'L1_NEW/L2/L3'
 
        assert r.repo_name == 'L1_NEW/L2/L3/L3_REPO'
 

	
 
    def test_change_parent_of_top_level_group_in_nested_setup(self):
 
        g1 = fixture.create_repo_group(u'R1')
 
        g2 = fixture.create_repo_group(u'R2', parent_group_id=g1.group_id)
 
        g3 = fixture.create_repo_group(u'R3', parent_group_id=g2.group_id)
 
        g4 = fixture.create_repo_group(u'R1_NEW')
 

	
 
        r = fixture.create_repo(u'R1/R2/R3/R3_REPO', repo_group=g3.group_id)
 
        ##rename L1 all groups should be now changed
 
        _update_group(g1.group_id, u'R1', parent_id=g4.group_id)
 
        _update_repo_group(g1.group_id, u'R1', parent_id=g4.group_id)
 
        Session().commit()
 
        assert g1.full_path == 'R1_NEW/R1'
 
        assert g2.full_path == 'R1_NEW/R1/R2'
 
        assert g3.full_path == 'R1_NEW/R1/R2/R3'
 
        assert r.repo_name == 'R1_NEW/R1/R2/R3/R3_REPO'
 

	
 
    def test_change_parent_of_top_level_group_in_nested_setup_with_rename(self):
 
        g1 = fixture.create_repo_group(u'X1')
 
        g2 = fixture.create_repo_group(u'X2', parent_group_id=g1.group_id)
 
        g3 = fixture.create_repo_group(u'X3', parent_group_id=g2.group_id)
 
        g4 = fixture.create_repo_group(u'X1_NEW')
 

	
 
        r = fixture.create_repo(u'X1/X2/X3/X3_REPO', repo_group=g3.group_id)
 

	
 
        ##rename L1 all groups should be now changed
 
        _update_group(g1.group_id, u'X1_PRIM', parent_id=g4.group_id)
 
        _update_repo_group(g1.group_id, u'X1_PRIM', parent_id=g4.group_id)
 
        Session().commit()
 
        assert g1.full_path == 'X1_NEW/X1_PRIM'
 
        assert g2.full_path == 'X1_NEW/X1_PRIM/X2'
 
        assert g3.full_path == 'X1_NEW/X1_PRIM/X2/X3'
 
        assert r.repo_name == 'X1_NEW/X1_PRIM/X2/X3/X3_REPO'
0 comments (0 inline, 0 general)