Changeset - 1a7611b9730e
[Not reviewed]
default
0 8 0
domruf - 10 years ago 2016-02-02 21:45:29
dominikruf@gmail.com
replace absolute /tmp paths to tempfile.gettempdir()
8 files changed with 45 insertions and 40 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/kallithea_backup.py
Show inline comments
 
@@ -19,44 +19,44 @@ Repositories backup manager, it allows t
 
repositories and send it to backup server using RSA key via ssh.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Feb 28, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import sys
 

	
 
import logging
 
import tarfile
 
import datetime
 
import subprocess
 
import tempfile
 

	
 
logging.basicConfig(level=logging.DEBUG,
 
                    format="%(asctime)s %(levelname)-5.5s %(message)s")
 

	
 

	
 
class BackupManager(object):
 
    def __init__(self, repos_location, rsa_key, backup_server):
 
        today = datetime.datetime.now().weekday() + 1
 
        self.backup_file_name = "repos.%s.tar.gz" % today
 

	
 
        self.id_rsa_path = self.get_id_rsa(rsa_key)
 
        self.repos_path = self.get_repos_path(repos_location)
 
        self.backup_server = backup_server
 

	
 
        self.backup_file_path = '/tmp'
 
        self.backup_file_path = tempfile.gettempdir()
 

	
 
        logging.info('starting backup for %s', self.repos_path)
 
        logging.info('backup target %s', self.backup_file_path)
 

	
 
    def get_id_rsa(self, rsa_key):
 
        if not os.path.isfile(rsa_key):
 
            logging.error('Could not load id_rsa key file in %s', rsa_key)
 
            sys.exit()
 
        return rsa_key
 

	
 
    def get_repos_path(self, path):
 
        if not os.path.isdir(path):
kallithea/tests/__init__.py
Show inline comments
 
@@ -71,25 +71,25 @@ __all__ = [
 
    'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
 
    'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
 
    'GIT_REMOTE_REPO', 'SCM_TESTS', 'remove_all_notifications',
 
]
 

	
 
# Invoke websetup with the current config file
 
# SetupCommand('setup-app').run([config_file])
 

	
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 

	
 
TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
 
TESTS_TMP_PATH = jn(tempfile.gettempdir(), 'rc_test_%s' % _RandomNameSequence().next())
 
TEST_USER_ADMIN_LOGIN = 'test_admin'
 
TEST_USER_ADMIN_PASS = 'test12'
 
TEST_USER_ADMIN_EMAIL = 'test_admin@example.com'
 

	
 
TEST_USER_REGULAR_LOGIN = 'test_regular'
 
TEST_USER_REGULAR_PASS = 'test12'
 
TEST_USER_REGULAR_EMAIL = 'test_regular@example.com'
 

	
 
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@example.com'
 

	
kallithea/tests/functional/test_admin_settings.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
import tempfile
 

	
 
