Changeset - 438876d818d3
[Not reviewed]
default
0 3 0
domruf - 8 years ago 2018-01-11 19:59:10
dominikruf@gmail.com
tests: git changeset authors need to have the format 'username <user@example.com>'

New verions of dulwich caused tests like test_compare_forks_on_branch_extra_commits_git
to fail because of this.
Since this is fixed now, re-allow dulwich versions 0.18.6 and newer.
3 files changed with 4 insertions and 4 deletions:
0 comments (0 inline, 0 general)
kallithea/tests/fixture.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
Helpers for fixture generation
 
"""
 

	
 
import logging
 
import os
 
import shutil
 
import tarfile
 
from os.path import dirname
 

	
 
import mock
 
from tg import request
 
from tg.util.webtest import test_context
 

	
 
from kallithea.model.db import Repository, User, RepoGroup, UserGroup, Gist, ChangesetStatus
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.comment import ChangesetCommentsModel
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.pull_request import CreatePullRequestAction#, CreatePullRequestIterationAction, PullRequestModel
 
from kallithea.lib import helpers
 
from kallithea.lib.auth import AuthUser
 
from kallithea.lib.db_manage import DbManage
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.tests.base import invalidate_all_caches, GIT_REPO, HG_REPO, \
 
    TESTS_TMP_PATH, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN
 
    TESTS_TMP_PATH, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TEST_USER_ADMIN_EMAIL
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
FIXTURES = os.path.join(dirname(dirname(os.path.abspath(__file__))), 'tests', 'fixtures')
 

	
 

	
 
def error_function(*args, **kwargs):
 
    raise Exception('Total Crash !')
 

	
 

	
 
class Fixture(object):
 

	
 
    def __init__(self):
 
        pass
 

	
 
    def anon_access(self, status):
 
        """
 
        Context manager for controlling anonymous access.
 
        Anon access will be set and committed, but restored again when exiting the block.
 

	
 
        Usage:
 

	
 
        fixture = Fixture()
 
        with fixture.anon_access(False):
 
            stuff
 
        """
 

	
 
        class context(object):
 
            def __enter__(self):
 
                anon = User.get_default_user()
 
                self._before = anon.active
 
                anon.active = status
 
                Session().commit()
 
                invalidate_all_caches()
 

	
 
            def __exit__(self, exc_type, exc_val, exc_tb):
 
                anon = User.get_default_user()
 
                anon.active = self._before
 
                Session().commit()
 

	
 
        return context()
 

	
 
    def _get_repo_create_params(self, **custom):
 
        """Return form values to be validated through RepoForm"""
 
        defs = dict(
 
            repo_name=None,
 
            repo_type='hg',
 
            clone_uri='',
 
            repo_group=u'-1',
 
            repo_description=u'DESC',
 
            repo_private=False,
 
            repo_landing_rev='rev:tip',
 
            repo_copy_permissions=False,
 
            repo_state=Repository.STATE_CREATED,
 
        )
 
        defs.update(custom)
 
        if 'repo_name_full' not in custom:
 
            defs.update({'repo_name_full': defs['repo_name']})
 

	
 
        # fix the repo name if passed as repo_name_full
 
        if defs['repo_name']:
 
            defs['repo_name'] = defs['repo_name'].split('/')[-1]
 

	
 
        return defs
 

	
 
    def _get_repo_group_create_params(self, **custom):
 
        """Return form values to be validated through RepoGroupForm"""
 
        defs = dict(
 
            group_name=None,
 
            group_description=u'DESC',
 
            parent_group_id=u'-1',
 
            perms_updates=[],
 
            perms_new=[],
 
            enable_locking=False,
 
            recursive=False
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def _get_user_create_params(self, name, **custom):
 
        defs = dict(
 
            username=name,
 
            password='qweqwe',
 
            email='%s+test@example.com' % name,
 
            firstname=u'TestUser',
 
            lastname=u'Test',
 
            active=True,
 
            admin=False,
 
            extern_type='internal',
 
            extern_name=None
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def _get_user_group_create_params(self, name, **custom):
 
        defs = dict(
 
            users_group_name=name,
 
            user_group_description=u'DESC',
 
            users_group_active=True,
 
            user_group_data={},
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def create_repo(self, name, repo_group=None, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            r = Repository.get_by_repo_name(name)
 
            if r:
 
                return r
 

	
 
        if isinstance(repo_group, RepoGroup):
 
            repo_group = repo_group.group_id
 

	
 
        form_data = self._get_repo_create_params(repo_name=name, **kwargs)
 
        form_data['repo_group'] = repo_group # patch form dict so it can be used directly by model
 
        cur_user = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        RepoModel().create(form_data, cur_user)
 
        Session().commit()
 
        ScmModel().mark_for_invalidation(name)
 
        return Repository.get_by_repo_name(name)
 

	
 
    def create_fork(self, repo_to_fork, fork_name, **kwargs):
 
        repo_to_fork = Repository.get_by_repo_name(repo_to_fork)
 

	
 
        form_data = self._get_repo_create_params(repo_name=fork_name,
 
                                            fork_parent_id=repo_to_fork,
 
                                            repo_type=repo_to_fork.repo_type,
 
                                            **kwargs)
 
        # patch form dict so it can be used directly by model
 
        form_data['description'] = form_data['repo_description']
 
        form_data['private'] = form_data['repo_private']
 
        form_data['landing_rev'] = form_data['repo_landing_rev']
 

	
 
        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        RepoModel().create_fork(form_data, cur_user=owner)
 
        Session().commit()
 
        ScmModel().mark_for_invalidation(fork_name)
 
        r = Repository.get_by_repo_name(fork_name)
 
        assert r
 
        return r
 

	
 
    def destroy_repo(self, repo_name, **kwargs):
 
        RepoModel().delete(repo_name, **kwargs)
 
        Session().commit()
 

	
 
    def create_repo_group(self, name, parent_group_id=None, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            gr = RepoGroup.get_by_group_name(group_name=name)
 
            if gr:
 
                return gr
 
        form_data = self._get_repo_group_create_params(group_name=name, **kwargs)
 
        gr = RepoGroupModel().create(
 
            group_name=form_data['group_name'],
 
            group_description=form_data['group_name'],
 
            parent=parent_group_id,
 
            owner=kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN),
 
            )
 
        Session().commit()
 
        gr = RepoGroup.get_by_group_name(gr.group_name)
 
        return gr
 

	
 
    def destroy_repo_group(self, repogroupid):
 
        RepoGroupModel().delete(repogroupid)
 
        Session().commit()
 

	
 
    def create_user(self, name, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            user = User.get_by_username(name)
 
            if user:
 
                return user
 
        form_data = self._get_user_create_params(name, **kwargs)
 
        user = UserModel().create(form_data)
 
        Session().commit()
 
        user = User.get_by_username(user.username)
 
        return user
 

	
 
    def destroy_user(self, userid):
 
        UserModel().delete(userid)
 
        Session().commit()
 

	
 
    def create_user_group(self, name, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            gr = UserGroup.get_by_group_name(group_name=name)
 
            if gr:
 
                return gr
 
        form_data = self._get_user_group_create_params(name, **kwargs)
 
        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        user_group = UserGroupModel().create(
 
            name=form_data['users_group_name'],
 
            description=form_data['user_group_description'],
 
            owner=owner, active=form_data['users_group_active'],
 
            group_data=form_data['user_group_data'])
 
        Session().commit()
 
        user_group = UserGroup.get_by_group_name(user_group.users_group_name)
 
        return user_group
 

	
 
    def destroy_user_group(self, usergroupid):
 
        UserGroupModel().delete(user_group=usergroupid, force=True)
 
        Session().commit()
 

	
 
    def create_gist(self, **kwargs):
 
        form_data = {
 
            'description': u'new-gist',
 
            'owner': TEST_USER_ADMIN_LOGIN,
 
            'gist_type': Gist.GIST_PUBLIC,
 
            'lifetime': -1,
 
            'gist_mapping': {'filename1.txt': {'content': 'hello world'}}
 
        }
 
        form_data.update(kwargs)
 
        gist = GistModel().create(
 
            description=form_data['description'], owner=form_data['owner'],
 
            gist_mapping=form_data['gist_mapping'], gist_type=form_data['gist_type'],
 
            lifetime=form_data['lifetime']
 
        )
 
        Session().commit()
 

	
 
        return gist
 

	
 
    def destroy_gists(self, gistid=None):
 
        for g in Gist.query():
 
            if gistid:
 
                if gistid == g.gist_access_id:
 
                    GistModel().delete(g)
 
            else:
 
                GistModel().delete(g)
 
        Session().commit()
 

	
 
    def load_resource(self, resource_name, strip=True):
 
        with open(os.path.join(FIXTURES, resource_name), 'rb') as f:
 
            source = f.read()
 
            if strip:
 
                source = source.strip()
 

	
 
        return source
 

	
 
    def commit_change(self, repo, filename, content, message, vcs_type,
 
                      parent=None, newfile=False, author=None):
 
        repo = Repository.get_by_repo_name(repo)
 
        _cs = parent
 
        if parent is None:
 
            _cs = EmptyChangeset(alias=vcs_type)
 
        if author is None:
 
            author = TEST_USER_ADMIN_LOGIN
 
            author = '%s <%s>' % (TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_EMAIL)
 

	
 
        if newfile:
 
            nodes = {
 
                filename: {
 
                    'content': content
 
                }
 
            }
 
            cs = ScmModel().create_nodes(
 
                user=TEST_USER_ADMIN_LOGIN, repo=repo,
 
                message=message,
 
                nodes=nodes,
 
                parent_cs=_cs,
 
                author=author,
 
            )
 
        else:
 
            cs = ScmModel().commit_change(
 
                repo=repo.scm_instance, repo_name=repo.repo_name,
 
                cs=parent, user=TEST_USER_ADMIN_LOGIN,
 
                author=author,
 
                message=message,
 
                content=content,
 
                f_path=filename
 
            )
 
        return cs
 

	
 
    def review_changeset(self, repo, revision, status, author=TEST_USER_ADMIN_LOGIN):
 
        comment = ChangesetCommentsModel().create(u"review comment", repo, author, revision=revision, send_email=False)
 
        csm = ChangesetStatusModel().set_status(repo, ChangesetStatus.STATUS_APPROVED, author, comment, revision=revision)
 
        Session().commit()
 
        return csm
 

	
 
    def create_pullrequest(self, testcontroller, repo_name, pr_src_rev, pr_dst_rev, title=u'title'):
 
        org_ref = 'branch:stable:%s' % pr_src_rev
 
        other_ref = 'branch:default:%s' % pr_dst_rev
 
        with test_context(testcontroller.app): # needed to be able to mock request user
 
            org_repo = other_repo = Repository.get_by_repo_name(repo_name)
 
            owner_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
            reviewers = [User.get_by_username(TEST_USER_REGULAR_LOGIN)]
 
            request.authuser = request.user = AuthUser(dbuser=owner_user)
 
            # creating a PR sends a message with an absolute URL - without routing that requires mocking
 
            with mock.patch.object(helpers, 'url', (lambda arg, qualified=False, **kwargs: ('https://localhost' if qualified else '') + '/fake/' + arg)):
 
                cmd = CreatePullRequestAction(org_repo, other_repo, org_ref, other_ref, title, u'No description', owner_user, reviewers)
 
                pull_request = cmd.execute()
 
            Session().commit()
 
        return pull_request.pull_request_id
 

	
 

	
 
#==============================================================================
 
# Global test environment setup
 
#==============================================================================
 

	
 
def create_test_env(repos_test_path, config):
 
    """
 
    Makes a fresh database and
 
    install test repository into tmp dir
 
    """
 

	
 
    # PART ONE create db
 
    dbconf = config['sqlalchemy.url']
 
    log.debug('making test db %s', dbconf)
 

	
 
    # create test dir if it doesn't exist
 
    if not os.path.isdir(repos_test_path):
 
        log.debug('Creating testdir %s', repos_test_path)
 
        os.makedirs(repos_test_path)
 

	
 
    dbmanage = DbManage(dbconf=dbconf, root=config['here'],
 
                        tests=True)
 
    dbmanage.create_tables(override=True)
 
    # for tests dynamically set new root paths based on generated content
 
    dbmanage.create_settings(dbmanage.config_prompt(repos_test_path))
 
    dbmanage.create_default_user()
 
    dbmanage.admin_prompt()
 
    dbmanage.create_permissions()
 
    dbmanage.populate_default_permissions()
 
    Session().commit()
 
    # PART TWO make test repo
 
    log.debug('making test vcs repositories')
 

	
 
    idx_path = config['index_dir']
 
    data_path = config['cache_dir']
 

	
 
    # clean index and data
 
    if idx_path and os.path.exists(idx_path):
 
        log.debug('remove %s', idx_path)
 
        shutil.rmtree(idx_path)
 

	
 
    if data_path and os.path.exists(data_path):
 
        log.debug('remove %s', data_path)
 
        shutil.rmtree(data_path)
 

	
 
    # CREATE DEFAULT TEST REPOS
 
    tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_hg.tar.gz'))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
    tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_git.tar.gz'))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, GIT_REPO))
 
    tar.close()
 

	
 
    # LOAD VCS test stuff
 
    from kallithea.tests.vcs import setup_package
 
    setup_package()
 

	
 

	
 
def create_test_index(repo_location, config, full_index):
 
    """
 
    Makes default test index
 
    """
 

	
 
    from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
    from kallithea.lib.pidlock import DaemonLock, LockHeld
 

	
 
    index_location = os.path.join(config['index_dir'])
 
    if not os.path.exists(index_location):
 
        os.makedirs(index_location)
 

	
 
    l = DaemonLock(os.path.join(index_location, 'make_index.lock'))
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location) \
 
        .run(full_index=full_index)
 
    l.release()
kallithea/tests/vcs/test_inmemchangesets.py
Show inline comments
 
# encoding: utf8
 
"""
 
