Changeset - 8568a1d4f100
[Not reviewed]
default
0 4 0
domruf - 10 years ago 2016-01-28 20:40:32
dominikruf@gmail.com
tests: don't use the TESTS_TMP_PATH string

it differs each time you run the test
and therefore doesn't work if you use a separat kallithea test instance
(KALLITHEA_NO_TMP_PATH=1)

instead get the 'real' path from the DB

FIXME: breaks kallithea/tests/functional/test_admin_repos.py
4 files changed with 18 insertions and 17 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -24,25 +24,25 @@ import mock
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib.compat import json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.meta import Session
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.db import Repository, User, Setting
 
from kallithea.model.db import Repository, User, Setting, Ui
 
from kallithea.lib.utils2 import time_to_datetime
 

	
 

	
 
API_URL = '/_admin/api'
 
TEST_USER_GROUP = u'test_user_group'
 
TEST_REPO_GROUP = u'test_repo_group'
 

	
 
fixture = Fixture()
 

	
 

	
 
def _build_data(apikey, method, **kw):
 
    """
 
@@ -271,25 +271,25 @@ class _BaseTestApi(object):
 

	
 
    def test_api_get_user_with_giving_userid_non_admin(self):
 
        id_, params = _build_data(self.apikey_regular, 'get_user',
 
                                  userid=self.TEST_USER_LOGIN)
 
        response = api_call(self, params)
 

	
 
        expected = 'userid is not the same as your user'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_pull(self):
 
        repo_name = u'test_pull'
 
        r = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
 
        r.clone_uri = os.path.join(TESTS_TMP_PATH, self.REPO)
 
        r.clone_uri = os.path.join(Ui.get_by_key('paths', '/').ui_value, self.REPO)
 
        Session.add(r)
 
        Session.commit()
 

	
 
        id_, params = _build_data(self.apikey, 'pull',
 
                                  repoid=repo_name,)
 
        response = api_call(self, params)
 

	
 
        expected = {'msg': 'Pulled from `%s`' % repo_name,
 
                    'repository': repo_name}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
        fixture.destroy_repo(repo_name)
kallithea/tests/functional/test_admin_repos.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
import os
 
import mock
 
import urllib
 

	
 
from kallithea.lib import vcs
 
from kallithea.lib.utils2 import safe_str, safe_unicode
 
from kallithea.model.db import Repository, RepoGroup, UserRepoToPerm, User, \
 
    Permission
 
    Permission, Ui
 
from kallithea.model.user import UserModel
 
from kallithea.tests import *
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.meta import Session
 
from kallithea.tests.fixture import Fixture, error_function
 

	
 
fixture = Fixture()
 

	
 

	
 
def _get_permission_for_user(user, repo):
 
    perm = UserRepoToPerm.query() \
 
@@ -71,25 +71,25 @@ class _BaseTest(object):
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name)))
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)))
 
        except vcs.exceptions.VCSError:
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        RepoModel().delete(repo_name)
 
        Session().commit()
 

	
 
    def test_create_in_group(self):
 
        self.log_user()
 

	
 
        ## create GROUP
 
        group_name = u'sometest_%s' % self.REPO_TYPE
 
        gr = RepoGroupModel().create(group_name=group_name,
 
@@ -123,25 +123,25 @@ class _BaseTest(object):
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        self.assertEqual(len(inherited_perms), 1)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name_full)))
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full)))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        RepoModel().delete(repo_name_full)
 
        RepoGroupModel().delete(group_name)
 
        Session().commit()
 

	
 
    def test_create_in_group_without_needed_permissions(self):
 
        usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
 
        # avoid spurious RepoGroup DetachedInstanceError ...
 
@@ -215,25 +215,25 @@ class _BaseTest(object):
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        self.assertEqual(len(inherited_perms), 1)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name_full)))
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full)))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        RepoModel().delete(repo_name_full)
 
        RepoGroupModel().delete(group_name)
 
        RepoGroupModel().delete(group_name_allowed)
 
        Session().commit()
 

	
 
    def test_create_in_group_inherit_permissions(self):
 
        self.log_user()
 
@@ -272,25 +272,25 @@ class _BaseTest(object):
 
        new_repo_id = new_repo.repo_id
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_full)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name_full))
 
        response.mustcontain(repo_name_full)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name_full)))
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_full)))
 
        except vcs.exceptions.VCSError:
 
            RepoGroupModel().delete(group_name)
 
            Session().commit()
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        #check if inherited permissiona are applied
 
        inherited_perms = UserRepoToPerm.query() \
 
            .filter(UserRepoToPerm.repository_id == new_repo_id).all()
 
        self.assertEqual(len(inherited_perms), 2)
 

	
 
        self.assertTrue(TEST_USER_REGULAR_LOGIN in [x.user.username
 
                                                    for x in inherited_perms])
 
@@ -349,42 +349,42 @@ class _BaseTest(object):
 
            .filter(Repository.repo_name == repo_name).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name)
 
        self.assertEqual(new_repo.description, description)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name)))
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)))
 
        except vcs.exceptions.VCSError:
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        response = self.app.post(url('delete_repo', repo_name=repo_name),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 

	
 
        self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name))
 

	
 
        response.follow()
 

	
 
        #check if repo was deleted from db
 
        deleted_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name).scalar()
 

	
 
        self.assertEqual(deleted_repo, None)
 

	
 
        self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
 
        self.assertEqual(os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)),
 
                                  False)
 

	
 
    def test_delete_non_ascii(self):
 
        self.log_user()
 
        non_ascii = "ąęł"
 
        repo_name = "%s%s" % (safe_str(self.NEW_REPO), non_ascii)
 
        repo_name_unicode = safe_unicode(repo_name)
 
        description = 'description for newly created repo' + non_ascii
 
        description_unicode = safe_unicode(description)
 
        response = self.app.post(url('repos'),
 
                        fixture._get_repo_create_params(repo_private=False,
 
                                                repo_name=repo_name,
 
@@ -402,40 +402,40 @@ class _BaseTest(object):
 
            .filter(Repository.repo_name == repo_name_unicode).one()
 

	
 
        self.assertEqual(new_repo.repo_name, repo_name_unicode)
 
        self.assertEqual(new_repo.description, description_unicode)
 

	
 
        # test if the repository is visible in the list ?
 
        response = self.app.get(url('summary_home', repo_name=repo_name))
 
        response.mustcontain(repo_name)
 
        response.mustcontain(self.REPO_TYPE)
 

	
 
        # test if the repository was created on filesystem
 
        try:
 
            vcs.get_repo(safe_str(os.path.join(TESTS_TMP_PATH, repo_name)))
 
            vcs.get_repo(safe_str(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_unicode)))
 
        except vcs.exceptions.VCSError:
 
            pytest.fail('no repo %s in filesystem' % repo_name)
 

	
 
        response = self.app.post(url('delete_repo', repo_name=repo_name),
 
            params={'_method': 'delete', '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'Deleted repository %s' % (repo_name_unicode))
 
        response.follow()
 

	
 
        #check if repo was deleted from db
 
        deleted_repo = Session().query(Repository) \
 
            .filter(Repository.repo_name == repo_name_unicode).scalar()
 

	
 
        self.assertEqual(deleted_repo, None)
 

	
 
        self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
 
        self.assertEqual(os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name_unicode)),
 
                                  False)
 

	
 
    def test_delete_repo_with_group(self):
 
        #TODO:
 
        pass
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('delete_repo', repo_name=self.REPO),
 
                                 params=dict(_method='delete', _authentication_token=self.authentication_token()))
 

	
 
    def test_show(self):
 
        self.log_user()
 
@@ -593,25 +593,25 @@ class _BaseTest(object):
 
                                                repo_name=repo_name,
 
                                                repo_type=self.REPO_TYPE,
 
                                                repo_description=description,
 
                                                _authentication_token=self.authentication_token()))
 

	
 
        self.checkSessionFlash(response,
 
                               'Error creating repository %s' % repo_name)
 
        # repo must not be in db
 
        repo = Repository.get_by_repo_name(repo_name)
 
        self.assertEqual(repo, None)
 

	
 
        # repo must not be in filesystem !
 
        self.assertFalse(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)))
 
        self.assertFalse(os.path.isdir(os.path.join(Ui.get_by_key('paths', '/').ui_value, repo_name)))
 

	
 
class TestAdminReposControllerGIT(TestController, _BaseTest):
 
    REPO = GIT_REPO
 
    REPO_TYPE = 'git'
 
    NEW_REPO = NEW_GIT_REPO
 
    OTHER_TYPE_REPO = HG_REPO
 
    OTHER_TYPE = 'hg'
 

	
 

	
 
class TestAdminReposControllerHG(TestController, _BaseTest):
 
    REPO = HG_REPO
 
    REPO_TYPE = 'hg'
kallithea/tests/other/test_validators.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
import formencode
 
import tempfile
 

	
 
from kallithea.tests import *
 

	
 
from kallithea.model import validators as v
 
from kallithea.model.user_group import UserGroupModel
 

	
 
from kallithea.model.meta import Session
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.tests.fixture import Fixture
 

	
 
fixture = Fixture()
 

	
 
@@ -188,26 +189,26 @@ class TestRepoGroups(BaseTestCase):
 
    def test_ValidSettings(self):
 
        validator = v.ValidSettings()
 
        self.assertEqual({'pass': 'pass'},
 
                         validator.to_python(value={'user': 'test',
 
                                                    'pass': 'pass'}))
 

	
 
        self.assertEqual({'user2': 'test', 'pass': 'pass'},
 
                         validator.to_python(value={'user2': 'test',
 
                                                    'pass': 'pass'}))
 

	
 
    def test_ValidPath(self):
 
            validator = v.ValidPath()
 
            self.assertEqual(TESTS_TMP_PATH,
 
                             validator.to_python(TESTS_TMP_PATH))
 
            self.assertEqual(tempfile.gettempdir(),
 
                             validator.to_python(tempfile.gettempdir()))
 
            self.assertRaises(formencode.Invalid, validator.to_python,
 
                              '/no_such_dir')
 

	
 
    def test_UniqSystemEmail(self):
 
        validator = v.UniqSystemEmail(old_data={})
 

	
 
        self.assertEqual('mail@python.org',
 
                         validator.to_python('MaiL@Python.org'))
 

	
 
        email = TEST_USER_REGULAR2_EMAIL
 
        self.assertRaises(formencode.Invalid, validator.to_python, email)
 

	
kallithea/tests/scripts/manual_test_concurrency.py
Show inline comments
 
@@ -33,28 +33,28 @@ import logging
 
from os.path import join as jn
 
from os.path import dirname as dn
 

	
 
from tempfile import _RandomNameSequence
 
from subprocess import Popen, PIPE
 

	
 
from paste.deploy import appconfig
 
from sqlalchemy import engine_from_config
 

	
 
from kallithea.lib.utils import add_cache
 
from kallithea.model import init_model
 
from kallithea.model import meta
 
from kallithea.model.db import User, Repository
 
from kallithea.model.db import User, Repository, Ui
 
from kallithea.lib.auth import get_crypt_password
 

	
 
from kallithea.tests import TESTS_TMP_PATH, HG_REPO
 
from kallithea.tests import HG_REPO
 
from kallithea.config.environment import load_environment
 

	
 
rel_path = dn(dn(dn(dn(os.path.abspath(__file__)))))
 
conf = appconfig('config:development.ini', relative_to=rel_path)
 
load_environment(conf.global_conf, conf.local_conf)
 

	
 
add_cache(conf)
 

	
 
USER = TEST_USER_ADMIN_LOGIN
 
PASS = TEST_USER_ADMIN_PASS
 
HOST = 'server.local'
 
METHOD = 'pull'
 
@@ -152,25 +152,25 @@ def set_anonymous_access(enable=True):
 

	
 

	
 
def get_anonymous_access():
 
    sa = get_session()
 
    return sa.query(User).filter(User.username == 'default').one().active
 

	
 

	
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 
def test_clone_with_credentials(no_errors=False, repo=HG_REPO, method=METHOD,
 
                                seq=None, backend='hg'):
 
    cwd = path = jn(TESTS_TMP_PATH, repo)
 
    cwd = path = jn(Ui.get_by_key('paths', '/').ui_value, repo)
 

	
 
    if seq is None:
 
        seq = _RandomNameSequence().next()
 

	
 
    try:
 
        shutil.rmtree(path, ignore_errors=True)
 
        os.makedirs(path)
 
        #print 'made dirs %s' % jn(path)
 
    except OSError:
 
        raise
 

	
 
    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
0 comments (0 inline, 0 general)