Changeset - 6341084b7a2f
[Not reviewed]
beta
0 2 0
Marcin Kuzminski - 13 years ago 2012-08-22 02:14:27
marcin@python-works.com
rewrote test_scm_operations, now run by nosetests
2 files changed with 145 insertions and 312 deletions:
0 comments (0 inline, 0 general)
rhodecode/tests/__init__.py
Show inline comments
 
"""Pylons application test package
 

	
 
This package assumes the Pylons environment is already loaded, such as
 
when this script is imported from the `nosetests --with-pylons=test.ini`
 
command.
 

	
 
This module initializes the application via ``websetup`` (`paster
 
setup-app`) and provides the base testing objects.
 
"""
 
import os
 
import time
 
import logging
 
import datetime
 
import hashlib
 
import tempfile
 
from os.path import join as jn
 

	
 
from unittest import TestCase
 
from tempfile import _RandomNameSequence
 

	
 
from paste.deploy import loadapp
 
from paste.script.appinstall import SetupCommand
 
from pylons import config, url
 
from routes.util import URLGenerator
 
from webtest import TestApp
 

	
 
from rhodecode import is_windows
 
from rhodecode.model.meta import Session
 
from rhodecode.model.db import User
 
from rhodecode.tests.nose_parametrized import parameterized
 

	
 
import pylons.test
 

	
 

	
 
os.environ['TZ'] = 'UTC'
 
if not is_windows:
 
    time.tzset()
 

	
 
log = logging.getLogger(__name__)
 

	
 
__all__ = [
 
    'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
 
    'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
 
    'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
 
    'TEST_USER_REGULAR_PASS', 'TEST_USER_REGULAR_EMAIL',
 
    'TEST_USER_REGULAR2_LOGIN', 'TEST_USER_REGULAR2_PASS',
 
    'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_HG_REPO_CLONE',
 
    'TEST_HG_REPO_PULL', 'TEST_GIT_REPO', 'TEST_GIT_REPO_CLONE',
 
    'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
 
    'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
 
    'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
 
    'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
 
    'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
 
    'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
 
    'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
 
    'GIT_REMOTE_REPO', 'SCM_TESTS',
 
]
 

	
 
# Invoke websetup with the current config file
 
# SetupCommand('setup-app').run([config_file])
 

	
 
##RUNNING DESIRED TESTS
 
# nosetests -x rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
 
# nosetests --pdb --pdb-failures
 
# nosetests --with-coverage --cover-package=rhodecode.model.validators rhodecode.tests.test_validators
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 

	
 
TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
 
TEST_USER_ADMIN_LOGIN = 'test_admin'
 
TEST_USER_ADMIN_PASS = 'test12'
 
TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
 

	
 
TEST_USER_REGULAR_LOGIN = 'test_regular'
 
TEST_USER_REGULAR_PASS = 'test12'
 
TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
 

	
 
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
 

	
 
HG_REPO = 'vcs_test_hg'
 
GIT_REPO = 'vcs_test_git'
 

	
 
NEW_HG_REPO = 'vcs_test_hg_new'
 
NEW_GIT_REPO = 'vcs_test_git_new'
 

	
 
HG_FORK = 'vcs_test_hg_fork'
 
GIT_FORK = 'vcs_test_git_fork'
 

	
 
## VCS
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
 

	
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 

	
 
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
 
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
 

	
 
TEST_DIR = tempfile.gettempdir()
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 
# cached repos if any !
 
# comment out to get some other repos from bb or github
 
GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
 

	
 

	
 
def get_new_dir(title):
 
    """
 
    Returns always new directory path.
 
    """
 
    from rhodecode.tests.vcs.utils import get_normalized_path
 
    name = TEST_REPO_PREFIX
 
    if title:
 
        name = '-'.join((name, title))
 
    hex = hashlib.sha1(str(time.time())).hexdigest()
 
    name = '-'.join((name, hex))
 
    path = os.path.join(TEST_DIR, name)
 
    return get_normalized_path(path)
 

	
 

	
 
