diff --git a/rhodecode/tests/test_hg_operations.py b/rhodecode/tests/test_hg_operations.py --- a/rhodecode/tests/test_hg_operations.py +++ b/rhodecode/tests/test_hg_operations.py @@ -6,13 +6,28 @@ Test suite for making push/pull operations :created_on: Dec 30, 2010 - :copyright: (c) 2010 by marcink. - :license: LICENSE_NAME, see LICENSE_FILE for more details. + :copyright: (C) 2009-2011 Marcin Kuzminski + :license: GPLv3, see COPYING for more details. """ +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . import os +import time +import sys import shutil import logging + from os.path import join as jn from os.path import dirname as dn @@ -26,7 +41,7 @@ from sqlalchemy import engine_from_confi from rhodecode.lib.utils import add_cache from rhodecode.model import init_model from rhodecode.model import meta -from rhodecode.model.db import User, Repository +from rhodecode.model.db import User, Repository, UserLog from rhodecode.lib.auth import get_crypt_password from rhodecode.tests import TESTS_TMP_PATH, NEW_HG_REPO, HG_REPO @@ -41,7 +56,8 @@ add_cache(conf) USER = 'test_admin' PASS = 'test12' HOST = '127.0.0.1:5000' -DEBUG = True +DEBUG = True if sys.argv[1:] else False +print 'DEBUG:', DEBUG log = logging.getLogger(__name__) @@ -64,28 +80,44 @@ class Command(object): print stdout, stderr return stdout, stderr + +def test_wrapp(func): + + def __wrapp(*args, **kwargs): + print '>>>%s' % func.__name__ + try: + res = func(*args, **kwargs) + except Exception, e: + print ('###############\n-' + '--%s failed %s--\n' + '###############\n' % (func.__name__, e)) + sys.exit() + print '++OK++' + return res + return __wrapp + def get_session(): engine = engine_from_config(conf, 'sqlalchemy.db1.') init_model(engine) - sa = meta.Session() + sa = meta.Session return sa def create_test_user(force=True): - print 'creating test user' + print '\tcreating test user' sa = get_session() user = sa.query(User).filter(User.username == USER).scalar() if force and user is not None: - print 'removing current user' + print '\tremoving current user' for repo in sa.query(Repository).filter(Repository.user == user).all(): sa.delete(repo) sa.delete(user) sa.commit() if user is None or force: - print 'creating new one' + print '\tcreating new one' new_usr = User() new_usr.username = USER new_usr.password = get_crypt_password(PASS) @@ -97,7 +129,7 @@ def create_test_user(force=True): sa.add(new_usr) sa.commit() - print 'done' + print '\tdone' def create_test_repo(force=True): @@ -112,7 +144,7 @@ def create_test_repo(force=True): repo = sa.query(Repository).filter(Repository.repo_name == HG_REPO).scalar() if repo is None: - print 'repo not found creating' + print '\trepo not found creating' form_data = {'repo_name':HG_REPO, 'repo_type':'hg', @@ -126,19 +158,27 @@ def create_test_repo(force=True): def set_anonymous_access(enable=True): sa = get_session() user = sa.query(User).filter(User.username == 'default').one() + sa.expire(user) user.active = enable sa.add(user) sa.commit() + sa.remove() + import time;time.sleep(3) + print '\tanonymous access is now:', enable + def get_anonymous_access(): sa = get_session() - return sa.query(User).filter(User.username == 'default').one().active + obj1 = sa.query(User).filter(User.username == 'default').one() + sa.expire(obj1) + return obj1.active #============================================================================== # TESTS #============================================================================== -def test_clone(no_errors=False): +@test_wrapp +def test_clone_with_credentials(no_errors=False): cwd = path = jn(TESTS_TMP_PATH, HG_REPO) try: @@ -148,6 +188,12 @@ def test_clone(no_errors=False): except OSError: raise + print '\tchecking if anonymous access is enabled' + anonymous_access = get_anonymous_access() + if anonymous_access: + print '\tenabled, disabling it ' + set_anonymous_access(enable=False) + time.sleep(1) clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ {'user':USER, @@ -163,8 +209,8 @@ def test_clone(no_errors=False): assert """abort""" not in stderr , 'got error from clone' - -def test_clone_anonymous_ok(): +@test_wrapp +def test_clone_anonymous(): cwd = path = jn(TESTS_TMP_PATH, HG_REPO) try: @@ -175,11 +221,12 @@ def test_clone_anonymous_ok(): raise - print 'checking if anonymous access is enabled' + print '\tchecking if anonymous access is enabled' anonymous_access = get_anonymous_access() if not anonymous_access: - print 'not enabled, enabling it ' + print '\tnot enabled, enabling it ' set_anonymous_access(enable=True) + time.sleep(1) clone_url = 'http://%(host)s/%(cloned_repo)s %(dest)s' % \ {'user':USER, @@ -189,18 +236,16 @@ def test_clone_anonymous_ok(): 'dest':path} stdout, stderr = Command(cwd).execute('hg clone', clone_url) - print stdout, stderr - assert """adding file changes""" in stdout, 'no messages about cloning' assert """abort""" not in stderr , 'got error from clone' #disable if it was enabled if not anonymous_access: - print 'disabling anonymous access' + print '\tdisabling anonymous access' set_anonymous_access(enable=False) - +@test_wrapp def test_clone_wrong_credentials(): cwd = path = jn(TESTS_TMP_PATH, HG_REPO) @@ -211,6 +256,11 @@ def test_clone_wrong_credentials(): except OSError: raise + print '\tchecking if anonymous access is enabled' + anonymous_access = get_anonymous_access() + if anonymous_access: + print '\tenabled, disabling it ' + set_anonymous_access(enable=False) clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s %(dest)s' % \ {'user':USER + 'error', @@ -221,12 +271,14 @@ def test_clone_wrong_credentials(): stdout, stderr = Command(cwd).execute('hg clone', clone_url) - assert """abort: authorization failed""" in stderr , 'no error from wrong credentials' + if not """abort: authorization failed""" in stderr: + raise Exception('Failure') - +@test_wrapp def test_pull(): pass +@test_wrapp def test_push_modify_file(f_name='setup.py'): cwd = path = jn(TESTS_TMP_PATH, HG_REPO) modified_file = jn(TESTS_TMP_PATH, HG_REPO, f_name) @@ -239,10 +291,11 @@ def test_push_modify_file(f_name='setup. Command(cwd).execute('hg push %s' % jn(TESTS_TMP_PATH, HG_REPO)) +@test_wrapp def test_push_new_file(commits=15, with_clone=True): if with_clone: - test_clone(no_errors=True) + test_clone_with_credentials(no_errors=True) cwd = path = jn(TESTS_TMP_PATH, HG_REPO) added_file = jn(path, '%ssetupążźć.py' % _RandomNameSequence().next()) @@ -269,6 +322,7 @@ def test_push_new_file(commits=15, with_ Command(cwd).execute('hg push --verbose --debug %s' % push_url) +@test_wrapp def test_push_wrong_credentials(): cwd = path = jn(TESTS_TMP_PATH, HG_REPO) clone_url = 'http://%(user)s:%(pass)s@%(host)s/%(cloned_repo)s' % \ @@ -288,6 +342,7 @@ def test_push_wrong_credentials(): Command(cwd).execute('hg push %s' % clone_url) +@test_wrapp def test_push_wrong_path(): cwd = path = jn(TESTS_TMP_PATH, HG_REPO) added_file = jn(path, 'somefile.py') @@ -295,7 +350,7 @@ def test_push_wrong_path(): try: shutil.rmtree(path, ignore_errors=True) os.makedirs(path) - print 'made dirs %s' % jn(path) + print '\tmade dirs %s' % jn(path) except OSError: raise @@ -318,20 +373,40 @@ def test_push_wrong_path(): 'dest':jn(TESTS_TMP_PATH, HG_REPO)} stdout, stderr = Command(cwd).execute('hg push %s' % clone_url) - assert """abort: HTTP Error 403: Forbidden""" in stderr + if not """abort: HTTP Error 403: Forbidden""" in stderr: + raise Exception('Failure') + +@test_wrapp +def get_logs(): + sa = get_session() + return len(sa.query(UserLog).all()) + +@test_wrapp +def test_logs(initial): + sa = get_session() + logs = sa.query(UserLog).all() + operations = 7 + if initial + operations != len(logs): + raise Exception("missing number of logs %s vs %s" % (initial, len(logs))) if __name__ == '__main__': create_test_user(force=False) create_test_repo() - #test_push_modify_file() - #test_clone() - #test_clone_anonymous_ok() + + initial_logs = get_logs() - #test_clone_wrong_credentials() +# test_push_modify_file() + test_clone_with_credentials() + test_clone_wrong_credentials() - test_pull() + test_push_new_file(commits=2, with_clone=True) - #test_push_wrong_path() - #test_push_wrong_credentials() + test_clone_anonymous() + test_push_wrong_path() + + + test_push_wrong_credentials() + + test_logs(initial_logs)