diff --git a/rhodecode/tests/other/test_vcs_operations.py b/rhodecode/tests/other/test_vcs_operations.py --- a/rhodecode/tests/other/test_vcs_operations.py +++ b/rhodecode/tests/other/test_vcs_operations.py @@ -1,19 +1,4 @@ # -*- coding: utf-8 -*- -""" - rhodecode.tests.test_scm_operations - ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - Test suite for making push/pull operations. - Run using after doing paster serve test.ini:: - RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/scripts/test_vcs_operations.py - - You must have git > 1.8.1 for tests to work fine - - :created_on: Dec 30, 2010 - :author: marcink - :copyright: (C) 2010-2012 Marcin Kuzminski - :license: GPLv3, see COPYING for more details. -""" # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or @@ -26,20 +11,32 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . +""" +rhodecode.tests.test_scm_operations +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -import os +Test suite for making push/pull operations. +Run using after doing paster serve test.ini:: + RC_WHOOSH_TEST_DISABLE=1 RC_NO_TMP_PATH=1 nosetests rhodecode/tests/other/test_vcs_operations.py + +You must have git > 1.8.1 for tests to work fine + +:created_on: Dec 30, 2010 +:author: marcink +:copyright: (c) 2013 RhodeCode GmbH. +:license: GPLv3, see LICENSE for more details. + +""" + import tempfile -import unittest import time from os.path import join as jn -from os.path import dirname as dn from tempfile import _RandomNameSequence from subprocess import Popen, PIPE from rhodecode.tests import * -from rhodecode.model.db import User, Repository, UserLog, UserIpMap,\ - CacheInvalidation +from rhodecode.model.db import User, Repository, UserIpMap, CacheInvalidation from rhodecode.model.meta import Session from rhodecode.model.repo import RepoModel from rhodecode.model.user import UserModel @@ -115,11 +112,13 @@ def _add_files_and_push(vcs, DEST, **kwa i, author_str, added_file ) elif vcs == 'git': - cmd = """git commit -m 'commited new %s' --author '%s' %s """ % ( + cmd = """EMAIL="me@email.com" git commit -m 'commited new %s' --author '%s' %s """ % ( i, author_str, added_file ) Command(cwd).execute(cmd) + # PUSH it back + _REPO = None if vcs == 'hg': _REPO = HG_REPO elif vcs == 'git': @@ -129,6 +128,7 @@ def _add_files_and_push(vcs, DEST, **kwa clone_url = _construct_url(_REPO, **kwargs) if 'clone_url' in kwargs: clone_url = kwargs['clone_url'] + stdout = stderr = None if vcs == 'hg': stdout, stderr = Command(cwd).execute('hg push --verbose', clone_url) elif vcs == 'git': @@ -151,6 +151,15 @@ def set_anonymous_access(enable=True): # TESTS #============================================================================== + +def _check_proper_git_push(stdout, stderr): + #WTF GIT stderr is output ?! + assert 'fatal' not in stderr + assert 'rejected' not in stderr + assert 'Pushing to' in stderr + assert 'master -> master' in stderr + + class TestVCSOperations(BaseTestCase): @classmethod @@ -207,7 +216,7 @@ class TestVCSOperations(BaseTestCase): def test_clone_hg_repo_as_git(self): clone_url = _construct_url(HG_REPO) stdout, stderr = Command('/tmp').execute('git clone', clone_url) - assert 'not found:' in stderr + assert 'not found' in stderr def test_clone_non_existing_path_hg(self): clone_url = _construct_url('trololo') @@ -217,7 +226,7 @@ class TestVCSOperations(BaseTestCase): def test_clone_non_existing_path_git(self): clone_url = _construct_url('trololo') stdout, stderr = Command('/tmp').execute('git clone', clone_url) - assert 'not found:' in stderr + assert 'not found' in stderr def test_push_new_file_hg(self): DEST = _get_tmp_dir() @@ -238,12 +247,15 @@ class TestVCSOperations(BaseTestCase): # commit some stuff into this repo stdout, stderr = _add_files_and_push('git', DEST) - #WTF git stderr ?! - assert 'master -> master' in stderr + print [(x.repo_full_path,x.repo_path) for x in Repository.get_all()] + _check_proper_git_push(stdout, stderr) def test_push_invalidates_cache_hg(self): key = CacheInvalidation.query().filter(CacheInvalidation.cache_key - ==HG_REPO).one() + ==HG_REPO).scalar() + if not key: + key = CacheInvalidation(HG_REPO, HG_REPO) + key.cache_active = True Session().add(key) Session().commit() @@ -253,13 +265,17 @@ class TestVCSOperations(BaseTestCase): stdout, stderr = Command('/tmp').execute('hg clone', clone_url) stdout, stderr = _add_files_and_push('hg', DEST, files_no=1) + key = CacheInvalidation.query().filter(CacheInvalidation.cache_key ==HG_REPO).one() self.assertEqual(key.cache_active, False) def test_push_invalidates_cache_git(self): key = CacheInvalidation.query().filter(CacheInvalidation.cache_key - ==GIT_REPO).one() + ==GIT_REPO).scalar() + if not key: + key = CacheInvalidation(GIT_REPO, GIT_REPO) + key.cache_active = True Session().add(key) Session().commit() @@ -270,9 +286,11 @@ class TestVCSOperations(BaseTestCase): # commit some stuff into this repo stdout, stderr = _add_files_and_push('git', DEST, files_no=1) + _check_proper_git_push(stdout, stderr) key = CacheInvalidation.query().filter(CacheInvalidation.cache_key ==GIT_REPO).one() + print CacheInvalidation.get_all() self.assertEqual(key.cache_active, False) def test_push_wrong_credentials_hg(self): @@ -313,7 +331,7 @@ class TestVCSOperations(BaseTestCase): stdout, stderr = _add_files_and_push('git', DEST, clone_url='http://127.0.0.1:5000/tmp',) - assert 'not found:' in stderr + assert 'not found' in stderr def test_clone_and_create_lock_hg(self): # enable locking @@ -386,31 +404,34 @@ class TestVCSOperations(BaseTestCase): % (HG_REPO, TEST_USER_ADMIN_LOGIN)) assert msg in stderr -#TODO: fix me ! somehow during tests hooks don't get called on GIT -# def test_push_on_locked_repo_by_other_user_git(self): -# #clone some temp -# DEST = _get_tmp_dir() -# clone_url = _construct_url(GIT_REPO, dest=DEST) -# stdout, stderr = Command('/tmp').execute('git clone', clone_url) -# -# #lock repo -# r = Repository.get_by_repo_name(GIT_REPO) -# # let this user actually push ! -# RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, -# perm='repository.write') -# Session().commit() -# Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) -# -# #push fails repo is locked by other user ! -# stdout, stderr = _add_files_and_push('git', DEST, -# user=TEST_USER_REGULAR_LOGIN, -# passwd=TEST_USER_REGULAR_PASS) -# msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" -# % (GIT_REPO, TEST_USER_ADMIN_LOGIN)) -# #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw -# # back 423 to it, it makes ANOTHER request and we fail there with 405 :/ -# msg = "405 Method Not Allowed" -# assert msg in stderr + def test_push_on_locked_repo_by_other_user_git(self): + #clone some temp + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + #lock repo + r = Repository.get_by_repo_name(GIT_REPO) + # let this user actually push ! + RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN, + perm='repository.write') + Session().commit() + Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id) + + #push fails repo is locked by other user ! + stdout, stderr = _add_files_and_push('git', DEST, + user=TEST_USER_REGULAR_LOGIN, + passwd=TEST_USER_REGULAR_PASS) + err = 'Repository `%s` locked by user `%s`' % (GIT_REPO, TEST_USER_ADMIN_LOGIN) + assert err in stderr + + #TODO: fix this somehow later on GIT, GIT is stupid and even if we throw + #back 423 to it, it makes ANOTHER request and we fail there with 405 :/ + + msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`""" + % (GIT_REPO, TEST_USER_ADMIN_LOGIN)) + #msg = "405 Method Not Allowed" + #assert msg in stderr def test_push_unlocks_repository_hg(self): # enable locking @@ -425,7 +446,8 @@ class TestVCSOperations(BaseTestCase): #check for lock repo after clone r = Repository.get_by_repo_name(HG_REPO) - assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id + uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id + assert r.locked[0] == uid #push is ok and repo is now unlocked stdout, stderr = _add_files_and_push('hg', DEST) @@ -435,29 +457,31 @@ class TestVCSOperations(BaseTestCase): r = Repository.get_by_repo_name(HG_REPO) assert r.locked == [None, None] -#TODO: fix me ! somehow during tests hooks don't get called on GIT -# def test_push_unlocks_repository_git(self): -# # enable locking -# r = Repository.get_by_repo_name(GIT_REPO) -# r.enable_locking = True -# Session().add(r) -# Session().commit() -# #clone some temp -# DEST = _get_tmp_dir() -# clone_url = _construct_url(GIT_REPO, dest=DEST) -# stdout, stderr = Command('/tmp').execute('git clone', clone_url) -# -# #check for lock repo after clone -# r = Repository.get_by_repo_name(GIT_REPO) -# assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id -# -# #push is ok and repo is now unlocked -# stdout, stderr = _add_files_and_push('git', DEST) -# #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout -# #we need to cleanup the Session Here ! -# Session.remove() -# r = Repository.get_by_repo_name(GIT_REPO) -# assert r.locked == [None, None] + #TODO: fix me ! somehow during tests hooks don't get called on GIT + def test_push_unlocks_repository_git(self): + # enable locking + r = Repository.get_by_repo_name(GIT_REPO) + r.enable_locking = True + Session().add(r) + Session().commit() + #clone some temp + DEST = _get_tmp_dir() + clone_url = _construct_url(GIT_REPO, dest=DEST) + stdout, stderr = Command('/tmp').execute('git clone', clone_url) + + #check for lock repo after clone + r = Repository.get_by_repo_name(GIT_REPO) + assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id + + #push is ok and repo is now unlocked + stdout, stderr = _add_files_and_push('git', DEST) + _check_proper_git_push(stdout, stderr) + + #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout + #we need to cleanup the Session Here ! + Session.remove() + r = Repository.get_by_repo_name(GIT_REPO) + assert r.locked == [None, None] def test_ip_restriction_hg(self): user_model = UserModel()