from kallithea.model.db import Setting, Ui
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestAdminSettingsController(TestControllerPytest):
 

	
 
    def test_index_main(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings'))
 
@@ -28,49 +30,49 @@ class TestAdminSettingsController(TestCo
 
    def test_index_email(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_email'))
 

	
 
    def test_index_hooks(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_hooks'))
 

	
 
    def test_create_custom_hook(self):
 
        self.log_user()
 
        response = self.app.post(url('admin_settings_hooks'),
 
                                params=dict(new_hook_ui_key='test_hooks_1',
 
                                            new_hook_ui_value='cd /tmp',
 
                                            new_hook_ui_value='cd %s' % tempfile.gettempdir(),
 
                                            _authentication_token=self.authentication_token()))
 

	
 
        response = response.follow()
 
        response.mustcontain('test_hooks_1')
 
        response.mustcontain('cd /tmp')
 
        response.mustcontain('cd %s' % tempfile.gettempdir())
 

	
 
    def test_create_custom_hook_delete(self):
 
        self.log_user()
 
        response = self.app.post(url('admin_settings_hooks'),
 
                                params=dict(new_hook_ui_key='test_hooks_2',
 
                                            new_hook_ui_value='cd /tmp2',
 
                                            new_hook_ui_value='cd %s2' % tempfile.gettempdir(),
 
                                            _authentication_token=self.authentication_token()))
 

	
 
        response = response.follow()
 
        response.mustcontain('test_hooks_2')
 
        response.mustcontain('cd /tmp2')
 
        response.mustcontain('cd %s2' % tempfile.gettempdir())
 

	
 
        hook_id = Ui.get_by_key('hooks', 'test_hooks_2').ui_id
 
        ## delete
 
        self.app.post(url('admin_settings_hooks'),
 
                        params=dict(hook_id=hook_id, _authentication_token=self.authentication_token()))
 
        response = self.app.get(url('admin_settings_hooks'))
 
        response.mustcontain(no=['test_hooks_2'])
 
        response.mustcontain(no=['cd /tmp2'])
 
        response.mustcontain(no=['cd %s2' % tempfile.gettempdir()])
 

	
 
    def test_index_search(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_search'))
 

	
 
    def test_index_system(self):
 
        self.log_user()
 
        response = self.app.get(url('admin_settings_system'))
 

	
 
    def test_ga_code_active(self):
 
        self.log_user()
 
        old_title = 'Kallithea'
kallithea/tests/other/manual_test_vcs_operations.py
Show inline comments
 
@@ -179,250 +179,250 @@ class TestVCSOperations(BaseTestCase):
 
        r.enable_locking = False
 
        Session().add(r)
 
        Session().commit()
 

	
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        Repository.unlock(r)
 
        r.enable_locking = False
 
        Session().add(r)
 
        Session().commit()
 

	
 
    def test_clone_hg_repo_by_admin(self):
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        assert 'requesting all changes' in stdout
 
        assert 'adding changesets' in stdout
 
        assert 'adding manifests' in stdout
 
        assert 'adding file changes' in stdout
 

	
 
        assert stderr == ''
 

	
 
    def test_clone_git_repo_by_admin(self):
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        assert 'Cloning into' in stdout + stderr
 
        assert stderr == '' or stdout == ''
 

	
 
    def test_clone_wrong_credentials_hg(self):
 
        clone_url = _construct_url(HG_REPO, passwd='bad!')
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
        assert 'abort: authorization failed' in stderr
 

	
 
    def test_clone_wrong_credentials_git(self):
 
        clone_url = _construct_url(GIT_REPO, passwd='bad!')
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
    def test_clone_git_dir_as_hg(self):
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_hg_repo_as_git(self):
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 
        assert 'not found' in stderr
 

	
 
    def test_clone_non_existing_path_hg(self):
 
        clone_url = _construct_url('trololo')
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_clone_non_existing_path_git(self):
 
        clone_url = _construct_url('trololo')
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 
        assert 'not found' in stderr
 

	
 
    def test_push_new_file_hg(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST)
 

	
 
        assert 'pushing to' in stdout
 
        assert 'Repository size' in stdout
 
        assert 'Last revision is now' in stdout
 

	
 
    def test_push_new_file_git(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        # commit some stuff into this repo
 
        stdout, stderr = _add_files_and_push('git', DEST)
 

	
 
        print [(x.repo_full_path,x.repo_path) for x in Repository.get_all()]
 
        _check_proper_git_push(stdout, stderr)
 

	
 
    def test_push_invalidates_cache_hg(self):
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               ==HG_REPO).scalar()
 
        if not key:
 
            key = CacheInvalidation(HG_REPO, HG_REPO)
 

	
 
        key.cache_active = True
 
        Session().add(key)
 
        Session().commit()
 

	
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST, files_no=1)
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               ==HG_REPO).all()
 
        self.assertEqual(key, [])
 

	
 
    def test_push_invalidates_cache_git(self):
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               ==GIT_REPO).scalar()
 
        if not key:
 
            key = CacheInvalidation(GIT_REPO, GIT_REPO)
 

	
 
        key.cache_active = True
 
        Session().add(key)
 
        Session().commit()
 

	
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        # commit some stuff into this repo
 
        stdout, stderr = _add_files_and_push('git', DEST, files_no=1)
 
        _check_proper_git_push(stdout, stderr)
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               ==GIT_REPO).all()
 
        self.assertEqual(key, [])
 

	
 
    def test_push_wrong_credentials_hg(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST, user='bad',
 
                                             passwd='name')
 

	
 
        assert 'abort: authorization failed' in stderr
 

	
 
    def test_push_wrong_credentials_git(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('git', DEST, user='bad',
 
                                             passwd='name')
 

	
 
        assert 'fatal: Authentication failed' in stderr
 

	
 
    def test_push_back_to_wrong_url_hg(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('hg', DEST,
 
                                    clone_url='http://%s/tmp' % HOST)
 

	
 
        assert 'HTTP Error 404: Not Found' in stderr
 

	
 
    def test_push_back_to_wrong_url_git(self):
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        stdout, stderr = _add_files_and_push('git', DEST,
 
                                    clone_url='http://%s/tmp' % HOST)
 

	
 
        assert 'not found' in stderr
 

	
 
    def test_clone_and_create_lock_hg(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        # clone
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        #check if lock was made
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
    def test_clone_and_create_lock_git(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        # clone
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        #check if lock was made
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
    def test_clone_after_repo_was_locked_hg(self):
 
        #lock repo
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
        #pull fails since repo is locked
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
 
        assert msg in stderr
 

	
 
    def test_clone_after_repo_was_locked_git(self):
 
        #lock repo
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 
        #pull fails since repo is locked
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 
        msg = ("""The requested URL returned error: 423""")
 
        assert msg in stderr
 

	
 
    def test_push_on_locked_repo_by_other_user_hg(self):
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        #lock repo
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        # let this user actually push !
 
        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
 
                                          perm='repository.write')
 
        Session().commit()
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 

	
 
        #push fails repo is locked by other user !
 
        stdout, stderr = _add_files_and_push('hg', DEST,
 
                                             user=TEST_USER_REGULAR_LOGIN,
 
                                             passwd=TEST_USER_REGULAR_PASS)
 
        msg = ("""abort: HTTP Error 423: Repository `%s` locked by user `%s`"""
 
                % (HG_REPO, TEST_USER_ADMIN_LOGIN))
 
        assert msg in stderr
 

	
 
    def test_push_on_locked_repo_by_other_user_git(self):
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        #lock repo
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        # let this user actually push !
 
        RepoModel().grant_user_permission(repo=r, user=TEST_USER_REGULAR_LOGIN,
 
                                          perm='repository.write')
 
        Session().commit()
 
        Repository.lock(r, User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id)
 

	
 
        #push fails repo is locked by other user !
 
        stdout, stderr = _add_files_and_push('git', DEST,
 
                                             user=TEST_USER_REGULAR_LOGIN,
 
@@ -438,99 +438,99 @@ class TestVCSOperations(BaseTestCase):
 
        #msg = "405 Method Not Allowed"
 
        #assert msg in stderr
 

	
 
    def test_push_unlocks_repository_hg(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(HG_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        #check for lock repo after clone
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        uid = User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 
        assert r.locked[0] == uid
 

	
 
        #push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push('hg', DEST)
 
        assert ('remote: Released lock on repo `%s`' % HG_REPO) in stdout
 
        #we need to cleanup the Session Here !
 
        Session.remove()
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        assert r.locked == [None, None]
 

	
 
    #TODO: fix me ! somehow during tests hooks don't get called on Git
 
    def test_push_unlocks_repository_git(self):
 
        # enable locking
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        r.enable_locking = True
 
        Session().add(r)
 
        Session().commit()
 
        #clone some temp
 
        DEST = _get_tmp_dir()
 
        clone_url = _construct_url(GIT_REPO, dest=DEST)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        #check for lock repo after clone
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        assert r.locked[0] == User.get_by_username(TEST_USER_ADMIN_LOGIN).user_id
 

	
 
        #push is ok and repo is now unlocked
 
        stdout, stderr = _add_files_and_push('git', DEST)
 
        _check_proper_git_push(stdout, stderr)
 

	
 
        #assert ('remote: Released lock on repo `%s`' % GIT_REPO) in stdout
 
        #we need to cleanup the Session Here !
 
        Session.remove()
 
        r = Repository.get_by_repo_name(GIT_REPO)
 
        assert r.locked == [None, None]
 

	
 
    def test_ip_restriction_hg(self):
 
        user_model = UserModel()
 
        try:
 
            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
 
            Session().commit()
 
            clone_url = _construct_url(HG_REPO)
 
            stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
            stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 
            assert 'abort: HTTP Error 403: Forbidden' in stderr
 
        finally:
 
            #release IP restrictions
 
            for ip in UserIpMap.getAll():
 
                UserIpMap.delete(ip.ip_id)
 
            Session().commit()
 

	
 
        time.sleep(2)
 
        clone_url = _construct_url(HG_REPO)
 
        stdout, stderr = Command('/tmp').execute('hg clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('hg clone', clone_url)
 

	
 
        assert 'requesting all changes' in stdout
 
        assert 'adding changesets' in stdout
 
        assert 'adding manifests' in stdout
 
        assert 'adding file changes' in stdout
 

	
 
        assert stderr == ''
 

	
 
    def test_ip_restriction_git(self):
 
        user_model = UserModel()
 
        try:
 
            user_model.add_extra_ip(TEST_USER_ADMIN_LOGIN, '10.10.10.10/32')
 
            Session().commit()
 
            clone_url = _construct_url(GIT_REPO)
 
            stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
            stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 
            # The message apparently changed in Git 1.8.3, so match it loosely.
 
            assert re.search(r'\b403\b', stderr)
 
        finally:
 
            #release IP restrictions
 
            for ip in UserIpMap.getAll():
 
                UserIpMap.delete(ip.ip_id)
 
            Session().commit()
 

	
 
        time.sleep(2)
 
        clone_url = _construct_url(GIT_REPO)
 
        stdout, stderr = Command('/tmp').execute('git clone', clone_url)
 
        stdout, stderr = Command(tempfile.gettempdir()).execute('git clone', clone_url)
 

	
 
        assert 'Cloning into' in stdout + stderr
 
        assert stderr == '' or stdout == ''
kallithea/tests/scripts/manual_test_crawler.py
Show inline comments
 
@@ -28,24 +28,25 @@ Original author and date, and relevant c
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import cookielib
 
import urllib
 
import urllib2
 
import time
 
import os
 
import sys
 
import tempfile
 
from os.path import join as jn
 
from os.path import dirname as dn
 

	
 
__here__ = os.path.abspath(__file__)
 
__root__ = dn(dn(dn(__here__)))
 
sys.path.append(__root__)
 

	
 
from kallithea.lib import vcs
 
from kallithea.lib.compat import OrderedSet
 
from kallithea.lib.vcs.exceptions import RepositoryError
 

	
 
PASES = 3
 
@@ -60,25 +61,25 @@ if not BASE_URI.endswith('/'):
 
    BASE_URI += '/'
 

	
 
print 'Crawling @ %s' % BASE_URI
 
BASE_URI += '%s'
 
PROJECT_PATH = jn('/', 'home', 'username', 'repos')
 
PROJECTS = [
 
    #'linux-magx-pbranch',
 
    'CPython',
 
    'kallithea',
 
]
 

	
 

	
 
cj = cookielib.FileCookieJar('/tmp/rc_test_cookie.txt')
 
cj = cookielib.FileCookieJar(jn(tempfile.gettempdir(), 'rc_test_cookie.txt'))
 
o = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
 
o.addheaders = [
 
    ('User-agent', 'kallithea-crawler'),
 
    ('Accept-Language', 'en - us, en;q = 0.5')
 
]
 

	
 
urllib2.install_opener(o)
 

	
 

	
 
def _get_repo(proj):
 
    if isinstance(proj, basestring):
 
        repo = vcs.get_repo(jn(PROJECT_PATH, proj))
kallithea/tests/vcs/conf.py
Show inline comments
 
@@ -13,25 +13,25 @@ from os.path import join as jn
 
__all__ = (
 
    'TEST_HG_REPO', 'TEST_GIT_REPO', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO',
 
    'SCM_TESTS',
 
)
 

	
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
THIS = os.path.abspath(os.path.dirname(__file__))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_TMP_PATH = os.environ.get('VCS_TEST_ROOT', '/tmp')
 
TEST_TMP_PATH = os.environ.get('VCS_TEST_ROOT', tempfile.gettempdir())
 
TEST_GIT_REPO = os.environ.get('VCS_TEST_GIT_REPO',
 
                              jn(TEST_TMP_PATH, 'vcs-git'))
 
TEST_GIT_REPO_CLONE = os.environ.get('VCS_TEST_GIT_REPO_CLONE',
 
                            jn(TEST_TMP_PATH, 'vcsgitclone%s' % uniq_suffix))
 
TEST_GIT_REPO_PULL = os.environ.get('VCS_TEST_GIT_REPO_PULL',
 
                            jn(TEST_TMP_PATH, 'vcsgitpull%s' % uniq_suffix))
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 
TEST_HG_REPO = os.environ.get('VCS_TEST_HG_REPO',
 
                              jn(TEST_TMP_PATH, 'vcs-hg'))
 
TEST_HG_REPO_CLONE = os.environ.get('VCS_TEST_HG_REPO_CLONE',
 
                              jn(TEST_TMP_PATH, 'vcshgclone%s' % uniq_suffix))
kallithea/tests/vcs/test_git.py
Show inline comments
 

	
 
import os
 
import sys
 
import mock
 
import datetime
 
import urllib2
 
import tempfile
 

	
 
import pytest
 

	
 
from kallithea.lib.vcs.backends.git import GitRepository, GitChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError
 
from kallithea.lib.vcs.nodes import NodeKind, FileNode, DirNode, NodeState
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.model.scm import ScmModel
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import TEST_GIT_REPO, TEST_GIT_REPO_CLONE, get_new_dir
 

	
 

	
 
class GitRepositoryTest(unittest.TestCase):
 

	
 
    def __check_for_existing_repo(self):
 
        if os.path.exists(TEST_GIT_REPO_CLONE):
 
            pytest.fail('Cannot test git clone repo as location %s already '
 
                      'exists. You should manually remove it first.'
 
                      % TEST_GIT_REPO_CLONE)
 

	
 
    def setUp(self):
 
        self.repo = GitRepository(TEST_GIT_REPO)
 

	
 
    def test_wrong_repo_path(self):
 
        wrong_repo_path = '/tmp/errorrepo'
 
        wrong_repo_path = os.path.join(tempfile.gettempdir(), 'errorrepo')
 
        self.assertRaises(RepositoryError, GitRepository, wrong_repo_path)
 

	
 
    def test_git_cmd_injection(self):
 
        repo_inject_path = TEST_GIT_REPO + '; echo "Cake";'
 
        with self.assertRaises(urllib2.URLError):
 
            # Should fail because URL will contain the parts after ; too
 
            urlerror_fail_repo = GitRepository(get_new_dir('injection-repo'), src_url=repo_inject_path, update_after_clone=True, create=True)
 

	
 
        with self.assertRaises(RepositoryError):
 
            # Should fail on direct clone call, which as of this writing does not happen outside of class
 
            clone_fail_repo = GitRepository(get_new_dir('injection-repo'), create=True)
 
            clone_fail_repo.clone(repo_inject_path, update_after_clone=True,)
kallithea/tests/vcs/test_hg.py
Show inline comments
 

	
 
import os
 

	
 
import pytest
 

	
 
import tempfile
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.lib.vcs.backends.hg import MercurialRepository, MercurialChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError
 
from kallithea.lib.vcs.nodes import NodeKind, NodeState
 
from kallithea.tests.vcs.conf import TEST_HG_REPO, TEST_HG_REPO_CLONE, \
 
    TEST_HG_REPO_PULL
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
# Use only clean mercurial's ui
 
from kallithea.lib.vcs.utils.hgcompat import mercurial
 
mercurial.scmutil.rcpath()
 
@@ -22,25 +23,25 @@ if mercurial.scmutil._rcpath:
 
class MercurialRepositoryTest(unittest.TestCase):
 

	
 
    def __check_for_existing_repo(self):
 
        if os.path.exists(TEST_HG_REPO_CLONE):
 
            pytest.fail('Cannot test mercurial clone repo as location %s already '
 
                      'exists. You should manually remove it first.'
 
                      % TEST_HG_REPO_CLONE)
 

	
 
    def setUp(self):
 
        self.repo = MercurialRepository(safe_str(TEST_HG_REPO))
 

	
 
    def test_wrong_repo_path(self):
 
        wrong_repo_path = '/tmp/errorrepo'
 
        wrong_repo_path = os.path.join(tempfile.gettempdir(), 'errorrepo')
 
        self.assertRaises(RepositoryError, MercurialRepository, wrong_repo_path)
 

	
 
    def test_unicode_path_repo(self):
 
        self.assertRaises(VCSError,lambda:MercurialRepository(u'iShouldFail'))
 

	
 
    def test_repo_clone(self):
 
        self.__check_for_existing_repo()
 
        repo = MercurialRepository(safe_str(TEST_HG_REPO))
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE,
 
            src_url=TEST_HG_REPO, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        # Checking hashes of changesets should be enough
0 comments (0 inline, 0 general)