Changeset - f6076c096692
[Not reviewed]
default
0 1 0
Thomas De Schampheleire - 8 years ago 2017-07-05 21:17:47
thomas.de.schampheleire@gmail.com
tests: vcs: remove influence of user's hg settings

Certain settings in the user's hgrc file can cause test failures. For
example, enabling a non-existent extension prints stderr messages like

*** failed to import extension journal: No module named journal

Tests that check stderr output for emptiness, like
test_clone_hg_repo_by_admin, then fail.

Instead, avoid all influence of the user's settings, by setting HGRCPATH and
HGPLAIN to empty.

In versions of Mercurial before 4.2, setting an empty HGRCPATH was generally
enough to get expected behavior, but since 4.2 the behavior of some commands
has changed, and a pager is now default. Setting HGPLAIN is a simple way of
neutralizing even that.
1 file changed with 2 insertions and 0 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/other/test_vcs_operations.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Test suite for vcs push/pull operations.
 

	
 
The tests need Git > 1.8.1.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Dec 30, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
import os
 
import re
 
import tempfile
 
import time
 
import pytest
 

	
 
from tempfile import _RandomNameSequence
 
from subprocess import Popen, PIPE
 

	
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.model.db import User, Repository, UserIpMap, CacheInvalidation
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.user import UserModel
 

	
 
DEBUG = True
 
HOST = '127.0.0.1:4999'  # test host
 

	
 
fixture = Fixture()
 

	
 

	
 
class Command(object):
 

	
 
    def __init__(self, cwd):
 
        self.cwd = cwd
 

	
 
    def execute(self, cmd, *args, **environ):
 
        """
 
        Runs command on the system with given ``args``.
 
        """
 

	
 
        command = cmd + ' ' + ' '.join(args)
 
        ignoreReturnCode = environ.pop('ignoreReturnCode', False)
 
        if DEBUG:
 
            print '*** CMD %s ***' % command
 
        testenv = dict(os.environ)
 
        testenv['LANG'] = 'en_US.UTF-8'
 
        testenv['LANGUAGE'] = 'en_US:en'
 
        testenv['HGPLAIN'] = ''
 
        testenv['HGRCPATH'] = ''
 
        testenv.update(environ)
 
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd, env=testenv)
 
        stdout, stderr = p.communicate()
 
        if DEBUG:
 
            if stdout:
 
                print 'stdout:', repr(stdout)
 
            if stderr:
 
                print 'stderr:', repr(stderr)
 
        if not ignoreReturnCode:
 
            assert p.returncode == 0
 
        return stdout, stderr
 

	
 

	
 
def _get_tmp_dir(prefix='vcs_operations-', suffix=''):
 
    return tempfile.mkdtemp(dir=TESTS_TMP_PATH, prefix=prefix, suffix=suffix)
 

	
 

	
 
def _add_files_and_push(webserver, vcs, DEST, ignoreReturnCode=False, files_no=3,
 
                        clone_url=None, username=TEST_USER_ADMIN_LOGIN, password=TEST_USER_ADMIN_PASS):
 
    """
 
    Generate some files, add it to DEST repo and push back
 
    vcs is git or hg and defines what VCS we want to make those files for
 

	
 
    :param vcs:
 
    :param DEST:
 
    """
 
    # commit some stuff into this repo
 
    cwd = os.path.join(DEST)
 
    #added_file = '%ssetupążźć.py' % _RandomNameSequence().next()
 
    added_file = '%ssetup.py' % _RandomNameSequence().next()
 
    Command(cwd).execute('touch %s' % added_file)
 
    Command(cwd).execute('%s add %s' % (vcs, added_file))
 

	
 
    email = 'me@example.com'
 
    if os.name == 'nt':
 
        author_str = 'User <%s>' % email
 
    else:
 
        author_str = 'User ǝɯɐᴎ <%s>' % email
 
    for i in xrange(files_no):
 
        cmd = """echo "added_line%s" >> %s""" % (i, added_file)
 
        Command(cwd).execute(cmd)
 
        if vcs == 'hg':
 
            cmd = """hg commit -m "committed new %s" -u "%s" "%s" """ % (
 
                i, author_str, added_file
 
            )
 
        elif vcs == 'git':
 
            cmd = """git commit -m "committed new %s" --author "%s" "%s" """ % (
 
                i, author_str, added_file
 
            )
 
        # git commit needs EMAIL on some machines
 
        Command(cwd).execute(cmd, EMAIL=email)
 

	
 
    # PUSH it back
 
    _REPO = None
 
    if vcs == 'hg':
 
        _REPO = HG_REPO
 
    elif vcs == 'git':
 
        _REPO = GIT_REPO
 

	
 
    if clone_url is None:
 
        clone_url = webserver.repo_url(_REPO, username=username, password=password)
 

	
 
    stdout = stderr = None
 
    if vcs == 'hg':
 
        stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url, ignoreReturnCode=ignoreReturnCode)
 
    elif vcs == 'git':
 
        stdout, stderr = Command(cwd).execute('git push --verbose', clone_url, "master", ignoreReturnCode=ignoreReturnCode)
 

	
 
    return stdout, stderr
 

	
 

	
 
