Changeset - 522cfb2be9e1
[Not reviewed]
default
0 2 0
domruf - 8 years ago 2017-11-26 13:12:46
dominikruf@gmail.com
Grafted from: 52eea71e0697
tests: add custom hook tests

Git testing is omitted for now - errors are not reported as they are for Hg.
2 files changed with 113 insertions and 1 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/fixture.py
Show inline comments
 
@@ -371,48 +371,62 @@ def create_test_env(repos_test_path, con
 
    dbmanage.populate_default_permissions()
 
    Session().commit()
 
    # PART TWO make test repo
 
    log.debug('making test vcs repositories')
 

	
 
    idx_path = config['index_dir']
 
    data_path = config['cache_dir']
 

	
 
    # clean index and data
 
    if idx_path and os.path.exists(idx_path):
 
        log.debug('remove %s', idx_path)
 
        shutil.rmtree(idx_path)
 

	
 
    if data_path and os.path.exists(data_path):
 
        log.debug('remove %s', data_path)
 
        shutil.rmtree(data_path)
 

	
 
    # CREATE DEFAULT TEST REPOS
 
    tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_hg.tar.gz'))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
    tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_git.tar.gz'))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, GIT_REPO))
 
    tar.close()
 

	
 
    # LOAD VCS test stuff
 
    from kallithea.tests.vcs import setup_package
 
    setup_package()
 

	
 

	
 
def create_test_index(repo_location, config, full_index):
 
    """
 
    Makes default test index
 
    """
 

	
 
    from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
    from kallithea.lib.pidlock import DaemonLock, LockHeld
 

	
 
    index_location = os.path.join(config['index_dir'])
 
    if not os.path.exists(index_location):
 
        os.makedirs(index_location)
 

	
 
    l = DaemonLock(os.path.join(index_location, 'make_index.lock'))
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location) \
 
        .run(full_index=full_index)
 
    l.release()
 

	
 

	
 
def failing_test_hook(ui, repo, **kwargs):
 
    ui.write("failing_test_hook failed\n")
 
    return 1
 

	
 

	
 
def exception_test_hook(ui, repo, **kwargs):
 
    raise Exception("exception_test_hook threw an exception")
 

	
 

	
 
def passing_test_hook(ui, repo, **kwargs):
 
    ui.write("passing_test_hook succeeded\n")
 
    return 0
kallithea/tests/other/test_vcs_operations.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Test suite for vcs push/pull operations.
 

	
 
The tests need Git > 1.8.1.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Dec 30, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
import os
 
import re
 
import tempfile
 
import time
 
import pytest
 

	
 
from tempfile import _RandomNameSequence
 
from subprocess import Popen, PIPE
 

	
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.model.db import User, Repository, UserIpMap, CacheInvalidation
 
from kallithea.model.db import User, Repository, UserIpMap, CacheInvalidation, Ui
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.user import UserModel
 

	
 
DEBUG = True
 
HOST = '127.0.0.1:4999'  # test host
 

	
 
fixture = Fixture()
 

	
 

	
 
class Command(object):
 

	
 
    def __init__(self, cwd):
 
        self.cwd = cwd
 

	
 
    def execute(self, cmd, *args, **environ):
 
        """
 
        Runs command on the system with given ``args``.
 
        """
 

	
 
        command = cmd + ' ' + ' '.join(args)
 
        ignoreReturnCode = environ.pop('ignoreReturnCode', False)
 
        if DEBUG:
 
            print '*** CMD %s ***' % command
 
        testenv = dict(os.environ)
 
        testenv['LANG'] = 'en_US.UTF-8'
 
        testenv['LANGUAGE'] = 'en_US:en'
 
        testenv['HGPLAIN'] = ''
 
        testenv['HGRCPATH'] = ''
 
        testenv.update(environ)
 
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv)
 
        stdout, stderr = p.communicate()
 
        if DEBUG:
 
            if stdout:
 
                print 'stdout:', repr(stdout)
 
            if stderr:
 
                print 'stderr:', repr(stderr)
 
        if not ignoreReturnCode:
 
            assert p.returncode == 0
 
        return stdout, stderr
 

	
 

	
 
