diff --git a/rhodecode/tests/functional/test_admin_repos.py b/rhodecode/tests/functional/test_admin_repos.py
--- a/rhodecode/tests/functional/test_admin_repos.py
+++ b/rhodecode/tests/functional/test_admin_repos.py
@@ -1,16 +1,19 @@
# -*- coding: utf-8 -*-
import os
+import mock
import urllib
from rhodecode.lib import vcs
from rhodecode.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
Permission
+from rhodecode.model.user import UserModel
from rhodecode.tests import *
-from rhodecode.model.repos_group import ReposGroupModel
+from rhodecode.model.repo_group import RepoGroupModel
from rhodecode.model.repo import RepoModel
+from rhodecode.model.scm import ScmModel
from rhodecode.model.meta import Session
-from rhodecode.tests.fixture import Fixture
+from rhodecode.tests.fixture import Fixture, error_function
fixture = Fixture()
@@ -24,87 +27,109 @@ def _get_permission_for_user(user, repo)
return perm
-class TestAdminReposController(TestController):
+class _BaseTest(TestController):
+ """
+ Write all tests here
+ """
+ REPO = None
+ REPO_TYPE = None
+ NEW_REPO = None
+ OTHER_TYPE_REPO = None
+ OTHER_TYPE = None
+
+ @classmethod
+ def setup_class(cls):
+ pass
+
+ @classmethod
+ def teardown_class(cls):
+ pass
def test_index(self):
self.log_user()
response = self.app.get(url('repos'))
- # Test response...
- def test_index_as_xml(self):
- response = self.app.get(url('formatted_repos', format='xml'))
-
- def test_create_hg(self):
+ def test_create(self):
self.log_user()
- repo_name = NEW_HG_REPO
+ repo_name = self.NEW_REPO
description = 'description for newly created repo'
response = self.app.post(url('repos'),
fixture._get_repo_create_params(repo_private=False,
repo_name=repo_name,
+ repo_type=self.REPO_TYPE,
repo_description=description))
+ ## run the check page that triggers the flash message
+ response = self.app.get(url('repo_check_home', repo_name=repo_name))
+ self.assertEqual(response.json, {u'result': True})
self.checkSessionFlash(response,
'Created repository %s'
% (repo_name, repo_name))
- #test if the repo was created in the database
+ # test if the repo was created in the database
new_repo = Session().query(Repository)\
.filter(Repository.repo_name == repo_name).one()
self.assertEqual(new_repo.repo_name, repo_name)
self.assertEqual(new_repo.description, description)
- #test if repository is visible in the list ?
- response = response.follow()
+ # test if the repository is visible in the list ?
+ response = self.app.get(url('summary_home', repo_name=repo_name))
+ response.mustcontain(repo_name)
+ response.mustcontain(self.REPO_TYPE)
- response.mustcontain(repo_name)
-
- #test if repository was created on filesystem
+ # test if the repository was created on filesystem
try:
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
except Exception:
self.fail('no repo %s in filesystem' % repo_name)
- def test_create_hg_non_ascii(self):
+ RepoModel().delete(repo_name)
+ Session().commit()
+
+ def test_create_non_ascii(self):
self.log_user()
non_ascii = "ąęł"
- repo_name = "%s%s" % (NEW_HG_REPO, non_ascii)
+ repo_name = "%s%s" % (self.NEW_REPO, non_ascii)
repo_name_unicode = repo_name.decode('utf8')
description = 'description for newly created repo' + non_ascii
description_unicode = description.decode('utf8')
- private = False
response = self.app.post(url('repos'),
fixture._get_repo_create_params(repo_private=False,
repo_name=repo_name,
+ repo_type=self.REPO_TYPE,
repo_description=description))
+ ## run the check page that triggers the flash message
+ response = self.app.get(url('repo_check_home', repo_name=repo_name))
+ self.assertEqual(response.json, {u'result': True})
self.checkSessionFlash(response,
u'Created repository %s'
% (urllib.quote(repo_name), repo_name_unicode))
- #test if the repo was created in the database
+ # test if the repo was created in the database
new_repo = Session().query(Repository)\
.filter(Repository.repo_name == repo_name_unicode).one()
self.assertEqual(new_repo.repo_name, repo_name_unicode)
self.assertEqual(new_repo.description, description_unicode)
- #test if repository is visible in the list ?
- response = response.follow()
+ # test if the repository is visible in the list ?
+ response = self.app.get(url('summary_home', repo_name=repo_name))
+ response.mustcontain(repo_name)
+ response.mustcontain(self.REPO_TYPE)
- response.mustcontain(repo_name)
-
- #test if repository was created on filesystem
+ # test if the repository was created on filesystem
try:
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
except Exception:
self.fail('no repo %s in filesystem' % repo_name)
- def test_create_hg_in_group(self):
+ def test_create_in_group(self):
self.log_user()
## create GROUP
- group_name = 'sometest'
- gr = ReposGroupModel().create(group_name=group_name,
- group_description='test',
- owner=TEST_USER_ADMIN_LOGIN)
+ group_name = 'sometest_%s' % self.REPO_TYPE
+ gr = RepoGroupModel().create(group_name=group_name,
+ group_description='test',
+ owner=TEST_USER_ADMIN_LOGIN)
Session().commit()
repo_name = 'ingroup'
@@ -113,137 +138,251 @@ class TestAdminReposController(TestContr
response = self.app.post(url('repos'),
fixture._get_repo_create_params(repo_private=False,
repo_name=repo_name,
+ repo_type=self.REPO_TYPE,
+ repo_description=description,
+ repo_group=gr.group_id,))
+ ## run the check page that triggers the flash message
+ response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
+ self.assertEqual(response.json, {u'result': True})
+ self.checkSessionFlash(response,
+ 'Created repository %s'
+ % (repo_name_full, repo_name_full))
+ # test if the repo was created in the database
+ new_repo = Session().query(Repository)\
+ .filter(Repository.repo_name == repo_name_full).one()
+ new_repo_id = new_repo.repo_id
+
+ self.assertEqual(new_repo.repo_name, repo_name_full)
+ self.assertEqual(new_repo.description, description)
+
+ # test if the repository is visible in the list ?
+ response = self.app.get(url('summary_home', repo_name=repo_name_full))
+ response.mustcontain(repo_name_full)
+ response.mustcontain(self.REPO_TYPE)
+
+ inherited_perms = UserRepoToPerm.query()\
+ .filter(UserRepoToPerm.repository_id == new_repo_id).all()
+ self.assertEqual(len(inherited_perms), 1)
+
+ # test if the repository was created on filesystem
+ try:
+ vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full))
+ except Exception:
+ RepoGroupModel().delete(group_name)
+ Session().commit()
+ self.fail('no repo %s in filesystem' % repo_name)
+
+ RepoModel().delete(repo_name_full)
+ RepoGroupModel().delete(group_name)
+ Session().commit()
+
+ def test_create_in_group_without_needed_permissions(self):
+ usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
+ # revoke
+ user_model = UserModel()
+ # disable fork and create on default user
+ user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
+ user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
+ user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
+ user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
+
+ # disable on regular user
+ user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
+ user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
+ user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
+ user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
+ Session().commit()
+
+ ## create GROUP
+ group_name = 'reg_sometest_%s' % self.REPO_TYPE
+ gr = RepoGroupModel().create(group_name=group_name,
+ group_description='test',
+ owner=TEST_USER_ADMIN_LOGIN)
+ Session().commit()
+
+ group_name_allowed = 'reg_sometest_allowed_%s' % self.REPO_TYPE
+ gr_allowed = RepoGroupModel().create(group_name=group_name_allowed,
+ group_description='test',
+ owner=TEST_USER_REGULAR_LOGIN)
+ Session().commit()
+
+ repo_name = 'ingroup'
+ repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
+ description = 'description for newly created repo'
+ response = self.app.post(url('repos'),
+ fixture._get_repo_create_params(repo_private=False,
+ repo_name=repo_name,
+ repo_type=self.REPO_TYPE,
repo_description=description,
repo_group=gr.group_id,))
+ response.mustcontain('Invalid value')
+
+ # user is allowed to create in this group
+ repo_name = 'ingroup'
+ repo_name_full = RepoGroup.url_sep().join([group_name_allowed, repo_name])
+ description = 'description for newly created repo'
+ response = self.app.post(url('repos'),
+ fixture._get_repo_create_params(repo_private=False,
+ repo_name=repo_name,
+ repo_type=self.REPO_TYPE,
+ repo_description=description,
+ repo_group=gr_allowed.group_id,))
+
+ ## run the check page that triggers the flash message
+ response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
+ self.assertEqual(response.json, {u'result': True})
self.checkSessionFlash(response,
'Created repository %s'
- % (repo_name_full, repo_name))
- #test if the repo was created in the database
+ % (repo_name_full, repo_name_full))
+ # test if the repo was created in the database
new_repo = Session().query(Repository)\
.filter(Repository.repo_name == repo_name_full).one()
+ new_repo_id = new_repo.repo_id
self.assertEqual(new_repo.repo_name, repo_name_full)
self.assertEqual(new_repo.description, description)
- #test if repository is visible in the list ?
- response = response.follow()
+ # test if the repository is visible in the list ?
+ response = self.app.get(url('summary_home', repo_name=repo_name_full))
+ response.mustcontain(repo_name_full)
+ response.mustcontain(self.REPO_TYPE)
- response.mustcontain(repo_name_full)
+ inherited_perms = UserRepoToPerm.query()\
+ .filter(UserRepoToPerm.repository_id == new_repo_id).all()
+ self.assertEqual(len(inherited_perms), 1)
- #test if repository was created on filesystem
+ # test if the repository was created on filesystem
try:
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full))
except Exception:
- ReposGroupModel().delete(group_name)
+ RepoGroupModel().delete(group_name)
Session().commit()
self.fail('no repo %s in filesystem' % repo_name)
RepoModel().delete(repo_name_full)
- ReposGroupModel().delete(group_name)
+ RepoGroupModel().delete(group_name)
+ RepoGroupModel().delete(group_name_allowed)
+ Session().commit()
+
+ def test_create_in_group_inherit_permissions(self):
+ self.log_user()
+
+ ## create GROUP
+ group_name = 'sometest_%s' % self.REPO_TYPE
+ gr = RepoGroupModel().create(group_name=group_name,
+ group_description='test',
+ owner=TEST_USER_ADMIN_LOGIN)
+ perm = Permission.get_by_key('repository.write')
+ RepoGroupModel().grant_user_permission(gr, TEST_USER_REGULAR_LOGIN, perm)
+
+ ## add repo permissions
Session().commit()
- def test_create_git(self):
- self.log_user()
- repo_name = NEW_GIT_REPO
+ repo_name = 'ingroup_inherited_%s' % self.REPO_TYPE
+ repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
description = 'description for newly created repo'
-
response = self.app.post(url('repos'),
fixture._get_repo_create_params(repo_private=False,
- repo_type='git',
+ repo_name=repo_name,
+ repo_type=self.REPO_TYPE,
+ repo_description=description,
+ repo_group=gr.group_id,
+ repo_copy_permissions=True))
+
+ ## run the check page that triggers the flash message
+ response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
+ self.checkSessionFlash(response,
+ 'Created repository %s'
+ % (repo_name_full, repo_name_full))
+ # test if the repo was created in the database
+ new_repo = Session().query(Repository)\
+ .filter(Repository.repo_name == repo_name_full).one()
+ new_repo_id = new_repo.repo_id
+
+ self.assertEqual(new_repo.repo_name, repo_name_full)
+ self.assertEqual(new_repo.description, description)
+
+ # test if the repository is visible in the list ?
+ response = self.app.get(url('summary_home', repo_name=repo_name_full))
+ response.mustcontain(repo_name_full)
+ response.mustcontain(self.REPO_TYPE)
+
+ # test if the repository was created on filesystem
+ try:
+ vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name_full))
+ except Exception:
+ RepoGroupModel().delete(group_name)
+ Session().commit()
+ self.fail('no repo %s in filesystem' % repo_name)
+
+ #check if inherited permissiona are applied
+ inherited_perms = UserRepoToPerm.query()\
+ .filter(UserRepoToPerm.repository_id == new_repo_id).all()
+ self.assertEqual(len(inherited_perms), 2)
+
+ self.assertTrue(TEST_USER_REGULAR_LOGIN in [x.user.username
+ for x in inherited_perms])
+ self.assertTrue('repository.write' in [x.permission.permission_name
+ for x in inherited_perms])
+
+ RepoModel().delete(repo_name_full)
+ RepoGroupModel().delete(group_name)
+ Session().commit()
+
+ def test_create_remote_repo_wrong_clone_uri(self):
+ self.log_user()
+ repo_name = self.NEW_REPO
+ description = 'description for newly created repo'
+ response = self.app.post(url('repos'),
+ fixture._get_repo_create_params(repo_private=False,
+ repo_name=repo_name,
+ repo_type=self.REPO_TYPE,
+ repo_description=description,
+ clone_uri='http://127.0.0.1/repo'))
+ response.mustcontain('invalid clone url')
+
+
+ def test_create_remote_repo_wrong_clone_uri_hg_svn(self):
+ self.log_user()
+ repo_name = self.NEW_REPO
+ description = 'description for newly created repo'
+ response = self.app.post(url('repos'),
+ fixture._get_repo_create_params(repo_private=False,
+ repo_name=repo_name,
+ repo_type=self.REPO_TYPE,
+ repo_description=description,
+ clone_uri='svn+http://127.0.0.1/repo'))
+ response.mustcontain('invalid clone url')
+
+
+ def test_delete(self):
+ self.log_user()
+ repo_name = 'vcs_test_new_to_delete_%s' % self.REPO_TYPE
+ description = 'description for newly created repo'
+ response = self.app.post(url('repos'),
+ fixture._get_repo_create_params(repo_private=False,
+ repo_type=self.REPO_TYPE,
repo_name=repo_name,
repo_description=description))
+ ## run the check page that triggers the flash message
+ response = self.app.get(url('repo_check_home', repo_name=repo_name))
self.checkSessionFlash(response,
'Created repository %s'
% (repo_name, repo_name))
-
- #test if the repo was created in the database
+ # test if the repo was created in the database
new_repo = Session().query(Repository)\
.filter(Repository.repo_name == repo_name).one()
self.assertEqual(new_repo.repo_name, repo_name)
self.assertEqual(new_repo.description, description)
- #test if repository is visible in the list ?
- response = response.follow()
-
- response.mustcontain(repo_name)
-
- #test if repository was created on filesystem
- try:
- vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
- except Exception:
- self.fail('no repo %s in filesystem' % repo_name)
-
- def test_create_git_non_ascii(self):
- self.log_user()
- non_ascii = "ąęł"
- repo_name = "%s%s" % (NEW_GIT_REPO, non_ascii)
- repo_name_unicode = repo_name.decode('utf8')
- description = 'description for newly created repo' + non_ascii
- description_unicode = description.decode('utf8')
- private = False
- response = self.app.post(url('repos'),
- fixture._get_repo_create_params(repo_private=False,
- repo_type='git',
- repo_name=repo_name,
- repo_description=description))
-
- self.checkSessionFlash(response,
- u'Created repository %s'
- % (urllib.quote(repo_name), repo_name_unicode))
-
- #test if the repo was created in the database
- new_repo = Session().query(Repository)\
- .filter(Repository.repo_name == repo_name_unicode).one()
-
- self.assertEqual(new_repo.repo_name, repo_name_unicode)
- self.assertEqual(new_repo.description, description_unicode)
-
- #test if repository is visible in the list ?
- response = response.follow()
-
+ # test if the repository is visible in the list ?
+ response = self.app.get(url('summary_home', repo_name=repo_name))
response.mustcontain(repo_name)
-
- #test if repository was created on filesystem
- try:
- vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
- except Exception:
- self.fail('no repo %s in filesystem' % repo_name)
-
- def test_update(self):
- response = self.app.put(url('repo', repo_name=HG_REPO))
-
- def test_update_browser_fakeout(self):
- response = self.app.post(url('repo', repo_name=HG_REPO),
- params=dict(_method='put'))
+ response.mustcontain(self.REPO_TYPE)
- def test_delete_hg(self):
- self.log_user()
- repo_name = 'vcs_test_new_to_delete'
- description = 'description for newly created repo'
- response = self.app.post(url('repos'),
- fixture._get_repo_create_params(repo_private=False,
- repo_type='hg',
- repo_name=repo_name,
- repo_description=description))
-
- self.checkSessionFlash(response,
- 'Created repository %s'
- % (repo_name, repo_name))
- #test if the repo was created in the database
- new_repo = Session().query(Repository)\
- .filter(Repository.repo_name == repo_name).one()
-
- self.assertEqual(new_repo.repo_name, repo_name)
- self.assertEqual(new_repo.description, description)
-
- #test if repository is visible in the list ?
- response = response.follow()
-
- response.mustcontain(repo_name)
-
- #test if repository was created on filesystem
+ # test if the repository was created on filesystem
try:
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
except Exception:
@@ -264,47 +403,49 @@ class TestAdminReposController(TestContr
self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
False)
- def test_delete_git(self):
+ def test_delete_non_ascii(self):
self.log_user()
- repo_name = 'vcs_test_new_to_delete'
- description = 'description for newly created repo'
- private = False
+ non_ascii = "ąęł"
+ repo_name = "%s%s" % (self.NEW_REPO, non_ascii)
+ repo_name_unicode = repo_name.decode('utf8')
+ description = 'description for newly created repo' + non_ascii
+ description_unicode = description.decode('utf8')
response = self.app.post(url('repos'),
fixture._get_repo_create_params(repo_private=False,
- repo_type='git',
repo_name=repo_name,
+ repo_type=self.REPO_TYPE,
repo_description=description))
-
+ ## run the check page that triggers the flash message
+ response = self.app.get(url('repo_check_home', repo_name=repo_name))
+ self.assertEqual(response.json, {u'result': True})
self.checkSessionFlash(response,
- 'Created repository %s'
- % (repo_name, repo_name))
- #test if the repo was created in the database
+ u'Created repository %s'
+ % (urllib.quote(repo_name), repo_name_unicode))
+ # test if the repo was created in the database
new_repo = Session().query(Repository)\
- .filter(Repository.repo_name == repo_name).one()
+ .filter(Repository.repo_name == repo_name_unicode).one()
- self.assertEqual(new_repo.repo_name, repo_name)
- self.assertEqual(new_repo.description, description)
+ self.assertEqual(new_repo.repo_name, repo_name_unicode)
+ self.assertEqual(new_repo.description, description_unicode)
- #test if repository is visible in the list ?
- response = response.follow()
+ # test if the repository is visible in the list ?
+ response = self.app.get(url('summary_home', repo_name=repo_name))
+ response.mustcontain(repo_name)
+ response.mustcontain(self.REPO_TYPE)
- response.mustcontain(repo_name)
-
- #test if repository was created on filesystem
+ # test if the repository was created on filesystem
try:
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
except Exception:
self.fail('no repo %s in filesystem' % repo_name)
response = self.app.delete(url('repo', repo_name=repo_name))
-
- self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name))
-
+ self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name_unicode))
response.follow()
#check if repo was deleted from db
deleted_repo = Session().query(Repository)\
- .filter(Repository.repo_name == repo_name).scalar()
+ .filter(Repository.repo_name == repo_name_unicode).scalar()
self.assertEqual(deleted_repo, None)
@@ -316,52 +457,49 @@ class TestAdminReposController(TestContr
pass
def test_delete_browser_fakeout(self):
- response = self.app.post(url('repo', repo_name=HG_REPO),
+ response = self.app.post(url('repo', repo_name=self.REPO),
params=dict(_method='delete'))
- def test_show_hg(self):
+ def test_show(self):
self.log_user()
- response = self.app.get(url('repo', repo_name=HG_REPO))
-
- def test_show_git(self):
- self.log_user()
- response = self.app.get(url('repo', repo_name=GIT_REPO))
-
+ response = self.app.get(url('repo', repo_name=self.REPO))
def test_edit(self):
- response = self.app.get(url('edit_repo', repo_name=HG_REPO))
+ response = self.app.get(url('edit_repo', repo_name=self.REPO))
def test_set_private_flag_sets_default_to_none(self):
self.log_user()
#initially repository perm should be read
- perm = _get_permission_for_user(user='default', repo=HG_REPO)
+ perm = _get_permission_for_user(user='default', repo=self.REPO)
self.assertTrue(len(perm), 1)
self.assertEqual(perm[0].permission.permission_name, 'repository.read')
- self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, False)
+ self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False)
- response = self.app.put(url('repo', repo_name=HG_REPO),
+ response = self.app.put(url('repo', repo_name=self.REPO),
fixture._get_repo_create_params(repo_private=1,
- repo_name=HG_REPO,
+ repo_name=self.REPO,
+ repo_type=self.REPO_TYPE,
user=TEST_USER_ADMIN_LOGIN))
self.checkSessionFlash(response,
- msg='Repository %s updated successfully' % (HG_REPO))
- self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, True)
+ msg='Repository %s updated successfully' % (self.REPO))
+ self.assertEqual(Repository.get_by_repo_name(self.REPO).private, True)
#now the repo default permission should be None
- perm = _get_permission_for_user(user='default', repo=HG_REPO)
+ perm = _get_permission_for_user(user='default', repo=self.REPO)
self.assertTrue(len(perm), 1)
self.assertEqual(perm[0].permission.permission_name, 'repository.none')
- response = self.app.put(url('repo', repo_name=HG_REPO),
+ response = self.app.put(url('repo', repo_name=self.REPO),
fixture._get_repo_create_params(repo_private=False,
- repo_name=HG_REPO,
+ repo_name=self.REPO,
+ repo_type=self.REPO_TYPE,
user=TEST_USER_ADMIN_LOGIN))
self.checkSessionFlash(response,
- msg='Repository %s updated successfully' % (HG_REPO))
- self.assertEqual(Repository.get_by_repo_name(HG_REPO).private, False)
+ msg='Repository %s updated successfully' % (self.REPO))
+ self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False)
#we turn off private now the repo default permission should stay None
- perm = _get_permission_for_user(user='default', repo=HG_REPO)
+ perm = _get_permission_for_user(user='default', repo=self.REPO)
self.assertTrue(len(perm), 1)
self.assertEqual(perm[0].permission.permission_name, 'repository.none')
@@ -369,3 +507,133 @@ class TestAdminReposController(TestContr
perm[0].permission = Permission.get_by_key('repository.read')
Session().add(perm[0])
Session().commit()
+
+ def test_set_repo_fork_has_no_self_id(self):
+ self.log_user()
+ repo = Repository.get_by_repo_name(self.REPO)
+ response = self.app.get(url('edit_repo_advanced', repo_name=self.REPO))
+ opt = """""" % repo.repo_id
+ response.mustcontain(no=[opt])
+
+ def test_set_fork_of_other_repo(self):
+ self.log_user()
+ other_repo = 'other_%s' % self.REPO_TYPE
+ fixture.create_repo(other_repo, repo_type=self.REPO_TYPE)
+ repo = Repository.get_by_repo_name(self.REPO)
+ repo2 = Repository.get_by_repo_name(other_repo)
+ response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
+ params=dict(id_fork_of=repo2.repo_id))
+ repo = Repository.get_by_repo_name(self.REPO)
+ repo2 = Repository.get_by_repo_name(other_repo)
+ self.checkSessionFlash(response,
+ 'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
+
+ assert repo.fork == repo2
+ response = response.follow()
+ # check if given repo is selected
+
+ opt = """""" % (
+ repo2.repo_id, repo2.repo_name)
+ response.mustcontain(opt)
+
+ fixture.destroy_repo(other_repo, forks='detach')
+
+ def test_set_fork_of_other_type_repo(self):
+ self.log_user()
+ repo = Repository.get_by_repo_name(self.REPO)
+ repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
+ response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
+ params=dict(id_fork_of=repo2.repo_id))
+ repo = Repository.get_by_repo_name(self.REPO)
+ repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
+ self.checkSessionFlash(response,
+ 'Cannot set repository as fork of repository with other type')
+
+ def test_set_fork_of_none(self):
+ self.log_user()
+ ## mark it as None
+ response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
+ params=dict(id_fork_of=None))
+ repo = Repository.get_by_repo_name(self.REPO)
+ repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
+ self.checkSessionFlash(response,
+ 'Marked repo %s as fork of %s'
+ % (repo.repo_name, "Nothing"))
+ assert repo.fork is None
+
+ def test_set_fork_of_same_repo(self):
+ self.log_user()
+ repo = Repository.get_by_repo_name(self.REPO)
+ response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
+ params=dict(id_fork_of=repo.repo_id))
+ self.checkSessionFlash(response,
+ 'An error occurred during this operation')
+
+ def test_create_on_top_level_without_permissions(self):
+ usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
+ # revoke
+ user_model = UserModel()
+ # disable fork and create on default user
+ user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
+ user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
+ user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
+ user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
+
+ # disable on regular user
+ user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
+ user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
+ user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.repository')
+ user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.fork.none')
+ Session().commit()
+
+
+ user = User.get(usr['user_id'])
+
+ repo_name = self.NEW_REPO+'no_perms'
+ description = 'description for newly created repo'
+ response = self.app.post(url('repos'),
+ fixture._get_repo_create_params(repo_private=False,
+ repo_name=repo_name,
+ repo_type=self.REPO_TYPE,
+ repo_description=description))
+
+ response.mustcontain('no permission to create repository in root location')
+
+ RepoModel().delete(repo_name)
+ Session().commit()
+
+ @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
+ def test_create_repo_when_filesystem_op_fails(self):
+ self.log_user()
+ repo_name = self.NEW_REPO
+ description = 'description for newly created repo'
+
+ response = self.app.post(url('repos'),
+ fixture._get_repo_create_params(repo_private=False,
+ repo_name=repo_name,
+ repo_type=self.REPO_TYPE,
+ repo_description=description))
+
+ self.checkSessionFlash(response,
+ 'Error creating repository %s' % repo_name)
+ # repo must not be in db
+ repo = Repository.get_by_repo_name(repo_name)
+ self.assertEqual(repo, None)
+
+ # repo must not be in filesystem !
+ self.assertFalse(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)))
+
+class TestAdminReposControllerGIT(_BaseTest):
+ REPO = GIT_REPO
+ REPO_TYPE = 'git'
+ NEW_REPO = NEW_GIT_REPO
+ OTHER_TYPE_REPO = HG_REPO
+ OTHER_TYPE = 'hg'
+
+
+class TestAdminReposControllerHG(_BaseTest):
+ REPO = HG_REPO
+ REPO_TYPE = 'hg'
+ NEW_REPO = NEW_HG_REPO
+ OTHER_TYPE_REPO = GIT_REPO
+ OTHER_TYPE = 'git'