def set_anonymous_access(enable=True):
 
    user = User.get_default_user()
 
    user.active = enable
 
    Session().commit()
 
    print '\tanonymous access is now:', enable
 
    if enable != User.get_default_user().active:
 
        raise Exception('Cannot set anonymous access')
 

	
 

	
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 

	
 

	
 
def _check_proper_git_push(stdout, stderr):
 
    #WTF Git stderr is output ?!
 
    assert 'fatal' not in stderr
 
    assert 'rejected' not in stderr
 
    assert 'Pushing to' in stderr
 
    assert 'master -> master' in stderr
 

	
 

	
 
@pytest.mark.usefixtures("test_context_fixture")
 
class TestVCSOperations(TestController):
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        #DISABLE ANONYMOUS ACCESS
 
        set_anonymous_access(False)
 

	
 
    def setup_method(self, method):
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        Repository.unlock(r)
 
        r.enable_locking = False
 
        Session().commit()
 

	
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        Repository.unlock(r)
 
        r.enable_locking = False
 
        Session().commit()
 

	
 
    def test_clone_hg_repo_by_admin(self, webserver):
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir())
 

	
 
        assert 'requesting all changes' in stdout
 
        assert 'adding changesets' in stdout
 
        assert 'adding manifests' in stdout
 
        assert 'adding file changes' in stdout
 

	
 
        assert stderr == ''
 

	
 
    def test_clone_git_repo_by_admin(self, webserver):
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir())
 

	
 
        assert 'Cloning into' in stdout + stderr
 
        assert stderr == '' or stdout == ''
 

	
 
    def test_clone_wrong_credentials_hg(self, webserver):
 
        clone_url = webserver.repo_url(HG_REPO, password='bad!')
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'abort: authorization failed' in stderr
 

	
 
    def test_clone_wrong_credentials_git(self, webserver):
 
        clone_url = webserver.repo_url(GIT_REPO, password='bad!')
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
    def test_clone_git_dir_as_hg(self, webserver):
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_hg_repo_as_git(self, webserver):
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'not found' in stderr
 

	
 
    def test_clone_non_existing_path_hg(self, webserver):
 
        clone_url = webserver.repo_url('trololo')
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_non_existing_path_git(self, webserver):
 
        clone_url = webserver.repo_url('trololo')
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, _get_tmp_dir(), ignoreReturnCode=True)
 
        assert 'not found' in stderr
 

	
 
    def test_push_new_file_hg(self, webserver):
 
        DEST = _get_tmp_dir()
 
        clone_url = webserver.repo_url(HG_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('hg clone', clone_url, DEST)
 

	
 
        fork_name = '%s_fork%s' % (HG_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(HG_REPO, fork_name)
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = _add_files_and_push(webserver, 'hg', DEST, clone_url=clone_url)
 

	
 
        assert 'pushing to' in stdout
 
        assert 'Repository size' in stdout
 
        assert 'Last revision is now' in stdout
 

	
 
    def test_push_new_file_git(self, webserver):
 
        DEST = _get_tmp_dir()
 
        clone_url = webserver.repo_url(GIT_REPO)
 
        stdout, stderr = Command(TESTS_TMP_PATH).execute('git clone', clone_url, DEST)
 

	
 
        # commit some stuff into this repo
 
        fork_name = '%s_fork%s' % (GIT_REPO, _RandomNameSequence().next())
 
        fixture.create_fork(GIT_REPO, fork_name)
 
        clone_url = webserver.repo_url(fork_name)
 
        stdout, stderr = _add_files_and_push(webserver, 'git', DEST, clone_url=clone_url)
 
        print [(x.repo_full_path,x.repo_path) for x in Repository.query()] # TODO: what is this for
 
        _check_proper_git_push(stdout, stderr)
 

	
 
    def test_push_invalidates_cache_hg(self, webserver):
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               ==HG_REPO).scalar()
 
        if not key:
 
            key = CacheInvalidation(HG_REPO, HG_REPO)
0 comments (0 inline, 0 general)