def _get_tmp_dir(prefix='vcs_operations-', suffix=''):
 
    return tempfile.mkdtemp(dir=TESTS_TMP_PATH, prefix=prefix, suffix=suffix)
 

	
 

	
 
def _add_files(vcs, dest_dir, files_no=3):
 
    """
 
@@ -91,137 +91,165 @@ def _add_files(vcs, dest_dir, files_no=3
 
    :param vcs:
 
    :param dest_dir:
 
    """
 
    added_file = '%ssetup.py' % _RandomNameSequence().next()
 
    open(os.path.join(dest_dir, added_file), 'a').close()
 
    Command(dest_dir).execute('%s add %s' % (vcs, added_file))
 

	
 
    email = 'me@example.com'
 
    if os.name == 'nt':
 
        author_str = 'User <%s>' % email
 
    else:
 
        author_str = 'User ǝɯɐᴎ <%s>' % email
 
    for i in xrange(files_no):
 
        cmd = """echo "added_line%s" >> %s""" % (i, added_file)
 
        Command(dest_dir).execute(cmd)
 
        if vcs == 'hg':
 
            cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % (
 
                i, author_str, added_file
 
            )
 
        elif vcs == 'git':
 
            cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % (
 
                i, author_str, added_file
 
            )
 
        # git commit needs EMAIL on some machines
 
        Command(dest_dir).execute(cmd, EMAIL=email)
 

	
 
