Changeset - 6d904a0cd48d
[Not reviewed]
beta
1 2 1
Marcin Kuzminski - 13 years ago 2012-08-28 09:28:09
marcin@python-works.com
added new suite of tests for VCS operations
- locking tests
- push pull tests
- added new ENV option to bypass re-creation of test enviroment
3 files changed with 304 insertions and 117 deletions:
0 comments (0 inline, 0 general)
rhodecode/config/environment.py
Show inline comments
 
@@ -75,13 +75,16 @@ def load_environment(global_conf, app_co
 
        if os.environ.get('TEST_DB'):
 
            # swap config if we pass enviroment variable
 
            config['sqlalchemy.db1.url'] = os.environ.get('TEST_DB')
 

	
 
        from rhodecode.lib.utils import create_test_env, create_test_index
 
        from rhodecode.tests import  TESTS_TMP_PATH
 
        create_test_env(TESTS_TMP_PATH, config)
 
        # set RC_NO_TMP_PATH=1 to disable re-creating the database and
 
        # test repos
 
        if not int(os.environ.get('RC_NO_TMP_PATH', 0)):
 
            create_test_env(TESTS_TMP_PATH, config)
 
        # set RC_WHOOSH_TEST_DISABLE=1 to disable whoosh index during tests
 
        if not int(os.environ.get('RC_WHOOSH_TEST_DISABLE', 0)):
 
            create_test_index(TESTS_TMP_PATH, config, True)
 

	
 
    # MULTIPLE DB configs
 
    # Setup the SQLAlchemy database engine
rhodecode/lib/base.py
Show inline comments
 
@@ -168,13 +168,13 @@ class BaseVCSController(object):
 
        present returns a tuple of make_lock, locked, locked_by.
 
        make_lock can have 3 states None (do nothing) True, make lock
 
        False release lock, This value is later propagated to hooks, which
 
        do the locking. Think about this as signals passed to hooks what to do.
 

	
 
        """
 
        locked = False
 
        locked = False  # defines that locked error should be thrown to user
 
        make_lock = None
 
        repo = Repository.get_by_repo_name(repo)
 
        user = User.get(user_id)
 

	
 
        # this is kind of hacky, but due to how mercurial handles client-server
 
        # server see all operation on changeset; bookmarks, phases and
 
@@ -184,13 +184,13 @@ class BaseVCSController(object):
 
        locked_by = repo.locked
 
        if repo and repo.enable_locking and not obsolete_call:
 
            if action == 'push':
 
                #check if it's already locked !, if it is compare users
 
                user_id, _date = repo.locked
 
                if user.user_id == user_id:
 
                    log.debug('Got push from user, now unlocking' % (user))
 
                    log.debug('Got push from user %s, now unlocking' % (user))
 
                    # unlock if we have push from user who locked
 
                    make_lock = False
 
                else:
 
                    # we're not the same user who locked, ban with 423 !
 
                    locked = True
 
            if action == 'pull':
 
@@ -199,13 +199,14 @@ class BaseVCSController(object):
 
                else:
 
                    log.debug('Setting lock on repo %s by %s' % (repo, user))
 
                    make_lock = True
 

	
 
        else:
 
            log.debug('Repository %s do not have locking enabled' % (repo))
 

	
 
        log.debug('FINAL locking values make_lock:%s,locked:%s,locked_by:%s'
 
                  % (make_lock, locked, locked_by))
 
        return make_lock, locked, locked_by
 

	
 
    def __call__(self, environ, start_response):
 
        start = time.time()
 
        try:
 
            return self._handle_request(environ, start_response)
rhodecode/tests/scripts/test_vcs_operations.py
Show inline comments
 
file renamed from rhodecode/tests/scripts/test_scm_operations.py to rhodecode/tests/scripts/test_vcs_operations.py
 
@@ -3,13 +3,13 @@
 
    rhodecode.tests.test_scm_operations
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    Test suite for making push/pull operations.
 
    Run using::
 

	
 
     RC_WHOOSH_TEST_DISABLE=1 nosetests rhodecode/tests/scripts/test_scm_operations.py
 
     RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py
 

	
 
    :created_on: Dec 30, 2010
 
    :author: marcink
 
    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
@@ -25,21 +25,23 @@
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import os
 
import tempfile
 
import unittest
 
from os.path import join as jn
 
from os.path import dirname as dn
 

	
 
from tempfile import _RandomNameSequence
 
from subprocess import Popen, PIPE
 

	
 
from rhodecode.tests import *
 
from rhodecode.model.db import User, Repository, UserLog
 
from rhodecode.model.meta import Session
 
from rhodecode.model.repo import RepoModel
 

	
 
DEBUG = True
 
HOST = '127.0.0.1:5000'  # test host
 

	
 

	
 
class Command(object):
 
@@ -82,161 +84,342 @@ def _construct_url(repo, dest=None, **kw
 
        _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
 
    else:
 
        _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
 
    return _url
 

	
 

	
 
def _add_files_and_push(vcs, DEST, **kwargs):
 
    """
 
    Generate some files, add it to DEST repo and push back
 
    vcs is git or hg and defines what VCS we want to make those files for
 

	
 
    :param vcs:
 
    :param DEST:
 
    """
 
    # commit some stuff into this repo
 
    cwd = path = jn(DEST)
 
    #added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
 
    added_file = jn(path, '%ssetup.py' % _RandomNameSequence().next())
 
    Command(cwd).execute('touch %s' % added_file)
 
    Command(cwd).execute('%s add %s' % (vcs, added_file))
 

	
 
    for i in xrange(3):
 
        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
 
        Command(cwd).execute(cmd)
 
        if vcs == 'hg':
 
            cmd = """hg commit -m 'commited new %s' -u '%s' %s """ % (
 
                i, 'Marcin Kuźminski <marcin@python-blog.com>', added_file
 
            )
 
        elif vcs == 'git':
 
            cmd = """git ci -m 'commited new %s' --author '%s' %s """ % (
 
                i, 'Marcin Kuźminski <marcin@python-blog.com>', added_file
 
            )
 
        Command(cwd).execute(cmd)
 
    # PUSH it back
 
    if vcs == 'hg':
 
        _REPO = HG_REPO
 
    elif vcs == 'git':
 
        _REPO = GIT_REPO
 

	
 
    kwargs['dest'] = ''
 
    clone_url = _construct_url(_REPO, **kwargs)
 
    if 'clone_url' in kwargs:
 
        clone_url = kwargs['clone_url']
 
    if vcs == 'hg':
 
        stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
 
    elif vcs == 'git':
 
        stdout, stderr = Command(cwd).execute('git push', clone_url + " master")
 

	
 
    return stdout, stderr
 

	
 

	
 
def set_anonymous_access(enable=True):
 
    user = User.get_by_username(User.DEFAULT_USER)
 
    user.active = enable
 
    Session().add(user)
 
    Session().commit()
 
    print '\tanonymous access is now:', enable
 
    if enable != User.get_by_username(User.DEFAULT_USER).active:
 
        raise Exception('Cannot set anonymous access')
 

	
 

	
 
def setup_module():
 
    #DISABLE ANONYMOUS ACCESS
 
    set_anonymous_access(False)
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 

	
 
class TestVCSOperations(unittest.TestCase):
 

	
 
    @classmethod
 
    def setup_class(cls):
 
        #DISABLE ANONYMOUS ACCESS
 
        set_anonymous_access(False)
 

	
 
def test_clone_hg_repo_by_admin():
 
    clone_url = _construct_url(HG_REPO)
 
    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
    def setUp(self):
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        Repository.unlock(r)
 
        r.enable_locking = False
 
        Session().add(r)
 
        Session().commit()
 

	
 
    assert 'requesting all changes' in stdout
 
    assert 'adding changesets' in stdout
 
    assert 'adding manifests' in stdout
 
    assert 'adding file changes' in stdout
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        Repository.unlock(r)
 
        r.enable_locking = False
 
        Session().add(r)
 
        Session().commit()
 

	
 
    assert stderr == ''
 
    def test_clone_hg_repo_by_admin(self):
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
        assert 'requesting all changes' in stdout
 
        assert 'adding changesets' in stdout
 
        assert 'adding manifests' in stdout
 
        assert 'adding file changes' in stdout
 

	
 
def test_clone_git_repo_by_admin():
 
    clone_url = _construct_url(GIT_REPO)
 
    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        assert stderr == ''
 

	
 
    def test_clone_git_repo_by_admin(self):
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 

	
 
    assert 'Cloning into' in stdout
 
    assert stderr == ''
 
        assert 'Cloning into' in stdout
 
        assert stderr == ''
 

	
 
    def test_clone_wrong_credentials_hg(self):
 
        clone_url = _construct_url(HG_REPO, passwd='bad!')
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        assert 'abort: authorization failed' in stderr
 

	
 
def test_clone_wrong_credentials_hg():
 
    clone_url = _construct_url(HG_REPO, passwd='bad!')
 
    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
    assert 'abort: authorization failed' in stderr
 
    def test_clone_wrong_credentials_git(self):
 
        clone_url = _construct_url(GIT_REPO, passwd='bad!')
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
    def test_clone_git_dir_as_hg(self):
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_hg_repo_as_git(self):
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        assert 'not found:' in stderr
 

	
 
def test_clone_wrong_credentials_git():
 
    clone_url = _construct_url(GIT_REPO, passwd='bad!')
 
    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
    assert 'fatal: Authentication failed' in stderr
 
    def test_clone_non_existing_path_hg(self):
 
        clone_url = _construct_url('trololo')
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_non_existing_path_git(self):
 
        clone_url = _construct_url('trololo')
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        assert 'not found:' in stderr
 

	
 
def test_clone_git_dir_as_hg():
 
    clone_url = _construct_url(GIT_REPO)
 
    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
    assert 'HTTP Error 404: Not Found' in stderr
 
    def test_push_new_file_hg(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST)
 

	
 
        assert 'pushing to' in stdout
 
        assert 'Repository size' in stdout
 
        assert 'Last revision is now' in stdout
 

	
 
def test_clone_hg_repo_as_git():
 
    clone_url = _construct_url(HG_REPO)
 
    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
    assert 'not found: did you run git update-server-info on the server' in stderr
 
    def test_push_new_file_git(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 

	
 
        # commit some stuff into this repo
 
        stdout, stderr = _add_files_and_push('git', DEST)
 

	
 
        #WTF git stderr ?!
 
        assert 'master -> master' in stderr
 

	
 
def test_clone_non_existing_path_hg():
 
    clone_url = _construct_url('trololo')
 
    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
    assert 'HTTP Error 404: Not Found' in stderr
 
    def test_push_wrong_credentials_hg(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST, user='bad',
 
                                             passwd='name')
 

	
 
def test_clone_non_existing_path_git():
 
    clone_url = _construct_url('trololo')
 
    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
    assert 'not found: did you run git update-server-info on the server' in stderr
 
        assert 'abort: authorization failed' in stderr
 

	
 
    def test_push_wrong_credentials_git(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('git', DEST, user='bad',
 
                                             passwd='name')
 

	
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
def test_push_new_file_hg():
 
    DEST = _get_tmp_dir()
 
    clone_url = _construct_url(HG_REPO, dest=DEST)
 
    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
    def test_push_back_to_wrong_url_hg(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST,
 
                                    clone_url='http://127.0.0.1:5000/tmp',)
 

	
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    # commit some stuff into this repo
 
    cwd = path = jn(DEST)
 
    added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
 
    Command(cwd).execute('touch %s' % added_file)
 
    Command(cwd).execute('hg add %s' % added_file)
 
    def test_push_back_to_wrong_url_git(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('git', DEST,
 
                                    clone_url='http://127.0.0.1:5000/tmp',)
 

	
 
        assert 'not found:' in stderr
 

	
 
    for i in xrange(3):
 
        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
 
        Command(cwd).execute(cmd)
 
    def test_clone_and_create_lock_hg(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        # clone
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
        #check if lock was made
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
        cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (
 
                i,
 
                'Marcin Kuźminski <marcin@python-blog.com>',
 
                added_file
 
        )
 
        Command(cwd).execute(cmd)
 
    # PUSH it back
 
    clone_url = _construct_url(HG_REPO, dest='')
 
    stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
 
    def test_clone_and_create_lock_git(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        # clone
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 

	
 
        #check if lock was made
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
    assert 'pushing to' in stdout
 
    assert 'Repository size' in stdout
 
    assert 'Last revision is now' in stdout
 

	
 

	
 
def test_push_new_file_git():
 
    DEST = _get_tmp_dir()
 
    clone_url = _construct_url(GIT_REPO, dest=DEST)
 
    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
    def test_clone_after_repo_was_locked_hg(self):
 
        #lock repo
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
        #pull fails since repo is locked
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
 
        assert msg in stderr
 

	
 
    # commit some stuff into this repo
 
    cwd = path = jn(DEST)
 
    added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
 
    Command(cwd).execute('touch %s' % added_file)
 
    Command(cwd).execute('git add %s' % added_file)
 
    def test_clone_after_repo_was_locked_git(self):
 
        #lock repo
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
        #pull fails since repo is locked
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
 
        #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw
 
        # back 423 to it, it makes ANOTHER request and we fail there with 405 :/
 
        msg = "405 Method Not Allowed"
 
        assert msg in stderr
 

	
 
    for i in xrange(3):
 
        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
 
        Command(cwd).execute(cmd)
 
    def test_push_on_locked_repo_by_other_user_hg(self):
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
        #lock repo
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        # let this user actually push !
 
        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
 
                                          perm='repository.write')
 
        Session().commit()
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 

	
 
        cmd = """git ci -m 'commited new %s' --author '%s' %s """ % (
 
                i,
 
                'Marcin Kuźminski <marcin@python-blog.com>',
 
                added_file
 
        )
 
        Command(cwd).execute(cmd)
 
    # PUSH it back
 
    clone_url = _construct_url(GIT_REPO, dest='')
 
    stdout, stderr = Command(cwd).execute('git push --verbose', clone_url)
 
        #push fails repo is locked by other user !
 
        stdout, stderr = _add_files_and_push('hg', DEST,
 
                                             user=TEST_USER_REGULAR_LOGIN,
 
                                             passwd=TEST_USER_REGULAR_PASS)
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
 
        assert msg in stderr
 

	
 
    #WTF git stderr ?!
 
    assert 'master -> master' in stderr
 

	
 

	
 
def test_push_modify_existing_file_hg():
 
    assert 0
 

	
 
#TODO: fix me ! somehow during tests hooks don't get called on GIT
 
#    def test_push_on_locked_repo_by_other_user_git(self):
 
#        #clone some temp
 
#        DEST = _get_tmp_dir()
 
#        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
#        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
#
 
#        #lock repo
 
#        r = Repository.get_by_repo_name(GIT_REPO)
 
#        # let this user actually push !
 
#        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
 
#                                          perm='repository.write')
 
#        Session().commit()
 
#        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
#
 
#        #push fails repo is locked by other user !
 
#        stdout, stderr = _add_files_and_push('git', DEST,
 
#                                             user=TEST_USER_REGULAR_LOGIN,
 
#                                             passwd=TEST_USER_REGULAR_PASS)
 
#        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
#                % (GIT_REPO, TEST_USER_ADMIN_LOGIN))
 
#        #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw
 
#        # back 423 to it, it makes ANOTHER request and we fail there with 405 :/
 
#        msg = "405 Method Not Allowed"
 
#        assert msg in stderr
 

	
 
def test_push_modify_existing_file_git():
 
    assert 0
 

	
 
    def test_push_unlocks_repository_hg(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
def test_push_wrong_credentials_hg():
 
    assert 0
 
        #check for lock repo after clone
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 

	
 
def test_push_wrong_credentials_git():
 
    assert 0
 
        #push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push('hg', DEST)
 
        assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout
 
        #we need to cleanup the Session Here !
 
        Session.remove()
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        assert r.locked == [None, None]
 

	
 

	
 
def test_push_back_to_wrong_url_hg():
 
    assert 0
 

	
 

	
 
def test_push_back_to_wrong_url_git():
 
    assert 0
 

	
 

	
 
#TODO: write all locking tests
 
#TODO: fix me ! somehow during tests hooks don't get called on GIT
 
#    def test_push_unlocks_repository_git(self):
 
#        # enable locking
 
#        r = Repository.get_by_repo_name(GIT_REPO)
 
#        r.enable_locking = True
 
#        Session().add(r)
 
#        Session().commit()
 
#        #clone some temp
 
#        DEST = _get_tmp_dir()
 
#        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
#        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
#
 
#        #check for lock repo after clone
 
#        r = Repository.get_by_repo_name(GIT_REPO)
 
#        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 
#
 
#        #push is ok and repo is now unlocked
 
#        stdout, stderr = _add_files_and_push('git', DEST)
 
#        #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout
 
#        #we need to cleanup the Session Here !
 
#        Session.remove()
 
#        r = Repository.get_by_repo_name(GIT_REPO)
 
#        assert r.locked == [None, None]
0 comments (0 inline, 0 general)