Changeset - 8568a1d4f100
[Not reviewed]
default
0 4 0
domruf - 10 years ago 2016-01-28 20:40:32
dominikruf@gmail.com
tests: don't use the TESTS_TMP_PATH string

it differs each time you run the test
and therefore doesn't work if you use a separat kallithea test instance
(KALLITHEA_NO_TMP_PATH=1)

instead get the 'real' path from the DB

FIXME: breaks kallithea/tests/functional/test_admin_repos.py
4 files changed with 18 insertions and 17 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -12,49 +12,49 @@
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
tests for api. run with::
 

	
 
    KALLITHEA_WHOOSH_TEST_DISABLE=1 nosetests --with-coverage --cover-package=kallithea.controllers.api.api -x kallithea/tests/api
 
"""
 

	
 
import os
 
import random
 
import mock
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib.compat import json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.meta import Session
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.db import Repository, User, Setting
 
from kallithea.model.db import Repository, User, Setting, Ui
 
from kallithea.lib.utils2 import time_to_datetime
 

	
 

	
 
API_URL = '/_admin/api'
 
TEST_USER_GROUP = u'test_user_group'
 
TEST_REPO_GROUP = u'test_repo_group'
 

	
 
fixture = Fixture()
 

	
 

	
 
def _build_data(apikey, method, **kw):
 
    """
 
    Builds API data with given random ID
 

	
 
    :param random_id:
 
    """
 
    random_id = random.randrange(1, 9999)
 
    return random_id, json.dumps({
 
        "id": random_id,
 
        "api_key": apikey,
 
        "method": method,
 
        "args": kw
 
    })
 

	
 
@@ -259,49 +259,49 @@ class _BaseTestApi(object):
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_without_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user')
 
        response = api_call(self, params)
 

	
 
        usr = User.get_by_username(self.TEST_USER_LOGIN)
 
        ret = usr.get_api_data()
 
        ret['permissions'] = AuthUser(dbuser=usr).permissions
 

	
 
        expected = ret
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user_with_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user',
 
                                  userid=self.TEST_USER_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'userid is not the same as your user'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_pull(self):
 
        repo_name = u'test_pull'
 
        r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        r.clone_uri = os.path.join(TESTS_TMP_PATH, self.REPO)
 
        r.clone_uri = os.path.join(Ui.get_by_key('paths', '/').ui_value, self.REPO)
 
        Session.add(r)
 
        Session.commit()
 

	
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=repo_name,)
 
        response = api_call(self, params)
 

	
 
        expected = {'msg': 'Pulled from `%s`' % repo_name,
 
                    'repository': repo_name}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        fixture.destroy_repo(repo_name)
 

	
 
    def test_api_pull_error(self):
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=self.REPO, )
 
        response = api_call(self, params)
 

	
 
        expected = 'Unable to pull changes from `%s`' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_rescan_repos(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos')
 
        response = api_call(self, params)
kallithea/tests/functional/test_admin_repos.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
import os
 
import mock
 
import urllib
 

	
 
from kallithea.lib import vcs
 
from kallithea.lib.utils2 import safe_str, safe_unicode
 
from kallithea.model.db import Repository, RepoGroup, UserRepoToPerm, User, \
 
    Permission
 
    Permission, Ui
 
from kallithea.model.user import UserModel
 
from kallithea.tests import *
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.meta import Session
 
from kallithea.tests.fixture import Fixture, error_function
 

	
 
fixture = Fixture()
 

	
 

	
 
def _get_permission_for_user(user, repo):
 
    perm = UserRepoToPerm.query() \
 
                .filter(UserRepoToPerm.repository ==
 
                        Repository.get_by_repo_name(repo)) \
 
                .filter(UserRepoToPerm.user == User.get_by_username(user)) \
 
                .all()
 
    return perm
 

	
 

	
 
class _BaseTest(object):
 
    """
 
    Write all tests here
 
    """
 
    REPO = None
 
@@ -59,49 +59,49 @@ class _BaseTest(object):
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _authentication_token=self.authentication_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name))
 
        self.assertEqual(response.json, {u'result': True})
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name, repo_name))
 

	
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name)))
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)))
 
        except vcs.exceptions.VCSError:
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        RepoModel().delete(repo_name)
 
        Session().commit()
 

	
 
    def test_create_in_group(self):
 
        self.log_user()
 

	
 
        ## create GROUP
 
        group_name = u'sometest_%s' % self.REPO_TYPE
 
        gr = RepoGroupModel().create(group_name=group_name,
 
                                     group_description=u'test',
 
                                     owner=TEST_USER_ADMIN_LOGIN)
 
        Session().commit()
 

	
 
        repo_name = u'ingroup'
 
        repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
@@ -111,49 +111,49 @@ class _BaseTest(object):
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
 
        self.assertEqual(response.json, {u'result': True})
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name_full, repo_name_full))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_full).one()
 
        new_repo_id = new_repo.repo_id
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_full)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        self.assertEqual(len(inherited_perms), 1)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name_full)))
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full)))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        RepoModel().delete(repo_name_full)
 
        RepoGroupModel().delete(group_name)
 
        Session().commit()
 

	
 
    def test_create_in_group_without_needed_permissions(self):
 
        usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
 
        # avoid spurious RepoGroup DetachedInstanceError ...
 
        authentication_token = self.authentication_token()
 
        # revoke
 
        user_model = UserModel()
 
        # disable fork and create on default user
 
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.create.repository')
 
        user_model.grant_perm(User.DEFAULT_USER, 'hg.create.none')
 
        user_model.revoke_perm(User.DEFAULT_USER, 'hg.fork.repository')
 
        user_model.grant_perm(User.DEFAULT_USER, 'hg.fork.none')
 

	
 
        # disable on regular user
 
        user_model.revoke_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.repository')
 
        user_model.grant_perm(TEST_USER_REGULAR_LOGIN, 'hg.create.none')
 
@@ -203,49 +203,49 @@ class _BaseTest(object):
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
 
        self.assertEqual(response.json, {u'result': True})
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name_full, repo_name_full))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_full).one()
 
        new_repo_id = new_repo.repo_id
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_full)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        self.assertEqual(len(inherited_perms), 1)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name_full)))
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full)))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        RepoModel().delete(repo_name_full)
 
        RepoGroupModel().delete(group_name)
 
        RepoGroupModel().delete(group_name_allowed)
 
        Session().commit()
 

	
 
    def test_create_in_group_inherit_permissions(self):
 
        self.log_user()
 

	
 
        ## create GROUP
 
        group_name = u'sometest_%s' % self.REPO_TYPE
 
        gr = RepoGroupModel().create(group_name=group_name,
 
                                     group_description=u'test',
 
                                     owner=TEST_USER_ADMIN_LOGIN)
 
        perm = Permission.get_by_key('repository.write')
 
        RepoGroupModel().grant_user_permission(gr, TEST_USER_REGULAR_LOGIN, perm)
 

	
 
        ## add repo permissions
 
        Session().commit()
 

	
 
@@ -260,49 +260,49 @@ class _BaseTest(object):
 
                                                repo_group=gr.group_id,
 
                                                repo_copy_permissions=True,
 
                                                _authentication_token=self.authentication_token()))
 

	
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name_full, repo_name_full))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_full).one()
 
        new_repo_id = new_repo.repo_id
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_full)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name_full)))
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full)))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        #check if inherited permissiona are applied
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        self.assertEqual(len(inherited_perms), 2)
 

	
 
        self.assertTrue(TEST_USER_REGULAR_LOGIN in [x.user.username
 
                                                    for x in inherited_perms])
 
        self.assertTrue('repository.write' in [x.permission.permission_name
 
                                               for x in inherited_perms])
 

	
 
        RepoModel().delete(repo_name_full)
 
        RepoGroupModel().delete(group_name)
 
        Session().commit()
 

	
 
    def test_create_remote_repo_wrong_clone_uri(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = u'description for newly created repo'
 
        response = self.app.post(url('repos'),
 
@@ -337,117 +337,117 @@ class _BaseTest(object):
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_name=repo_name,
 
                                                repo_description=description,
 
                                                _authentication_token=self.authentication_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name))
 
        self.checkSessionFlash(response,
 
                               'Created repository <a href="/%s">%s</a>'
 
                               % (repo_name, repo_name))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name)))
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)))
 
        except vcs.exceptions.VCSError:
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        response = self.app.post(url('delete_repo', repo_name=repo_name),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 

	
 
        self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name))
 

	
 
        response.follow()
 

	
 
        #check if repo was deleted from db
 
        deleted_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name).scalar()
 

	
 
        self.assertEqual(deleted_repo, None)
 

	
 
        self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
 
        self.assertEqual(os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)),
 
                                  False)
 

	
 
    def test_delete_non_ascii(self):
 
        self.log_user()
 
        non_ascii = "ąęł"
 
        repo_name = "%s%s" % (safe_str(self.NEW_REPO), non_ascii)
 
        repo_name_unicode = safe_unicode(repo_name)
 
        description = 'description for newly created repo' + non_ascii
 
        description_unicode = safe_unicode(description)
 
        response = self.app.post(url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _authentication_token=self.authentication_token()))
 
        ## run the check page that triggers the flash message
 
        response = self.app.get(url('repo_check_home', repo_name=repo_name))
 
        self.assertEqual(response.json, {u'result': True})
 
        self.checkSessionFlash(response,
 
                               u'Created repository <a href="/%s">%s</a>'
 
                               % (urllib.quote(repo_name), repo_name_unicode))
 
        # test if the repo was created in the database
 
        new_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_unicode).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_unicode)
 
        self.assertEqual(new_repo.description, description_unicode)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name)))
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_unicode)))
 
        except vcs.exceptions.VCSError:
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        response = self.app.post(url('delete_repo', repo_name=repo_name),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name_unicode))
 
        response.follow()
 

	
 
        #check if repo was deleted from db
 
        deleted_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_unicode).scalar()
 

	
 
        self.assertEqual(deleted_repo, None)
 

	
 
        self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
 
        self.assertEqual(os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_unicode)),
 
                                  False)
 

	
 
    def test_delete_repo_with_group(self):
 
        #TODO:
 
        pass
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('delete_repo', repo_name=self.REPO),
 
                                 params=dict(_method='delete', _authentication_token=self.authentication_token()))
 

	
 
    def test_show(self):
 
        self.log_user()
 
        response = self.app.get(url('summary_home', repo_name=self.REPO))
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_repo', repo_name=self.REPO))
 

	
 
    def test_set_private_flag_sets_default_to_none(self):
 
        self.log_user()
 
        #initially repository perm should be read
 
        perm = _get_permission_for_user(user='default', repo=self.REPO)
 
        self.assertTrue(len(perm), 1)
 
        self.assertEqual(perm[0].permission.permission_name, 'repository.read')
 
        self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False)
 
@@ -581,40 +581,40 @@ class _BaseTest(object):
 

	
 
        RepoModel().delete(repo_name)
 
        Session().commit()
 

	
 
    @mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
 
    def test_create_repo_when_filesystem_op_fails(self):
 
        self.log_user()
 
        repo_name = self.NEW_REPO
 
        description = 'description for newly created repo'
 

	
 
        response = self.app.post(url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _authentication_token=self.authentication_token()))
 

	
 
        self.checkSessionFlash(response,
 
                               'Error creating repository %s' % repo_name)
 
        # repo must not be in db
 
        repo = Repository.get_by_repo_name(repo_name)
 
        self.assertEqual(repo, None)
 

	
 
        # repo must not be in filesystem !
 
        self.assertFalse(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)))
 
        self.assertFalse(os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)))
 

	
 
class TestAdminReposControllerGIT(TestController, _BaseTest):
 
    REPO = GIT_REPO
 
    REPO_TYPE = 'git'
 
    NEW_REPO = NEW_GIT_REPO
 
    OTHER_TYPE_REPO = HG_REPO
 
    OTHER_TYPE = 'hg'
 

	
 

	
 
class TestAdminReposControllerHG(TestController, _BaseTest):
 
    REPO = HG_REPO
 
    REPO_TYPE = 'hg'
 
    NEW_REPO = NEW_HG_REPO
 
    OTHER_TYPE_REPO = GIT_REPO
 
    OTHER_TYPE = 'git'
kallithea/tests/other/test_validators.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
import formencode
 
import tempfile
 

	
 
from kallithea.tests import *
 

	
 
from kallithea.model import validators as v
 
from kallithea.model.user_group import UserGroupModel
 

	
 
from kallithea.model.meta import Session
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.tests.fixture import Fixture
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestRepoGroups(BaseTestCase):
 

	
 
    def setUp(self):
 
        pass
 

	
 
    def tearDown(self):
 
        Session.remove()
 

	
 
    def test_Message_extractor(self):
 
        validator = v.ValidUsername()
 
        self.assertRaises(formencode.Invalid, validator.to_python, 'default')
 
@@ -176,50 +177,50 @@ class TestRepoGroups(BaseTestCase):
 
            #TODO: write this one
 
            pass
 

	
 
    def test_ValidForkType(self):
 
            validator = v.ValidForkType(old_data={'repo_type': 'hg'})
 
            self.assertEqual('hg', validator.to_python('hg'))
 
            self.assertRaises(formencode.Invalid, validator.to_python, 'git')
 

	
 
    def test_ValidPerms(self):
 
            #TODO: write this one
 
            pass
 

	
 
    def test_ValidSettings(self):
 
        validator = v.ValidSettings()
 
        self.assertEqual({'pass': 'pass'},
 
                         validator.to_python(value={'user': 'test',
 
                                                    'pass': 'pass'}))
 

	
 
        self.assertEqual({'user2': 'test', 'pass': 'pass'},
 
                         validator.to_python(value={'user2': 'test',
 
                                                    'pass': 'pass'}))
 

	
 
    def test_ValidPath(self):
 
            validator = v.ValidPath()
 
            self.assertEqual(TESTS_TMP_PATH,
 
                             validator.to_python(TESTS_TMP_PATH))
 
            self.assertEqual(tempfile.gettempdir(),
 
                             validator.to_python(tempfile.gettempdir()))
 
            self.assertRaises(formencode.Invalid, validator.to_python,
 
                              '/no_such_dir')
 

	
 
    def test_UniqSystemEmail(self):
 
        validator = v.UniqSystemEmail(old_data={})
 

	
 
        self.assertEqual('mail@python.org',
 
                         validator.to_python('MaiL@Python.org'))
 

	
 
        email = TEST_USER_REGULAR2_EMAIL
 
        self.assertRaises(formencode.Invalid, validator.to_python, email)
 

	
 
    def test_ValidSystemEmail(self):
 
        validator = v.ValidSystemEmail()
 
        email = TEST_USER_REGULAR2_EMAIL
 

	
 
        self.assertEqual(email, validator.to_python(email))
 
        self.assertRaises(formencode.Invalid, validator.to_python, 'err')
 

	
 
    def test_LdapLibValidator(self):
 
        if ldap_lib_installed:
 
            validator = v.LdapLibValidator()
 
            self.assertEqual("DN", validator.to_python('DN'))
 
        else:
kallithea/tests/scripts/manual_test_concurrency.py
Show inline comments
 
@@ -21,52 +21,52 @@ This file was forked by the Kallithea pr
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Dec 30, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
import os
 
import sys
 
import shutil
 
import logging
 
from os.path import join as jn
 
from os.path import dirname as dn
 

	
 
from tempfile import _RandomNameSequence
 
from subprocess import Popen, PIPE
 

	
 
from paste.deploy import appconfig
 
from sqlalchemy import engine_from_config
 

	
 
from kallithea.lib.utils import add_cache
 
from kallithea.model import init_model
 
from kallithea.model import meta
 
from kallithea.model.db import User, Repository
 
from kallithea.model.db import User, Repository, Ui
 
from kallithea.lib.auth import get_crypt_password
 

	
 
from kallithea.tests import TESTS_TMP_PATH, HG_REPO
 
from kallithea.tests import HG_REPO
 
from kallithea.config.environment import load_environment
 

	
 
rel_path = dn(dn(dn(dn(os.path.abspath(__file__)))))
 
conf = appconfig('config:development.ini', relative_to=rel_path)
 
load_environment(conf.global_conf, conf.local_conf)
 

	
 
add_cache(conf)
 

	
 
USER = TEST_USER_ADMIN_LOGIN
 
PASS = TEST_USER_ADMIN_PASS
 
HOST = 'server.local'
 
METHOD = 'pull'
 
DEBUG = True
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(object):
 

	
 
    def __init__(self, cwd):
 
        self.cwd = cwd
 

	
 
    def execute(self, cmd, *args):
 
        """Runs command on the system with given ``args``.
 
        """
 
@@ -140,49 +140,49 @@ def create_test_repo(force=True):
 
        rm.base_path = '/home/hg'
 
        rm.create(form_data, user)
 

	
 
    print 'done'
 

	
 

	
 
def set_anonymous_access(enable=True):
 
    sa = get_session()
 
    user = sa.query(User).filter(User.username == 'default').one()
 
    user.active = enable
 
    sa.add(user)
 
    sa.commit()
 

	
 

	
 
def get_anonymous_access():
 
    sa = get_session()
 
    return sa.query(User).filter(User.username == 'default').one().active
 

	
 

	
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 
def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD,
 
                                seq=None, backend='hg'):
 
    cwd = path = jn(TESTS_TMP_PATH, repo)
 
    cwd = path = jn(Ui.get_by_key('paths', '/').ui_value, repo)
 

	
 
    if seq is None:
 
        seq = _RandomNameSequence().next()
 

	
 
    try:
 
        shutil.rmtree(path, ignore_errors=True)
 
        os.makedirs(path)
 
        #print 'made dirs %s' % jn(path)
 
    except OSError:
 
        raise
 

	
 
    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
 
                  {'user': USER,
 
                   'pass': PASS,
 
                   'host': HOST,
 
                   'cloned_repo': repo, }
 

	
 
    dest = path + seq
 
    if method == 'pull':
 
        stdout, stderr = Command(cwd).execute(backend, method, '--cwd', dest, clone_url)
 
    else:
 
        stdout, stderr = Command(cwd).execute(backend, method, clone_url, dest)
 
        print stdout,'sdasdsadsa'
 
        if not no_errors:
0 comments (0 inline, 0 general)