def _add_files_and_push(webserver, vcs, dest_dir, ignoreReturnCode=False, files_no=3,
 
                            clone_url=None, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
 
    _add_files(vcs, dest_dir, files_no=files_no)
 
    # PUSH it back
 
    _REPO = None
 
    if vcs == 'hg':
 
        _REPO = HG_REPO
 
    elif vcs == 'git':
 
        _REPO = GIT_REPO
 

	
 
    if clone_url is None:
 
        clone_url = webserver.repo_url(_REPO, username=username, password=password)
 

	
 
    stdout = stderr = None
 
    if vcs == 'hg':
 
        stdout, stderr = Command(dest_dir).execute('hg push --verbose', clone_url, ignoreReturnCode=ignoreReturnCode)
 
    elif vcs == 'git':
 
        stdout, stderr = Command(dest_dir).execute('git push --verbose', clone_url, "master", ignoreReturnCode=ignoreReturnCode)
 

	
 
    return stdout, stderr
 

	
 

	
 
def _check_outgoing(vcs, cwd, clone_url=''):
 
    if vcs == 'hg':
 
        # hg removes the password from default URLs, so we have to provide it here via the clone_url
 
        return Command(cwd).execute('hg -q outgoing', clone_url, ignoreReturnCode=True)
 
    elif vcs == 'git':
 
        Command(cwd).execute('git remote update')
 
        return Command(cwd).execute('git log origin/master..master')
 

	
 

	
 
def set_anonymous_access(enable=True):
 
    user = User.get_default_user()
 
    user.active = enable
 
    Session().commit()
 
    print '\tanonymous access is now:', enable
 
    if enable != User.get_default_user().active:
 
        raise Exception('Cannot set anonymous access')
 

	
 

	
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 

	
 

	
 
def _check_proper_git_push(stdout, stderr):
 
    # WTF Git stderr is output ?!
 
    assert 'fatal' not in stderr
 
    assert 'rejected' not in stderr
 
    assert 'Pushing to' in stderr
 
    assert 'master -> master' in stderr
 

	
 

	
 
@pytest.mark.usefixtures("test_context_fixture")
 
class TestVCSOperations(TestController):
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        # DISABLE ANONYMOUS ACCESS
 
        set_anonymous_access(False)
 

	
 
    def setup_method(self, method):
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        Repository.unlock(r)
 
        r.enable_locking = False
 
        Session().commit()
 

	
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        Repository.unlock(r)
 
        r.enable_locking = False
 
        Session().commit()
 

	
 
    @pytest.fixture()
 
    def testhook_cleanup(self):
 
        yield
 
        # remove hook
 
        for hook in ['prechangegroup', 'pretxnchangegroup', 'preoutgoing', 'changegroup', 'outgoing', 'incoming']:
 
            entry = Ui.get_by_key('hooks', '%s.testhook' % hook)
 
            if entry:
 
                Session().delete(entry)
 
        Session().commit()
 

	
 
    @pytest.fixture(scope="module")
 
    def testfork(self):
 
        # create fork so the repo stays untouched
 
        git_fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(GIT_REPO, git_fork_name)
 
        hg_fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(HG_REPO, hg_fork_name)
 
        return {'git': git_fork_name, 'hg': hg_fork_name}
 

	
 
    def test_clone_hg_repo_by_admin(self, webserver):
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir())
 

	
 
        assert 'requesting all changes' in stdout
 
        assert 'adding changesets' in stdout
 
        assert 'adding manifests' in stdout
 
        assert 'adding file changes' in stdout
 

	
 
        assert stderr == ''
 

	
 
    def test_clone_git_repo_by_admin(self, webserver):
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir())
 

	
 
        assert 'Cloning into' in stdout + stderr
 
        assert stderr == '' or stdout == ''
 

	
 
    def test_clone_wrong_credentials_hg(self, webserver):
 
        clone_url = webserver.repo_url(HG_REPO, password='bad!')
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'abort: authorization failed' in stderr
 

	
 
    def test_clone_wrong_credentials_git(self, webserver):
 
        clone_url = webserver.repo_url(GIT_REPO, password='bad!')
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
    def test_clone_git_dir_as_hg(self, webserver):
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_hg_repo_as_git(self, webserver):
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'not found' in stderr
 

	
 
    def test_clone_non_existing_path_hg(self, webserver):
 
        clone_url = webserver.repo_url('trololo')
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_non_existing_path_git(self, webserver):
 
        clone_url = webserver.repo_url('trololo')
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'not found' in stderr
 

	
 
