diff --git a/rhodecode/tests/__init__.py b/rhodecode/tests/__init__.py
--- a/rhodecode/tests/__init__.py
+++ b/rhodecode/tests/__init__.py
@@ -41,12 +41,13 @@ log = logging.getLogger(__name__)
__all__ = [
'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
- 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
- 'TEST_USER_REGULAR_PASS', 'TEST_USER_REGULAR_EMAIL',
- 'TEST_USER_REGULAR2_LOGIN', 'TEST_USER_REGULAR2_PASS',
- 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_HG_REPO_CLONE',
- 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO', 'TEST_GIT_REPO_CLONE',
- 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
+ 'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
+ 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
+ 'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
+ 'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
+ 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
+ 'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
+ 'GIT_REMOTE_REPO', 'SCM_TESTS',
]
# Invoke websetup with the current config file
diff --git a/rhodecode/tests/scripts/test_scm_operations.py b/rhodecode/tests/scripts/test_scm_operations.py
--- a/rhodecode/tests/scripts/test_scm_operations.py
+++ b/rhodecode/tests/scripts/test_scm_operations.py
@@ -1,9 +1,12 @@
# -*- coding: utf-8 -*-
"""
- rhodecode.tests.test_hg_operations
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ rhodecode.tests.test_scm_operations
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Test suite for making push/pull operations
+ Test suite for making push/pull operations.
+ Run using::
+
+ RC_WHOOSH_TEST_DISABLE=1 nosetests rhodecode/tests/scripts/test_scm_operations.py
:created_on: Dec 30, 2010
:author: marcink
@@ -24,47 +27,19 @@
# along with this program. If not, see .
import os
-import time
-import sys
-import shutil
-import logging
-
+import tempfile
from os.path import join as jn
from os.path import dirname as dn
from tempfile import _RandomNameSequence
from subprocess import Popen, PIPE
-from paste.deploy import appconfig
-from pylons import config
-from sqlalchemy import engine_from_config
-
-from rhodecode.lib.utils import add_cache
-from rhodecode.model import init_model
-from rhodecode.model import meta
+from rhodecode.tests import *
from rhodecode.model.db import User, Repository, UserLog
-from rhodecode.lib.auth import get_crypt_password
-
-from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
-from rhodecode.config.environment import load_environment
-
-rel_path = dn(dn(dn(os.path.abspath(__file__))))
+from rhodecode.model.meta import Session
-conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path)
-load_environment(conf.global_conf, conf.local_conf)
-
-add_cache(conf)
-
-USER = 'test_admin'
-PASS = 'test12'
-HOST = '127.0.0.1:5000'
-DEBUG = False
-print 'DEBUG:', DEBUG
-log = logging.getLogger(__name__)
-
-engine = engine_from_config(conf, 'sqlalchemy.db1.')
-init_model(engine)
-sa = meta.Session()
+DEBUG = True
+HOST = '127.0.0.1:5000' # test host
class Command(object):
@@ -73,13 +48,13 @@ class Command(object):
self.cwd = cwd
def execute(self, cmd, *args):
- """Runs command on the system with given ``args``.
+ """
+ Runs command on the system with given ``args``.
"""
command = cmd + ' ' + ' '.join(args)
- log.debug('Executing %s' % command)
if DEBUG:
- print command
+ print '*** CMD %s ***' % command
p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
stdout, stderr = p.communicate()
if DEBUG:
@@ -87,324 +62,181 @@ class Command(object):
return stdout, stderr
-def test_wrapp(func):
-
- def __wrapp(*args, **kwargs):
- print '>>>%s' % func.__name__
- try:
- res = func(*args, **kwargs)
- except Exception, e:
- print ('###############\n-'
- '--%s failed %s--\n'
- '###############\n' % (func.__name__, e))
- sys.exit()
- print '++OK++'
- return res
- return __wrapp
+def _get_tmp_dir():
+ return tempfile.mkdtemp(prefix='rc_integration_test')
-def create_test_user(force=True):
- print '\tcreating test user'
-
- user = User.get_by_username(USER)
-
- if force and user is not None:
- print '\tremoving current user'
- for repo in Repository.query().filter(Repository.user == user).all():
- sa.delete(repo)
- sa.delete(user)
- sa.commit()
-
- if user is None or force:
- print '\tcreating new one'
- new_usr = User()
- new_usr.username = USER
- new_usr.password = get_crypt_password(PASS)
- new_usr.email = 'mail@mail.com'
- new_usr.name = 'test'
- new_usr.lastname = 'lasttestname'
- new_usr.active = True
- new_usr.admin = True
- sa.add(new_usr)
- sa.commit()
-
- print '\tdone'
-
-
-def create_test_repo(force=True):
- from rhodecode.model.repo import RepoModel
-
- user = User.get_by_username(USER)
- if user is None:
- raise Exception('user not found')
-
- repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
-
- if repo is None:
- print '\trepo not found creating'
-
- form_data = {'repo_name':HG_REPO,
- 'repo_type':'hg',
- 'private':False,
- 'clone_uri':'' }
- rm = RepoModel(sa)
- rm.base_path = '/home/hg'
- rm.create(form_data, user)
+def _construct_url(repo, dest=None, **kwargs):
+ if dest is None:
+ #make temp clone
+ dest = _get_tmp_dir()
+ params = {
+ 'user': TEST_USER_ADMIN_LOGIN,
+ 'passwd': TEST_USER_ADMIN_PASS,
+ 'host': HOST,
+ 'cloned_repo': repo,
+ 'dest': dest
+ }
+ params.update(**kwargs)
+ if params['user'] and params['passwd']:
+ _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
+ else:
+ _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
+ return _url
def set_anonymous_access(enable=True):
- user = User.get_by_username('default')
+ user = User.get_by_username(User.DEFAULT_USER)
user.active = enable
- sa.add(user)
- sa.commit()
+ Session().add(user)
+ Session().commit()
print '\tanonymous access is now:', enable
- if enable != User.get_by_username('default').active:
+ if enable != User.get_by_username(User.DEFAULT_USER).active:
raise Exception('Cannot set anonymous access')
-def get_anonymous_access():
- user = User.get_by_username('default')
- return user.active
+def setup_module():
+ #DISABLE ANONYMOUS ACCESS
+ set_anonymous_access(False)
+
+
+def test_clone_hg_repo_by_admin():
+ clone_url = _construct_url(HG_REPO)
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+
+ assert 'requesting all changes' in stdout
+ assert 'adding changesets' in stdout
+ assert 'adding manifests' in stdout
+ assert 'adding file changes' in stdout
+
+ assert stderr == ''
-#==============================================================================
-# TESTS
-#==============================================================================
-@test_wrapp
-def test_clone_with_credentials(no_errors=False):
- cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
-
- try:
- shutil.rmtree(path, ignore_errors=True)
- os.makedirs(path)
- #print 'made dirs %s' % jn(path)
- except OSError:
- raise
+def test_clone_git_repo_by_admin():
+ clone_url = _construct_url(GIT_REPO)
+ stdout, stderr = Command('/tmp').execute('git clone', clone_url)
- print '\tchecking if anonymous access is enabled'
- anonymous_access = get_anonymous_access()
- if anonymous_access:
- print '\tenabled, disabling it '
- set_anonymous_access(enable=False)
-
- clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
- {'user':USER,
- 'pass':PASS,
- 'host':HOST,
- 'cloned_repo':HG_REPO,
- 'dest':path}
-
- stdout, stderr = Command(cwd).execute('hg clone', clone_url)
-
- if no_errors is False:
- assert """adding file changes""" in stdout, 'no messages about cloning'
- assert """abort""" not in stderr , 'got error from clone'
+ assert 'Cloning into' in stdout
+ assert stderr == ''
-@test_wrapp
-def test_clone_anonymous():
- cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+def test_clone_wrong_credentials_hg():
+ clone_url = _construct_url(HG_REPO, passwd='bad!')
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+ assert 'abort: authorization failed' in stderr
- try:
- shutil.rmtree(path, ignore_errors=True)
- os.makedirs(path)
- #print 'made dirs %s' % jn(path)
- except OSError:
- raise
+
+def test_clone_wrong_credentials_git():
+ clone_url = _construct_url(GIT_REPO, passwd='bad!')
+ stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+ assert 'fatal: Authentication failed' in stderr
- print '\tchecking if anonymous access is enabled'
- anonymous_access = get_anonymous_access()
- if not anonymous_access:
- print '\tnot enabled, enabling it '
- set_anonymous_access(enable=True)
+def test_clone_git_dir_as_hg():
+ clone_url = _construct_url(GIT_REPO)
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+ assert 'HTTP Error 404: Not Found' in stderr
+
- clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
- {'user':USER,
- 'pass':PASS,
- 'host':HOST,
- 'cloned_repo':HG_REPO,
- 'dest':path}
+def test_clone_hg_repo_as_git():
+ clone_url = _construct_url(HG_REPO)
+ stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+ assert 'not found: did you run git update-server-info on the server' in stderr
- stdout, stderr = Command(cwd).execute('hg clone', clone_url)
- assert """adding file changes""" in stdout, 'no messages about cloning'
- assert """abort""" not in stderr , 'got error from clone'
-
- #disable if it was enabled
- if not anonymous_access:
- print '\tdisabling anonymous access'
- set_anonymous_access(enable=False)
+def test_clone_non_existing_path_hg():
+ clone_url = _construct_url('trololo')
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
+ assert 'HTTP Error 404: Not Found' in stderr
-@test_wrapp
-def test_clone_wrong_credentials():
- cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
-
- try:
- shutil.rmtree(path, ignore_errors=True)
- os.makedirs(path)
- #print 'made dirs %s' % jn(path)
- except OSError:
- raise
-
- print '\tchecking if anonymous access is enabled'
- anonymous_access = get_anonymous_access()
- if anonymous_access:
- print '\tenabled, disabling it '
- set_anonymous_access(enable=False)
-
- clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
- {'user':USER + 'error',
- 'pass':PASS,
- 'host':HOST,
- 'cloned_repo':HG_REPO,
- 'dest':path}
-
- stdout, stderr = Command(cwd).execute('hg clone', clone_url)
-
- if not """abort: authorization failed""" in stderr:
- raise Exception('Failure')
+def test_clone_non_existing_path_git():
+ clone_url = _construct_url('trololo')
+ stdout, stderr = Command('/tmp').execute('git clone', clone_url)
+ assert 'not found: did you run git update-server-info on the server' in stderr
-@test_wrapp
-def test_pull():
- pass
-
-
-@test_wrapp
-def test_push_modify_file(f_name='setup.py'):
- cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
- modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
- for i in xrange(5):
- cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
- Command(cwd).execute(cmd)
+def test_push_new_file_hg():
+ DEST = _get_tmp_dir()
+ clone_url = _construct_url(HG_REPO, dest=DEST)
+ stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
- cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
- Command(cwd).execute(cmd)
-
- Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
-
-
-@test_wrapp
-def test_push_new_file(commits=15, with_clone=True):
-
- if with_clone:
- test_clone_with_credentials(no_errors=True)
-
- cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
+ # commit some stuff into this repo
+ cwd = path = jn(DEST)
added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
-
Command(cwd).execute('touch %s' % added_file)
-
Command(cwd).execute('hg add %s' % added_file)
- for i in xrange(commits):
+ for i in xrange(3):
cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
Command(cwd).execute(cmd)
- cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
- 'Marcin Kuźminski ',
- added_file)
+ cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (
+ i,
+ 'Marcin Kuźminski ',
+ added_file
+ )
Command(cwd).execute(cmd)
+ # PUSH it back
+ clone_url = _construct_url(HG_REPO, dest='')
+ stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
- push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
- {'user':USER,
- 'pass':PASS,
- 'host':HOST,
- 'cloned_repo':HG_REPO,
- 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
-
- Command(cwd).execute('hg push --verbose --debug %s' % push_url)
+ assert 'pushing to' in stdout
+ assert 'Repository size' in stdout
+ assert 'Last revision is now' in stdout
-@test_wrapp
-def test_push_wrong_credentials():
- cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
- clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
- {'user':USER + 'xxx',
- 'pass':PASS,
- 'host':HOST,
- 'cloned_repo':HG_REPO,
- 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
-
- modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
- for i in xrange(5):
- cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
- Command(cwd).execute(cmd)
-
- cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
- Command(cwd).execute(cmd)
+def test_push_new_file_git():
+ DEST = _get_tmp_dir()
+ clone_url = _construct_url(GIT_REPO, dest=DEST)
+ stdout, stderr = Command('/tmp').execute('git clone', clone_url)
- Command(cwd).execute('hg push %s' % clone_url)
-
-
-@test_wrapp
-def test_push_wrong_path():
- cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
- added_file = jn(path, 'somefile.py')
+ # commit some stuff into this repo
+ cwd = path = jn(DEST)
+ added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
+ Command(cwd).execute('touch %s' % added_file)
+ Command(cwd).execute('git add %s' % added_file)
- try:
- shutil.rmtree(path, ignore_errors=True)
- os.makedirs(path)
- print '\tmade dirs %s' % jn(path)
- except OSError:
- raise
-
- Command(cwd).execute("""echo '' > %s""" % added_file)
- Command(cwd).execute("""hg init %s""" % path)
- Command(cwd).execute("""hg add %s""" % added_file)
-
- for i in xrange(2):
+ for i in xrange(3):
cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
Command(cwd).execute(cmd)
- cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
+ cmd = """git ci -m 'commited new %s' --author '%s' %s """ % (
+ i,
+ 'Marcin Kuźminski ',
+ added_file
+ )
Command(cwd).execute(cmd)
+ # PUSH it back
+ clone_url = _construct_url(GIT_REPO, dest='')
+ stdout, stderr = Command(cwd).execute('git push --verbose', clone_url)
- clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
- {'user':USER,
- 'pass':PASS,
- 'host':HOST,
- 'cloned_repo':HG_REPO + '_error',
- 'dest':jn(TESTS_TMP_PATH, HG_REPO)}
-
- stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
- if not """abort: HTTP Error 403: Forbidden""" in stderr:
- raise Exception('Failure')
+ #WTF git stderr ?!
+ assert 'master -> master' in stderr
-@test_wrapp
-def get_logs():
- return UserLog.query().all()
+def test_push_modify_existing_file_hg():
+ assert 0
-@test_wrapp
-def test_logs(initial):
- logs = UserLog.query().all()
- operations = 4
- if len(initial) + operations != len(logs):
- raise Exception("missing number of logs initial:%s vs current:%s" % \
- (len(initial), len(logs)))
+def test_push_modify_existing_file_git():
+ assert 0
+
+
+def test_push_wrong_credentials_hg():
+ assert 0
-if __name__ == '__main__':
- create_test_user(force=False)
- create_test_repo()
+def test_push_wrong_credentials_git():
+ assert 0
+
+
+def test_push_back_to_wrong_url_hg():
+ assert 0
- initial_logs = get_logs()
- print 'initial activity logs: %s' % len(initial_logs)
- s = time.time()
- #test_push_modify_file()
- test_clone_with_credentials()
- test_clone_wrong_credentials()
- test_push_new_file(commits=2, with_clone=True)
-
- test_clone_anonymous()
- test_push_wrong_path()
+def test_push_back_to_wrong_url_git():
+ assert 0
- test_push_wrong_credentials()
- test_logs(initial_logs)
- print 'finished ok in %.3f' % (time.time() - s)
+#TODO: write all locking tests