diff --git a/rhodecode/tests/functional/test_admin_repos.py b/rhodecode/tests/functional/test_admin_repos.py --- a/rhodecode/tests/functional/test_admin_repos.py +++ b/rhodecode/tests/functional/test_admin_repos.py @@ -1,16 +1,19 @@ # -*- coding: utf-8 -*- import os +import mock import urllib from rhodecode.lib import vcs from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\ Permission +from rhodecode.model.user import UserModel from rhodecode.tests import * -from rhodecode.model.repos_group import ReposGroupModel +from rhodecode.model.repo_group import RepoGroupModel from rhodecode.model.repo import RepoModel +from rhodecode.model.scm import ScmModel from rhodecode.model.meta import Session -from rhodecode.tests.fixture import Fixture +from rhodecode.tests.fixture import Fixture, error_function fixture = Fixture() @@ -24,87 +27,109 @@ def _get_permission_for_user(user, repo) return perm -class TestAdminReposController(TestController): +class _BaseTest(TestController): + """ + Write all tests here + """ + REPO = None + REPO_TYPE = None + NEW_REPO = None + OTHER_TYPE_REPO = None + OTHER_TYPE = None + + @classmethod + def setup_class(cls): + pass + + @classmethod + def teardown_class(cls): + pass def test_index(self): self.log_user() response = self.app.get(url('repos')) - # Test response... - def test_index_as_xml(self): - response = self.app.get(url('formatted_repos', format='xml')) - - def test_create_hg(self): + def test_create(self): self.log_user() - repo_name = NEW_HG_REPO + repo_name = self.NEW_REPO description = 'description for newly created repo' response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, repo_name=repo_name, + repo_type=self.REPO_TYPE, repo_description=description)) + ## run the check page that triggers the flash message + response = self.app.get(url('repo_check_home', repo_name=repo_name)) + self.assertEqual(response.json, {u'result': True}) self.checkSessionFlash(response, 'Created repository %s' % (repo_name, repo_name)) - #test if the repo was created in the database + # test if the repo was created in the database new_repo = Session().query(Repository)\ .filter(Repository.repo_name == repo_name).one() self.assertEqual(new_repo.repo_name, repo_name) self.assertEqual(new_repo.description, description) - #test if repository is visible in the list ? - response = response.follow() + # test if the repository is visible in the list ? + response = self.app.get(url('summary_home', repo_name=repo_name)) + response.mustcontain(repo_name) + response.mustcontain(self.REPO_TYPE) - response.mustcontain(repo_name) - - #test if repository was created on filesystem + # test if the repository was created on filesystem try: vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) except Exception: self.fail('no repo %s in filesystem' % repo_name) - def test_create_hg_non_ascii(self): + RepoModel().delete(repo_name) + Session().commit() + + def test_create_non_ascii(self): self.log_user() non_ascii = "ąęł" - repo_name = "%s%s" % (NEW_HG_REPO, non_ascii) + repo_name = "%s%s" % (self.NEW_REPO, non_ascii) repo_name_unicode = repo_name.decode('utf8') description = 'description for newly created repo' + non_ascii description_unicode = description.decode('utf8') - private = False response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, repo_name=repo_name, + repo_type=self.REPO_TYPE, repo_description=description)) + ## run the check page that triggers the flash message + response = self.app.get(url('repo_check_home', repo_name=repo_name)) + self.assertEqual(response.json, {u'result': True}) self.checkSessionFlash(response, u'Created repository %s' % (urllib.quote(repo_name), repo_name_unicode)) - #test if the repo was created in the database + # test if the repo was created in the database new_repo = Session().query(Repository)\ .filter(Repository.repo_name == repo_name_unicode).one() self.assertEqual(new_repo.repo_name, repo_name_unicode) self.assertEqual(new_repo.description, description_unicode) - #test if repository is visible in the list ? - response = response.follow() + # test if the repository is visible in the list ? + response = self.app.get(url('summary_home', repo_name=repo_name)) + response.mustcontain(repo_name) + response.mustcontain(self.REPO_TYPE) - response.mustcontain(repo_name) - - #test if repository was created on filesystem + # test if the repository was created on filesystem try: vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) except Exception: self.fail('no repo %s in filesystem' % repo_name) - def test_create_hg_in_group(self): + def test_create_in_group(self): self.log_user() ## create GROUP - group_name = 'sometest' - gr = ReposGroupModel().create(group_name=group_name, - group_description='test', - owner=TEST_USER_ADMIN_LOGIN) + group_name = 'sometest_%s' % self.REPO_TYPE + gr = RepoGroupModel().create(group_name=group_name, + group_description='test', + owner=TEST_USER_ADMIN_LOGIN) Session().commit() repo_name = 'ingroup' @@ -113,137 +138,251 @@ class TestAdminReposController(TestContr response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, repo_name=repo_name, + repo_type=self.REPO_TYPE, + repo_description=description, + repo_group=gr.group_id,)) + ## run the check page that triggers the flash message + response = self.app.get(url('repo_check_home', repo_name=repo_name_full)) + self.assertEqual(response.json, {u'result': True}) + self.checkSessionFlash(response, + 'Created repository %s' + % (repo_name_full, repo_name_full)) + # test if the repo was created in the database + new_repo = Session().query(Repository)\ + .filter(Repository.repo_name == repo_name_full).one() + new_repo_id = new_repo.repo_id + + self.assertEqual(new_repo.repo_name, repo_name_full) + self.assertEqual(new_repo.description, description) + + # test if the repository is visible in the list ? + response = self.app.get(url('summary_home', repo_name=repo_name_full)) + response.mustcontain(repo_name_full) + response.mustcontain(self.REPO_TYPE) + + inherited_perms = UserRepoToPerm.query()\ + .filter(UserRepoToPerm.repository_id == new_repo_id).all() + self.assertEqual(len(inherited_perms), 1) + + # test if the repository was created on filesystem + try: + vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full)) + except Exception: + RepoGroupModel().delete(group_name) + Session().commit() + self.fail('no repo %s in filesystem' % repo_name) + + RepoModel().delete(repo_name_full) + RepoGroupModel().delete(group_name) + Session().commit() + + def test_create_in_group_without_needed_permissions(self): + usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) + # revoke + user_model = UserModel() + # disable fork and create on default user + user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository') + user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none') + user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository') + user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none') + + # disable on regular user + user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository') + user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none') + user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository') + user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none') + Session().commit() + + ## create GROUP + group_name = 'reg_sometest_%s' % self.REPO_TYPE + gr = RepoGroupModel().create(group_name=group_name, + group_description='test', + owner=TEST_USER_ADMIN_LOGIN) + Session().commit() + + group_name_allowed = 'reg_sometest_allowed_%s' % self.REPO_TYPE + gr_allowed = RepoGroupModel().create(group_name=group_name_allowed, + group_description='test', + owner=TEST_USER_REGULAR_LOGIN) + Session().commit() + + repo_name = 'ingroup' + repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) + description = 'description for newly created repo' + response = self.app.post(url('repos'), + fixture._get_repo_create_params(repo_private=False, + repo_name=repo_name, + repo_type=self.REPO_TYPE, repo_description=description, repo_group=gr.group_id,)) + response.mustcontain('Invalid value') + + # user is allowed to create in this group + repo_name = 'ingroup' + repo_name_full = RepoGroup.url_sep().join([group_name_allowed, repo_name]) + description = 'description for newly created repo' + response = self.app.post(url('repos'), + fixture._get_repo_create_params(repo_private=False, + repo_name=repo_name, + repo_type=self.REPO_TYPE, + repo_description=description, + repo_group=gr_allowed.group_id,)) + + ## run the check page that triggers the flash message + response = self.app.get(url('repo_check_home', repo_name=repo_name_full)) + self.assertEqual(response.json, {u'result': True}) self.checkSessionFlash(response, 'Created repository %s' - % (repo_name_full, repo_name)) - #test if the repo was created in the database + % (repo_name_full, repo_name_full)) + # test if the repo was created in the database new_repo = Session().query(Repository)\ .filter(Repository.repo_name == repo_name_full).one() + new_repo_id = new_repo.repo_id self.assertEqual(new_repo.repo_name, repo_name_full) self.assertEqual(new_repo.description, description) - #test if repository is visible in the list ? - response = response.follow() + # test if the repository is visible in the list ? + response = self.app.get(url('summary_home', repo_name=repo_name_full)) + response.mustcontain(repo_name_full) + response.mustcontain(self.REPO_TYPE) - response.mustcontain(repo_name_full) + inherited_perms = UserRepoToPerm.query()\ + .filter(UserRepoToPerm.repository_id == new_repo_id).all() + self.assertEqual(len(inherited_perms), 1) - #test if repository was created on filesystem + # test if the repository was created on filesystem try: vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full)) except Exception: - ReposGroupModel().delete(group_name) + RepoGroupModel().delete(group_name) Session().commit() self.fail('no repo %s in filesystem' % repo_name) RepoModel().delete(repo_name_full) - ReposGroupModel().delete(group_name) + RepoGroupModel().delete(group_name) + RepoGroupModel().delete(group_name_allowed) + Session().commit() + + def test_create_in_group_inherit_permissions(self): + self.log_user() + + ## create GROUP + group_name = 'sometest_%s' % self.REPO_TYPE + gr = RepoGroupModel().create(group_name=group_name, + group_description='test', + owner=TEST_USER_ADMIN_LOGIN) + perm = Permission.get_by_key('repository.write') + RepoGroupModel().grant_user_permission(gr, TEST_USER_REGULAR_LOGIN, perm) + + ## add repo permissions Session().commit() - def test_create_git(self): - self.log_user() - repo_name = NEW_GIT_REPO + repo_name = 'ingroup_inherited_%s' % self.REPO_TYPE + repo_name_full = RepoGroup.url_sep().join([group_name, repo_name]) description = 'description for newly created repo' - response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, - repo_type='git', + repo_name=repo_name, + repo_type=self.REPO_TYPE, + repo_description=description, + repo_group=gr.group_id, + repo_copy_permissions=True)) + + ## run the check page that triggers the flash message + response = self.app.get(url('repo_check_home', repo_name=repo_name_full)) + self.checkSessionFlash(response, + 'Created repository %s' + % (repo_name_full, repo_name_full)) + # test if the repo was created in the database + new_repo = Session().query(Repository)\ + .filter(Repository.repo_name == repo_name_full).one() + new_repo_id = new_repo.repo_id + + self.assertEqual(new_repo.repo_name, repo_name_full) + self.assertEqual(new_repo.description, description) + + # test if the repository is visible in the list ? + response = self.app.get(url('summary_home', repo_name=repo_name_full)) + response.mustcontain(repo_name_full) + response.mustcontain(self.REPO_TYPE) + + # test if the repository was created on filesystem + try: + vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full)) + except Exception: + RepoGroupModel().delete(group_name) + Session().commit() + self.fail('no repo %s in filesystem' % repo_name) + + #check if inherited permissiona are applied + inherited_perms = UserRepoToPerm.query()\ + .filter(UserRepoToPerm.repository_id == new_repo_id).all() + self.assertEqual(len(inherited_perms), 2) + + self.assertTrue(TEST_USER_REGULAR_LOGIN in [x.user.username + for x in inherited_perms]) + self.assertTrue('repository.write' in [x.permission.permission_name + for x in inherited_perms]) + + RepoModel().delete(repo_name_full) + RepoGroupModel().delete(group_name) + Session().commit() + + def test_create_remote_repo_wrong_clone_uri(self): + self.log_user() + repo_name = self.NEW_REPO + description = 'description for newly created repo' + response = self.app.post(url('repos'), + fixture._get_repo_create_params(repo_private=False, + repo_name=repo_name, + repo_type=self.REPO_TYPE, + repo_description=description, + clone_uri='http://127.0.0.1/repo')) + response.mustcontain('invalid clone url') + + + def test_create_remote_repo_wrong_clone_uri_hg_svn(self): + self.log_user() + repo_name = self.NEW_REPO + description = 'description for newly created repo' + response = self.app.post(url('repos'), + fixture._get_repo_create_params(repo_private=False, + repo_name=repo_name, + repo_type=self.REPO_TYPE, + repo_description=description, + clone_uri='svn+http://127.0.0.1/repo')) + response.mustcontain('invalid clone url') + + + def test_delete(self): + self.log_user() + repo_name = 'vcs_test_new_to_delete_%s' % self.REPO_TYPE + description = 'description for newly created repo' + response = self.app.post(url('repos'), + fixture._get_repo_create_params(repo_private=False, + repo_type=self.REPO_TYPE, repo_name=repo_name, repo_description=description)) + ## run the check page that triggers the flash message + response = self.app.get(url('repo_check_home', repo_name=repo_name)) self.checkSessionFlash(response, 'Created repository %s' % (repo_name, repo_name)) - - #test if the repo was created in the database + # test if the repo was created in the database new_repo = Session().query(Repository)\ .filter(Repository.repo_name == repo_name).one() self.assertEqual(new_repo.repo_name, repo_name) self.assertEqual(new_repo.description, description) - #test if repository is visible in the list ? - response = response.follow() - - response.mustcontain(repo_name) - - #test if repository was created on filesystem - try: - vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) - except Exception: - self.fail('no repo %s in filesystem' % repo_name) - - def test_create_git_non_ascii(self): - self.log_user() - non_ascii = "ąęł" - repo_name = "%s%s" % (NEW_GIT_REPO, non_ascii) - repo_name_unicode = repo_name.decode('utf8') - description = 'description for newly created repo' + non_ascii - description_unicode = description.decode('utf8') - private = False - response = self.app.post(url('repos'), - fixture._get_repo_create_params(repo_private=False, - repo_type='git', - repo_name=repo_name, - repo_description=description)) - - self.checkSessionFlash(response, - u'Created repository %s' - % (urllib.quote(repo_name), repo_name_unicode)) - - #test if the repo was created in the database - new_repo = Session().query(Repository)\ - .filter(Repository.repo_name == repo_name_unicode).one() - - self.assertEqual(new_repo.repo_name, repo_name_unicode) - self.assertEqual(new_repo.description, description_unicode) - - #test if repository is visible in the list ? - response = response.follow() - + # test if the repository is visible in the list ? + response = self.app.get(url('summary_home', repo_name=repo_name)) response.mustcontain(repo_name) - - #test if repository was created on filesystem - try: - vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) - except Exception: - self.fail('no repo %s in filesystem' % repo_name) - - def test_update(self): - response = self.app.put(url('repo', repo_name=HG_REPO)) - - def test_update_browser_fakeout(self): - response = self.app.post(url('repo', repo_name=HG_REPO), - params=dict(_method='put')) + response.mustcontain(self.REPO_TYPE) - def test_delete_hg(self): - self.log_user() - repo_name = 'vcs_test_new_to_delete' - description = 'description for newly created repo' - response = self.app.post(url('repos'), - fixture._get_repo_create_params(repo_private=False, - repo_type='hg', - repo_name=repo_name, - repo_description=description)) - - self.checkSessionFlash(response, - 'Created repository %s' - % (repo_name, repo_name)) - #test if the repo was created in the database - new_repo = Session().query(Repository)\ - .filter(Repository.repo_name == repo_name).one() - - self.assertEqual(new_repo.repo_name, repo_name) - self.assertEqual(new_repo.description, description) - - #test if repository is visible in the list ? - response = response.follow() - - response.mustcontain(repo_name) - - #test if repository was created on filesystem + # test if the repository was created on filesystem try: vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) except Exception: @@ -264,47 +403,49 @@ class TestAdminReposController(TestContr self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)), False) - def test_delete_git(self): + def test_delete_non_ascii(self): self.log_user() - repo_name = 'vcs_test_new_to_delete' - description = 'description for newly created repo' - private = False + non_ascii = "ąęł" + repo_name = "%s%s" % (self.NEW_REPO, non_ascii) + repo_name_unicode = repo_name.decode('utf8') + description = 'description for newly created repo' + non_ascii + description_unicode = description.decode('utf8') response = self.app.post(url('repos'), fixture._get_repo_create_params(repo_private=False, - repo_type='git', repo_name=repo_name, + repo_type=self.REPO_TYPE, repo_description=description)) - + ## run the check page that triggers the flash message + response = self.app.get(url('repo_check_home', repo_name=repo_name)) + self.assertEqual(response.json, {u'result': True}) self.checkSessionFlash(response, - 'Created repository %s' - % (repo_name, repo_name)) - #test if the repo was created in the database + u'Created repository %s' + % (urllib.quote(repo_name), repo_name_unicode)) + # test if the repo was created in the database new_repo = Session().query(Repository)\ - .filter(Repository.repo_name == repo_name).one() + .filter(Repository.repo_name == repo_name_unicode).one() - self.assertEqual(new_repo.repo_name, repo_name) - self.assertEqual(new_repo.description, description) + self.assertEqual(new_repo.repo_name, repo_name_unicode) + self.assertEqual(new_repo.description, description_unicode) - #test if repository is visible in the list ? - response = response.follow() + # test if the repository is visible in the list ? + response = self.app.get(url('summary_home', repo_name=repo_name)) + response.mustcontain(repo_name) + response.mustcontain(self.REPO_TYPE) - response.mustcontain(repo_name) - - #test if repository was created on filesystem + # test if the repository was created on filesystem try: vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name)) except Exception: self.fail('no repo %s in filesystem' % repo_name) response = self.app.delete(url('repo', repo_name=repo_name)) - - self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name)) - + self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name_unicode)) response.follow() #check if repo was deleted from db deleted_repo = Session().query(Repository)\ - .filter(Repository.repo_name == repo_name).scalar() + .filter(Repository.repo_name == repo_name_unicode).scalar() self.assertEqual(deleted_repo, None) @@ -316,52 +457,49 @@ class TestAdminReposController(TestContr pass def test_delete_browser_fakeout(self): - response = self.app.post(url('repo', repo_name=HG_REPO), + response = self.app.post(url('repo', repo_name=self.REPO), params=dict(_method='delete')) - def test_show_hg(self): + def test_show(self): self.log_user() - response = self.app.get(url('repo', repo_name=HG_REPO)) - - def test_show_git(self): - self.log_user() - response = self.app.get(url('repo', repo_name=GIT_REPO)) - + response = self.app.get(url('repo', repo_name=self.REPO)) def test_edit(self): - response = self.app.get(url('edit_repo', repo_name=HG_REPO)) + response = self.app.get(url('edit_repo', repo_name=self.REPO)) def test_set_private_flag_sets_default_to_none(self): self.log_user() #initially repository perm should be read - perm = _get_permission_for_user(user='default', repo=HG_REPO) + perm = _get_permission_for_user(user='default', repo=self.REPO) self.assertTrue(len(perm), 1) self.assertEqual(perm[0].permission.permission_name, 'repository.read') - self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, False) + self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False) - response = self.app.put(url('repo', repo_name=HG_REPO), + response = self.app.put(url('repo', repo_name=self.REPO), fixture._get_repo_create_params(repo_private=1, - repo_name=HG_REPO, + repo_name=self.REPO, + repo_type=self.REPO_TYPE, user=TEST_USER_ADMIN_LOGIN)) self.checkSessionFlash(response, - msg='Repository %s updated successfully' % (HG_REPO)) - self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, True) + msg='Repository %s updated successfully' % (self.REPO)) + self.assertEqual(Repository.get_by_repo_name(self.REPO).private, True) #now the repo default permission should be None - perm = _get_permission_for_user(user='default', repo=HG_REPO) + perm = _get_permission_for_user(user='default', repo=self.REPO) self.assertTrue(len(perm), 1) self.assertEqual(perm[0].permission.permission_name, 'repository.none') - response = self.app.put(url('repo', repo_name=HG_REPO), + response = self.app.put(url('repo', repo_name=self.REPO), fixture._get_repo_create_params(repo_private=False, - repo_name=HG_REPO, + repo_name=self.REPO, + repo_type=self.REPO_TYPE, user=TEST_USER_ADMIN_LOGIN)) self.checkSessionFlash(response, - msg='Repository %s updated successfully' % (HG_REPO)) - self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, False) + msg='Repository %s updated successfully' % (self.REPO)) + self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False) #we turn off private now the repo default permission should stay None - perm = _get_permission_for_user(user='default', repo=HG_REPO) + perm = _get_permission_for_user(user='default', repo=self.REPO) self.assertTrue(len(perm), 1) self.assertEqual(perm[0].permission.permission_name, 'repository.none') @@ -369,3 +507,133 @@ class TestAdminReposController(TestContr perm[0].permission = Permission.get_by_key('repository.read') Session().add(perm[0]) Session().commit() + + def test_set_repo_fork_has_no_self_id(self): + self.log_user() + repo = Repository.get_by_repo_name(self.REPO) + response = self.app.get(url('edit_repo_advanced', repo_name=self.REPO)) + opt = """""" % repo.repo_id + response.mustcontain(no=[opt]) + + def test_set_fork_of_other_repo(self): + self.log_user() + other_repo = 'other_%s' % self.REPO_TYPE + fixture.create_repo(other_repo, repo_type=self.REPO_TYPE) + repo = Repository.get_by_repo_name(self.REPO) + repo2 = Repository.get_by_repo_name(other_repo) + response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO), + params=dict(id_fork_of=repo2.repo_id)) + repo = Repository.get_by_repo_name(self.REPO) + repo2 = Repository.get_by_repo_name(other_repo) + self.checkSessionFlash(response, + 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name)) + + assert repo.fork == repo2 + response = response.follow() + # check if given repo is selected + + opt = """""" % ( + repo2.repo_id, repo2.repo_name) + response.mustcontain(opt) + + fixture.destroy_repo(other_repo, forks='detach') + + def test_set_fork_of_other_type_repo(self): + self.log_user() + repo = Repository.get_by_repo_name(self.REPO) + repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO) + response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO), + params=dict(id_fork_of=repo2.repo_id)) + repo = Repository.get_by_repo_name(self.REPO) + repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO) + self.checkSessionFlash(response, + 'Cannot set repository as fork of repository with other type') + + def test_set_fork_of_none(self): + self.log_user() + ## mark it as None + response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO), + params=dict(id_fork_of=None)) + repo = Repository.get_by_repo_name(self.REPO) + repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO) + self.checkSessionFlash(response, + 'Marked repo %s as fork of %s' + % (repo.repo_name, "Nothing")) + assert repo.fork is None + + def test_set_fork_of_same_repo(self): + self.log_user() + repo = Repository.get_by_repo_name(self.REPO) + response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO), + params=dict(id_fork_of=repo.repo_id)) + self.checkSessionFlash(response, + 'An error occurred during this operation') + + def test_create_on_top_level_without_permissions(self): + usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS) + # revoke + user_model = UserModel() + # disable fork and create on default user + user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository') + user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none') + user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository') + user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none') + + # disable on regular user + user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository') + user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none') + user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository') + user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none') + Session().commit() + + + user = User.get(usr['user_id']) + + repo_name = self.NEW_REPO+'no_perms' + description = 'description for newly created repo' + response = self.app.post(url('repos'), + fixture._get_repo_create_params(repo_private=False, + repo_name=repo_name, + repo_type=self.REPO_TYPE, + repo_description=description)) + + response.mustcontain('no permission to create repository in root location') + + RepoModel().delete(repo_name) + Session().commit() + + @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function) + def test_create_repo_when_filesystem_op_fails(self): + self.log_user() + repo_name = self.NEW_REPO + description = 'description for newly created repo' + + response = self.app.post(url('repos'), + fixture._get_repo_create_params(repo_private=False, + repo_name=repo_name, + repo_type=self.REPO_TYPE, + repo_description=description)) + + self.checkSessionFlash(response, + 'Error creating repository %s' % repo_name) + # repo must not be in db + repo = Repository.get_by_repo_name(repo_name) + self.assertEqual(repo, None) + + # repo must not be in filesystem ! + self.assertFalse(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name))) + +class TestAdminReposControllerGIT(_BaseTest): + REPO = GIT_REPO + REPO_TYPE = 'git' + NEW_REPO = NEW_GIT_REPO + OTHER_TYPE_REPO = HG_REPO + OTHER_TYPE = 'hg' + + +class TestAdminReposControllerHG(_BaseTest): + REPO = HG_REPO + REPO_TYPE = 'hg' + NEW_REPO = NEW_HG_REPO + OTHER_TYPE_REPO = GIT_REPO + OTHER_TYPE = 'git'