Changeset - 0bae66824ac5
[Not reviewed]
default
0 5 0
Mads Kiilerich - 9 years ago 2017-05-08 05:25:41
mads@kiilerich.com
tests: clarify that default parameters are for form - direct model access requires different types

_get_repo_group_create_params parent_group_id is thus set to the form value
'-1' instead of None. It worked before anyway because the model failed to find
the repo '-1' and thus got pretty much the same as None.
5 files changed with 47 insertions and 40 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/celerylib/tasks.py
Show inline comments
 
@@ -431,49 +431,48 @@ def create_repo_fork(form_data, cur_user
 
    clone_uri = form_data.get('clone_uri')
 
    repo_group = form_data['repo_group']
 
    landing_rev = form_data['landing_rev']
 
    copy_fork_permissions = form_data.get('copy_permissions')
 

	
 
    try:
 
        fork_of = Repository.guess_instance(form_data.get('fork_parent_id'))
 

	
 
        RepoModel()._create_repo(
 
            repo_name=repo_name_full,
 
            repo_type=repo_type,
 
            description=form_data['description'],
 
            owner=owner,
 
            private=private,
 
            clone_uri=clone_uri,
 
            repo_group=repo_group,
 
            landing_rev=landing_rev,
 
            fork_of=fork_of,
 
            copy_fork_permissions=copy_fork_permissions
 
        )
 
        action_logger(cur_user, 'user_forked_repo:%s' % repo_name_full,
 
                      fork_of.repo_name, '')
 
        DBS.commit()
 

	
 
        update_after_clone = form_data['update_after_clone'] # FIXME - unused!
 
        source_repo_path = os.path.join(base_path, fork_of.repo_name)
 

	
 
        # now create this repo on Filesystem
 
        RepoModel()._create_filesystem_repo(
 
            repo_name=repo_name,
 
            repo_type=repo_type,
 
            repo_group=RepoGroup.guess_instance(repo_group),
 
            clone_uri=source_repo_path,
 
        )
 
        repo = Repository.get_by_repo_name(repo_name_full)
 
        log_create_repository(repo.get_dict(), created_by=owner.username)
 

	
 
        # update repo changeset caches initially
 
        repo.update_changeset_cache()
 

	
 
        # set new created state
 
        repo.set_state(Repository.STATE_CREATED)
 
        DBS.commit()
 
    except Exception as e:
 
        log.warning('Exception %s occurred when forking repository, '
 
                    'doing cleanup...' % e)
 
        #rollback things manually !
 
        repo = Repository.get_by_repo_name(repo_name_full)
 
        if repo:
kallithea/model/repo.py
Show inline comments
 
@@ -276,48 +276,49 @@ class RepoModel(object):
 
            replacement_user = User.query().filter(User.admin ==
 
                                                   True).first().username
 
            defaults.update({'owner': replacement_user})
 

	
 
        # fill repository users
 
        for p in repo_info.repo_to_perm:
 
            defaults.update({'u_perm_%s' % p.user.username:
 
                                 p.permission.permission_name})
 

	
 
        # fill repository groups
 
        for p in repo_info.users_group_to_perm:
 
            defaults.update({'g_perm_%s' % p.users_group.users_group_name:
 
                                 p.permission.permission_name})
 

	
 
        return defaults
 

	
 
    def update(self, repo, **kwargs):
 
        try:
 
            cur_repo = Repository.guess_instance(repo)
 
            org_repo_name = cur_repo.repo_name
 
            if 'owner' in kwargs:
 
                cur_repo.owner = User.get_by_username(kwargs['owner'])
 

	
 
            if 'repo_group' in kwargs:
 
                assert kwargs['repo_group'] != u'-1', kwargs # RepoForm should have converted to None
 
                cur_repo.group = RepoGroup.get(kwargs['repo_group'])
 
                cur_repo.repo_name = cur_repo.get_new_name(cur_repo.just_name)
 
            log.debug('Updating repo %s with params:%s', cur_repo, kwargs)
 
            for k in ['repo_enable_downloads',
 
                      'repo_description',
 
                      'repo_enable_locking',
 
                      'repo_landing_rev',
 
                      'repo_private',
 
                      'repo_enable_statistics',
 
                      ]:
 
                if k in kwargs:
 
                    setattr(cur_repo, remove_prefix(k, 'repo_'), kwargs[k])
 
            clone_uri = kwargs.get('clone_uri')
 
            if clone_uri is not None and clone_uri != cur_repo.clone_uri_hidden:
 
                cur_repo.clone_uri = clone_uri
 

	
 
            if 'repo_name' in kwargs:
 
                cur_repo.repo_name = cur_repo.get_new_name(kwargs['repo_name'])
 

	
 
            #if private flag is set, reset default permission to NONE
 
            if kwargs.get('repo_private'):
 
                EMPTY_PERM = 'repository.none'
 
                RepoModel().grant_user_permission(
 
                    repo=cur_repo, user='default', perm=EMPTY_PERM
kallithea/model/repo_group.py
Show inline comments
 
@@ -252,61 +252,66 @@ class RepoGroupModel(object):
 
                if member_type == 'user':
 
                    # this updates also current one if found
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                ## set for user group
 
                else:
 
                    #check if we have permissions to alter this usergroup's access
 
                    if not check_perms or HasUserGroupPermissionLevel('read')(member):
 
                        _set_perm_group(obj, users_group=member, perm=perm)
 
            # set new permissions
 
            for member, perm, member_type in perms_new:
 
                if member_type == 'user':
 
                    _set_perm_user(obj, user=member, perm=perm)
 
                else:
 
                    #check if we have permissions to alter this usergroup's access
 
                    if not check_perms or HasUserGroupPermissionLevel('read')(member):
 
                        _set_perm_group(obj, users_group=member, perm=perm)
 
            updates.append(obj)
 
            # if it's not recursive call for all,repos,groups
 
            # break the loop and don't proceed with other changes
 
            if recursive not in ['all', 'repos', 'groups']:
 
                break
 

	
 
        return updates
 

	
 
    def update(self, repo_group, form_data):
 

	
 
    def update(self, repo_group, kwargs):
 
        try:
 
            repo_group = RepoGroup.guess_instance(repo_group)
 
            old_path = repo_group.full_path
 

	
 
            # change properties
 
            repo_group.group_description = form_data['group_description']
 
            repo_group.parent_group_id = form_data['parent_group_id']
 
            repo_group.enable_locking = form_data['enable_locking']
 
            if 'group_description' in kwargs:
 
                repo_group.group_description = kwargs['group_description']
 
            if 'parent_group_id' in kwargs:
 
                repo_group.parent_group_id = kwargs['parent_group_id']
 
            if 'enable_locking' in kwargs:
 
                repo_group.enable_locking = kwargs['enable_locking']
 

	
 
            repo_group.parent_group = RepoGroup.get(form_data['parent_group_id'])
 
            repo_group.group_name = repo_group.get_new_name(form_data['group_name'])
 
            if 'parent_group_id' in kwargs:
 
                assert kwargs['parent_group_id'] != u'-1', kwargs # RepoGroupForm should have converted to None
 
                repo_group.parent_group = RepoGroup.get(kwargs['parent_group_id'])
 
            if 'group_name' in kwargs:
 
                repo_group.group_name = repo_group.get_new_name(kwargs['group_name'])
 
            new_path = repo_group.full_path
 
            Session().add(repo_group)
 

	
 
            # iterate over all members of this groups and do fixes
 
            # set locking if given
 
            # if obj is a repoGroup also fix the name of the group according
 
            # to the parent
 
            # if obj is a Repo fix it's name
 
            # this can be potentially heavy operation
 
            for obj in repo_group.recursive_groups_and_repos():
 
                #set the value from it's parent
 
                obj.enable_locking = repo_group.enable_locking
 
                if isinstance(obj, RepoGroup):
 
                    new_name = obj.get_new_name(obj.name)
 
                    log.debug('Fixing group %s to new name %s' \
 
                                % (obj.group_name, new_name))
 
                    obj.group_name = new_name
 
                elif isinstance(obj, Repository):
 
                    # we need to get all repositories from this new group and
 
                    # rename them accordingly to new group path
 
                    new_name = obj.get_new_name(obj.just_name)
 
                    log.debug('Fixing repo %s to new name %s' \
 
                                % (obj.repo_name, new_name))
 
                    obj.repo_name = new_name
kallithea/tests/fixture.py
Show inline comments
 
@@ -56,165 +56,167 @@ class Fixture(object):
 

	
 
        Usage:
 

	
 
        fixture = Fixture()
 
        with fixture.anon_access(False):
 
            stuff
 
        """
 

	
 
        class context(object):
 
            def __enter__(self):
 
                anon = User.get_default_user()
 
                self._before = anon.active
 
                anon.active = status
 
                Session().commit()
 
                invalidate_all_caches()
 

	
 
            def __exit__(self, exc_type, exc_val, exc_tb):
 
                anon = User.get_default_user()
 
                anon.active = self._before
 
                Session().commit()
 

	
 
        return context()
 

	
 
    def _get_repo_create_params(self, **custom):
 
        """Return form values to be validated through RepoForm"""
 
        defs = dict(
 
            repo_name=None,
 
            repo_type='hg',
 
            clone_uri='',
 
            repo_group=u'-1',
 
            repo_description=u'DESC',
 
            repo_private=False,
 
            repo_landing_rev='rev:tip',
 
            repo_copy_permissions=False,
 
            repo_state=Repository.STATE_CREATED,
 
        )
 
        defs.update(custom)
 
        if 'repo_name_full' not in custom:
 
            defs.update({'repo_name_full': defs['repo_name']})
 

	
 
        # fix the repo name if passed as repo_name_full
 
        if defs['repo_name']:
 
            defs['repo_name'] = defs['repo_name'].split('/')[-1]
 

	
 
        return defs
 

	
 
    def _get_group_create_params(self, **custom):
 
    def _get_repo_group_create_params(self, **custom):
 
        """Return form values to be validated through RepoGroupForm"""
 
        defs = dict(
 
            group_name=None,
 
            group_description=u'DESC',
 
            parent_group_id=None,
 
            parent_group_id=u'-1',
 
            perms_updates=[],
 
            perms_new=[],
 
            enable_locking=False,
 
            recursive=False
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def _get_user_create_params(self, name, **custom):
 
        defs = dict(
 
            username=name,
 
            password='qweqwe',
 
            email='%s+test@example.com' % name,
 
            firstname=u'TestUser',
 
            lastname=u'Test',
 
            active=True,
 
            admin=False,
 
            extern_type='internal',
 
            extern_name=None
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def _get_user_group_create_params(self, name, **custom):
 
        defs = dict(
 
            users_group_name=name,
 
            user_group_description=u'DESC',
 
            users_group_active=True,
 
            user_group_data={},
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def create_repo(self, name, **kwargs):
 
    def create_repo(self, name, repo_group=None, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            r = Repository.get_by_repo_name(name)
 
            if r:
 
                return r
 

	
 
        if isinstance(kwargs.get('repo_group'), RepoGroup):
 
            kwargs['repo_group'] = kwargs['repo_group'].group_id
 
        if isinstance(repo_group, RepoGroup):
 
            repo_group = repo_group.group_id
 

	
 
        form_data = self._get_repo_create_params(repo_name=name, **kwargs)
 
        form_data['repo_group'] = repo_group # patch form dict so it can be used directly by model
 
        cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        RepoModel().create(form_data, cur_user)
 
        Session().commit()
 
        ScmModel().mark_for_invalidation(name)
 
        return Repository.get_by_repo_name(name)
 

	
 
    def create_fork(self, repo_to_fork, fork_name, **kwargs):
 
        repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
 

	
 
        form_data = self._get_repo_create_params(repo_name=fork_name,
 
                                            fork_parent_id=repo_to_fork,
 
                                            repo_type=repo_to_fork.repo_type,
 
                                            **kwargs)
 
        form_data['update_after_clone'] = False
 

	
 
        #TODO: fix it !!
 
        # patch form dict so it can be used directly by model
 
        form_data['description'] = form_data['repo_description']
 
        form_data['private'] = form_data['repo_private']
 
        form_data['landing_rev'] = form_data['repo_landing_rev']
 

	
 
        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        RepoModel().create_fork(form_data, cur_user=owner)
 
        Session().commit()
 
        ScmModel().mark_for_invalidation(fork_name)
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r
 
        return r
 

	
 
    def destroy_repo(self, repo_name, **kwargs):
 
        RepoModel().delete(repo_name, **kwargs)
 
        Session().commit()
 

	
 
    def create_repo_group(self, name, **kwargs):
 
    def create_repo_group(self, name, parent_group_id=None, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            gr = RepoGroup.get_by_group_name(group_name=name)
 
            if gr:
 
                return gr
 
        form_data = self._get_group_create_params(group_name=name, **kwargs)
 
        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        form_data = self._get_repo_group_create_params(group_name=name, **kwargs)
 
        gr = RepoGroupModel().create(
 
            group_name=form_data['group_name'],
 
            group_description=form_data['group_name'],
 
            owner=owner, parent=form_data['parent_group_id'])
 
            parent=parent_group_id,
 
            owner=kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN),
 
            )
 
        Session().commit()
 
        gr = RepoGroup.get_by_group_name(gr.group_name)
 
        return gr
 

	
 
    def destroy_repo_group(self, repogroupid):
 
        RepoGroupModel().delete(repogroupid)
 
        Session().commit()
 

	
 
    def create_user(self, name, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            user = User.get_by_username(name)
 
            if user:
 
                return user
 
        form_data = self._get_user_create_params(name, **kwargs)
 
        user = UserModel().create(form_data)
 
        Session().commit()
 
        user = User.get_by_username(user.username)
 
        return user
 

	
 
    def destroy_user(self, userid):
 
        UserModel().delete(userid)
 
        Session().commit()
 

	
kallithea/tests/models/test_repo_groups.py
Show inline comments
 
import os
 
import pytest
 
from sqlalchemy.exc import IntegrityError
 

	
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 

	
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.db import RepoGroup
 
from kallithea.model.meta import Session
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
def _update_group(id_, group_name, desc=u'desc', parent_id=None):
 
    form_data = fixture._get_group_create_params(group_name=group_name,
 
                                                 group_desc=desc,
 
                                                 parent_group_id=parent_id)
 
    gr = RepoGroupModel().update(id_, form_data)
 
    return gr
 
def _update_repo_group(id_, group_name, desc=u'desc', parent_id=None):
 
    form_data = dict(
 
        group_name=group_name,
 
        group_description=desc,
 
        parent_group_id=parent_id,
 
        )
 
    return RepoGroupModel().update(id_, form_data)
 

	
 

	
 
def _update_repo(name, **kwargs):
 
    form_data = fixture._get_repo_create_params(**kwargs)
 
    if not 'repo_name' in kwargs:
 
        form_data['repo_name'] = name
 
        kwargs['repo_name'] = name
 
    if not 'perms_new' in kwargs:
 
        form_data['perms_new'] = []
 
        kwargs['perms_new'] = []
 
    if not 'perms_updates' in kwargs:
 
        form_data['perms_updates'] = []
 
    r = RepoModel().update(name, **form_data)
 
        kwargs['perms_updates'] = []
 
    r = RepoModel().update(name, **kwargs)
 
    return r
 

	
 

	
 
class TestRepoGroups(TestController):
 

	
 
    def setup_method(self, method):
 
        self.g1 = fixture.create_repo_group(u'test1', skip_if_exists=True)
 
        self.g2 = fixture.create_repo_group(u'test2', skip_if_exists=True)
 
        self.g3 = fixture.create_repo_group(u'test3', skip_if_exists=True)
 

	
 
    def teardown_method(self, method):
 
        Session.remove()
 

	
 
    def __check_path(self, *path):
 
        """
 
        Checks the path for existence !
 
        """
 
        path = [TESTS_TMP_PATH] + list(path)
 
        path = os.path.join(*path)
 
        return os.path.isdir(path)
 

	
 
    def _check_folders(self):
 
        print os.listdir(TESTS_TMP_PATH)
 

	
 
@@ -76,126 +76,126 @@ class TestRepoGroups(TestController):
 
        assert sg1.full_path == 'test1/sub1'
 
        assert self.__check_path('test1', 'sub1')
 

	
 
        ssg1 = fixture.create_repo_group(u'subsub1', parent_group_id=sg1.group_id)
 
        assert ssg1.parent_group == sg1
 
        assert ssg1.full_path == 'test1/sub1/subsub1'
 
        assert self.__check_path('test1', 'sub1', 'subsub1')
 

	
 
    def test_remove_group(self):
 
        sg1 = fixture.create_repo_group(u'deleteme')
 
        self.__delete_group(sg1.group_id)
 

	
 
        assert RepoGroup.get(sg1.group_id) == None
 
        assert not self.__check_path('deteteme')
 

	
 
        sg1 = fixture.create_repo_group(u'deleteme', parent_group_id=self.g1.group_id)
 
        self.__delete_group(sg1.group_id)
 

	
 
        assert RepoGroup.get(sg1.group_id) == None
 
        assert not self.__check_path('test1', 'deteteme')
 

	
 
    def test_rename_single_group(self):
 
        sg1 = fixture.create_repo_group(u'initial')
 

	
 
        new_sg1 = _update_group(sg1.group_id, u'after')
 
        new_sg1 = _update_repo_group(sg1.group_id, u'after')
 
        assert self.__check_path('after')
 
        assert RepoGroup.get_by_group_name(u'initial') == None
 

	
 
    def test_update_group_parent(self):
 

	
 
        sg1 = fixture.create_repo_group(u'initial', parent_group_id=self.g1.group_id)
 

	
 
        new_sg1 = _update_group(sg1.group_id, u'after', parent_id=self.g1.group_id)
 
        new_sg1 = _update_repo_group(sg1.group_id, u'after', parent_id=self.g1.group_id)
 
        assert self.__check_path('test1', 'after')
 
        assert RepoGroup.get_by_group_name(u'test1/initial') == None
 

	
 
        new_sg1 = _update_group(sg1.group_id, u'after', parent_id=self.g3.group_id)
 
        new_sg1 = _update_repo_group(sg1.group_id, u'after', parent_id=self.g3.group_id)
 
        assert self.__check_path('test3', 'after')
 
        assert RepoGroup.get_by_group_name(u'test3/initial') == None
 

	
 
        new_sg1 = _update_group(sg1.group_id, u'hello')
 
        new_sg1 = _update_repo_group(sg1.group_id, u'hello')
 
        assert self.__check_path('hello')
 

	
 
        assert RepoGroup.get_by_group_name(u'hello') == new_sg1
 

	
 
    def test_subgrouping_with_repo(self):
 

	
 
        g1 = fixture.create_repo_group(u'g1')
 
        g2 = fixture.create_repo_group(u'g2')
 
        # create new repo
 
        r = fixture.create_repo(u'john')
 

	
 
        assert r.repo_name == 'john'
 
        # put repo into group
 
        r = _update_repo(u'john', repo_group=g1.group_id)
 
        Session().commit()
 
        assert r.repo_name == 'g1/john'
 

	
 
        _update_group(g1.group_id, u'g1', parent_id=g2.group_id)
 
        _update_repo_group(g1.group_id, u'g1', parent_id=g2.group_id)
 
        assert self.__check_path('g2', 'g1')
 

	
 
        # test repo
 
        assert r.repo_name == RepoGroup.url_sep().join(['g2', 'g1',
 
                                                                r.just_name])
 

	
 
    def test_move_to_root(self):
 
        g1 = fixture.create_repo_group(u't11')
 
        g2 = fixture.create_repo_group(u't22', parent_group_id=g1.group_id)
 

	
 
        assert g2.full_path == 't11/t22'
 
        assert self.__check_path('t11', 't22')
 

	
 
        g2 = _update_group(g2.group_id, u'g22', parent_id=None)
 
        g2 = _update_repo_group(g2.group_id, u'g22', parent_id=None)
 
        Session().commit()
 

	
 
        assert g2.group_name == 'g22'
 
        # we moved out group from t1 to '' so it's full path should be 'g2'
 
        assert g2.full_path == 'g22'
 
        assert not self.__check_path('t11', 't22')
 
        assert self.__check_path('g22')
 

	
 
    def test_rename_top_level_group_in_nested_setup(self):
 
        g1 = fixture.create_repo_group(u'L1')
 
        g2 = fixture.create_repo_group(u'L2', parent_group_id=g1.group_id)
 
        g3 = fixture.create_repo_group(u'L3', parent_group_id=g2.group_id)
 

	
 
        r = fixture.create_repo(u'L1/L2/L3/L3_REPO', repo_group=g3.group_id)
 

	
 
        ##rename L1 all groups should be now changed
 
        _update_group(g1.group_id, u'L1_NEW')
 
        _update_repo_group(g1.group_id, u'L1_NEW')
 
        Session().commit()
 
        assert g1.full_path == 'L1_NEW'
 
        assert g2.full_path == 'L1_NEW/L2'
 
        assert g3.full_path == 'L1_NEW/L2/L3'
 
        assert r.repo_name == 'L1_NEW/L2/L3/L3_REPO'
 

	
 
    def test_change_parent_of_top_level_group_in_nested_setup(self):
 
        g1 = fixture.create_repo_group(u'R1')
 
        g2 = fixture.create_repo_group(u'R2', parent_group_id=g1.group_id)
 
        g3 = fixture.create_repo_group(u'R3', parent_group_id=g2.group_id)
 
        g4 = fixture.create_repo_group(u'R1_NEW')
 

	
 
        r = fixture.create_repo(u'R1/R2/R3/R3_REPO', repo_group=g3.group_id)
 
        ##rename L1 all groups should be now changed
 
        _update_group(g1.group_id, u'R1', parent_id=g4.group_id)
 
        _update_repo_group(g1.group_id, u'R1', parent_id=g4.group_id)
 
        Session().commit()
 
        assert g1.full_path == 'R1_NEW/R1'
 
        assert g2.full_path == 'R1_NEW/R1/R2'
 
        assert g3.full_path == 'R1_NEW/R1/R2/R3'
 
        assert r.repo_name == 'R1_NEW/R1/R2/R3/R3_REPO'
 

	
 
    def test_change_parent_of_top_level_group_in_nested_setup_with_rename(self):
 
        g1 = fixture.create_repo_group(u'X1')
 
        g2 = fixture.create_repo_group(u'X2', parent_group_id=g1.group_id)
 
        g3 = fixture.create_repo_group(u'X3', parent_group_id=g2.group_id)
 
        g4 = fixture.create_repo_group(u'X1_NEW')
 

	
 
        r = fixture.create_repo(u'X1/X2/X3/X3_REPO', repo_group=g3.group_id)
 

	
 
        ##rename L1 all groups should be now changed
 
        _update_group(g1.group_id, u'X1_PRIM', parent_id=g4.group_id)
 
        _update_repo_group(g1.group_id, u'X1_PRIM', parent_id=g4.group_id)
 
        Session().commit()
 
        assert g1.full_path == 'X1_NEW/X1_PRIM'
 
        assert g2.full_path == 'X1_NEW/X1_PRIM/X2'
 
        assert g3.full_path == 'X1_NEW/X1_PRIM/X2/X3'
 
        assert r.repo_name == 'X1_NEW/X1_PRIM/X2/X3/X3_REPO'
0 comments (0 inline, 0 general)