class TestController(TestCase):
 

	
 
    def __init__(self, *args, **kwargs):
 
        wsgiapp = pylons.test.pylonsapp
 
        config = wsgiapp.config
 

	
 
        self.app = TestApp(wsgiapp)
 
        url._push_object(URLGenerator(config['routes.map'], environ))
 
        self.Session = Session
 
        self.index_location = config['app_conf']['index_dir']
 
        TestCase.__init__(self, *args, **kwargs)
 

	
 
    def log_user(self, username=TEST_USER_ADMIN_LOGIN,
 
                 password=TEST_USER_ADMIN_PASS):
 
        self._logged_username = username
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username': username,
 
                                  'password': password})
 

	
 
        if 'invalid user name' in response.body:
 
            self.fail('could not login using %s %s' % (username, password))
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        ses = response.session['rhodecode_user']
 
        self.assertEqual(ses.get('username'), username)
 
        response = response.follow()
 
        self.assertEqual(ses.get('is_authenticated'), True)
 

	
 
        return response.session['rhodecode_user']
 

	
 
    def _get_logged_user(self):
 
        return User.get_by_username(self._logged_username)
 

	
 
    def checkSessionFlash(self, response, msg):
 
        self.assertTrue('flash' in response.session)
 
        if not msg in response.session['flash'][0][1]:
 
            self.fail(
 
                'msg `%s` not found in session flash: got `%s` instead' % (
 
                      msg, response.session['flash'])
 
            )
rhodecode/tests/scripts/test_scm_operations.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.tests.test_hg_operations
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
    rhodecode.tests.test_scm_operations
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    Test suite for making push/pull operations
 
    Test suite for making push/pull operations.
 
    Run using::
 

	
 
     RC_WHOOSH_TEST_DISABLE=1 nosetests rhodecode/tests/scripts/test_scm_operations.py
 

	
 
    :created_on: Dec 30, 2010
 
    :author: marcink
 
    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import os
 
import time
 
import sys
 
import shutil
 
import logging
 

	
 
import tempfile
 
from os.path import join as jn
 
from os.path import dirname as dn
 

	
 
from tempfile import _RandomNameSequence
 
from subprocess import Popen, PIPE
 

	
 
from paste.deploy import appconfig
 
from pylons import config
 
from sqlalchemy import engine_from_config
 

	
 
from rhodecode.lib.utils import add_cache
 
from rhodecode.model import init_model
 
from rhodecode.model import meta
 
from rhodecode.tests import *
 
from rhodecode.model.db import User, Repository, UserLog
 
from rhodecode.lib.auth import get_crypt_password
 

	
 
from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO
 
from rhodecode.config.environment import load_environment
 

	
 
rel_path = dn(dn(dn(os.path.abspath(__file__))))
 
from rhodecode.model.meta import Session
 

	
 
conf = appconfig('config:%s' % sys.argv[1], relative_to=rel_path)
 
load_environment(conf.global_conf, conf.local_conf)
 

	
 
add_cache(conf)
 

	
 
USER = 'test_admin'
 
PASS = 'test12'
 
HOST = '127.0.0.1:5000'
 
DEBUG = False
 
print 'DEBUG:', DEBUG
 
log = logging.getLogger(__name__)
 

	
 
engine = engine_from_config(conf, 'sqlalchemy.db1.')
 
init_model(engine)
 
sa = meta.Session()
 
DEBUG = True
 
HOST = '127.0.0.1:5000'  # test host
 

	
 

	
 