@@ -524,48 +552,118 @@ class TestVCSOperations(TestController):
 
        try:
 
            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
 
            Session().commit()
 
            clone_url = webserver.repo_url(HG_REPO)
 
            stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
            assert 'abort: HTTP Error 403: Forbidden' in stderr
 
        finally:
 
            # release IP restrictions
 
            for ip in UserIpMap.query():
 
                UserIpMap.delete(ip.ip_id)
 
            Session().commit()
 

	
 
        # IP permissions are cached, need to wait for the cache in the server process to expire
 
        time.sleep(1.5)
 

	
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir())
 

	
 
        assert 'requesting all changes' in stdout
 
        assert 'adding changesets' in stdout
 
        assert 'adding manifests' in stdout
 
        assert 'adding file changes' in stdout
 

	
 
        assert stderr == ''
 

	
 
    def test_ip_restriction_git(self, webserver):
 
        user_model = UserModel()
 
        try:
 
            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
 
            Session().commit()
 
            clone_url = webserver.repo_url(GIT_REPO)
 
            stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
            # The message apparently changed in Git 1.8.3, so match it loosely.
 
            assert re.search(r'\b403\b', stderr)
 
        finally:
 
            # release IP restrictions
 
            for ip in UserIpMap.query():
 
                UserIpMap.delete(ip.ip_id)
 
            Session().commit()
 

	
 
        # IP permissions are cached, need to wait for the cache in the server process to expire
 
        time.sleep(1.5)
 

	
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir())
 

	
 
        assert 'Cloning into' in stdout + stderr
 
        assert stderr == '' or stdout == ''
 

	
 
    @parametrize('repo_type, repo_name', [
 
                #('git',      GIT_REPO), # git hooks doesn't work like hg hooks
 
                ('hg',       HG_REPO),
 
    ])
 
    def test_custom_hooks_preoutgoing(self, testhook_cleanup, webserver, testfork, repo_type, repo_name):
 
        # set prechangegroup to failing hook (returns True)
 
        Ui.create_or_update_hook('preoutgoing.testhook', 'python:kallithea.tests.fixture.failing_test_hook')
 
        Session().commit()
 
        # clone repo
 
        clone_url = webserver.repo_url(testfork[repo_type], username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS)
 
        dest_dir = _get_tmp_dir()
 
        stdout, stderr = Command(TESTS_TMP_PATH)\
 
            .execute('%s clone' % repo_type, clone_url, dest_dir, ignoreReturnCode=True)
 
        if repo_type == 'hg':
 
            assert 'preoutgoing.testhook hook failed' in stdout
 
        elif repo_type == 'git':
 
            assert 'error: 406' in stderr
 

	
 
    @parametrize('repo_type, repo_name', [
 
                #('git',      GIT_REPO), # git hooks doesn't work like hg hooks
 
                ('hg',       HG_REPO),
 
    ])
 
    def test_custom_hooks_prechangegroup(self, testhook_cleanup, webserver, testfork, repo_type, repo_name):
 

	
 
        # set prechangegroup to failing hook (returns True)
 
        Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.failing_test_hook')
 
        Session().commit()
 
        # clone repo
 
        clone_url = webserver.repo_url(testfork[repo_type], username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS)
 
        dest_dir = _get_tmp_dir()
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('%s clone' % repo_type, clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, repo_type, dest_dir,
 
                                             username=TEST_USER_ADMIN_LOGIN,
 
                                             password=TEST_USER_ADMIN_PASS,
 
                                             ignoreReturnCode=True)
 
        assert 'failing_test_hook failed' in stdout + stderr
 
        assert 'Traceback' not in stdout + stderr
 
        assert 'prechangegroup.testhook hook failed' in stdout + stderr
 
        # there are still outgoing changesets
 
        stdout, stderr = _check_outgoing(repo_type, dest_dir, clone_url)
 
        assert stdout != ''
 

	
 
        # set prechangegroup hook to exception throwing method
 
        Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.exception_test_hook')
 
        Session().commit()
 
        # re-try to push
 
        stdout, stderr = Command(dest_dir).execute('%s push' % repo_type, clone_url, ignoreReturnCode=True)
 
        if repo_type == 'hg':
 
            # like with 'hg serve...' 'HTTP Error 500: INTERNAL SERVER ERROR' should be returned
 
            assert 'HTTP Error 500: INTERNAL SERVER ERROR' in stderr
 
        elif repo_type == 'git':
 
            assert 'exception_test_hook threw an exception' in stderr
 
        # there are still outgoing changesets
 
        stdout, stderr = _check_outgoing(repo_type, dest_dir, clone_url)
 
        assert stdout != ''
 

	
 
        # set prechangegroup hook to method that returns False
 
        Ui.create_or_update_hook('prechangegroup.testhook', 'python:kallithea.tests.fixture.passing_test_hook')
 
        Session().commit()
 
        # re-try to push
 
        stdout, stderr = Command(dest_dir).execute('%s push' % repo_type, clone_url, ignoreReturnCode=True)
 
        assert 'passing_test_hook succeeded' in stdout + stderr
 
        assert 'Traceback' not in stdout + stderr
 
        assert 'prechangegroup.testhook hook failed' not in stdout + stderr
 
        # no more outgoing changesets
 
        stdout, stderr = _check_outgoing(repo_type, dest_dir, clone_url)
 
        assert stdout == ''
 
        assert stderr == ''
0 comments (0 inline, 0 general)