Changeset - 6d904a0cd48d
[Not reviewed]
beta
1 2 1
Marcin Kuzminski - 13 years ago 2012-08-28 09:28:09
marcin@python-works.com
added new suite of tests for VCS operations
- locking tests
- push pull tests
- added new ENV option to bypass re-creation of test enviroment
3 files changed with 304 insertions and 117 deletions:
0 comments (0 inline, 0 general)
rhodecode/config/environment.py
Show inline comments
 
@@ -78,7 +78,10 @@ def load_environment(global_conf, app_co
 

	
 
        from rhodecode.lib.utils import create_test_env, create_test_index
 
        from rhodecode.tests import  TESTS_TMP_PATH
 
        create_test_env(TESTS_TMP_PATH, config)
 
        # set RC_NO_TMP_PATH=1 to disable re-creating the database and
 
        # test repos
 
        if not int(os.environ.get('RC_NO_TMP_PATH', 0)):
 
            create_test_env(TESTS_TMP_PATH, config)
 
        # set RC_WHOOSH_TEST_DISABLE=1 to disable whoosh index during tests
 
        if not int(os.environ.get('RC_WHOOSH_TEST_DISABLE', 0)):
 
            create_test_index(TESTS_TMP_PATH, config, True)
rhodecode/lib/base.py
Show inline comments
 
@@ -171,7 +171,7 @@ class BaseVCSController(object):
 
        do the locking. Think about this as signals passed to hooks what to do.
 

	
 
        """
 
        locked = False
 
        locked = False  # defines that locked error should be thrown to user
 
        make_lock = None
 
        repo = Repository.get_by_repo_name(repo)
 
        user = User.get(user_id)
 
@@ -187,7 +187,7 @@ class BaseVCSController(object):
 
                #check if it's already locked !, if it is compare users
 
                user_id, _date = repo.locked
 
                if user.user_id == user_id:
 
                    log.debug('Got push from user, now unlocking' % (user))
 
                    log.debug('Got push from user %s, now unlocking' % (user))
 
                    # unlock if we have push from user who locked
 
                    make_lock = False
 
                else:
 
@@ -202,7 +202,8 @@ class BaseVCSController(object):
 

	
 
        else:
 
            log.debug('Repository %s do not have locking enabled' % (repo))
 

	
 
        log.debug('FINAL locking values make_lock:%s,locked:%s,locked_by:%s'
 
                  % (make_lock, locked, locked_by))
 
        return make_lock, locked, locked_by
 

	
 
    def __call__(self, environ, start_response):
rhodecode/tests/scripts/test_vcs_operations.py
Show inline comments
 
file renamed from rhodecode/tests/scripts/test_scm_operations.py to rhodecode/tests/scripts/test_vcs_operations.py
 
@@ -6,7 +6,7 @@
 
    Test suite for making push/pull operations.
 
    Run using::
 

	
 
     RC_WHOOSH_TEST_DISABLE=1 nosetests rhodecode/tests/scripts/test_scm_operations.py
 
     RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py
 

	
 
    :created_on: Dec 30, 2010
 
    :author: marcink
 
@@ -28,6 +28,7 @@
 

	
 
import os
 
import tempfile
 
import unittest
 
from os.path import join as jn
 
from os.path import dirname as dn
 

	
 
@@ -37,6 +38,7 @@ from subprocess import Popen, PIPE
 
from rhodecode.tests import *
 
from rhodecode.model.db import User, Repository, UserLog
 
from rhodecode.model.meta import Session
 
from rhodecode.model.repo import RepoModel
 

	
 
DEBUG = True
 
HOST = '127.0.0.1:5000'  # test host
 
@@ -85,6 +87,51 @@ def _construct_url(repo, dest=None, **kw
 
    return _url
 

	
 

	
 
def _add_files_and_push(vcs, DEST, **kwargs):
 
    """
 
    Generate some files, add it to DEST repo and push back
 
    vcs is git or hg and defines what VCS we want to make those files for
 

	
 
    :param vcs:
 
    :param DEST:
 
    """
 
    # commit some stuff into this repo
 
    cwd = path = jn(DEST)
 
    #added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
 
    added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next())
 
    Command(cwd).execute('touch %s' % added_file)
 
    Command(cwd).execute('%s add %s' % (vcs, added_file))
 

	
 
    for i in xrange(3):
 
        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
 
        Command(cwd).execute(cmd)
 
        if vcs == 'hg':
 
            cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % (
 
                i, 'Marcin Kuźminski <marcin@python-blog.com>', added_file
 
            )
 
        elif vcs == 'git':
 
            cmd = """git ci -m 'commited new %s' --author '%s' %s """ % (
 
                i, 'Marcin Kuźminski <marcin@python-blog.com>', added_file
 
            )
 
        Command(cwd).execute(cmd)
 
    # PUSH it back
 
    if vcs == 'hg':
 
        _REPO = HG_REPO
 
    elif vcs == 'git':
 
        _REPO = GIT_REPO
 

	
 
    kwargs['dest'] = ''
 
    clone_url = _construct_url(_REPO, **kwargs)
 
    if 'clone_url' in kwargs:
 
        clone_url = kwargs['clone_url']
 
    if vcs == 'hg':
 
        stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
 
    elif vcs == 'git':
 
        stdout, stderr = Command(cwd).execute('git push', clone_url + " master")
 

	
 
    return stdout, stderr
 

	
 

	
 
def set_anonymous_access(enable=True):
 
    user = User.get_by_username(User.DEFAULT_USER)
 
    user.active = enable
 
@@ -95,148 +142,284 @@ def set_anonymous_access(enable=True):
 
        raise Exception('Cannot set anonymous access')
 

	
 

	
 
def setup_module():
 
    #DISABLE ANONYMOUS ACCESS
 
    set_anonymous_access(False)
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 

	
 
class TestVCSOperations(unittest.TestCase):
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        #DISABLE ANONYMOUS ACCESS
 
        set_anonymous_access(False)
 

	
 
def test_clone_hg_repo_by_admin():
 
    clone_url = _construct_url(HG_REPO)
 
    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
    def setUp(self):
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        Repository.unlock(r)
 
        r.enable_locking = False
 
        Session().add(r)
 
        Session().commit()
 

	
 
    assert 'requesting all changes' in stdout
 
    assert 'adding changesets' in stdout
 
    assert 'adding manifests' in stdout
 
    assert 'adding file changes' in stdout
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        Repository.unlock(r)
 
        r.enable_locking = False
 
        Session().add(r)
 
        Session().commit()
 

	
 
    assert stderr == ''
 
    def test_clone_hg_repo_by_admin(self):
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
        assert 'requesting all changes' in stdout
 
        assert 'adding changesets' in stdout
 
        assert 'adding manifests' in stdout
 
        assert 'adding file changes' in stdout
 

	
 
def test_clone_git_repo_by_admin():
 
    clone_url = _construct_url(GIT_REPO)
 
    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        assert stderr == ''
 

	
 
    def test_clone_git_repo_by_admin(self):
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 

	
 
    assert 'Cloning into' in stdout
 
    assert stderr == ''
 
        assert 'Cloning into' in stdout
 
        assert stderr == ''
 

	
 
    def test_clone_wrong_credentials_hg(self):
 
        clone_url = _construct_url(HG_REPO, passwd='bad!')
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        assert 'abort: authorization failed' in stderr
 

	
 
def test_clone_wrong_credentials_hg():
 
    clone_url = _construct_url(HG_REPO, passwd='bad!')
 
    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
    assert 'abort: authorization failed' in stderr
 
    def test_clone_wrong_credentials_git(self):
 
        clone_url = _construct_url(GIT_REPO, passwd='bad!')
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
    def test_clone_git_dir_as_hg(self):
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_hg_repo_as_git(self):
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        assert 'not found:' in stderr
 

	
 
def test_clone_wrong_credentials_git():
 
    clone_url = _construct_url(GIT_REPO, passwd='bad!')
 
    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
    assert 'fatal: Authentication failed' in stderr
 
    def test_clone_non_existing_path_hg(self):
 
        clone_url = _construct_url('trololo')
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_non_existing_path_git(self):
 
        clone_url = _construct_url('trololo')
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        assert 'not found:' in stderr
 

	
 
def test_clone_git_dir_as_hg():
 
    clone_url = _construct_url(GIT_REPO)
 
    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
    assert 'HTTP Error 404: Not Found' in stderr
 
    def test_push_new_file_hg(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST)
 

	
 
        assert 'pushing to' in stdout
 
        assert 'Repository size' in stdout
 
        assert 'Last revision is now' in stdout
 

	
 
def test_clone_hg_repo_as_git():
 
    clone_url = _construct_url(HG_REPO)
 
    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
    assert 'not found: did you run git update-server-info on the server' in stderr
 
    def test_push_new_file_git(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 

	
 
        # commit some stuff into this repo
 
        stdout, stderr = _add_files_and_push('git', DEST)
 

	
 
        #WTF git stderr ?!
 
        assert 'master -> master' in stderr
 

	
 
def test_clone_non_existing_path_hg():
 
    clone_url = _construct_url('trololo')
 
    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
    assert 'HTTP Error 404: Not Found' in stderr
 
    def test_push_wrong_credentials_hg(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST, user='bad',
 
                                             passwd='name')
 

	
 
def test_clone_non_existing_path_git():
 
    clone_url = _construct_url('trololo')
 
    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
    assert 'not found: did you run git update-server-info on the server' in stderr
 
        assert 'abort: authorization failed' in stderr
 

	
 
    def test_push_wrong_credentials_git(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('git', DEST, user='bad',
 
                                             passwd='name')
 

	
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
def test_push_new_file_hg():
 
    DEST = _get_tmp_dir()
 
    clone_url = _construct_url(HG_REPO, dest=DEST)
 
    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
    def test_push_back_to_wrong_url_hg(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST,
 
                                    clone_url='http://127.0.0.1:5000/tmp',)
 

	
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    # commit some stuff into this repo
 
    cwd = path = jn(DEST)
 
    added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
 
    Command(cwd).execute('touch %s' % added_file)
 
    Command(cwd).execute('hg add %s' % added_file)
 
    def test_push_back_to_wrong_url_git(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('git', DEST,
 
                                    clone_url='http://127.0.0.1:5000/tmp',)
 

	
 
        assert 'not found:' in stderr
 

	
 
    for i in xrange(3):
 
        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
 
        Command(cwd).execute(cmd)
 
    def test_clone_and_create_lock_hg(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        # clone
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
        #check if lock was made
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
        cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (
 
                i,
 
                'Marcin Kuźminski <marcin@python-blog.com>',
 
                added_file
 
        )
 
        Command(cwd).execute(cmd)
 
    # PUSH it back
 
    clone_url = _construct_url(HG_REPO, dest='')
 
    stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
 
    def test_clone_and_create_lock_git(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        # clone
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 

	
 
        #check if lock was made
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
    assert 'pushing to' in stdout
 
    assert 'Repository size' in stdout
 
    assert 'Last revision is now' in stdout
 

	
 

	
 
def test_push_new_file_git():
 
    DEST = _get_tmp_dir()
 
    clone_url = _construct_url(GIT_REPO, dest=DEST)
 
    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
    def test_clone_after_repo_was_locked_hg(self):
 
        #lock repo
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
        #pull fails since repo is locked
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
 
        assert msg in stderr
 

	
 
    # commit some stuff into this repo
 
    cwd = path = jn(DEST)
 
    added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
 
    Command(cwd).execute('touch %s' % added_file)
 
    Command(cwd).execute('git add %s' % added_file)
 
    def test_clone_after_repo_was_locked_git(self):
 
        #lock repo
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
        #pull fails since repo is locked
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
 
        #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw
 
        # back 423 to it, it makes ANOTHER request and we fail there with 405 :/
 
        msg = "405 Method Not Allowed"
 
        assert msg in stderr
 

	
 
    for i in xrange(3):
 
        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
 
        Command(cwd).execute(cmd)
 
    def test_push_on_locked_repo_by_other_user_hg(self):
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
        #lock repo
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        # let this user actually push !
 
        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
 
                                          perm='repository.write')
 
        Session().commit()
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 

	
 
        cmd = """git ci -m 'commited new %s' --author '%s' %s """ % (
 
                i,
 
                'Marcin Kuźminski <marcin@python-blog.com>',
 
                added_file
 
        )
 
        Command(cwd).execute(cmd)
 
    # PUSH it back
 
    clone_url = _construct_url(GIT_REPO, dest='')
 
    stdout, stderr = Command(cwd).execute('git push --verbose', clone_url)
 
        #push fails repo is locked by other user !
 
        stdout, stderr = _add_files_and_push('hg', DEST,
 
                                             user=TEST_USER_REGULAR_LOGIN,
 
                                             passwd=TEST_USER_REGULAR_PASS)
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
 
        assert msg in stderr
 

	
 
    #WTF git stderr ?!
 
    assert 'master -> master' in stderr
 

	
 

	
 
def test_push_modify_existing_file_hg():
 
    assert 0
 

	
 
#TODO: fix me ! somehow during tests hooks don't get called on GIT
 
#    def test_push_on_locked_repo_by_other_user_git(self):
 
#        #clone some temp
 
#        DEST = _get_tmp_dir()
 
#        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
#        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
#
 
#        #lock repo
 
#        r = Repository.get_by_repo_name(GIT_REPO)
 
#        # let this user actually push !
 
#        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
 
#                                          perm='repository.write')
 
#        Session().commit()
 
#        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
#
 
#        #push fails repo is locked by other user !
 
#        stdout, stderr = _add_files_and_push('git', DEST,
 
#                                             user=TEST_USER_REGULAR_LOGIN,
 
#                                             passwd=TEST_USER_REGULAR_PASS)
 
#        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
#                % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
 
#        #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw
 
#        # back 423 to it, it makes ANOTHER request and we fail there with 405 :/
 
#        msg = "405 Method Not Allowed"
 
#        assert msg in stderr
 

	
 
def test_push_modify_existing_file_git():
 
    assert 0
 

	
 
    def test_push_unlocks_repository_hg(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
def test_push_wrong_credentials_hg():
 
    assert 0
 
        #check for lock repo after clone
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 

	
 
def test_push_wrong_credentials_git():
 
    assert 0
 
        #push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push('hg', DEST)
 
        assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout
 
        #we need to cleanup the Session Here !
 
        Session.remove()
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        assert r.locked == [None, None]
 

	
 

	
 
def test_push_back_to_wrong_url_hg():
 
    assert 0
 

	
 

	
 
def test_push_back_to_wrong_url_git():
 
    assert 0
 

	
 

	
 
#TODO: write all locking tests
 
#TODO: fix me ! somehow during tests hooks don't get called on GIT
 
#    def test_push_unlocks_repository_git(self):
 
#        # enable locking
 
#        r = Repository.get_by_repo_name(GIT_REPO)
 
#        r.enable_locking = True
 
#        Session().add(r)
 
#        Session().commit()
 
#        #clone some temp
 
#        DEST = _get_tmp_dir()
 
#        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
#        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
#
 
#        #check for lock repo after clone
 
#        r = Repository.get_by_repo_name(GIT_REPO)
 
#        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 
#
 
#        #push is ok and repo is now unlocked
 
#        stdout, stderr = _add_files_and_push('git', DEST)
 
#        #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout
 
#        #we need to cleanup the Session Here !
 
#        Session.remove()
 
#        r = Repository.get_by_repo_name(GIT_REPO)
 
#        assert r.locked == [None, None]
0 comments (0 inline, 0 general)