class Command(object):
 

	
 
    def __init__(self, cwd):
 
        self.cwd = cwd
 

	
 
    def execute(self, cmd, *args):
 
        """Runs command on the system with given ``args``.
 
        """
 
        Runs command on the system with given ``args``.
 
        """
 

	
 
        command = cmd + ' ' + ' '.join(args)
 
        log.debug('Executing %s' % command)
 
        if DEBUG:
 
            print command
 
            print '*** CMD %s ***' % command
 
        p = Popen(command, shell=True, stdout=PIPE, stderr=PIPE, cwd=self.cwd)
 
        stdout, stderr = p.communicate()
 
        if DEBUG:
 
            print stdout, stderr
 
        return stdout, stderr
 

	
 

	
 
def test_wrapp(func):
 

	
 
    def __wrapp(*args, **kwargs):
 
        print '>>>%s' % func.__name__
 
        try:
 
            res = func(*args, **kwargs)
 
        except Exception, e:
 
            print ('###############\n-'
 
                   '--%s failed %s--\n'
 
                   '###############\n' % (func.__name__, e))
 
            sys.exit()
 
        print '++OK++'
 
        return res
 
    return __wrapp
 
def _get_tmp_dir():
 
    return tempfile.mkdtemp(prefix='rc_integration_test')
 

	
 

	
 
def create_test_user(force=True):
 
    print '\tcreating test user'
 

	
 
    user = User.get_by_username(USER)
 

	
 
    if force and user is not None:
 
        print '\tremoving current user'
 
        for repo in Repository.query().filter(Repository.user == user).all():
 
            sa.delete(repo)
 
        sa.delete(user)
 
        sa.commit()
 

	
 
    if user is None or force:
 
        print '\tcreating new one'
 
        new_usr = User()
 
        new_usr.username = USER
 
        new_usr.password = get_crypt_password(PASS)
 
        new_usr.email = 'mail@mail.com'
 
        new_usr.name = 'test'
 
        new_usr.lastname = 'lasttestname'
 
        new_usr.active = True
 
        new_usr.admin = True
 
        sa.add(new_usr)
 
        sa.commit()
 

	
 
    print '\tdone'
 

	
 

	
 
def create_test_repo(force=True):
 
    from rhodecode.model.repo import RepoModel
 

	
 
    user = User.get_by_username(USER)
 
    if user is None:
 
        raise Exception('user not found')
 

	
 
    repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar()
 

	
 
    if repo is None:
 
        print '\trepo not found creating'
 

	
 
        form_data = {'repo_name':HG_REPO,
 
                     'repo_type':'hg',
 
                     'private':False,
 
                     'clone_uri':'' }
 
        rm = RepoModel(sa)
 
        rm.base_path = '/home/hg'
 
        rm.create(form_data, user)
 
def _construct_url(repo, dest=None, **kwargs):
 
    if dest is None:
 
        #make temp clone
 
        dest = _get_tmp_dir()
 
    params = {
 
        'user': TEST_USER_ADMIN_LOGIN,
 
        'passwd': TEST_USER_ADMIN_PASS,
 
        'host': HOST,
 
        'cloned_repo': repo,
 
        'dest': dest
 
    }
 
    params.update(**kwargs)
 
    if params['user'] and params['passwd']:
 
        _url = 'http://%(user)s:%(passwd)s@%(host)s/%(cloned_repo)s %(dest)s' % params
 
    else:
 
        _url = 'http://(host)s/%(cloned_repo)s %(dest)s' % params
 
    return _url
 

	
 

	
 
def set_anonymous_access(enable=True):
 
    user = User.get_by_username('default')
 
    user = User.get_by_username(User.DEFAULT_USER)
 
    user.active = enable
 
    sa.add(user)
 
    sa.commit()
 
    Session().add(user)
 
    Session().commit()
 
    print '\tanonymous access is now:', enable
 
    if enable != User.get_by_username('default').active:
 
    if enable != User.get_by_username(User.DEFAULT_USER).active:
 
        raise Exception('Cannot set anonymous access')
 

	
 

	
 
def get_anonymous_access():
 
    user = User.get_by_username('default')
 
    return user.active
 
def setup_module():
 
    #DISABLE ANONYMOUS ACCESS
 
    set_anonymous_access(False)
 

	
 

	
 
def test_clone_hg_repo_by_admin():
 
    clone_url = _construct_url(HG_REPO)
 
    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
    assert 'requesting all changes' in stdout
 
    assert 'adding changesets' in stdout
 
    assert 'adding manifests' in stdout
 
    assert 'adding file changes' in stdout
 

	
 
    assert stderr == ''
 

	
 

	
 
#==============================================================================
 
# TESTS
 
#==============================================================================
 
@test_wrapp
 
def test_clone_with_credentials(no_errors=False):
 
    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
 

	
 
    try:
 
        shutil.rmtree(path, ignore_errors=True)
 
        os.makedirs(path)
 
        #print 'made dirs %s' % jn(path)
 
    except OSError:
 
        raise
 
def test_clone_git_repo_by_admin():
 
    clone_url = _construct_url(GIT_REPO)
 
    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 

	
 
    print '\tchecking if anonymous access is enabled'
 
    anonymous_access = get_anonymous_access()
 
    if anonymous_access:
 
        print '\tenabled, disabling it '
 
        set_anonymous_access(enable=False)
 

	
 
    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
 
                  {'user':USER,
 
                   'pass':PASS,
 
                   'host':HOST,
 
                   'cloned_repo':HG_REPO,
 
                   'dest':path}
 

	
 
    stdout, stderr = Command(cwd).execute('hg clone', clone_url)
 

	
 
    if no_errors is False:
 
        assert """adding file changes""" in stdout, 'no messages about cloning'
 
        assert """abort""" not in stderr , 'got error from clone'
 
    assert 'Cloning into' in stdout
 
    assert stderr == ''
 

	
 

	
 
@test_wrapp
 
def test_clone_anonymous():
 
    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
 
def test_clone_wrong_credentials_hg():
 
    clone_url = _construct_url(HG_REPO, passwd='bad!')
 
    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
    assert 'abort: authorization failed' in stderr
 

	
 
    try:
 
        shutil.rmtree(path, ignore_errors=True)
 
        os.makedirs(path)
 
        #print 'made dirs %s' % jn(path)
 
    except OSError:
 
        raise
 

	
 
def test_clone_wrong_credentials_git():
 
    clone_url = _construct_url(GIT_REPO, passwd='bad!')
 
    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
    assert 'fatal: Authentication failed' in stderr
 

	
 

	
 
    print '\tchecking if anonymous access is enabled'
 
    anonymous_access = get_anonymous_access()
 
    if not anonymous_access:
 
        print '\tnot enabled, enabling it '
 
        set_anonymous_access(enable=True)
 
def test_clone_git_dir_as_hg():
 
    clone_url = _construct_url(GIT_REPO)
 
    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
    assert 'HTTP Error 404: Not Found' in stderr
 

	
 

	
 
    clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \
 
                  {'user':USER,
 
                   'pass':PASS,
 
                   'host':HOST,
 
                   'cloned_repo':HG_REPO,
 
                   'dest':path}
 
def test_clone_hg_repo_as_git():
 
    clone_url = _construct_url(HG_REPO)
 
    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
    assert 'not found: did you run git update-server-info on the server' in stderr
 

	
 
    stdout, stderr = Command(cwd).execute('hg clone', clone_url)
 

	
 
    assert """adding file changes""" in stdout, 'no messages about cloning'
 
    assert """abort""" not in stderr , 'got error from clone'
 

	
 
    #disable if it was enabled
 
    if not anonymous_access:
 
        print '\tdisabling anonymous access'
 
        set_anonymous_access(enable=False)
 
def test_clone_non_existing_path_hg():
 
    clone_url = _construct_url('trololo')
 
    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
    assert 'HTTP Error 404: Not Found' in stderr
 

	
 

	
 
@test_wrapp
 
def test_clone_wrong_credentials():
 
    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
 

	
 
    try:
 
        shutil.rmtree(path, ignore_errors=True)
 
        os.makedirs(path)
 
        #print 'made dirs %s' % jn(path)
 
    except OSError:
 
        raise
 

	
 
    print '\tchecking if anonymous access is enabled'
 
    anonymous_access = get_anonymous_access()
 
    if anonymous_access:
 
        print '\tenabled, disabling it '
 
        set_anonymous_access(enable=False)
 

	
 
    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \
 
                  {'user':USER + 'error',
 
                   'pass':PASS,
 
                   'host':HOST,
 
                   'cloned_repo':HG_REPO,
 
                   'dest':path}
 

	
 
    stdout, stderr = Command(cwd).execute('hg clone', clone_url)
 

	
 
    if not """abort: authorization failed"""  in stderr:
 
        raise Exception('Failure')
 
def test_clone_non_existing_path_git():
 
    clone_url = _construct_url('trololo')
 
    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
    assert 'not found: did you run git update-server-info on the server' in stderr
 

	
 

	
 
@test_wrapp
 
def test_pull():
 
    pass
 

	
 

	
 
@test_wrapp
 
def test_push_modify_file(f_name='setup.py'):
 
    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
 
    modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name)
 
    for i in xrange(5):
 
        cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
 
        Command(cwd).execute(cmd)
 
def test_push_new_file_hg():
 
    DEST = _get_tmp_dir()
 
    clone_url = _construct_url(HG_REPO, dest=DEST)
 
    stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 

	
 
        cmd = """hg ci -m 'changed file %s' %s """ % (i, modified_file)
 
        Command(cwd).execute(cmd)
 

	
 
    Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO))
 

	
 

	
 
@test_wrapp
 
def test_push_new_file(commits=15, with_clone=True):
 

	
 
    if with_clone:
 
        test_clone_with_credentials(no_errors=True)
 

	
 
    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
 
    # commit some stuff into this repo
 
    cwd = path = jn(DEST)
 
    added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
 

	
 
    Command(cwd).execute('touch %s' % added_file)
 

	
 
    Command(cwd).execute('hg add %s' % added_file)
 

	
 
    for i in xrange(commits):
 
    for i in xrange(3):
 
        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
 
        Command(cwd).execute(cmd)
 

	
 
        cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (i,
 
                                'Marcin Kuźminski <marcin@python-blog.com>',
 
                                added_file)
 
        cmd = """hg ci -m 'commited new %s' -u '%s' %s """ % (
 
                i,
 
                'Marcin Kuźminski <marcin@python-blog.com>',
 
                added_file
 
        )
 
        Command(cwd).execute(cmd)
 
    # PUSH it back
 
    clone_url = _construct_url(HG_REPO, dest='')
 
    stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url)
 

	
 
    push_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
 
                  {'user':USER,
 
                   'pass':PASS,
 
                   'host':HOST,
 
                   'cloned_repo':HG_REPO,
 
                   'dest':jn(TESTS_TMP_PATH, HG_REPO)}
 

	
 
    Command(cwd).execute('hg push --verbose --debug %s' % push_url)
 
    assert 'pushing to' in stdout
 
    assert 'Repository size' in stdout
 
    assert 'Last revision is now' in stdout
 

	
 

	
 
@test_wrapp
 
def test_push_wrong_credentials():
 
    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
 
    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
 
                  {'user':USER + 'xxx',
 
                   'pass':PASS,
 
                   'host':HOST,
 
                   'cloned_repo':HG_REPO,
 
                   'dest':jn(TESTS_TMP_PATH, HG_REPO)}
 

	
 
    modified_file = jn(TESTS_TMP_PATH, HG_REPO, 'setup.py')
 
    for i in xrange(5):
 
        cmd = """echo 'added_line%s' >> %s""" % (i, modified_file)
 
        Command(cwd).execute(cmd)
 

	
 
        cmd = """hg ci -m 'commited %s' %s """ % (i, modified_file)
 
        Command(cwd).execute(cmd)
 
def test_push_new_file_git():
 
    DEST = _get_tmp_dir()
 
    clone_url = _construct_url(GIT_REPO, dest=DEST)
 
    stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 

	
 
    Command(cwd).execute('hg push %s' % clone_url)
 

	
 

	
 
@test_wrapp
 
def test_push_wrong_path():
 
    cwd = path = jn(TESTS_TMP_PATH, HG_REPO)
 
    added_file = jn(path, 'somefile.py')
 
    # commit some stuff into this repo
 
    cwd = path = jn(DEST)
 
    added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next())
 
    Command(cwd).execute('touch %s' % added_file)
 
    Command(cwd).execute('git add %s' % added_file)
 

	
 
    try:
 
        shutil.rmtree(path, ignore_errors=True)
 
        os.makedirs(path)
 
        print '\tmade dirs %s' % jn(path)
 
    except OSError:
 
        raise
 

	
 
    Command(cwd).execute("""echo '' > %s""" % added_file)
 
    Command(cwd).execute("""hg init %s""" % path)
 
    Command(cwd).execute("""hg add %s""" % added_file)
 

	
 
    for i in xrange(2):
 
    for i in xrange(3):
 
        cmd = """echo 'added_line%s' >> %s""" % (i, added_file)
 
        Command(cwd).execute(cmd)
 

	
 
        cmd = """hg ci -m 'commited new %s' %s """ % (i, added_file)
 
        cmd = """git ci -m 'commited new %s' --author '%s' %s """ % (
 
                i,
 
                'Marcin Kuźminski <marcin@python-blog.com>',
 
                added_file
 
        )
 
        Command(cwd).execute(cmd)
 
    # PUSH it back
 
    clone_url = _construct_url(GIT_REPO, dest='')
 
    stdout, stderr = Command(cwd).execute('git push --verbose', clone_url)
 

	
 
    clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \
 
                  {'user':USER,
 
                   'pass':PASS,
 
                   'host':HOST,
 
                   'cloned_repo':HG_REPO + '_error',
 
                   'dest':jn(TESTS_TMP_PATH, HG_REPO)}
 

	
 
    stdout, stderr = Command(cwd).execute('hg push %s' % clone_url)
 
    if not """abort: HTTP Error 403: Forbidden"""  in stderr:
 
        raise Exception('Failure')
 
    #WTF git stderr ?!
 
    assert 'master -> master' in stderr
 

	
 

	
 
@test_wrapp
 
def get_logs():
 
    return UserLog.query().all()
 
def test_push_modify_existing_file_hg():
 
    assert 0
 

	
 

	
 
@test_wrapp
 
def test_logs(initial):
 
    logs = UserLog.query().all()
 
    operations = 4
 
    if len(initial) + operations != len(logs):
 
        raise Exception("missing number of logs initial:%s vs current:%s" % \
 
                            (len(initial), len(logs)))
 
def test_push_modify_existing_file_git():
 
    assert 0
 

	
 

	
 
def test_push_wrong_credentials_hg():
 
    assert 0
 

	
 

	
 
if __name__ == '__main__':
 
    create_test_user(force=False)
 
    create_test_repo()
 
def test_push_wrong_credentials_git():
 
    assert 0
 

	
 

	
 
def test_push_back_to_wrong_url_hg():
 
    assert 0
 

	
 
    initial_logs = get_logs()
 
    print 'initial activity logs: %s' % len(initial_logs)
 
    s = time.time()
 
    #test_push_modify_file()
 
    test_clone_with_credentials()
 
    test_clone_wrong_credentials()
 

	
 
    test_push_new_file(commits=2, with_clone=True)
 

	
 
    test_clone_anonymous()
 
    test_push_wrong_path()
 
def test_push_back_to_wrong_url_git():
 
    assert 0
 

	
 
    test_push_wrong_credentials()
 

	
 
    test_logs(initial_logs)
 
    print 'finished ok in %.3f' % (time.time() - s)
 
#TODO: write all locking tests
0 comments (0 inline, 0 general)