Tests so called "in memory changesets" commit API of vcs.
 
"""
 

	
 
import time
 
import datetime
 

	
 
from kallithea.lib import vcs
 
from kallithea.tests.vcs.conf import SCM_TESTS, get_new_dir
 
from kallithea.lib.vcs.exceptions import EmptyRepositoryError
 
from kallithea.lib.vcs.exceptions import NodeAlreadyAddedError
 
from kallithea.lib.vcs.exceptions import NodeAlreadyExistsError
 
from kallithea.lib.vcs.exceptions import NodeAlreadyRemovedError
 
from kallithea.lib.vcs.exceptions import NodeAlreadyChangedError
 
from kallithea.lib.vcs.exceptions import NodeDoesNotExistError
 
from kallithea.lib.vcs.exceptions import NodeNotChangedError
 
from kallithea.lib.vcs.nodes import DirNode
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.lib.vcs.utils import safe_unicode
 

	
 

	
 
class InMemoryChangesetTestMixin(object):
 
    """
 
    This is a backend independent test case class which should be created
 
    with ``type`` method.
 

	
 
    It is required to set following attributes at subclass:
 

	
 
    - ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
 
    - ``repo_path``: path to the repository which would be created for set of
 
      tests
 
    """
 

	
 
    def get_backend(self):
 
        return vcs.get_backend(self.backend_alias)
 

	
 
    def setUp(self):
 
        Backend = self.get_backend()
 
        self.repo_path = get_new_dir(str(time.time()))
 
        self.repo = Backend(self.repo_path, create=True)
 
        self.imc = self.repo.in_memory_changeset
 
        self.nodes = [
 
            FileNode('foobar', content='Foo & bar'),
 
            FileNode('foobar2', content='Foo & bar, doubled!'),
 
            FileNode('foo bar with spaces', content=''),
 
            FileNode('foo/bar/baz', content='Inside'),
 
            FileNode('foo/bar/file.bin', content='\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x03\x00\xfe\xff\t\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x10\x00\x00\x18\x00\x00\x00\x01\x00\x00\x00\xfe\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'),
 
        ]
 

	
 
    def test_add(self):
 
        rev_count = len(self.repo.revisions)
 
        to_add = [FileNode(node.path, content=node.content)
 
            for node in self.nodes]
 
        for node in to_add:
 
            self.imc.add(node)
 
        message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
 
        author = unicode(self.__class__)
 
        changeset = self.imc.commit(message=message, author=author)
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertEqual(changeset, newtip)
 
        self.assertEqual(rev_count + 1, len(self.repo.revisions))
 
        self.assertEqual(newtip.message, message)
 
        self.assertEqual(newtip.author, author)
 
        self.assertTrue(not any((self.imc.added, self.imc.changed,
 
            self.imc.removed)))
 
        for node in to_add:
 
            self.assertEqual(newtip.get_node(node.path).content, node.content)
 

	
 
    def test_add_in_bulk(self):
 
        rev_count = len(self.repo.revisions)
 
        to_add = [FileNode(node.path, content=node.content)
 
            for node in self.nodes]
 
        self.imc.add(*to_add)
 
        message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
 
        author = unicode(self.__class__)
 
        changeset = self.imc.commit(message=message, author=author)
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertEqual(changeset, newtip)
 
        self.assertEqual(rev_count + 1, len(self.repo.revisions))
 
        self.assertEqual(newtip.message, message)
 
        self.assertEqual(newtip.author, author)
 
        self.assertTrue(not any((self.imc.added, self.imc.changed,
 
            self.imc.removed)))
 
        for node in to_add:
 
            self.assertEqual(newtip.get_node(node.path).content, node.content)
 

	
 
    def test_add_actually_adds_all_nodes_at_second_commit_too(self):
 
        self.imc.add(FileNode('foo/bar/image.png', content='\0'))
 
        self.imc.add(FileNode('foo/README.txt', content='readme!'))
 
        changeset = self.imc.commit(u'Initial', u'joe.doe@example.com')
 
        self.assertTrue(isinstance(changeset.get_node('foo'), DirNode))
 
        self.assertTrue(isinstance(changeset.get_node('foo/bar'), DirNode))
 
        self.assertEqual(changeset.get_node('foo/bar/image.png').content, '\0')
 
        self.assertEqual(changeset.get_node('foo/README.txt').content, 'readme!')
 

	
 
        # commit some more files again
 
        to_add = [
 
            FileNode('foo/bar/foobaz/bar', content='foo'),
 
            FileNode('foo/bar/another/bar', content='foo'),
 
            FileNode('foo/baz.txt', content='foo'),
 
            FileNode('foobar/foobaz/file', content='foo'),
 
            FileNode('foobar/barbaz', content='foo'),
 
        ]
 
        self.imc.add(*to_add)
 
        changeset = self.imc.commit(u'Another', u'joe.doe@example.com')
 
        self.assertEqual(changeset.get_node('foo/bar/foobaz/bar').content, 'foo')
 
        self.assertEqual(changeset.get_node('foo/bar/another/bar').content, 'foo')
 
        self.assertEqual(changeset.get_node('foo/baz.txt').content, 'foo')
 
        self.assertEqual(changeset.get_node('foobar/foobaz/file').content, 'foo')
 
        self.assertEqual(changeset.get_node('foobar/barbaz').content, 'foo')
 

	
 
    def test_add_non_ascii_files(self):
 
        rev_count = len(self.repo.revisions)
 
        to_add = [
 
            FileNode('żółwik/zwierzątko', content='ćććć'),
 
            FileNode(u'żółwik/zwierzątko_uni', content=u'ćććć'),
 
        ]
 
        for node in to_add:
 
            self.imc.add(node)
 
        message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
 
        author = unicode(self.__class__)
 
        changeset = self.imc.commit(message=message, author=author)
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertEqual(changeset, newtip)
 
        self.assertEqual(rev_count + 1, len(self.repo.revisions))
 
        self.assertEqual(newtip.message, message)
 
        self.assertEqual(newtip.author, author)
 
        self.assertTrue(not any((self.imc.added, self.imc.changed,
 
            self.imc.removed)))
 
        for node in to_add:
 
            self.assertEqual(newtip.get_node(node.path).content, node.content)
 

	
 
    def test_add_raise_already_added(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.assertRaises(NodeAlreadyAddedError, self.imc.add, node)
 

	
 
    def test_check_integrity_raise_already_exist(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        self.imc.add(node)
 
        self.assertRaises(NodeAlreadyExistsError, self.imc.commit,
 
            message='new message',
 
            author=str(self))
 

	
 
    def test_change(self):
 
        self.imc.add(FileNode('foo/bar/baz', content='foo'))
 
        self.imc.add(FileNode('foo/fbar', content='foobar'))
 
        tip = self.imc.commit(u'Initial', u'joe.doe@example.com')
 

	
 
        # Change node's content
 
        node = FileNode('foo/bar/baz', content='My **changed** content')
 
        self.imc.change(node)
 
        self.imc.commit(u'Changed %s' % node.path, u'joe.doe@example.com')
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertNotEqual(tip, newtip)
 
        self.assertNotEqual(tip.id, newtip.id)
 
        self.assertEqual(newtip.get_node('foo/bar/baz').content,
 
                        'My **changed** content')
 

	
 
    def test_change_non_ascii(self):
 
        to_add = [
 
            FileNode('żółwik/zwierzątko', content='ćććć'),
 
            FileNode(u'żółwik/zwierzątko_uni', content=u'ćććć'),
 
        ]
 
        for node in to_add:
 
            self.imc.add(node)
 

	
 
        tip = self.imc.commit(u'Initial', u'joe.doe@example.com')
 

	
 
        # Change node's content
 
        node = FileNode('żółwik/zwierzątko', content='My **changed** content')
 
        self.imc.change(node)
 
        self.imc.commit(u'Changed %s' % safe_unicode(node.path),
 
                        u'joe.doe@example.com')
 

	
 
        node = FileNode(u'żółwik/zwierzątko_uni', content=u'My **changed** content')
 
        self.imc.change(node)
 
        self.imc.commit(u'Changed %s' % safe_unicode(node.path),
 
                        u'joe.doe@example.com')
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertNotEqual(tip, newtip)
 
        self.assertNotEqual(tip.id, newtip.id)
 

	
 
        self.assertEqual(newtip.get_node('żółwik/zwierzątko').content,
 
                         'My **changed** content')
 
        self.assertEqual(newtip.get_node('żółwik/zwierzątko_uni').content,
 
                         'My **changed** content')
 

	
 
    def test_change_raise_empty_repository(self):
 
        node = FileNode('foobar')
 
        self.assertRaises(EmptyRepositoryError, self.imc.change, node)
 

	
 
    def test_check_integrity_change_raise_node_does_not_exist(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        node = FileNode('not-foobar', content='')
 
        self.imc.change(node)
 
        self.assertRaises(NodeDoesNotExistError, self.imc.commit,
 
            message='Changed not existing node',
 
            author=str(self))
 

	
 
    def test_change_raise_node_already_changed(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        node = FileNode('foobar', content='more baz')
 
        self.imc.change(node)
 
        self.assertRaises(NodeAlreadyChangedError, self.imc.change, node)
 

	
 
    def test_check_integrity_change_raise_node_not_changed(self):
 
        self.test_add()  # Performs first commit
 

	
 
        node = FileNode(self.nodes[0].path, content=self.nodes[0].content)
 
        self.imc.change(node)
 
        self.assertRaises(NodeNotChangedError, self.imc.commit,
 
            message=u'Trying to mark node as changed without touching it',
 
            author=unicode(self))
 

	
 
    def test_change_raise_node_already_removed(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        self.imc.remove(FileNode('foobar'))
 
        self.assertRaises(NodeAlreadyRemovedError, self.imc.change, node)
 

	
 
    def test_remove(self):
 
        self.test_add()  # Performs first commit
 

	
 
        tip = self.repo.get_changeset()
 
        node = self.nodes[0]
 
        self.assertEqual(node.content, tip.get_node(node.path).content)
 
        self.imc.remove(node)
 
        self.imc.commit(message=u'Removed %s' % node.path, author=unicode(self))
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertNotEqual(tip, newtip)
 
        self.assertNotEqual(tip.id, newtip.id)
 
        self.assertRaises(NodeDoesNotExistError, newtip.get_node, node.path)
 

	
 
    def test_remove_last_file_from_directory(self):
 
        node = FileNode('omg/qwe/foo/bar', content='foobar')
 
        self.imc.add(node)
 
        self.imc.commit(u'added', u'joe doe')
 

	
 
        self.imc.remove(node)
 
        tip = self.imc.commit(u'removed', u'joe doe')
 
        self.assertRaises(NodeDoesNotExistError, tip.get_node, 'omg/qwe/foo/bar')
 

	
 
    def test_remove_raise_node_does_not_exist(self):
 
        self.imc.remove(self.nodes[0])
 
        self.assertRaises(NodeDoesNotExistError, self.imc.commit,
 
            message='Trying to remove node at empty repository',
 
            author=str(self))
 

	
 
    def test_check_integrity_remove_raise_node_does_not_exist(self):
 
        self.test_add()  # Performs first commit
 

	
 
        node = FileNode('no-such-file')
 
        self.imc.remove(node)
 
        self.assertRaises(NodeDoesNotExistError, self.imc.commit,
 
            message=u'Trying to remove not existing node',
 
            author=unicode(self))
 

	
 
    def test_remove_raise_node_already_removed(self):
 
        self.test_add() # Performs first commit
 

	
 
        node = FileNode(self.nodes[0].path)
 
        self.imc.remove(node)
 
        self.assertRaises(NodeAlreadyRemovedError, self.imc.remove, node)
 

	
 
    def test_remove_raise_node_already_changed(self):
 
        self.test_add()  # Performs first commit
 

	
 
        node = FileNode(self.nodes[0].path, content='Bending time')
 
        self.imc.change(node)
 
        self.assertRaises(NodeAlreadyChangedError, self.imc.remove, node)
 

	
 
    def test_reset(self):
 
        self.imc.add(FileNode('foo', content='bar'))
 
        #self.imc.change(FileNode('baz', content='new'))
 
        #self.imc.remove(FileNode('qwe'))
 
        self.imc.reset()
 
        self.assertTrue(not any((self.imc.added, self.imc.changed,
 
            self.imc.removed)))
 

	
 
    def test_multiple_commits(self):
 
        N = 3  # number of commits to perform
 
        last = None
 
        for x in xrange(N):
 
            fname = 'file%s' % str(x).rjust(5, '0')
 
            content = 'foobar\n' * x
 
            node = FileNode(fname, content=content)
 
            self.imc.add(node)
 
            commit = self.imc.commit(u"Commit no. %s" % (x + 1), author=u'vcs')
 
            self.assertTrue(last != commit)
 
            last = commit
 

	
 
        # Check commit number for same repo
 
        self.assertEqual(len(self.repo.revisions), N)
 

	
 
        # Check commit number for recreated repo
 
        backend = self.get_backend()
 
        repo = backend(self.repo_path)
 
        self.assertEqual(len(repo.revisions), N)
 

	
 
    def test_date_attr(self):
 
        node = FileNode('foobar.txt', content='Foobared!')
 
        self.imc.add(node)
 
        date = datetime.datetime(1985, 1, 30, 1, 45)
 
        commit = self.imc.commit(u"Committed at time when I was born ;-)",
 
            author=u'lb', date=date)
 
            author=u'lb <lb@example.com>', date=date)
 

	
 
        self.assertEqual(commit.date, date)
 

	
 

	
 
class BackendBaseTestCase(unittest.TestCase):
 
    """
 
    Base test class for tests which requires repository.
 
    """
 
    backend_alias = 'hg'
 
    commits = [
 
        {
 
            'message': 'Initial commit',
 
            'author': 'Joe Doe <joe.doe@example.com>',
 
            'date': datetime.datetime(2010, 1, 1, 20),
 
            'added': [
 
                FileNode('foobar', content='Foobar'),
 
                FileNode('foobar2', content='Foobar II'),
 
                FileNode('foo/bar/baz', content='baz here!'),
 
            ],
 
        },
 
    ]
 

	
 
    def get_backend(self):
 
        return vcs.get_backend(self.backend_alias)
 

	
 
    def get_commits(self):
 
        """
 
        Returns list of commits which builds repository for each tests.
 
        """
 
        if hasattr(self, 'commits'):
 
            return self.commits
 

	
 
    def get_new_repo_path(self):
 
        """
 
        Returns newly created repository's directory.
 
        """
 
        backend = self.get_backend()
 
        key = '%s-%s' % (self.backend_alias, str(time.time()))
 
        repo_path = get_new_dir(key)
 
        return repo_path
 

	
 
    def setUp(self):
 
        Backend = self.get_backend()
 
        self.backend_class = Backend
 
        self.repo_path = self.get_new_repo_path()
 
        self.repo = Backend(self.repo_path, create=True)
 
        self.imc = self.repo.in_memory_changeset
 

	
 
        for commit in self.get_commits():
 
            for node in commit.get('added', []):
 
                self.imc.add(FileNode(node.path, content=node.content))
 
            for node in commit.get('changed', []):
 
                self.imc.change(FileNode(node.path, content=node.content))
 
            for node in commit.get('removed', []):
 
                self.imc.remove(FileNode(node.path))
 
            self.imc.commit(message=unicode(commit['message']),
 
                            author=unicode(commit['author']),
 
                date=commit['date'])
 

	
 
        self.tip = self.repo.get_changeset()
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s in memory changeset test' % alias).title().split())
 
    bases = (InMemoryChangesetTestMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
setup.py
Show inline comments
 
#!/usr/bin/env python2
 
# -*- coding: utf-8 -*-
 
import os
 
import sys
 
import platform
 

	
 
if sys.version_info < (2, 6) or sys.version_info >= (3,):
 
    raise Exception('Kallithea requires python 2.6 or 2.7')
 

	
 

	
 
here = os.path.abspath(os.path.dirname(__file__))
 

	
 

	
 
def _get_meta_var(name, data, callback_handler=None):
 
    import re
 
    matches = re.compile(r'(?:%s)\s*=\s*(.*)' % name).search(data)
 
    if matches:
 
        if not callable(callback_handler):
 
            callback_handler = lambda v: v
 

	
 
        return callback_handler(eval(matches.groups()[0]))
 

	
 
_meta = open(os.path.join(here, 'kallithea', '__init__.py'), 'rb')
 
_metadata = _meta.read()
 
_meta.close()
 

	
 
callback = lambda V: ('.'.join(map(str, V[:3])) + '.'.join(V[3:]))
 
__version__ = _get_meta_var('VERSION', _metadata, callback)
 
__license__ = _get_meta_var('__license__', _metadata)
 
__author__ = _get_meta_var('__author__', _metadata)
 
__url__ = _get_meta_var('__url__', _metadata)
 
# defines current platform
 
__platform__ = platform.system()
 

	
 
is_windows = __platform__ in ['Windows']
 

	
 
requirements = [
 
    "alembic>=0.8.0,<0.9",
 
    "GearBox<1",
 
    "waitress>=0.8.8,<1.0",
 
    "webob>=1.7,<2",
 
    "backlash >= 0.1.2, < 1.0.0",
 
    "TurboGears2 >= 2.3.10, < 3.0.0",
 
    "tgext.routes >= 0.2.0, < 1.0.0",
 
    "Beaker>=1.7.0,<2",
 
    "WebHelpers==1.3",
 
    "formencode>=1.2.4,<=1.2.6",
 
    "SQLAlchemy>=1.1,<1.2",
 
    "Mako>=0.9.0,<=1.0.0",
 
    "pygments>=1.5",
 
    "whoosh>=2.5.0,<=2.5.7",
 
    "celery>=3.1,<3.2",
 
    "babel>=0.9.6,<2.4",
 
    "python-dateutil>=1.5.0,<2.0.0",
 
    "markdown==2.2.1",
 
    "docutils>=0.8.1",
 
    "URLObject==2.3.4",
 
    "Routes==1.13",
 
    "dulwich>=0.14.1,<0.18.6", # temporary workaround for not using 0.18.6 which cause test failure ... and perhaps also real failures
 
    "dulwich>=0.14.1",
 
    "mercurial>=2.9,<4.5",
 
    "decorator >= 3.3.2",
 
    "Paste >= 2.0.3, < 3.0",
 
]
 

	
 
if sys.version_info < (2, 7):
 
    requirements.append("importlib==1.0.1")
 
    requirements.append("argparse")
 

	
 
if not is_windows:
 
    requirements.append("bcrypt>=3.1.0")
 

	
 
dependency_links = [
 
]
 

	
 
classifiers = [
 
    'Development Status :: 4 - Beta',
 
    'Environment :: Web Environment',
 
    'Framework :: Pylons',
 
    'Intended Audience :: Developers',
 
    'License :: OSI Approved :: GNU General Public License (GPL)',
 
    'Operating System :: OS Independent',
 
    'Programming Language :: Python',
 
    'Programming Language :: Python :: 2.6',
 
    'Programming Language :: Python :: 2.7',
 
    'Topic :: Software Development :: Version Control',
 
]
 

	
 

	
 
# additional files from project that goes somewhere in the filesystem
 
# relative to sys.prefix
 
data_files = []
 

	
 
description = ('Kallithea is a fast and powerful management tool '
 
               'for Mercurial and Git with a built in push/pull server, '
 
               'full text search and code-review.')
 

	
 
keywords = ' '.join([
 
    'kallithea', 'mercurial', 'git', 'code review',
 
    'repo groups', 'ldap', 'repository management', 'hgweb replacement',
 
    'hgwebdir', 'gitweb replacement', 'serving hgweb',
 
])
 

	
 
# long description
 
README_FILE = 'README.rst'
 
try:
 
    long_description = open(README_FILE).read()
 
except IOError as err:
 
    sys.stderr.write(
 
        "[WARNING] Cannot find file specified as long_description (%s)\n"
 
        % README_FILE
 
    )
 
    long_description = description
 

	
 
import setuptools
 

	
 
# monkey patch setuptools to use distutils owner/group functionality
 
from setuptools.command import sdist
 
sdist_org = sdist.sdist
 
class sdist_new(sdist_org):
 
    def initialize_options(self):
 
        sdist_org.initialize_options(self)
 
        self.owner = self.group = 'root'
 
sdist.sdist = sdist_new
 

	
 
packages = setuptools.find_packages(exclude=['ez_setup'])
 

	
 
setuptools.setup(
 
    name='Kallithea',
 
    version=__version__,
 
    description=description,
 
    long_description=long_description,
 
    keywords=keywords,
 
    license=__license__,
 
    author=__author__,
 
    author_email='kallithea@sfconservancy.org',
 
    dependency_links=dependency_links,
 
    url=__url__,
 
    install_requires=requirements,
 
    classifiers=classifiers,
 
    data_files=data_files,
 
    packages=packages,
 
    include_package_data=True,
 
    message_extractors={'kallithea': [
 
            ('**.py', 'python', None),
 
            ('templates/**.mako', 'mako', {'input_encoding': 'utf-8'}),
 
            ('templates/**.html', 'mako', {'input_encoding': 'utf-8'}),
 
            ('public/**', 'ignore', None)]},
 
    zip_safe=False,
 
    entry_points="""
 
    [console_scripts]
 
    kallithea-api =    kallithea.bin.kallithea_api:main
 
    kallithea-gist =   kallithea.bin.kallithea_gist:main
 
    kallithea-config = kallithea.bin.kallithea_config:main
 

	
 
    [paste.app_factory]
 
    main = kallithea.config.middleware:make_app
 

	
 
    [gearbox.commands]
 
    make-config=kallithea.lib.paster_commands.make_config:Command
 
    setup-db=kallithea.lib.paster_commands.setup_db:Command
 
    cleanup-repos=kallithea.lib.paster_commands.cleanup:Command
 
    update-repoinfo=kallithea.lib.paster_commands.update_repoinfo:Command
 
    make-rcext=kallithea.lib.paster_commands.make_rcextensions:Command
 
    repo-scan=kallithea.lib.paster_commands.repo_scan:Command
 
    cache-keys=kallithea.lib.paster_commands.cache_keys:Command
 
    ishell=kallithea.lib.paster_commands.ishell:Command
 
    make-index=kallithea.lib.paster_commands.make_index:Command
 
    upgrade-db=kallithea.lib.dbmigrate:UpgradeDb
 
    celeryd=kallithea.lib.paster_commands.celeryd:Command
 
    install-iis=kallithea.lib.paster_commands.install_iis:Command
 
    """,
 
)
0 comments (0 inline, 0 general)