Changeset - 402a96fcfa22
rhodecode/lib/utils.py
Show inline comments
 
@@ -590,24 +590,28 @@ def create_test_env(repos_test_path, con
 
        shutil.rmtree(idx_path)
 

	
 
    if data_path and os.path.exists(data_path):
 
        log.debug('remove %s' % data_path)
 
        shutil.rmtree(data_path)
 

	
 
    #CREATE DEFAULT HG REPOSITORY
 
    cur_dir = dn(dn(abspath(__file__)))
 
    tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_hg.tar.gz"))
 
    tar.extractall(jn(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
    #LOAD VCS test stuff
 
    from rhodecode.tests.vcs import setup_package
 
    setup_package()
 

	
 

	
 
#==============================================================================
 
# PASTER COMMANDS
 
#==============================================================================
 
class BasePasterCommand(Command):
 
    """
 
    Abstract Base Class for paster commands.
 

	
 
    The celery commands are somewhat aggressive about loading
 
    celery.conf, and since our module sets the `CELERY_LOADER`
 
    environment variable to our loader, we have to bootstrap a bit and
 
    make sure we've had a chance to load the pylons config off of the
rhodecode/lib/vcs/backends/hg/inmemory.py
Show inline comments
 
@@ -23,25 +23,26 @@ class MercurialInMemoryChangeset(BaseInM
 
        :param date: ``datetime.datetime`` instance. Defaults to
 
          ``datetime.datetime.now()``.
 
        :param branch: branch name, as string. If none given, default backend's
 
          branch would be used.
 

	
 
        :raises ``CommitError``: if any error occurs while committing
 
        """
 
        self.check_integrity(parents)
 

	
 
        from .repository import MercurialRepository
 
        if not isinstance(message, unicode) or not isinstance(author, unicode):
 
            raise RepositoryError('Given message and author needs to be '
 
                                  'an <unicode> instance')
 
                                  'an <unicode> instance got %r & %r instead'
 
                                  % (type(message), type(author)))
 

	
 
        if branch is None:
 
            branch = MercurialRepository.DEFAULT_BRANCH_NAME
 
        kwargs['branch'] = branch
 

	
 
        def filectxfn(_repo, memctx, path):
 
            """
 
            Marks given path as added/changed/removed in a given _repo. This is
 
            for internal mercurial commit function.
 
            """
 

	
 
            # check if this path is removed
rhodecode/lib/vcs/nodes.py
Show inline comments
 
@@ -413,25 +413,25 @@ class FileNode(Node):
 
    def extension(self):
 
        """Returns filenode extension"""
 
        return self.name.split('.')[-1]
 

	
 
    def is_executable(self):
 
        """
 
        Returns ``True`` if file has executable flag turned on.
 
        """
 
        return bool(self.mode & stat.S_IXUSR)
 

	
 
    def __repr__(self):
 
        return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
 
                                 self.changeset.short_id)
 
                                 getattr(self.changeset, 'short_id', ''))
 

	
 

	
 
class RemovedFileNode(FileNode):
 
    """
 
    Dummy FileNode class - trying to access any public attribute except path,
 
    name, kind or state (or methods/attributes checking those two) would raise
 
    RemovedFileNodeError.
 
    """
 
    ALLOWED_ATTRIBUTES = ['name', 'path', 'state', 'is_root', 'is_file',
 
        'is_dir', 'kind', 'added', 'changed', 'not_changed', 'removed']
 

	
 
    def __init__(self, path):
 
@@ -548,25 +548,25 @@ class DirNode(Node):
 

	
 
    @LazyProperty
 
    def size(self):
 
        size = 0
 
        for root, dirs, files in self.changeset.walk(self.path):
 
            for f in files:
 
                size += f.size
 

	
 
        return size
 

	
 
    def __repr__(self):
 
        return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
 
                                 self.changeset.short_id)
 
                                 getattr(self.changeset, 'short_id', ''))
 

	
 

	
 
class RootNode(DirNode):
 
    """
 
    DirNode being the root node of the repository.
 
    """
 

	
 
    def __init__(self, nodes=(), changeset=None):
 
        super(RootNode, self).__init__(path='', nodes=nodes,
 
            changeset=changeset)
 

	
 
    def __repr__(self):
 
@@ -582,25 +582,25 @@ class SubModuleNode(Node):
 

	
 
    def __init__(self, name, url=None, changeset=None, alias=None):
 
        self.path = name
 
        self.kind = NodeKind.SUBMODULE
 
        self.alias = alias
 
        # we have to use emptyChangeset here since this can point to svn/git/hg
 
        # submodules we cannot get from repository
 
        self.changeset = EmptyChangeset(str(changeset), alias=alias)
 
        self.url = url or self._extract_submodule_url()
 

	
 
    def __repr__(self):
 
        return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
 
                                 self.changeset.short_id)
 
                                 getattr(self.changeset, 'short_id', ''))
 

	
 
    def _extract_submodule_url(self):
 
        if self.alias == 'git':
 
            #TODO: find a way to parse gits submodule file and extract the
 
            # linking URL
 
            return self.path
 
        if self.alias == 'hg':
 
            return self.path
 

	
 
    @LazyProperty
 
    def name(self):
 
        """
rhodecode/tests/__init__.py
Show inline comments
 
"""Pylons application test package
 

	
 
This package assumes the Pylons environment is already loaded, such as
 
when this script is imported from the `nosetests --with-pylons=test.ini`
 
command.
 

	
 
This module initializes the application via ``websetup`` (`paster
 
setup-app`) and provides the base testing objects.
 
"""
 
import os
 
import time
 
import logging
 
import datetime
 
import hashlib
 
import tempfile
 
from os.path import join as jn
 

	
 
from unittest import TestCase
 
from tempfile import _RandomNameSequence
 

	
 
from paste.deploy import loadapp
 
from paste.script.appinstall import SetupCommand
 
from pylons import config, url
 
from routes.util import URLGenerator
 
from webtest import TestApp
 

	
 
from rhodecode import is_windows
 
from rhodecode.model.meta import Session
 
from rhodecode.model.db import User
 

	
 
import pylons.test
 

	
 

	
 
os.environ['TZ'] = 'UTC'
 
if not is_windows:
 
    time.tzset()
 

	
 
log = logging.getLogger(__name__)
 

	
 
__all__ = [
 
    'environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO',
 
    'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK', 'GIT_FORK',
 
    'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
 
    'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
 
    'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL'
 
    'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
 
    'TEST_USER_REGULAR_PASS', 'TEST_USER_REGULAR_EMAIL',
 
    'TEST_USER_REGULAR2_LOGIN', 'TEST_USER_REGULAR2_PASS',
 
    'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_GIT_REPO',
 
    'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
 
]
 

	
 
# Invoke websetup with the current config file
 
# SetupCommand('setup-app').run([config_file])
 

	
 
##RUNNING DESIRED TESTS
 
# nosetests -x rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
 
# nosetests --pdb --pdb-failures
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 

	
 
@@ -64,43 +70,83 @@ TEST_USER_REGULAR2_LOGIN = 'test_regular
 
TEST_USER_REGULAR2_PASS = 'test12'
 
TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
 

	
 
HG_REPO = 'vcs_test_hg'
 
GIT_REPO = 'vcs_test_git'
 

	
 
NEW_HG_REPO = 'vcs_test_hg_new'
 
NEW_GIT_REPO = 'vcs_test_git_new'
 

	
 
HG_FORK = 'vcs_test_hg_fork'
 
GIT_FORK = 'vcs_test_git_fork'
 

	
 
## VCS
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
THIS = os.path.abspath(os.path.dirname(__file__))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
 
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
 
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
 

	
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 
TEST_HG_REPO = jn(TESTS_TMP_PATH, 'vcs-hg')
 
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
 
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
 

	
 
TEST_DIR = tempfile.gettempdir()
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 

	
 
def get_new_dir(title):
 
    """
 
    Returns always new directory path.
 
    """
 
    from rhodecode.tests.vcs.utils import get_normalized_path
 
    name = TEST_REPO_PREFIX
 
    if title:
 
        name = '-'.join((name, title))
 
    hex = hashlib.sha1(str(time.time())).hexdigest()
 
    name = '-'.join((name, hex))
 
    path = os.path.join(TEST_DIR, name)
 
    return get_normalized_path(path)
 

	
 

	
 
PACKAGE_DIR = os.path.abspath(os.path.join(
 
    os.path.dirname(__file__), '..'))
 

	
 
TEST_USER_CONFIG_FILE = jn(THIS, 'aconfig')
 

	
 

	
 
class TestController(TestCase):
 

	
 
    def __init__(self, *args, **kwargs):
 
        wsgiapp = pylons.test.pylonsapp
 
        config = wsgiapp.config
 

	
 
        self.app = TestApp(wsgiapp)
 
        url._push_object(URLGenerator(config['routes.map'], environ))
 
        self.Session = Session
 
        self.index_location = config['app_conf']['index_dir']
 
        TestCase.__init__(self, *args, **kwargs)
 

	
 
    def log_user(self, username=TEST_USER_ADMIN_LOGIN,
 
                 password=TEST_USER_ADMIN_PASS):
 
        self._logged_username = username
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':username,
 
                                  'password':password})
 
                                 {'username': username,
 
                                  'password': password})
 

	
 
        if 'invalid user name' in response.body:
 
            self.fail('could not login using %s %s' % (username, password))
 

	
 
        self.assertEqual(response.status, '302 Found')
 
        ses = response.session['rhodecode_user']
 
        self.assertEqual(ses.get('username'), username)
 
        response = response.follow()
 
        self.assertEqual(ses.get('is_authenticated'), True)
 

	
 
        return response.session['rhodecode_user']
 

	
rhodecode/tests/functional/test_files.py
Show inline comments
 
@@ -177,24 +177,32 @@ class TestFilesController(TestController
 
<option value="2e6a2bf9356ca56df08807f4ad86d480da72a8f4">web</option>
 
</optgroup>
 
<optgroup label="Tags">
 
<option selected="selected" value="27cd5cce30c96924232dffcd24178a07ffeb5dfc">tip</option>
 
<option value="fd4bdb5e9b2a29b4393a4ac6caef48c17ee1a200">0.1.4</option>
 
<option value="17544fbfcd33ffb439e2b728b5d526b1ef30bfcf">0.1.3</option>
 
<option value="a7e60bff65d57ac3a1a1ce3b12a70f8a9e8a7720">0.1.2</option>
 
<option value="eb3a60fc964309c1a318b8dfe26aa2d1586c85ae">0.1.1</option>
 
</optgroup>""")
 

	
 
        response.mustcontain("""<span style="text-transform: uppercase;"><a href="#">branch: default</a></span>""")
 

	
 
    def test_file_annotation_git(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name=GIT_REPO,
 
                                    revision='master',
 
                                    f_path='vcs/nodes.py',
 
                                    annotate=True))
 

	
 
    def test_archival(self):
 
        self.log_user()
 

	
 
        for arch_ext, info in ARCHIVE_SPECS.items():
 
            short = '27cd5cce30c9%s' % arch_ext
 
            fname = '27cd5cce30c96924232dffcd24178a07ffeb5dfc%s' % arch_ext
 
            filename = '%s-%s' % (HG_REPO, short)
 
            response = self.app.get(url(controller='files',
 
                                        action='archivefile',
 
                                        repo_name=HG_REPO,
 
                                        fname=fname))
 

	
rhodecode/tests/functional/test_search.py
Show inline comments
 
from rhodecode.tests import *
 
import os
 
from nose.plugins.skip import SkipTest
 

	
 

	
 
class TestSearchController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'))
 

	
 
        self.assertTrue('class="small" id="q" name="q" type="text"' in
 
                        response.body)
 
        # Test response...
 

	
 
    def test_empty_search(self):
 
        if os.path.isdir(self.index_location):
 
            raise SkipTest('skipped due to existing index')
 
        else:
 
            self.log_user()
 
            response = self.app.get(url(controller='search', action='index'),
 
                                    {'q':HG_REPO})
 
                                    {'q': HG_REPO})
 
            self.assertTrue('There is no index to search in. '
 
                            'Please run whoosh indexer' in response.body)
 

	
 
    def test_normal_search(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'),
 
                                {'q':'def repo'})
 
        self.assertTrue('10 results' in response.body)
 
        self.assertTrue('Permission denied' not in response.body)
 
                                {'q': 'def repo'})
 
        response.mustcontain('10 results')
 
        response.mustcontain('Permission denied')
 

	
 
    def test_repo_search(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'),
 
                                {'q':'repository:%s def test' % HG_REPO})
 
        self.assertTrue('4 results' in response.body)
 
        self.assertTrue('Permission denied' not in response.body)
 
                                {'q': 'repository:%s def test' % HG_REPO})
 

	
 
        response.mustcontain('4 results')
 
        response.mustcontain('Permission denied')
rhodecode/tests/functional/test_summary.py
Show inline comments
 
@@ -6,61 +6,99 @@ from rhodecode.lib.utils import invalida
 
class TestSummaryController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(HG_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name=HG_REPO))
 

	
 
        #repo type
 
        response.mustcontain(
 
            """<img style="margin-bottom:2px" class="icon" """
 
            """title="Mercurial repository" alt="Mercurial """
 
            """repository" src="/images/icons/hgicon.png"/>"""
 
            """title="Mercurial repository" alt="Mercurial repository" """
 
            """src="/images/icons/hgicon.png"/>"""
 
        )
 
        response.mustcontain(
 
            """<img style="margin-bottom:2px" class="icon" """
 
            """title="public repository" alt="public """
 
            """repository" src="/images/icons/lock_open.png"/>"""
 
        )
 

	
 
        #codes stats
 
        self._enable_stats()
 

	
 
        invalidate_cache('get_repo_cached_%s' % HG_REPO)
 
        response = self.app.get(url(controller='summary', action='index',
 
                                    repo_name=HG_REPO))
 
        response.mustcontain(
 
            """var data = [["py", {"count": 42, "desc": ["Python"]}], """
 
            """["rst", {"count": 11, "desc": ["Rst"]}], """
 
            """["sh", {"count": 2, "desc": ["Bash"]}], """
 
            """["makefile", {"count": 1, "desc": ["Makefile", "Makefile"]}],"""
 
            """ ["cfg", {"count": 1, "desc": ["Ini"]}], """
 
            """["css", {"count": 1, "desc": ["Css"]}], """
 
            """["bat", {"count": 1, "desc": ["Batch"]}]];"""
 
        )
 

	
 
        # clone url...
 
        response.mustcontain("""<input style="width:80%;margin-left:105px" type="text" id="clone_url" readonly="readonly" value="http://test_admin@localhost:80/vcs_test_hg"/>""")
 
        response.mustcontain("""<input style="display:none;width:80%;margin-left:105px" type="text" id="clone_url_id" readonly="readonly" value="http://test_admin@localhost:80/_1"/>""")
 
        response.mustcontain("""<input style="width:80%%;margin-left:105px" type="text" id="clone_url" readonly="readonly" value="http://test_admin@localhost:80/%s"/>""" % HG_REPO)
 
        response.mustcontain("""<input style="display:none;width:80%%;margin-left:105px" type="text" id="clone_url_id" readonly="readonly" value="http://test_admin@localhost:80/_%s"/>""" % ID)
 

	
 
    def test_index_git(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(GIT_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name=GIT_REPO))
 

	
 
    def test_index_by_id(self):
 
        #repo type
 
        response.mustcontain(
 
            """<img style="margin-bottom:2px" class="icon" """
 
            """title="Git repository" alt="Git repository" """
 
            """src="/images/icons/giticon.png"/>"""
 
        )
 
        response.mustcontain(
 
            """<img style="margin-bottom:2px" class="icon" """
 
            """title="public repository" alt="public """
 
            """repository" src="/images/icons/lock_open.png"/>"""
 
        )
 

	
 
        # clone url...
 
        response.mustcontain("""<input style="width:80%%;margin-left:105px" type="text" id="clone_url" readonly="readonly" value="http://test_admin@localhost:80/%s"/>""" % GIT_REPO)
 
        response.mustcontain("""<input style="display:none;width:80%%;margin-left:105px" type="text" id="clone_url_id" readonly="readonly" value="http://test_admin@localhost:80/_%s"/>""" % ID)
 

	
 
    def test_index_by_id_hg(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(HG_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='_%s' % ID))
 

	
 
        #repo type
 
        response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
 
                        """title="Mercurial repository" alt="Mercurial """
 
                        """repository" src="/images/icons/hgicon.png"/>""")
 
        response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
 
                        """title="public repository" alt="public """
 
                        """repository" src="/images/icons/lock_open.png"/>""")
 

	
 
    def test_index_by_id_git(self):
 
        self.log_user()
 
        ID = Repository.get_by_repo_name(GIT_REPO).repo_id
 
        response = self.app.get(url(controller='summary',
 
                                    action='index',
 
                                    repo_name='_%s' % ID))
 

	
 
        #repo type
 
        response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
 
                        """title="Git repository" alt="Git """
 
                        """repository" src="/images/icons/hgicon.png"/>""")
 
        response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
 
                        """title="public repository" alt="public """
 
                        """repository" src="/images/icons/lock_open.png"/>""")
 

	
 
    def _enable_stats(self):
 
        r = Repository.get_by_repo_name(HG_REPO)
 
        r.enable_statistics = True
 
        self.Session.add(r)
 
        self.Session.commit()
rhodecode/tests/vcs/__init__.py
Show inline comments
 
new file 100644
 
"""
 
Unit tests for vcs_ library.
 

	
 
In order to run tests we need to prepare our environment first. Tests would be
 
run for each engine listed at ``conf.SCM_TESTS`` - keys are aliases from
 
``vcs.backends.BACKENDS``.
 

	
 
For each SCM we run tests for, we need some repository. We would use
 
repositories location from system environment variables or test suite defaults
 
- see ``conf`` module for more detail. We simply try to check if repository at
 
certain location exists, if not we would try to fetch them. At ``test_vcs`` or
 
``test_common`` we run unit tests common for each repository type and for
 
example specific mercurial tests are located at ``test_hg`` module.
 

	
 
Oh, and tests are run with ``unittest.collector`` wrapped by ``collector``
 
function at ``tests/__init__.py``.
 

	
 
.. _vcs: http://bitbucket.org/marcinkuzminski/vcs
 
.. _unittest: http://pypi.python.org/pypi/unittest
 

	
 
"""
 
import os
 
from rhodecode.lib import vcs
 
from rhodecode.lib.vcs.utils.compat import unittest
 
from rhodecode.tests import *
 
from utils import VCSTestError, SCMFetcher
 

	
 

	
 
def setup_package():
 
    """
 
    Prepares whole package for tests which mainly means it would try to fetch
 
    test repositories or use already existing ones.
 
    """
 
    fetchers = {
 
        'hg': {
 
            'alias': 'hg',
 
            'test_repo_path': TEST_HG_REPO,
 
            'remote_repo': HG_REMOTE_REPO,
 
            'clone_cmd': 'hg clone',
 
        },
 
        'git': {
 
            'alias': 'git',
 
            'test_repo_path': TEST_GIT_REPO,
 
            'remote_repo': GIT_REMOTE_REPO,
 
            'clone_cmd': 'git clone --bare',
 
        },
 
    }
 
    try:
 
        for scm, fetcher_info in fetchers.items():
 
            fetcher = SCMFetcher(**fetcher_info)
 
            fetcher.setup()
 
    except VCSTestError, err:
 
        raise RuntimeError(str(err))
 

	
 
start_dir = os.path.abspath(os.path.dirname(__file__))
 
unittest.defaultTestLoader.discover(start_dir)
rhodecode/tests/vcs/aconfig
Show inline comments
 
new file 100644
 
[user]
 
name = Foo Bar
 
email = foo.bar@example.com
 

	
 
[ui]
 
username = Foo Bar foo.bar@example.com
 

	
 
[universal]
 
foo = bar
 

	
rhodecode/tests/vcs/base.py
Show inline comments
 
new file 100644
 
"""
 
Module providing backend independent mixin class. It requires that
 
InMemoryChangeset class is working properly at backend class.
 
"""
 
import os
 
from rhodecode.lib import vcs
 
import time
 
import shutil
 
import datetime
 
from rhodecode.lib.vcs.utils.compat import unittest
 

	
 
from conf import SCM_TESTS, get_new_dir
 

	
 
from rhodecode.lib.vcs.nodes import FileNode
 

	
 

	
 
class BackendTestMixin(object):
 
    """
 
    This is a backend independent test case class which should be created
 
    with ``type`` method.
 

	
 
    It is required to set following attributes at subclass:
 

	
 
    - ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
 
    - ``repo_path``: path to the repository which would be created for set of
 
      tests
 
    - ``recreate_repo_per_test``: If set to ``False``, repo would NOT be created
 
      before every single test. Defaults to ``True``.
 
    """
 
    recreate_repo_per_test = True
 

	
 
    @classmethod
 
    def get_backend(cls):
 
        return vcs.get_backend(cls.backend_alias)
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        commits = [
 
            {
 
                'message': u'Initial commit',
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar', content='Foobar'),
 
                    FileNode('foobar2', content='Foobar II'),
 
                    FileNode('foo/bar/baz', content='baz here!'),
 
                ],
 
            },
 
            {
 
                'message': u'Changes...',
 
                'author': u'Jane Doe <jane.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 21),
 
                'added': [
 
                    FileNode('some/new.txt', content='news...'),
 
                ],
 
                'changed': [
 
                    FileNode('foobar', 'Foobar I'),
 
                ],
 
                'removed': [],
 
            },
 
        ]
 
        return commits
 

	
 
    @classmethod
 
    def setUpClass(cls):
 
        Backend = cls.get_backend()
 
        cls.backend_class = Backend
 
        cls.repo_path = get_new_dir(str(time.time()))
 
        cls.repo = Backend(cls.repo_path, create=True)
 
        cls.imc = cls.repo.in_memory_changeset
 

	
 
        for commit in cls._get_commits():
 
            for node in commit.get('added', []):
 
                cls.imc.add(FileNode(node.path, content=node.content))
 
            for node in commit.get('changed', []):
 
                cls.imc.change(FileNode(node.path, content=node.content))
 
            for node in commit.get('removed', []):
 
                cls.imc.remove(FileNode(node.path))
 
            
 
            cls.tip = cls.imc.commit(message=unicode(commit['message']),
 
                                     author=unicode(commit['author']),
 
                                     date=commit['date'])
 

	
 
    @classmethod
 
    def tearDownClass(cls):
 
        if not getattr(cls, 'recreate_repo_per_test', False) and \
 
            'VCS_REMOVE_TEST_DIRS' in os.environ:
 
            shutil.rmtree(cls.repo_path)
 

	
 
    def setUp(self):
 
        if getattr(self, 'recreate_repo_per_test', False):
 
            self.__class__.setUpClass()
 

	
 
    def tearDown(self):
 
        if getattr(self, 'recreate_repo_per_test', False) and \
 
            'VCS_REMOVE_TEST_DIRS' in os.environ:
 
            shutil.rmtree(self.repo_path)
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s base backend test' % alias).title().split())
 
    bases = (BackendTestMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
rhodecode/tests/vcs/conf.py
Show inline comments
 
new file 100644
 
"""
 
Unit tests configuration module for vcs.
 
"""
 
import os
 
import time
 
import hashlib
 
import tempfile
 
import datetime
 

	
 
from utils import get_normalized_path
 
from os.path import join as jn
 

	
 
__all__ = (
 
    'TEST_HG_REPO', 'TEST_GIT_REPO', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO',
 
    'SCM_TESTS',
 
)
 

	
 
SCM_TESTS = ['hg', 'git']
 
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
 

	
 
THIS = os.path.abspath(os.path.dirname(__file__))
 

	
 
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
 

	
 
TEST_TMP_PATH = os.environ.get('VCS_TEST_ROOT', '/tmp')
 
TEST_GIT_REPO = os.environ.get('VCS_TEST_GIT_REPO',
 
                              jn(TEST_TMP_PATH, 'vcs-git'))
 
TEST_GIT_REPO_CLONE = os.environ.get('VCS_TEST_GIT_REPO_CLONE',
 
                            jn(TEST_TMP_PATH, 'vcsgitclone%s' % uniq_suffix))
 
TEST_GIT_REPO_PULL = os.environ.get('VCS_TEST_GIT_REPO_PULL',
 
                            jn(TEST_TMP_PATH, 'vcsgitpull%s' % uniq_suffix))
 

	
 
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
 
TEST_HG_REPO = os.environ.get('VCS_TEST_HG_REPO',
 
                              jn(TEST_TMP_PATH, 'vcs-hg'))
 
TEST_HG_REPO_CLONE = os.environ.get('VCS_TEST_HG_REPO_CLONE',
 
                              jn(TEST_TMP_PATH, 'vcshgclone%s' % uniq_suffix))
 
TEST_HG_REPO_PULL = os.environ.get('VCS_TEST_HG_REPO_PULL',
 
                              jn(TEST_TMP_PATH, 'vcshgpull%s' % uniq_suffix))
 

	
 
TEST_DIR = os.environ.get('VCS_TEST_ROOT', tempfile.gettempdir())
 
TEST_REPO_PREFIX = 'vcs-test'
 

	
 

	
 
def get_new_dir(title):
 
    """
 
    Returns always new directory path.
 
    """
 
    name = TEST_REPO_PREFIX
 
    if title:
 
        name = '-'.join((name, title))
 
    hex = hashlib.sha1(str(time.time())).hexdigest()
 
    name = '-'.join((name, hex))
 
    path = os.path.join(TEST_DIR, name)
 
    return get_normalized_path(path)
 

	
 

	
 
PACKAGE_DIR = os.path.abspath(os.path.join(
 
    os.path.dirname(__file__), '..'))
 

	
 
TEST_USER_CONFIG_FILE = jn(THIS, 'aconfig')
rhodecode/tests/vcs/test_archives.py
Show inline comments
 
new file 100644
 
from __future__ import with_statement
 

	
 
import os
 
import tarfile
 
import zipfile
 
import datetime
 
import tempfile
 
import StringIO
 
from base import BackendTestMixin
 
from conf import SCM_TESTS
 
from rhodecode.lib.vcs.exceptions import VCSError
 
from rhodecode.lib.vcs.nodes import FileNode
 
from rhodecode.lib.vcs.utils.compat import unittest
 

	
 

	
 
class ArchivesTestCaseMixin(BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('%d/file_%d.txt' % (x, x),
 
                        content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_archive_zip(self):
 
        path = tempfile.mkstemp()[1]
 
        with open(path, 'wb') as f:
 
            self.tip.fill_archive(stream=f, kind='zip', prefix='repo')
 
        out = zipfile.ZipFile(path)
 

	
 
        for x in xrange(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            decompressed = StringIO.StringIO()
 
            decompressed.write(out.read('repo/' + node_path))
 
            self.assertEqual(
 
                decompressed.getvalue(),
 
                self.tip.get_node(node_path).content)
 

	
 
    def test_archive_tgz(self):
 
        path = tempfile.mkstemp()[1]
 
        with open(path, 'wb') as f:
 
            self.tip.fill_archive(stream=f, kind='tgz', prefix='repo')
 
        outdir = tempfile.mkdtemp()
 

	
 
        outfile = tarfile.open(path, 'r|gz')
 
        outfile.extractall(outdir)
 

	
 
        for x in xrange(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            self.assertEqual(
 
                open(os.path.join(outdir, 'repo/' + node_path)).read(),
 
                self.tip.get_node(node_path).content)
 

	
 
    def test_archive_tbz2(self):
 
        path = tempfile.mkstemp()[1]
 
        with open(path, 'w+b') as f:
 
            self.tip.fill_archive(stream=f, kind='tbz2', prefix='repo')
 
        outdir = tempfile.mkdtemp()
 

	
 
        outfile = tarfile.open(path, 'r|bz2')
 
        outfile.extractall(outdir)
 

	
 
        for x in xrange(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            self.assertEqual(
 
                open(os.path.join(outdir, 'repo/' + node_path)).read(),
 
                self.tip.get_node(node_path).content)
 

	
 
    def test_archive_default_stream(self):
 
        tmppath = tempfile.mkstemp()[1]
 
        with open(tmppath, 'w') as stream:
 
            self.tip.fill_archive(stream=stream)
 
        mystream = StringIO.StringIO()
 
        self.tip.fill_archive(stream=mystream)
 
        mystream.seek(0)
 
        with open(tmppath, 'r') as f:
 
            self.assertEqual(f.read(), mystream.read())
 

	
 
    def test_archive_wrong_kind(self):
 
        with self.assertRaises(VCSError):
 
            self.tip.fill_archive(kind='wrong kind')
 

	
 
    def test_archive_empty_prefix(self):
 
        with self.assertRaises(VCSError):
 
            self.tip.fill_archive(prefix='')
 

	
 
    def test_archive_prefix_with_leading_slash(self):
 
        with self.assertRaises(VCSError):
 
            self.tip.fill_archive(prefix='/any')
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s archive test' % alias).title().split())
 
    bases = (ArchivesTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 
if __name__ == '__main__':
 
    unittest.main()
rhodecode/tests/vcs/test_branches.py
Show inline comments
 
new file 100644
 
from __future__ import with_statement
 

	
 
from rhodecode.lib import vcs
 
import datetime
 
from rhodecode.lib.vcs.utils.compat import unittest
 

	
 
from base import BackendTestMixin
 
from conf import SCM_TESTS
 

	
 
from rhodecode.lib.vcs.nodes import FileNode
 

	
 

	
 
class BranchesTestCaseMixin(BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        commits = [
 
            {
 
                'message': 'Initial commit',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar', content='Foobar'),
 
                    FileNode('foobar2', content='Foobar II'),
 
                    FileNode('foo/bar/baz', content='baz here!'),
 
                ],
 
            },
 
            {
 
                'message': 'Changes...',
 
                'author': 'Jane Doe <jane.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 21),
 
                'added': [
 
                    FileNode('some/new.txt', content='news...'),
 
                ],
 
                'changed': [
 
                    FileNode('foobar', 'Foobar I'),
 
                ],
 
                'removed': [],
 
            },
 
        ]
 
        return commits
 

	
 
    def test_simple(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(tip.date, datetime.datetime(2010, 1, 1, 21))
 

	
 
    def test_new_branch(self):
 
        # This check must not be removed to ensure the 'branches' LazyProperty
 
        # gets hit *before* the new 'foobar' branch got created:
 
        self.assertFalse('foobar' in self.repo.branches)
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 
        self.assertTrue('foobar' in self.repo.branches)
 
        self.assertEqual(foobar_tip.branch, 'foobar')
 

	
 
    def test_new_head(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
            parents=[tip],
 
        )
 
        self.imc.change(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\nand more...\n'))
 
        newtip = self.imc.commit(
 
            message=u'At default branch',
 
            author=u'joe',
 
            branch=foobar_tip.branch,
 
            parents=[foobar_tip],
 
        )
 

	
 
        newest_tip = self.imc.commit(
 
            message=u'Merged with %s' % foobar_tip.raw_id,
 
            author=u'joe',
 
            branch=self.backend_class.DEFAULT_BRANCH_NAME,
 
            parents=[newtip, foobar_tip],
 
        )
 

	
 
        self.assertEqual(newest_tip.branch,
 
            self.backend_class.DEFAULT_BRANCH_NAME)
 

	
 
    def test_branch_with_slash_in_name(self):
 
        self.imc.add(vcs.nodes.FileNode('extrafile', content='Some data\n'))
 
        self.imc.commit(u'Branch with a slash!', author=u'joe',
 
            branch='issue/123')
 
        self.assertTrue('issue/123' in self.repo.branches)
 

	
 
    def test_branch_with_slash_in_name_and_similar_without(self):
 
        self.imc.add(vcs.nodes.FileNode('extrafile', content='Some data\n'))
 
        self.imc.commit(u'Branch with a slash!', author=u'joe',
 
            branch='issue/123')
 
        self.imc.add(vcs.nodes.FileNode('extrafile II', content='Some data\n'))
 
        self.imc.commit(u'Branch without a slash...', author=u'joe',
 
            branch='123')
 
        self.assertIn('issue/123', self.repo.branches)
 
        self.assertIn('123', self.repo.branches)
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s branches test' % alias).title().split())
 
    bases = (BranchesTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
rhodecode/tests/vcs/test_changesets.py
Show inline comments
 
new file 100644
 
from __future__ import with_statement
 

	
 
from rhodecode.lib import vcs
 
import datetime
 
from base import BackendTestMixin
 
from conf import SCM_TESTS
 
from rhodecode.lib.vcs.backends.base import BaseChangeset
 
from rhodecode.lib.vcs.nodes import FileNode
 
from rhodecode.lib.vcs.exceptions import BranchDoesNotExistError
 
from rhodecode.lib.vcs.exceptions import ChangesetDoesNotExistError
 
from rhodecode.lib.vcs.exceptions import RepositoryError
 
from rhodecode.lib.vcs.utils.compat import unittest
 

	
 

	
 
class TestBaseChangeset(unittest.TestCase):
 

	
 
    def test_as_dict(self):
 
        changeset = BaseChangeset()
 
        changeset.id = 'ID'
 
        changeset.raw_id = 'RAW_ID'
 
        changeset.short_id = 'SHORT_ID'
 
        changeset.revision = 1009
 
        changeset.date = datetime.datetime(2011, 1, 30, 1, 45)
 
        changeset.message = 'Message of a commit'
 
        changeset.author = 'Joe Doe <joe.doe@example.com>'
 
        changeset.added = [FileNode('foo/bar/baz'), FileNode('foobar')]
 
        changeset.changed = []
 
        changeset.removed = []
 
        self.assertEqual(changeset.as_dict(), {
 
            'id': 'ID',
 
            'raw_id': 'RAW_ID',
 
            'short_id': 'SHORT_ID',
 
            'revision': 1009,
 
            'date': datetime.datetime(2011, 1, 30, 1, 45),
 
            'message': 'Message of a commit',
 
            'author': {
 
                'name': 'Joe Doe',
 
                'email': 'joe.doe@example.com',
 
            },
 
            'added': ['foo/bar/baz', 'foobar'],
 
            'changed': [],
 
            'removed': [],
 
        })
 

	
 
class ChangesetsWithCommitsTestCaseixin(BackendTestMixin):
 
    recreate_repo_per_test = True
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_new_branch(self):
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 
        self.assertTrue('foobar' in self.repo.branches)
 
        self.assertEqual(foobar_tip.branch, 'foobar')
 
        # 'foobar' should be the only branch that contains the new commit
 
        self.assertNotEqual(*self.repo.branches.values())
 

	
 
    def test_new_head_in_default_branch(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
            parents=[tip],
 
        )
 
        self.imc.change(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\nand more...\n'))
 
        newtip = self.imc.commit(
 
            message=u'At default branch',
 
            author=u'joe',
 
            branch=foobar_tip.branch,
 
            parents=[foobar_tip],
 
        )
 

	
 
        newest_tip = self.imc.commit(
 
            message=u'Merged with %s' % foobar_tip.raw_id,
 
            author=u'joe',
 
            branch=self.backend_class.DEFAULT_BRANCH_NAME,
 
            parents=[newtip, foobar_tip],
 
        )
 

	
 
        self.assertEqual(newest_tip.branch,
 
            self.backend_class.DEFAULT_BRANCH_NAME)
 

	
 
    def test_get_changesets_respects_branch_name(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        doc_changeset = self.imc.commit(
 
            message=u'New branch: docs',
 
            author=u'joe',
 
            branch='docs',
 
        )
 
        self.imc.add(vcs.nodes.FileNode('newfile', content=''))
 
        self.imc.commit(
 
            message=u'Back in default branch',
 
            author=u'joe',
 
            parents=[tip],
 
        )
 
        default_branch_changesets = self.repo.get_changesets(
 
            branch_name=self.repo.DEFAULT_BRANCH_NAME)
 
        self.assertNotIn(doc_changeset, default_branch_changesets)
 

	
 

	
 
class ChangesetsTestCaseMixin(BackendTestMixin):
 
    recreate_repo_per_test = False
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': u'Commit %d' % x,
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_simple(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(tip.date, datetime.datetime(2010, 1, 3, 20))
 

	
 
    def test_get_changesets_is_ordered_by_date(self):
 
        changesets = list(self.repo.get_changesets())
 
        ordered_by_date = sorted(changesets,
 
            key=lambda cs: cs.date)
 
        self.assertItemsEqual(changesets, ordered_by_date)
 

	
 
    def test_get_changesets_respects_start(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        self.assertEqual(len(changesets), 4)
 

	
 
    def test_get_changesets_numerical_id_respects_start(self):
 
        second_id = 1
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        self.assertEqual(len(changesets), 4)
 

	
 
    def test_get_changesets_includes_start_changeset(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        self.assertEqual(changesets[0].raw_id, second_id)
 

	
 
    def test_get_changesets_respects_end(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        self.assertEqual(changesets[-1].raw_id, second_id)
 
        self.assertEqual(len(changesets), 2)
 

	
 
    def test_get_changesets_numerical_id_respects_end(self):
 
        second_id = 1
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        self.assertEqual(changesets.index(changesets[-1]), second_id)
 
        self.assertEqual(len(changesets), 2)
 

	
 
    def test_get_changesets_respects_both_start_and_end(self):
 
        second_id = self.repo.revisions[1]
 
        third_id = self.repo.revisions[2]
 
        changesets = list(self.repo.get_changesets(start=second_id,
 
            end=third_id))
 
        self.assertEqual(len(changesets), 2)
 

	
 
    def test_get_changesets_numerical_id_respects_both_start_and_end(self):
 
        changesets = list(self.repo.get_changesets(start=2, end=3))
 
        self.assertEqual(len(changesets), 2)
 

	
 
    def test_get_changesets_includes_end_changeset(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        self.assertEqual(changesets[-1].raw_id, second_id)
 

	
 
    def test_get_changesets_respects_start_date(self):
 
        start_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(start_date=start_date):
 
            self.assertGreaterEqual(cs.date, start_date)
 

	
 
    def test_get_changesets_respects_end_date(self):
 
        end_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(end_date=end_date):
 
            self.assertLessEqual(cs.date, end_date)
 

	
 
    def test_get_changesets_respects_reverse(self):
 
        changesets_id_list = [cs.raw_id for cs in
 
            self.repo.get_changesets(reverse=True)]
 
        self.assertItemsEqual(changesets_id_list, reversed(self.repo.revisions))
 

	
 
    def test_get_filenodes_generator(self):
 
        tip = self.repo.get_changeset()
 
        filepaths = [node.path for node in tip.get_filenodes_generator()]
 
        self.assertItemsEqual(filepaths, ['file_%d.txt' % x for x in xrange(5)])
 

	
 
    def test_size(self):
 
        tip = self.repo.get_changeset()
 
        size = 5 * len('Foobar N') # Size of 5 files
 
        self.assertEqual(tip.size, size)
 

	
 
    def test_author(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(tip.author, u'Joe Doe <joe.doe@example.com>')
 

	
 
    def test_author_name(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(tip.author_name, u'Joe Doe')
 

	
 
    def test_author_email(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(tip.author_email, u'joe.doe@example.com')
 

	
 
    def test_get_changesets_raise_changesetdoesnotexist_for_wrong_start(self):
 
        with self.assertRaises(ChangesetDoesNotExistError):
 
            list(self.repo.get_changesets(start='foobar'))
 

	
 
    def test_get_changesets_raise_changesetdoesnotexist_for_wrong_end(self):
 
        with self.assertRaises(ChangesetDoesNotExistError):
 
            list(self.repo.get_changesets(end='foobar'))
 

	
 
    def test_get_changesets_raise_branchdoesnotexist_for_wrong_branch_name(self):
 
        with self.assertRaises(BranchDoesNotExistError):
 
            list(self.repo.get_changesets(branch_name='foobar'))
 

	
 
    def test_get_changesets_raise_repositoryerror_for_wrong_start_end(self):
 
        start = self.repo.revisions[-1]
 
        end = self.repo.revisions[0]
 
        with self.assertRaises(RepositoryError):
 
            list(self.repo.get_changesets(start=start, end=end))
 

	
 
    def test_get_changesets_numerical_id_reversed(self):
 
        with self.assertRaises(RepositoryError):
 
            [x for x in self.repo.get_changesets(start=3, end=2)]
 

	
 
    def test_get_changesets_numerical_id_respects_both_start_and_end_last(self):
 
        with self.assertRaises(RepositoryError):
 
            last = len(self.repo.revisions)
 
            list(self.repo.get_changesets(start=last-1, end=last-2))
 

	
 
    def test_get_changesets_numerical_id_last_zero_error(self):
 
        with self.assertRaises(RepositoryError):
 
            last = len(self.repo.revisions)
 
            list(self.repo.get_changesets(start=last-1, end=0))
 

	
 

	
 
class ChangesetsChangesTestCaseMixin(BackendTestMixin):
 
    recreate_repo_per_test = False
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': u'Initial',
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foo/bar', content='foo'),
 
                    FileNode('foobar', content='foo'),
 
                    FileNode('qwe', content='foo'),
 
                ],
 
            },
 
            {
 
                'message': u'Massive changes',
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [FileNode('fallout', content='War never changes')],
 
                'changed': [
 
                    FileNode('foo/bar', content='baz'),
 
                    FileNode('foobar', content='baz'),
 
                ],
 
                'removed': [FileNode('qwe')],
 
            },
 
        ]
 

	
 
    def test_initial_commit(self):
 
        changeset = self.repo.get_changeset(0)
 
        self.assertItemsEqual(changeset.added, [
 
            changeset.get_node('foo/bar'),
 
            changeset.get_node('foobar'),
 
            changeset.get_node('qwe'),
 
        ])
 
        self.assertItemsEqual(changeset.changed, [])
 
        self.assertItemsEqual(changeset.removed, [])
 

	
 
    def test_head_added(self):
 
        changeset = self.repo.get_changeset()
 
        self.assertItemsEqual(changeset.added, [
 
            changeset.get_node('fallout'),
 
        ])
 
        self.assertItemsEqual(changeset.changed, [
 
            changeset.get_node('foo/bar'),
 
            changeset.get_node('foobar'),
 
        ])
 
        self.assertEqual(len(changeset.removed), 1)
 
        self.assertEqual(list(changeset.removed)[0].path, 'qwe')
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    # tests with additional commits
 
    cls_name = ''.join(('%s changesets with commits test' % alias).title().split())
 
    bases = (ChangesetsWithCommitsTestCaseixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 
    # tests without additional commits
 
    cls_name = ''.join(('%s changesets test' % alias).title().split())
 
    bases = (ChangesetsTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 
    # tests changes
 
    cls_name = ''.join(('%s changesets changes test' % alias).title().split())
 
    bases = (ChangesetsChangesTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
rhodecode/tests/vcs/test_filenodes_unicode_path.py
Show inline comments
 
new file 100644
 
# encoding: utf8
 

	
 
from __future__ import with_statement
 

	
 
import datetime
 
from rhodecode.lib.vcs.nodes import FileNode
 
from rhodecode.lib.vcs.utils.compat import unittest
 
from test_inmemchangesets import BackendBaseTestCase
 
from conf import SCM_TESTS
 

	
 

	
 
class FileNodeUnicodePathTestsMixin(object):
 

	
 
    fname = 'ąśðąęłąć.txt'
 
    ufname = (fname).decode('utf-8')
 

	
 
    def get_commits(self):
 
        self.nodes = [
 
            FileNode(self.fname, content='Foobar'),
 
        ]
 

	
 
        commits = [
 
            {
 
                'message': 'Initial commit',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': self.nodes,
 
            },
 
        ]
 
        return commits
 

	
 
    def test_filenode_path(self):
 
        node = self.tip.get_node(self.fname)
 
        unode = self.tip.get_node(self.ufname)
 
        self.assertEqual(node, unode)
 

	
 

	
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s file node unicode path test' % alias).title()
 
        .split())
 
    bases = (FileNodeUnicodePathTestsMixin, BackendBaseTestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
rhodecode/tests/vcs/test_getitem.py
Show inline comments
 
new file 100644
 
from __future__ import with_statement
 

	
 
import datetime
 
from base import BackendTestMixin
 
from conf import SCM_TESTS
 
from rhodecode.lib.vcs.nodes import FileNode
 
from rhodecode.lib.vcs.utils.compat import unittest
 

	
 

	
 
class GetitemTestCaseMixin(BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test__getitem__last_item_is_tip(self):
 
        self.assertEqual(self.repo[-1], self.repo.get_changeset())
 

	
 
    def test__getitem__returns_correct_items(self):
 
        changesets = [self.repo[x] for x in xrange(len(self.repo.revisions))]
 
        self.assertEqual(changesets, list(self.repo.get_changesets()))
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s getitem test' % alias).title().split())
 
    bases = (GetitemTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
rhodecode/tests/vcs/test_getslice.py
Show inline comments
 
new file 100644
 
from __future__ import with_statement
 

	
 
import datetime
 
from base import BackendTestMixin
 
from conf import SCM_TESTS
 
from rhodecode.lib.vcs.nodes import FileNode
 
from rhodecode.lib.vcs.utils.compat import unittest
 

	
 

	
 
class GetsliceTestCaseMixin(BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test__getslice__last_item_is_tip(self):
 
        self.assertEqual(list(self.repo[-1:])[0], self.repo.get_changeset())
 

	
 
    def test__getslice__respects_start_index(self):
 
        self.assertEqual(list(self.repo[2:]),
 
            [self.repo.get_changeset(rev) for rev in self.repo.revisions[2:]])
 

	
 
    def test__getslice__respects_negative_start_index(self):
 
        self.assertEqual(list(self.repo[-2:]),
 
            [self.repo.get_changeset(rev) for rev in self.repo.revisions[-2:]])
 

	
 
    def test__getslice__respects_end_index(self):
 
        self.assertEqual(list(self.repo[:2]),
 
            [self.repo.get_changeset(rev) for rev in self.repo.revisions[:2]])
 

	
 
    def test__getslice__respects_negative_end_index(self):
 
        self.assertEqual(list(self.repo[:-2]),
 
            [self.repo.get_changeset(rev) for rev in self.repo.revisions[:-2]])
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s getslice test' % alias).title().split())
 
    bases = (GetsliceTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
rhodecode/tests/vcs/test_git.py
Show inline comments
 
new file 100644
 
from __future__ import with_statement
 

	
 
import os
 
import mock
 
import datetime
 
from rhodecode.lib.vcs.backends.git import GitRepository, GitChangeset
 
from rhodecode.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError
 
from rhodecode.lib.vcs.nodes import NodeKind, FileNode, DirNode, NodeState
 
from rhodecode.lib.vcs.utils.compat import unittest
 
from rhodecode.tests.vcs.base import BackendTestMixin
 
from conf import TEST_GIT_REPO, TEST_GIT_REPO_CLONE, get_new_dir
 

	
 

	
 
class GitRepositoryTest(unittest.TestCase):
 

	
 
    def __check_for_existing_repo(self):
 
        if os.path.exists(TEST_GIT_REPO_CLONE):
 
            self.fail('Cannot test git clone repo as location %s already '
 
                      'exists. You should manually remove it first.'
 
                      % TEST_GIT_REPO_CLONE)
 

	
 
    def setUp(self):
 
        self.repo = GitRepository(TEST_GIT_REPO)
 

	
 
    def test_wrong_repo_path(self):
 
        wrong_repo_path = '/tmp/errorrepo'
 
        self.assertRaises(RepositoryError, GitRepository, wrong_repo_path)
 

	
 
    def test_repo_clone(self):
 
        self.__check_for_existing_repo()
 
        repo = GitRepository(TEST_GIT_REPO)
 
        repo_clone = GitRepository(TEST_GIT_REPO_CLONE,
 
            src_url=TEST_GIT_REPO, create=True, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        # Checking hashes of changesets should be enough
 
        for changeset in repo.get_changesets():
 
            raw_id = changeset.raw_id
 
            self.assertEqual(raw_id, repo_clone.get_changeset(raw_id).raw_id)
 

	
 
    def test_repo_clone_without_create(self):
 
        self.assertRaises(RepositoryError, GitRepository,
 
            TEST_GIT_REPO_CLONE + '_wo_create', src_url=TEST_GIT_REPO)
 

	
 
    def test_repo_clone_with_update(self):
 
        repo = GitRepository(TEST_GIT_REPO)
 
        clone_path = TEST_GIT_REPO_CLONE + '_with_update'
 
        repo_clone = GitRepository(clone_path,
 
            create=True, src_url=TEST_GIT_REPO, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 

	
 
        #check if current workdir was updated
 
        fpath = os.path.join(clone_path, 'MANIFEST.in')
 
        self.assertEqual(True, os.path.isfile(fpath),
 
            'Repo was cloned and updated but file %s could not be found'
 
            % fpath)
 

	
 
    def test_repo_clone_without_update(self):
 
        repo = GitRepository(TEST_GIT_REPO)
 
        clone_path = TEST_GIT_REPO_CLONE + '_without_update'
 
        repo_clone = GitRepository(clone_path,
 
            create=True, src_url=TEST_GIT_REPO, update_after_clone=False)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        #check if current workdir was *NOT* updated
 
        fpath = os.path.join(clone_path, 'MANIFEST.in')
 
        # Make sure it's not bare repo
 
        self.assertFalse(repo_clone._repo.bare)
 
        self.assertEqual(False, os.path.isfile(fpath),
 
            'Repo was cloned and updated but file %s was found'
 
            % fpath)
 

	
 
    def test_repo_clone_into_bare_repo(self):
 
        repo = GitRepository(TEST_GIT_REPO)
 
        clone_path = TEST_GIT_REPO_CLONE + '_bare.git'
 
        repo_clone = GitRepository(clone_path, create=True,
 
            src_url=repo.path, bare=True)
 
        self.assertTrue(repo_clone._repo.bare)
 

	
 
    def test_create_repo_is_not_bare_by_default(self):
 
        repo = GitRepository(get_new_dir('not-bare-by-default'), create=True)
 
        self.assertFalse(repo._repo.bare)
 

	
 
    def test_create_bare_repo(self):
 
        repo = GitRepository(get_new_dir('bare-repo'), create=True, bare=True)
 
        self.assertTrue(repo._repo.bare)
 

	
 
    def test_revisions(self):
 
        # there are 112 revisions (by now)
 
        # so we can assume they would be available from now on
 
        subset = set([
 
            'c1214f7e79e02fc37156ff215cd71275450cffc3',
 
            '38b5fe81f109cb111f549bfe9bb6b267e10bc557',
 
            'fa6600f6848800641328adbf7811fd2372c02ab2',
 
            '102607b09cdd60e2793929c4f90478be29f85a17',
 
            '49d3fd156b6f7db46313fac355dca1a0b94a0017',
 
            '2d1028c054665b962fa3d307adfc923ddd528038',
 
            'd7e0d30fbcae12c90680eb095a4f5f02505ce501',
 
            'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
 
            'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
 
            '8430a588b43b5d6da365400117c89400326e7992',
 
            'd955cd312c17b02143c04fa1099a352b04368118',
 
            'f67b87e5c629c2ee0ba58f85197e423ff28d735b',
 
            'add63e382e4aabc9e1afdc4bdc24506c269b7618',
 
            'f298fe1189f1b69779a4423f40b48edf92a703fc',
 
            'bd9b619eb41994cac43d67cf4ccc8399c1125808',
 
            '6e125e7c890379446e98980d8ed60fba87d0f6d1',
 
            'd4a54db9f745dfeba6933bf5b1e79e15d0af20bd',
 
            '0b05e4ed56c802098dfc813cbe779b2f49e92500',
 
            '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
 
            '45223f8f114c64bf4d6f853e3c35a369a6305520',
 
            'ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
 
            'f5ea29fc42ef67a2a5a7aecff10e1566699acd68',
 
            '27d48942240f5b91dfda77accd2caac94708cc7d',
 
            '622f0eb0bafd619d2560c26f80f09e3b0b0d78af',
 
            'e686b958768ee96af8029fe19c6050b1a8dd3b2b'])
 
        self.assertTrue(subset.issubset(set(self.repo.revisions)))
 

	
 

	
 

	
 
    def test_slicing(self):
 
        #4 1 5 10 95
 
        for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
 
                                 (10, 20, 10), (5, 100, 95)]:
 
            revs = list(self.repo[sfrom:sto])
 
            self.assertEqual(len(revs), size)
 
            self.assertEqual(revs[0], self.repo.get_changeset(sfrom))
 
            self.assertEqual(revs[-1], self.repo.get_changeset(sto - 1))
 

	
 

	
 
    def test_branches(self):
 
        # TODO: Need more tests here
 
        # Removed (those are 'remotes' branches for cloned repo)
 
        #self.assertTrue('master' in self.repo.branches)
 
        #self.assertTrue('gittree' in self.repo.branches)
 
        #self.assertTrue('web-branch' in self.repo.branches)
 
        for name, id in self.repo.branches.items():
 
            self.assertTrue(isinstance(
 
                self.repo.get_changeset(id), GitChangeset))
 

	
 
    def test_tags(self):
 
        # TODO: Need more tests here
 
        self.assertTrue('v0.1.1' in self.repo.tags)
 
        self.assertTrue('v0.1.2' in self.repo.tags)
 
        for name, id in self.repo.tags.items():
 
            self.assertTrue(isinstance(
 
                self.repo.get_changeset(id), GitChangeset))
 

	
 
    def _test_single_changeset_cache(self, revision):
 
        chset = self.repo.get_changeset(revision)
 
        self.assertTrue(revision in self.repo.changesets)
 
        self.assertTrue(chset is self.repo.changesets[revision])
 

	
 
    def test_initial_changeset(self):
 
        id = self.repo.revisions[0]
 
        init_chset = self.repo.get_changeset(id)
 
        self.assertEqual(init_chset.message, 'initial import\n')
 
        self.assertEqual(init_chset.author,
 
            'Marcin Kuzminski <marcin@python-blog.com>')
 
        for path in ('vcs/__init__.py',
 
                     'vcs/backends/BaseRepository.py',
 
                     'vcs/backends/__init__.py'):
 
            self.assertTrue(isinstance(init_chset.get_node(path), FileNode))
 
        for path in ('', 'vcs', 'vcs/backends'):
 
            self.assertTrue(isinstance(init_chset.get_node(path), DirNode))
 

	
 
        self.assertRaises(NodeDoesNotExistError, init_chset.get_node, path='foobar')
 

	
 
        node = init_chset.get_node('vcs/')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.DIR)
 

	
 
        node = init_chset.get_node('vcs')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.DIR)
 

	
 
        node = init_chset.get_node('vcs/__init__.py')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.FILE)
 

	
 
    def test_not_existing_changeset(self):
 
        self.assertRaises(RepositoryError, self.repo.get_changeset,
 
            'f' * 40)
 

	
 
    def test_changeset10(self):
 

	
 
        chset10 = self.repo.get_changeset(self.repo.revisions[9])
 
        README = """===
 
VCS
 
===
 

	
 
Various Version Control System management abstraction layer for Python.
 

	
 
Introduction
 
------------
 

	
 
TODO: To be written...
 

	
 
"""
 
        node = chset10.get_node('README.rst')
 
        self.assertEqual(node.kind, NodeKind.FILE)
 
        self.assertEqual(node.content, README)
 

	
 

	
 
class GitChangesetTest(unittest.TestCase):
 

	
 
    def setUp(self):
 
        self.repo = GitRepository(TEST_GIT_REPO)
 

	
 
    def test_default_changeset(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(tip, self.repo.get_changeset(None))
 
        self.assertEqual(tip, self.repo.get_changeset('tip'))
 

	
 
    def test_root_node(self):
 
        tip = self.repo.get_changeset()
 
        self.assertTrue(tip.root is tip.get_node(''))
 

	
 
    def test_lazy_fetch(self):
 
        """
 
        Test if changeset's nodes expands and are cached as we walk through
 
        the revision. This test is somewhat hard to write as order of tests
 
        is a key here. Written by running command after command in a shell.
 
        """
 
        hex = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
 
        self.assertTrue(hex in self.repo.revisions)
 
        chset = self.repo.get_changeset(hex)
 
        self.assertTrue(len(chset.nodes) == 0)
 
        root = chset.root
 
        self.assertTrue(len(chset.nodes) == 1)
 
        self.assertTrue(len(root.nodes) == 8)
 
        # accessing root.nodes updates chset.nodes
 
        self.assertTrue(len(chset.nodes) == 9)
 

	
 
        docs = root.get_node('docs')
 
        # we haven't yet accessed anything new as docs dir was already cached
 
        self.assertTrue(len(chset.nodes) == 9)
 
        self.assertTrue(len(docs.nodes) == 8)
 
        # accessing docs.nodes updates chset.nodes
 
        self.assertTrue(len(chset.nodes) == 17)
 

	
 
        self.assertTrue(docs is chset.get_node('docs'))
 
        self.assertTrue(docs is root.nodes[0])
 
        self.assertTrue(docs is root.dirs[0])
 
        self.assertTrue(docs is chset.get_node('docs'))
 

	
 
    def test_nodes_with_changeset(self):
 
        hex = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
 
        chset = self.repo.get_changeset(hex)
 
        root = chset.root
 
        docs = root.get_node('docs')
 
        self.assertTrue(docs is chset.get_node('docs'))
 
        api = docs.get_node('api')
 
        self.assertTrue(api is chset.get_node('docs/api'))
 
        index = api.get_node('index.rst')
 
        self.assertTrue(index is chset.get_node('docs/api/index.rst'))
 
        self.assertTrue(index is chset.get_node('docs')\
 
            .get_node('api')\
 
            .get_node('index.rst'))
 

	
 
    def test_branch_and_tags(self):
 
        '''
 
        rev0 = self.repo.revisions[0]
 
        chset0 = self.repo.get_changeset(rev0)
 
        self.assertEqual(chset0.branch, 'master')
 
        self.assertEqual(chset0.tags, [])
 

	
 
        rev10 = self.repo.revisions[10]
 
        chset10 = self.repo.get_changeset(rev10)
 
        self.assertEqual(chset10.branch, 'master')
 
        self.assertEqual(chset10.tags, [])
 

	
 
        rev44 = self.repo.revisions[44]
 
        chset44 = self.repo.get_changeset(rev44)
 
        self.assertEqual(chset44.branch, 'web-branch')
 

	
 
        tip = self.repo.get_changeset('tip')
 
        self.assertTrue('tip' in tip.tags)
 
        '''
 
        # Those tests would fail - branches are now going
 
        # to be changed at main API in order to support git backend
 
        pass
 

	
 
    def _test_slices(self, limit, offset):
 
        count = self.repo.count()
 
        changesets = self.repo.get_changesets(limit=limit, offset=offset)
 
        idx = 0
 
        for changeset in changesets:
 
            rev = offset + idx
 
            idx += 1
 
            rev_id = self.repo.revisions[rev]
 
            if idx > limit:
 
                self.fail("Exceeded limit already (getting revision %s, "
 
                    "there are %s total revisions, offset=%s, limit=%s)"
 
                    % (rev_id, count, offset, limit))
 
            self.assertEqual(changeset, self.repo.get_changeset(rev_id))
 
        result = list(self.repo.get_changesets(limit=limit, offset=offset))
 
        start = offset
 
        end = limit and offset + limit or None
 
        sliced = list(self.repo[start:end])
 
        self.failUnlessEqual(result, sliced,
 
            msg="Comparison failed for limit=%s, offset=%s"
 
            "(get_changeset returned: %s and sliced: %s"
 
            % (limit, offset, result, sliced))
 

	
 
    def _test_file_size(self, revision, path, size):
 
        node = self.repo.get_changeset(revision).get_node(path)
 
        self.assertTrue(node.is_file())
 
        self.assertEqual(node.size, size)
 

	
 
    def test_file_size(self):
 
        to_check = (
 
            ('c1214f7e79e02fc37156ff215cd71275450cffc3',
 
                'vcs/backends/BaseRepository.py', 502),
 
            ('d7e0d30fbcae12c90680eb095a4f5f02505ce501',
 
                'vcs/backends/hg.py', 854),
 
            ('6e125e7c890379446e98980d8ed60fba87d0f6d1',
 
                'setup.py', 1068),
 

	
 
            ('d955cd312c17b02143c04fa1099a352b04368118',
 
                'vcs/backends/base.py', 2921),
 
            ('ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
 
                'vcs/backends/base.py', 3936),
 
            ('f50f42baeed5af6518ef4b0cb2f1423f3851a941',
 
                'vcs/backends/base.py', 6189),
 
        )
 
        for revision, path, size in to_check:
 
            self._test_file_size(revision, path, size)
 

	
 
    def test_file_history(self):
 
        # we can only check if those revisions are present in the history
 
        # as we cannot update this test every time file is changed
 
        files = {
 
            'setup.py': [
 
                '54386793436c938cff89326944d4c2702340037d',
 
                '51d254f0ecf5df2ce50c0b115741f4cf13985dab',
 
                '998ed409c795fec2012b1c0ca054d99888b22090',
 
                '5e0eb4c47f56564395f76333f319d26c79e2fb09',
 
                '0115510b70c7229dbc5dc49036b32e7d91d23acd',
 
                '7cb3fd1b6d8c20ba89e2264f1c8baebc8a52d36e',
 
                '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
 
                '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
 
                'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
 
            ],
 
            'vcs/nodes.py': [
 
                '33fa3223355104431402a888fa77a4e9956feb3e',
 
                'fa014c12c26d10ba682fadb78f2a11c24c8118e1',
 
                'e686b958768ee96af8029fe19c6050b1a8dd3b2b',
 
                'ab5721ca0a081f26bf43d9051e615af2cc99952f',
 
                'c877b68d18e792a66b7f4c529ea02c8f80801542',
 
                '4313566d2e417cb382948f8d9d7c765330356054',
 
                '6c2303a793671e807d1cfc70134c9ca0767d98c2',
 
                '54386793436c938cff89326944d4c2702340037d',
 
                '54000345d2e78b03a99d561399e8e548de3f3203',
 
                '1c6b3677b37ea064cb4b51714d8f7498f93f4b2b',
 
                '2d03ca750a44440fb5ea8b751176d1f36f8e8f46',
 
                '2a08b128c206db48c2f0b8f70df060e6db0ae4f8',
 
                '30c26513ff1eb8e5ce0e1c6b477ee5dc50e2f34b',
 
                'ac71e9503c2ca95542839af0ce7b64011b72ea7c',
 
                '12669288fd13adba2a9b7dd5b870cc23ffab92d2',
 
                '5a0c84f3e6fe3473e4c8427199d5a6fc71a9b382',
 
                '12f2f5e2b38e6ff3fbdb5d722efed9aa72ecb0d5',
 
                '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
 
                'f50f42baeed5af6518ef4b0cb2f1423f3851a941',
 
                'd7e390a45f6aa96f04f5e7f583ad4f867431aa25',
 
                'f15c21f97864b4f071cddfbf2750ec2e23859414',
 
                'e906ef056cf539a4e4e5fc8003eaf7cf14dd8ade',
 
                'ea2b108b48aa8f8c9c4a941f66c1a03315ca1c3b',
 
                '84dec09632a4458f79f50ddbbd155506c460b4f9',
 
                '0115510b70c7229dbc5dc49036b32e7d91d23acd',
 
                '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
 
                '3bf1c5868e570e39569d094f922d33ced2fa3b2b',
 
                'b8d04012574729d2c29886e53b1a43ef16dd00a1',
 
                '6970b057cffe4aab0a792aa634c89f4bebf01441',
 
                'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
 
                'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
 
            ],
 
            'vcs/backends/git.py': [
 
                '4cf116ad5a457530381135e2f4c453e68a1b0105',
 
                '9a751d84d8e9408e736329767387f41b36935153',
 
                'cb681fb539c3faaedbcdf5ca71ca413425c18f01',
 
                '428f81bb652bcba8d631bce926e8834ff49bdcc6',
 
                '180ab15aebf26f98f714d8c68715e0f05fa6e1c7',
 
                '2b8e07312a2e89e92b90426ab97f349f4bce2a3a',
 
                '50e08c506174d8645a4bb517dd122ac946a0f3bf',
 
                '54000345d2e78b03a99d561399e8e548de3f3203',
 
            ],
 
        }
 
        for path, revs in files.items():
 
            node = self.repo.get_changeset(revs[0]).get_node(path)
 
            node_revs = [chset.raw_id for chset in node.history]
 
            self.assertTrue(set(revs).issubset(set(node_revs)),
 
                "We assumed that %s is subset of revisions for which file %s "
 
                "has been changed, and history of that node returned: %s"
 
                % (revs, path, node_revs))
 

	
 
    def test_file_annotate(self):
 
        files = {
 
            'vcs/backends/__init__.py': {
 
                'c1214f7e79e02fc37156ff215cd71275450cffc3': {
 
                    'lines_no': 1,
 
                    'changesets': [
 
                        'c1214f7e79e02fc37156ff215cd71275450cffc3',
 
                    ],
 
                },
 
                '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647': {
 
                    'lines_no': 21,
 
                    'changesets': [
 
                        '49d3fd156b6f7db46313fac355dca1a0b94a0017',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                    ],
 
                },
 
                'e29b67bd158580fc90fc5e9111240b90e6e86064': {
 
                    'lines_no': 32,
 
                    'changesets': [
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '54000345d2e78b03a99d561399e8e548de3f3203',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '78c3f0c23b7ee935ec276acb8b8212444c33c396',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '78c3f0c23b7ee935ec276acb8b8212444c33c396',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                    ],
 
                },
 
            },
 
        }
 

	
 
        for fname, revision_dict in files.items():
 
            for rev, data in revision_dict.items():
 
                cs = self.repo.get_changeset(rev)
 
                ann = cs.get_file_annotate(fname)
 

	
 
                l1 = [x[1].raw_id for x in ann]
 
                l2 = files[fname][rev]['changesets']
 
                self.assertTrue(l1 == l2 , "The lists of revision for %s@rev %s"
 
                                "from annotation list should match each other, "
 
                                "got \n%s \nvs \n%s " % (fname, rev, l1, l2))
 

	
 
    def test_files_state(self):
 
        """
 
        Tests state of FileNodes.
 
        """
 
        node = self.repo\
 
            .get_changeset('e6ea6d16e2f26250124a1f4b4fe37a912f9d86a0')\
 
            .get_node('vcs/utils/diffs.py')
 
        self.assertTrue(node.state, NodeState.ADDED)
 
        self.assertTrue(node.added)
 
        self.assertFalse(node.changed)
 
        self.assertFalse(node.not_changed)
 
        self.assertFalse(node.removed)
 

	
 
        node = self.repo\
 
            .get_changeset('33fa3223355104431402a888fa77a4e9956feb3e')\
 
            .get_node('.hgignore')
 
        self.assertTrue(node.state, NodeState.CHANGED)
 
        self.assertFalse(node.added)
 
        self.assertTrue(node.changed)
 
        self.assertFalse(node.not_changed)
 
        self.assertFalse(node.removed)
 

	
 
        node = self.repo\
 
            .get_changeset('e29b67bd158580fc90fc5e9111240b90e6e86064')\
 
            .get_node('setup.py')
 
        self.assertTrue(node.state, NodeState.NOT_CHANGED)
 
        self.assertFalse(node.added)
 
        self.assertFalse(node.changed)
 
        self.assertTrue(node.not_changed)
 
        self.assertFalse(node.removed)
 

	
 
        # If node has REMOVED state then trying to fetch it would raise
 
        # ChangesetError exception
 
        chset = self.repo.get_changeset(
 
            'fa6600f6848800641328adbf7811fd2372c02ab2')
 
        path = 'vcs/backends/BaseRepository.py'
 
        self.assertRaises(NodeDoesNotExistError, chset.get_node, path)
 
        # but it would be one of ``removed`` (changeset's attribute)
 
        self.assertTrue(path in [rf.path for rf in chset.removed])
 

	
 
        chset = self.repo.get_changeset(
 
            '54386793436c938cff89326944d4c2702340037d')
 
        changed = ['setup.py', 'tests/test_nodes.py', 'vcs/backends/hg.py',
 
            'vcs/nodes.py']
 
        self.assertEqual(set(changed), set([f.path for f in chset.changed]))
 

	
 
    def test_commit_message_is_unicode(self):
 
        for cs in self.repo:
 
            self.assertEqual(type(cs.message), unicode)
 

	
 
    def test_changeset_author_is_unicode(self):
 
        for cs in self.repo:
 
            self.assertEqual(type(cs.author), unicode)
 

	
 
    def test_repo_files_content_is_unicode(self):
 
        changeset = self.repo.get_changeset()
 
        for node in changeset.get_node('/'):
 
            if node.is_file():
 
                self.assertEqual(type(node.content), unicode)
 

	
 
    def test_wrong_path(self):
 
        # There is 'setup.py' in the root dir but not there:
 
        path = 'foo/bar/setup.py'
 
        tip = self.repo.get_changeset()
 
        self.assertRaises(VCSError, tip.get_node, path)
 

	
 
    def test_author_email(self):
 
        self.assertEqual('marcin@python-blog.com',
 
          self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3')\
 
          .author_email)
 
        self.assertEqual('lukasz.balcerzak@python-center.pl',
 
          self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b')\
 
          .author_email)
 
        self.assertEqual('none@none',
 
          self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992')\
 
          .author_email)
 

	
 
    def test_author_username(self):
 
        self.assertEqual('Marcin Kuzminski',
 
          self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3')\
 
          .author_name)
 
        self.assertEqual('Lukasz Balcerzak',
 
          self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b')\
 
          .author_name)
 
        self.assertEqual('marcink',
 
          self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992')\
 
          .author_name)
 

	
 

	
 
class GitSpecificTest(unittest.TestCase):
 

	
 
    def test_error_is_raised_for_added_if_diff_name_status_is_wrong(self):
 
        repo = mock.MagicMock()
 
        changeset = GitChangeset(repo, 'foobar')
 
        changeset._diff_name_status = 'foobar'
 
        with self.assertRaises(VCSError):
 
            changeset.added
 

	
 
    def test_error_is_raised_for_changed_if_diff_name_status_is_wrong(self):
 
        repo = mock.MagicMock()
 
        changeset = GitChangeset(repo, 'foobar')
 
        changeset._diff_name_status = 'foobar'
 
        with self.assertRaises(VCSError):
 
            changeset.added
 

	
 
    def test_error_is_raised_for_removed_if_diff_name_status_is_wrong(self):
 
        repo = mock.MagicMock()
 
        changeset = GitChangeset(repo, 'foobar')
 
        changeset._diff_name_status = 'foobar'
 
        with self.assertRaises(VCSError):
 
            changeset.added
 

	
 

	
 
class GitSpecificWithRepoTest(BackendTestMixin, unittest.TestCase):
 
    backend_alias = 'git'
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': 'Initial',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar/static/js/admin/base.js', content='base'),
 
                    FileNode('foobar/static/admin', content='admin',
 
                        mode=0120000), # this is a link
 
                    FileNode('foo', content='foo'),
 
                ],
 
            },
 
            {
 
                'message': 'Second',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [
 
                    FileNode('foo2', content='foo2'),
 
                ],
 
            },
 
        ]
 

	
 
    def test_paths_slow_traversing(self):
 
        cs = self.repo.get_changeset()
 
        self.assertEqual(cs.get_node('foobar').get_node('static').get_node('js')
 
            .get_node('admin').get_node('base.js').content, 'base')
 

	
 
    def test_paths_fast_traversing(self):
 
        cs = self.repo.get_changeset()
 
        self.assertEqual(cs.get_node('foobar/static/js/admin/base.js').content,
 
            'base')
 

	
 
    def test_workdir_get_branch(self):
 
        self.repo.run_git_command('checkout -b production')
 
        # Regression test: one of following would fail if we don't check
 
        # .git/HEAD file
 
        self.repo.run_git_command('checkout production')
 
        self.assertEqual(self.repo.workdir.get_branch(), 'production')
 
        self.repo.run_git_command('checkout master')
 
        self.assertEqual(self.repo.workdir.get_branch(), 'master')
 

	
 
    def test_get_diff_runs_git_command_with_hashes(self):
 
        self.repo.run_git_command = mock.Mock(return_value=['', ''])
 
        self.repo.get_diff(0, 1)
 
        self.repo.run_git_command.assert_called_once_with('diff -U%s %s %s' %
 
            (3, self.repo._get_revision(0), self.repo._get_revision(1)))
 

	
 
    def test_get_diff_runs_git_command_with_str_hashes(self):
 
        self.repo.run_git_command = mock.Mock(return_value=['', ''])
 
        self.repo.get_diff(self.repo.EMPTY_CHANGESET, 1)
 
        self.repo.run_git_command.assert_called_once_with('show -U%s %s' %
 
            (3, self.repo._get_revision(1)))
 

	
 
    def test_get_diff_runs_git_command_with_path_if_its_given(self):
 
        self.repo.run_git_command = mock.Mock(return_value=['', ''])
 
        self.repo.get_diff(0, 1, 'foo')
 
        self.repo.run_git_command.assert_called_once_with('diff -U%s %s %s -- "foo"'
 
            % (3, self.repo._get_revision(0), self.repo._get_revision(1)))
 

	
 

	
 
class GitRegressionTest(BackendTestMixin, unittest.TestCase):
 
    backend_alias = 'git'
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': 'Initial',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('bot/__init__.py', content='base'),
 
                    FileNode('bot/templates/404.html', content='base'),
 
                    FileNode('bot/templates/500.html', content='base'),
 
                ],
 
            },
 
            {
 
                'message': 'Second',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [
 
                    FileNode('bot/build/migrations/1.py', content='foo2'),
 
                    FileNode('bot/build/migrations/2.py', content='foo2'),
 
                    FileNode('bot/build/static/templates/f.html', content='foo2'),
 
                    FileNode('bot/build/static/templates/f1.html', content='foo2'),
 
                    FileNode('bot/build/templates/err.html', content='foo2'),
 
                    FileNode('bot/build/templates/err2.html', content='foo2'),
 
                ],
 
            },
 
        ]
 

	
 
    def test_similar_paths(self):
 
        cs = self.repo.get_changeset()
 
        paths = lambda *n:[x.path for x in n]
 
        self.assertEqual(paths(*cs.get_nodes('bot')), ['bot/build', 'bot/templates', 'bot/__init__.py'])
 
        self.assertEqual(paths(*cs.get_nodes('bot/build')), ['bot/build/migrations', 'bot/build/static', 'bot/build/templates'])
 
        self.assertEqual(paths(*cs.get_nodes('bot/build/static')), ['bot/build/static/templates'])
 
        # this get_nodes below causes troubles !
 
        self.assertEqual(paths(*cs.get_nodes('bot/build/static/templates')), ['bot/build/static/templates/f.html', 'bot/build/static/templates/f1.html'])
 
        self.assertEqual(paths(*cs.get_nodes('bot/build/templates')), ['bot/build/templates/err.html', 'bot/build/templates/err2.html'])
 
        self.assertEqual(paths(*cs.get_nodes('bot/templates/')), ['bot/templates/404.html', 'bot/templates/500.html'])
 

	
 
if __name__ == '__main__':
 
    unittest.main()
rhodecode/tests/vcs/test_hg.py
Show inline comments
 
new file 100644
 
from __future__ import with_statement
 

	
 
import os
 
from rhodecode.lib.vcs.backends.hg import MercurialRepository, MercurialChangeset
 
from rhodecode.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError
 
from rhodecode.lib.vcs.nodes import NodeKind, NodeState
 
from conf import PACKAGE_DIR, TEST_HG_REPO, TEST_HG_REPO_CLONE, \
 
    TEST_HG_REPO_PULL
 
from rhodecode.lib.vcs.utils.compat import unittest
 

	
 

	
 
# Use only clean mercurial's ui
 
import mercurial.scmutil
 
mercurial.scmutil.rcpath()
 
if mercurial.scmutil._rcpath:
 
    mercurial.scmutil._rcpath = mercurial.scmutil._rcpath[:1]
 

	
 

	
 
class MercurialRepositoryTest(unittest.TestCase):
 

	
 
    def __check_for_existing_repo(self):
 
        if os.path.exists(TEST_HG_REPO_CLONE):
 
            self.fail('Cannot test mercurial clone repo as location %s already '
 
                      'exists. You should manually remove it first.'
 
                      % TEST_HG_REPO_CLONE)
 

	
 
    def setUp(self):
 
        self.repo = MercurialRepository(TEST_HG_REPO)
 

	
 
    def test_wrong_repo_path(self):
 
        wrong_repo_path = '/tmp/errorrepo'
 
        self.assertRaises(RepositoryError, MercurialRepository, wrong_repo_path)
 

	
 
    def test_unicode_path_repo(self):
 
        self.assertRaises(VCSError,lambda:MercurialRepository(u'iShouldFail'))
 

	
 
    def test_repo_clone(self):
 
        self.__check_for_existing_repo()
 
        repo = MercurialRepository(TEST_HG_REPO)
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE,
 
            src_url=TEST_HG_REPO, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        # Checking hashes of changesets should be enough
 
        for changeset in repo.get_changesets():
 
            raw_id = changeset.raw_id
 
            self.assertEqual(raw_id, repo_clone.get_changeset(raw_id).raw_id)
 

	
 
    def test_repo_clone_with_update(self):
 
        repo = MercurialRepository(TEST_HG_REPO)
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_w_update',
 
            src_url=TEST_HG_REPO, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 

	
 
        #check if current workdir was updated
 
        self.assertEqual(os.path.isfile(os.path.join(TEST_HG_REPO_CLONE \
 
                                                    + '_w_update',
 
                                                    'MANIFEST.in')), True,)
 

	
 
    def test_repo_clone_without_update(self):
 
        repo = MercurialRepository(TEST_HG_REPO)
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_wo_update',
 
            src_url=TEST_HG_REPO, update_after_clone=False)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        self.assertEqual(os.path.isfile(os.path.join(TEST_HG_REPO_CLONE \
 
                                                    + '_wo_update',
 
                                                    'MANIFEST.in')), False,)
 

	
 
    def test_pull(self):
 
        if os.path.exists(TEST_HG_REPO_PULL):
 
            self.fail('Cannot test mercurial pull command as location %s '
 
                      'already exists. You should manually remove it first'
 
                      % TEST_HG_REPO_PULL)
 
        repo_new = MercurialRepository(TEST_HG_REPO_PULL, create=True)
 
        self.assertTrue(len(self.repo.revisions) > len(repo_new.revisions))
 

	
 
        repo_new.pull(self.repo.path)
 
        repo_new = MercurialRepository(TEST_HG_REPO_PULL)
 
        self.assertTrue(len(self.repo.revisions) == len(repo_new.revisions))
 

	
 
    def test_revisions(self):
 
        # there are 21 revisions at bitbucket now
 
        # so we can assume they would be available from now on
 
        subset = set(['b986218ba1c9b0d6a259fac9b050b1724ed8e545',
 
                 '3d8f361e72ab303da48d799ff1ac40d5ac37c67e',
 
                 '6cba7170863a2411822803fa77a0a264f1310b35',
 
                 '56349e29c2af3ac913b28bde9a2c6154436e615b',
 
                 '2dda4e345facb0ccff1a191052dd1606dba6781d',
 
                 '6fff84722075f1607a30f436523403845f84cd9e',
 
                 '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
 
                 '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
 
                 'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
 
                 'be90031137367893f1c406e0a8683010fd115b79',
 
                 'db8e58be770518cbb2b1cdfa69146e47cd481481',
 
                 '84478366594b424af694a6c784cb991a16b87c21',
 
                 '17f8e105dddb9f339600389c6dc7175d395a535c',
 
                 '20a662e756499bde3095ffc9bc0643d1def2d0eb',
 
                 '2e319b85e70a707bba0beff866d9f9de032aa4f9',
 
                 '786facd2c61deb9cf91e9534735124fb8fc11842',
 
                 '94593d2128d38210a2fcd1aabff6dda0d6d9edf8',
 
                 'aa6a0de05b7612707db567078e130a6cd114a9a7',
 
                 'eada5a770da98ab0dd7325e29d00e0714f228d09'
 
                ])
 
        self.assertTrue(subset.issubset(set(self.repo.revisions)))
 

	
 

	
 
        # check if we have the proper order of revisions
 
        org = ['b986218ba1c9b0d6a259fac9b050b1724ed8e545',
 
                '3d8f361e72ab303da48d799ff1ac40d5ac37c67e',
 
                '6cba7170863a2411822803fa77a0a264f1310b35',
 
                '56349e29c2af3ac913b28bde9a2c6154436e615b',
 
                '2dda4e345facb0ccff1a191052dd1606dba6781d',
 
                '6fff84722075f1607a30f436523403845f84cd9e',
 
                '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
 
                '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
 
                'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
 
                'be90031137367893f1c406e0a8683010fd115b79',
 
                'db8e58be770518cbb2b1cdfa69146e47cd481481',
 
                '84478366594b424af694a6c784cb991a16b87c21',
 
                '17f8e105dddb9f339600389c6dc7175d395a535c',
 
                '20a662e756499bde3095ffc9bc0643d1def2d0eb',
 
                '2e319b85e70a707bba0beff866d9f9de032aa4f9',
 
                '786facd2c61deb9cf91e9534735124fb8fc11842',
 
                '94593d2128d38210a2fcd1aabff6dda0d6d9edf8',
 
                'aa6a0de05b7612707db567078e130a6cd114a9a7',
 
                'eada5a770da98ab0dd7325e29d00e0714f228d09',
 
                '2c1885c735575ca478bf9e17b0029dca68824458',
 
                'd9bcd465040bf869799b09ad732c04e0eea99fe9',
 
                '469e9c847fe1f6f7a697b8b25b4bc5b48780c1a7',
 
                '4fb8326d78e5120da2c7468dcf7098997be385da',
 
                '62b4a097164940bd66030c4db51687f3ec035eed',
 
                '536c1a19428381cfea92ac44985304f6a8049569',
 
                '965e8ab3c44b070cdaa5bf727ddef0ada980ecc4',
 
                '9bb326a04ae5d98d437dece54be04f830cf1edd9',
 
                'f8940bcb890a98c4702319fbe36db75ea309b475',
 
                'ff5ab059786ebc7411e559a2cc309dfae3625a3b',
 
                '6b6ad5f82ad5bb6190037671bd254bd4e1f4bf08',
 
                'ee87846a61c12153b51543bf860e1026c6d3dcba', ]
 
        self.assertEqual(org, self.repo.revisions[:31])
 

	
 
    def test_iter_slice(self):
 
        sliced = list(self.repo[:10])
 
        itered = list(self.repo)[:10]
 
        self.assertEqual(sliced, itered)
 

	
 
    def test_slicing(self):
 
        #4 1 5 10 95
 
        for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
 
                                 (10, 20, 10), (5, 100, 95)]:
 
            revs = list(self.repo[sfrom:sto])
 
            self.assertEqual(len(revs), size)
 
            self.assertEqual(revs[0], self.repo.get_changeset(sfrom))
 
            self.assertEqual(revs[-1], self.repo.get_changeset(sto - 1))
 

	
 
    def test_branches(self):
 
        # TODO: Need more tests here
 

	
 
        #active branches
 
        self.assertTrue('default' in self.repo.branches)
 

	
 
        #closed branches
 
        self.assertFalse('web' in self.repo.branches)
 
        self.assertFalse('git' in self.repo.branches)
 

	
 
        # closed
 
        self.assertTrue('workdir' in self.repo._get_branches(closed=True))
 
        self.assertTrue('webvcs' in self.repo._get_branches(closed=True))
 

	
 
        for name, id in self.repo.branches.items():
 
            self.assertTrue(isinstance(
 
                self.repo.get_changeset(id), MercurialChangeset))
 

	
 
    def test_tip_in_tags(self):
 
        # tip is always a tag
 
        self.assertIn('tip', self.repo.tags)
 

	
 
    def test_tip_changeset_in_tags(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(self.repo.tags['tip'], tip.raw_id)
 

	
 
    def test_initial_changeset(self):
 

	
 
        init_chset = self.repo.get_changeset(0)
 
        self.assertEqual(init_chset.message, 'initial import')
 
        self.assertEqual(init_chset.author,
 
            'Marcin Kuzminski <marcin@python-blog.com>')
 
        self.assertEqual(sorted(init_chset._file_paths),
 
            sorted([
 
                'vcs/__init__.py',
 
                'vcs/backends/BaseRepository.py',
 
                'vcs/backends/__init__.py',
 
            ])
 
        )
 
        self.assertEqual(sorted(init_chset._dir_paths),
 
            sorted(['', 'vcs', 'vcs/backends']))
 

	
 
        self.assertRaises(NodeDoesNotExistError, init_chset.get_node, path='foobar')
 

	
 
        node = init_chset.get_node('vcs/')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.DIR)
 

	
 
        node = init_chset.get_node('vcs')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.DIR)
 

	
 
        node = init_chset.get_node('vcs/__init__.py')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.FILE)
 

	
 
    def test_not_existing_changeset(self):
 
        #rawid
 
        self.assertRaises(RepositoryError, self.repo.get_changeset,
 
            'abcd' * 10)
 
        #shortid
 
        self.assertRaises(RepositoryError, self.repo.get_changeset,
 
            'erro' * 4)
 
        #numeric
 
        self.assertRaises(RepositoryError, self.repo.get_changeset,
 
            self.repo.count() + 1)
 

	
 

	
 
        # Small chance we ever get to this one
 
        revision = pow(2, 30)
 
        self.assertRaises(RepositoryError, self.repo.get_changeset, revision)
 

	
 
    def test_changeset10(self):
 

	
 
        chset10 = self.repo.get_changeset(10)
 
        README = """===
 
VCS
 
===
 

	
 
Various Version Control System management abstraction layer for Python.
 

	
 
Introduction
 
------------
 

	
 
TODO: To be written...
 

	
 
"""
 
        node = chset10.get_node('README.rst')
 
        self.assertEqual(node.kind, NodeKind.FILE)
 
        self.assertEqual(node.content, README)
 

	
 

	
 
class MercurialChangesetTest(unittest.TestCase):
 

	
 
    def setUp(self):
 
        self.repo = MercurialRepository(TEST_HG_REPO)
 

	
 
    def _test_equality(self, changeset):
 
        revision = changeset.revision
 
        self.assertEqual(changeset, self.repo.get_changeset(revision))
 

	
 
    def test_equality(self):
 
        self.setUp()
 
        revs = [0, 10, 20]
 
        changesets = [self.repo.get_changeset(rev) for rev in revs]
 
        for changeset in changesets:
 
            self._test_equality(changeset)
 

	
 
    def test_default_changeset(self):
 
        tip = self.repo.get_changeset('tip')
 
        self.assertEqual(tip, self.repo.get_changeset())
 
        self.assertEqual(tip, self.repo.get_changeset(revision=None))
 
        self.assertEqual(tip, list(self.repo[-1:])[0])
 

	
 
    def test_root_node(self):
 
        tip = self.repo.get_changeset('tip')
 
        self.assertTrue(tip.root is tip.get_node(''))
 

	
 
    def test_lazy_fetch(self):
 
        """
 
        Test if changeset's nodes expands and are cached as we walk through
 
        the revision. This test is somewhat hard to write as order of tests
 
        is a key here. Written by running command after command in a shell.
 
        """
 
        self.setUp()
 
        chset = self.repo.get_changeset(45)
 
        self.assertTrue(len(chset.nodes) == 0)
 
        root = chset.root
 
        self.assertTrue(len(chset.nodes) == 1)
 
        self.assertTrue(len(root.nodes) == 8)
 
        # accessing root.nodes updates chset.nodes
 
        self.assertTrue(len(chset.nodes) == 9)
 

	
 
        docs = root.get_node('docs')
 
        # we haven't yet accessed anything new as docs dir was already cached
 
        self.assertTrue(len(chset.nodes) == 9)
 
        self.assertTrue(len(docs.nodes) == 8)
 
        # accessing docs.nodes updates chset.nodes
 
        self.assertTrue(len(chset.nodes) == 17)
 

	
 
        self.assertTrue(docs is chset.get_node('docs'))
 
        self.assertTrue(docs is root.nodes[0])
 
        self.assertTrue(docs is root.dirs[0])
 
        self.assertTrue(docs is chset.get_node('docs'))
 

	
 
    def test_nodes_with_changeset(self):
 
        self.setUp()
 
        chset = self.repo.get_changeset(45)
 
        root = chset.root
 
        docs = root.get_node('docs')
 
        self.assertTrue(docs is chset.get_node('docs'))
 
        api = docs.get_node('api')
 
        self.assertTrue(api is chset.get_node('docs/api'))
 
        index = api.get_node('index.rst')
 
        self.assertTrue(index is chset.get_node('docs/api/index.rst'))
 
        self.assertTrue(index is chset.get_node('docs')\
 
            .get_node('api')\
 
            .get_node('index.rst'))
 

	
 
    def test_branch_and_tags(self):
 
        chset0 = self.repo.get_changeset(0)
 
        self.assertEqual(chset0.branch, 'default')
 
        self.assertEqual(chset0.tags, [])
 

	
 
        chset10 = self.repo.get_changeset(10)
 
        self.assertEqual(chset10.branch, 'default')
 
        self.assertEqual(chset10.tags, [])
 

	
 
        chset44 = self.repo.get_changeset(44)
 
        self.assertEqual(chset44.branch, 'web')
 

	
 
        tip = self.repo.get_changeset('tip')
 
        self.assertTrue('tip' in tip.tags)
 

	
 
    def _test_file_size(self, revision, path, size):
 
        node = self.repo.get_changeset(revision).get_node(path)
 
        self.assertTrue(node.is_file())
 
        self.assertEqual(node.size, size)
 

	
 
    def test_file_size(self):
 
        to_check = (
 
            (10, 'setup.py', 1068),
 
            (20, 'setup.py', 1106),
 
            (60, 'setup.py', 1074),
 

	
 
            (10, 'vcs/backends/base.py', 2921),
 
            (20, 'vcs/backends/base.py', 3936),
 
            (60, 'vcs/backends/base.py', 6189),
 
        )
 
        for revision, path, size in to_check:
 
            self._test_file_size(revision, path, size)
 

	
 
    def test_file_history(self):
 
        # we can only check if those revisions are present in the history
 
        # as we cannot update this test every time file is changed
 
        files = {
 
            'setup.py': [7, 18, 45, 46, 47, 69, 77],
 
            'vcs/nodes.py': [7, 8, 24, 26, 30, 45, 47, 49, 56, 57, 58, 59, 60,
 
                61, 73, 76],
 
            'vcs/backends/hg.py': [4, 5, 6, 11, 12, 13, 14, 15, 16, 21, 22, 23,
 
                26, 27, 28, 30, 31, 33, 35, 36, 37, 38, 39, 40, 41, 44, 45, 47,
 
                48, 49, 53, 54, 55, 58, 60, 61, 67, 68, 69, 70, 73, 77, 78, 79,
 
                82],
 
        }
 
        for path, revs in files.items():
 
            tip = self.repo.get_changeset(revs[-1])
 
            node = tip.get_node(path)
 
            node_revs = [chset.revision for chset in node.history]
 
            self.assertTrue(set(revs).issubset(set(node_revs)),
 
                "We assumed that %s is subset of revisions for which file %s "
 
                "has been changed, and history of that node returned: %s"
 
                % (revs, path, node_revs))
 

	
 
    def test_file_annotate(self):
 
        files = {
 
                 'vcs/backends/__init__.py':
 
                  {89: {'lines_no': 31,
 
                        'changesets': [32, 32, 61, 32, 32, 37, 32, 32, 32, 44,
 
                                       37, 37, 37, 37, 45, 37, 44, 37, 37, 37,
 
                                       32, 32, 32, 32, 37, 32, 37, 37, 32,
 
                                       32, 32]},
 
                   20: {'lines_no': 1,
 
                        'changesets': [4]},
 
                   55: {'lines_no': 31,
 
                        'changesets': [32, 32, 45, 32, 32, 37, 32, 32, 32, 44,
 
                                       37, 37, 37, 37, 45, 37, 44, 37, 37, 37,
 
                                       32, 32, 32, 32, 37, 32, 37, 37, 32,
 
                                       32, 32]}},
 
                 'vcs/exceptions.py':
 
                 {89: {'lines_no': 18,
 
                       'changesets': [16, 16, 16, 16, 16, 16, 16, 16, 16, 16,
 
                                      16, 16, 17, 16, 16, 18, 18, 18]},
 
                  20: {'lines_no': 18,
 
                       'changesets': [16, 16, 16, 16, 16, 16, 16, 16, 16, 16,
 
                                      16, 16, 17, 16, 16, 18, 18, 18]},
 
                  55: {'lines_no': 18, 'changesets': [16, 16, 16, 16, 16, 16,
 
                                                      16, 16, 16, 16, 16, 16,
 
                                                      17, 16, 16, 18, 18, 18]}},
 
                 'MANIFEST.in': {89: {'lines_no': 5,
 
                                      'changesets': [7, 7, 7, 71, 71]},
 
                                 20: {'lines_no': 3,
 
                                      'changesets': [7, 7, 7]},
 
                                 55: {'lines_no': 3,
 
                                     'changesets': [7, 7, 7]}}}
 

	
 

	
 
        for fname, revision_dict in files.items():
 
            for rev, data in revision_dict.items():
 
                cs = self.repo.get_changeset(rev)
 
                ann = cs.get_file_annotate(fname)
 

	
 
                l1 = [x[1].revision for x in ann]
 
                l2 = files[fname][rev]['changesets']
 
                self.assertTrue(l1 == l2 , "The lists of revision for %s@rev%s"
 
                                "from annotation list should match each other,"
 
                                "got \n%s \nvs \n%s " % (fname, rev, l1, l2))
 

	
 
    def test_changeset_state(self):
 
        """
 
        Tests which files have been added/changed/removed at particular revision
 
        """
 

	
 
        # rev 46ad32a4f974:
 
        # hg st --rev 46ad32a4f974
 
        #    changed: 13
 
        #    added:   20
 
        #    removed: 1
 
        changed = set(['.hgignore'
 
            , 'README.rst' , 'docs/conf.py' , 'docs/index.rst' , 'setup.py'
 
            , 'tests/test_hg.py' , 'tests/test_nodes.py' , 'vcs/__init__.py'
 
            , 'vcs/backends/__init__.py' , 'vcs/backends/base.py'
 
            , 'vcs/backends/hg.py' , 'vcs/nodes.py' , 'vcs/utils/__init__.py'])
 

	
 
        added = set(['docs/api/backends/hg.rst'
 
            , 'docs/api/backends/index.rst' , 'docs/api/index.rst'
 
            , 'docs/api/nodes.rst' , 'docs/api/web/index.rst'
 
            , 'docs/api/web/simplevcs.rst' , 'docs/installation.rst'
 
            , 'docs/quickstart.rst' , 'setup.cfg' , 'vcs/utils/baseui_config.py'
 
            , 'vcs/utils/web.py' , 'vcs/web/__init__.py' , 'vcs/web/exceptions.py'
 
            , 'vcs/web/simplevcs/__init__.py' , 'vcs/web/simplevcs/exceptions.py'
 
            , 'vcs/web/simplevcs/middleware.py' , 'vcs/web/simplevcs/models.py'
 
            , 'vcs/web/simplevcs/settings.py' , 'vcs/web/simplevcs/utils.py'
 
            , 'vcs/web/simplevcs/views.py'])
 

	
 
        removed = set(['docs/api.rst'])
 

	
 
        chset64 = self.repo.get_changeset('46ad32a4f974')
 
        self.assertEqual(set((node.path for node in chset64.added)), added)
 
        self.assertEqual(set((node.path for node in chset64.changed)), changed)
 
        self.assertEqual(set((node.path for node in chset64.removed)), removed)
 

	
 
        # rev b090f22d27d6:
 
        # hg st --rev b090f22d27d6
 
        #    changed: 13
 
        #    added:   20
 
        #    removed: 1
 
        chset88 = self.repo.get_changeset('b090f22d27d6')
 
        self.assertEqual(set((node.path for node in chset88.added)), set())
 
        self.assertEqual(set((node.path for node in chset88.changed)),
 
            set(['.hgignore']))
 
        self.assertEqual(set((node.path for node in chset88.removed)), set())
 
#
 
        # 85:
 
        #    added:   2 ['vcs/utils/diffs.py', 'vcs/web/simplevcs/views/diffs.py']
 
        #    changed: 4 ['vcs/web/simplevcs/models.py', ...]
 
        #    removed: 1 ['vcs/utils/web.py']
 
        chset85 = self.repo.get_changeset(85)
 
        self.assertEqual(set((node.path for node in chset85.added)), set([
 
            'vcs/utils/diffs.py',
 
            'vcs/web/simplevcs/views/diffs.py']))
 
        self.assertEqual(set((node.path for node in chset85.changed)), set([
 
            'vcs/web/simplevcs/models.py',
 
            'vcs/web/simplevcs/utils.py',
 
            'vcs/web/simplevcs/views/__init__.py',
 
            'vcs/web/simplevcs/views/repository.py',
 
            ]))
 
        self.assertEqual(set((node.path for node in chset85.removed)),
 
            set(['vcs/utils/web.py']))
 

	
 

	
 
    def test_files_state(self):
 
        """
 
        Tests state of FileNodes.
 
        """
 
        chset = self.repo.get_changeset(85)
 
        node = chset.get_node('vcs/utils/diffs.py')
 
        self.assertTrue(node.state, NodeState.ADDED)
 
        self.assertTrue(node.added)
 
        self.assertFalse(node.changed)
 
        self.assertFalse(node.not_changed)
 
        self.assertFalse(node.removed)
 

	
 
        chset = self.repo.get_changeset(88)
 
        node = chset.get_node('.hgignore')
 
        self.assertTrue(node.state, NodeState.CHANGED)
 
        self.assertFalse(node.added)
 
        self.assertTrue(node.changed)
 
        self.assertFalse(node.not_changed)
 
        self.assertFalse(node.removed)
 

	
 
        chset = self.repo.get_changeset(85)
 
        node = chset.get_node('setup.py')
 
        self.assertTrue(node.state, NodeState.NOT_CHANGED)
 
        self.assertFalse(node.added)
 
        self.assertFalse(node.changed)
 
        self.assertTrue(node.not_changed)
 
        self.assertFalse(node.removed)
 

	
 
        # If node has REMOVED state then trying to fetch it would raise
 
        # ChangesetError exception
 
        chset = self.repo.get_changeset(2)
 
        path = 'vcs/backends/BaseRepository.py'
 
        self.assertRaises(NodeDoesNotExistError, chset.get_node, path)
 
        # but it would be one of ``removed`` (changeset's attribute)
 
        self.assertTrue(path in [rf.path for rf in chset.removed])
 

	
 
    def test_commit_message_is_unicode(self):
 
        for cm in self.repo:
 
            self.assertEqual(type(cm.message), unicode)
 

	
 
    def test_changeset_author_is_unicode(self):
 
        for cm in self.repo:
 
            self.assertEqual(type(cm.author), unicode)
 

	
 
    def test_repo_files_content_is_unicode(self):
 
        test_changeset = self.repo.get_changeset(100)
 
        for node in test_changeset.get_node('/'):
 
            if node.is_file():
 
                self.assertEqual(type(node.content), unicode)
 

	
 
    def test_wrong_path(self):
 
        # There is 'setup.py' in the root dir but not there:
 
        path = 'foo/bar/setup.py'
 
        self.assertRaises(VCSError, self.repo.get_changeset().get_node, path)
 

	
 

	
 
    def test_archival_file(self):
 
        #TODO:
 
        pass
 

	
 
    def test_archival_as_generator(self):
 
        #TODO:
 
        pass
 

	
 
    def test_archival_wrong_kind(self):
 
        tip = self.repo.get_changeset()
 
        self.assertRaises(VCSError, tip.fill_archive, kind='error')
 

	
 
    def test_archival_empty_prefix(self):
 
        #TODO:
 
        pass
 

	
 

	
 
    def test_author_email(self):
 
        self.assertEqual('marcin@python-blog.com',
 
                         self.repo.get_changeset('b986218ba1c9').author_email)
 
        self.assertEqual('lukasz.balcerzak@python-center.pl',
 
                         self.repo.get_changeset('3803844fdbd3').author_email)
 
        self.assertEqual('',
 
                         self.repo.get_changeset('84478366594b').author_email)
 

	
 
    def test_author_username(self):
 
        self.assertEqual('Marcin Kuzminski',
 
                         self.repo.get_changeset('b986218ba1c9').author_name)
 
        self.assertEqual('Lukasz Balcerzak',
 
                         self.repo.get_changeset('3803844fdbd3').author_name)
 
        self.assertEqual('marcink',
 
                         self.repo.get_changeset('84478366594b').author_name)
rhodecode/tests/vcs/test_inmemchangesets.py
Show inline comments
 
new file 100644
 
"""
 
Tests so called "in memory changesets" commit API of vcs.
 
"""
 
from __future__ import with_statement
 

	
 
from rhodecode.lib import vcs
 
import time
 
import datetime
 
from conf import SCM_TESTS, get_new_dir
 
from rhodecode.lib.vcs.exceptions import EmptyRepositoryError
 
from rhodecode.lib.vcs.exceptions import NodeAlreadyAddedError
 
from rhodecode.lib.vcs.exceptions import NodeAlreadyExistsError
 
from rhodecode.lib.vcs.exceptions import NodeAlreadyRemovedError
 
from rhodecode.lib.vcs.exceptions import NodeAlreadyChangedError
 
from rhodecode.lib.vcs.exceptions import NodeDoesNotExistError
 
from rhodecode.lib.vcs.exceptions import NodeNotChangedError
 
from rhodecode.lib.vcs.nodes import DirNode
 
from rhodecode.lib.vcs.nodes import FileNode
 
from rhodecode.lib.vcs.utils.compat import unittest
 

	
 

	
 
class InMemoryChangesetTestMixin(object):
 
    """
 
    This is a backend independent test case class which should be created
 
    with ``type`` method.
 

	
 
    It is required to set following attributes at subclass:
 

	
 
    - ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
 
    - ``repo_path``: path to the repository which would be created for set of
 
      tests
 
    """
 

	
 
    def get_backend(self):
 
        return vcs.get_backend(self.backend_alias)
 

	
 
    def setUp(self):
 
        Backend = self.get_backend()
 
        self.repo_path = get_new_dir(str(time.time()))
 
        self.repo = Backend(self.repo_path, create=True)
 
        self.imc = self.repo.in_memory_changeset
 
        self.nodes = [
 
            FileNode('foobar', content='Foo & bar'),
 
            FileNode('foobar2', content='Foo & bar, doubled!'),
 
            FileNode('foo bar with spaces', content=''),
 
            FileNode('foo/bar/baz', content='Inside'),
 
        ]
 

	
 
    def test_add(self):
 
        rev_count = len(self.repo.revisions)
 
        to_add = [FileNode(node.path, content=node.content)
 
            for node in self.nodes]
 
        for node in to_add:
 
            self.imc.add(node)
 
        message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
 
        author = unicode(self.__class__)
 
        changeset = self.imc.commit(message=message, author=author)
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertEqual(changeset, newtip)
 
        self.assertEqual(rev_count + 1, len(self.repo.revisions))
 
        self.assertEqual(newtip.message, message)
 
        self.assertEqual(newtip.author, author)
 
        self.assertTrue(not any((self.imc.added, self.imc.changed,
 
            self.imc.removed)))
 
        for node in to_add:
 
            self.assertEqual(newtip.get_node(node.path).content, node.content)
 

	
 
    def test_add_in_bulk(self):
 
        rev_count = len(self.repo.revisions)
 
        to_add = [FileNode(node.path, content=node.content)
 
            for node in self.nodes]
 
        self.imc.add(*to_add)
 
        message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
 
        author = unicode(self.__class__)
 
        changeset = self.imc.commit(message=message, author=author)
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertEqual(changeset, newtip)
 
        self.assertEqual(rev_count + 1, len(self.repo.revisions))
 
        self.assertEqual(newtip.message, message)
 
        self.assertEqual(newtip.author, author)
 
        self.assertTrue(not any((self.imc.added, self.imc.changed,
 
            self.imc.removed)))
 
        for node in to_add:
 
            self.assertEqual(newtip.get_node(node.path).content, node.content)
 

	
 
    def test_add_actually_adds_all_nodes_at_second_commit_too(self):
 
        self.imc.add(FileNode('foo/bar/image.png', content='\0'))
 
        self.imc.add(FileNode('foo/README.txt', content='readme!'))
 
        changeset = self.imc.commit(u'Initial', u'joe.doe@example.com')
 
        self.assertTrue(isinstance(changeset.get_node('foo'), DirNode))
 
        self.assertTrue(isinstance(changeset.get_node('foo/bar'), DirNode))
 
        self.assertEqual(changeset.get_node('foo/bar/image.png').content, '\0')
 
        self.assertEqual(changeset.get_node('foo/README.txt').content, 'readme!')
 

	
 
        # commit some more files again
 
        to_add = [
 
            FileNode('foo/bar/foobaz/bar', content='foo'),
 
            FileNode('foo/bar/another/bar', content='foo'),
 
            FileNode('foo/baz.txt', content='foo'),
 
            FileNode('foobar/foobaz/file', content='foo'),
 
            FileNode('foobar/barbaz', content='foo'),
 
        ]
 
        self.imc.add(*to_add)
 
        changeset = self.imc.commit(u'Another', u'joe.doe@example.com')
 
        self.assertEqual(changeset.get_node('foo/bar/foobaz/bar').content, 'foo')
 
        self.assertEqual(changeset.get_node('foo/bar/another/bar').content, 'foo')
 
        self.assertEqual(changeset.get_node('foo/baz.txt').content, 'foo')
 
        self.assertEqual(changeset.get_node('foobar/foobaz/file').content, 'foo')
 
        self.assertEqual(changeset.get_node('foobar/barbaz').content, 'foo')
 

	
 
    def test_add_raise_already_added(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.assertRaises(NodeAlreadyAddedError, self.imc.add, node)
 

	
 
    def test_check_integrity_raise_already_exist(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        self.imc.add(node)
 
        self.assertRaises(NodeAlreadyExistsError, self.imc.commit,
 
            message='new message',
 
            author=str(self))
 

	
 
    def test_change(self):
 
        self.imc.add(FileNode('foo/bar/baz', content='foo'))
 
        self.imc.add(FileNode('foo/fbar', content='foobar'))
 
        tip = self.imc.commit(u'Initial', u'joe.doe@example.com')
 

	
 
        # Change node's content
 
        node = FileNode('foo/bar/baz', content='My **changed** content')
 
        self.imc.change(node)
 
        self.imc.commit(u'Changed %s' % node.path, u'joe.doe@example.com')
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertNotEqual(tip, newtip)
 
        self.assertNotEqual(tip.id, newtip.id)
 
        self.assertEqual(newtip.get_node('foo/bar/baz').content,
 
            'My **changed** content')
 

	
 
    def test_change_raise_empty_repository(self):
 
        node = FileNode('foobar')
 
        self.assertRaises(EmptyRepositoryError, self.imc.change, node)
 

	
 
    def test_check_integrity_change_raise_node_does_not_exist(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        node = FileNode('not-foobar', content='')
 
        self.imc.change(node)
 
        self.assertRaises(NodeDoesNotExistError, self.imc.commit,
 
            message='Changed not existing node',
 
            author=str(self))
 

	
 
    def test_change_raise_node_already_changed(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        node = FileNode('foobar', content='more baz')
 
        self.imc.change(node)
 
        self.assertRaises(NodeAlreadyChangedError, self.imc.change, node)
 

	
 
    def test_check_integrity_change_raise_node_not_changed(self):
 
        self.test_add()  # Performs first commit
 

	
 
        node = FileNode(self.nodes[0].path, content=self.nodes[0].content)
 
        self.imc.change(node)
 
        self.assertRaises(NodeNotChangedError, self.imc.commit,
 
            message=u'Trying to mark node as changed without touching it',
 
            author=unicode(self))
 

	
 
    def test_change_raise_node_already_removed(self):
 
        node = FileNode('foobar', content='baz')
 
        self.imc.add(node)
 
        self.imc.commit(message=u'Added foobar', author=unicode(self))
 
        self.imc.remove(FileNode('foobar'))
 
        self.assertRaises(NodeAlreadyRemovedError, self.imc.change, node)
 

	
 
    def test_remove(self):
 
        self.test_add()  # Performs first commit
 

	
 
        tip = self.repo.get_changeset()
 
        node = self.nodes[0]
 
        self.assertEqual(node.content, tip.get_node(node.path).content)
 
        self.imc.remove(node)
 
        self.imc.commit(message=u'Removed %s' % node.path, author=unicode(self))
 

	
 
        newtip = self.repo.get_changeset()
 
        self.assertNotEqual(tip, newtip)
 
        self.assertNotEqual(tip.id, newtip.id)
 
        self.assertRaises(NodeDoesNotExistError, newtip.get_node, node.path)
 

	
 
    def test_remove_last_file_from_directory(self):
 
        node = FileNode('omg/qwe/foo/bar', content='foobar')
 
        self.imc.add(node)
 
        self.imc.commit(u'added', u'joe doe')
 

	
 
        self.imc.remove(node)
 
        tip = self.imc.commit(u'removed', u'joe doe')
 
        self.assertRaises(NodeDoesNotExistError, tip.get_node, 'omg/qwe/foo/bar')
 

	
 
    def test_remove_raise_node_does_not_exist(self):
 
        self.imc.remove(self.nodes[0])
 
        self.assertRaises(NodeDoesNotExistError, self.imc.commit,
 
            message='Trying to remove node at empty repository',
 
            author=str(self))
 

	
 
    def test_check_integrity_remove_raise_node_does_not_exist(self):
 
        self.test_add()  # Performs first commit
 

	
 
        node = FileNode('no-such-file')
 
        self.imc.remove(node)
 
        self.assertRaises(NodeDoesNotExistError, self.imc.commit,
 
            message=u'Trying to remove not existing node',
 
            author=unicode(self))
 

	
 
    def test_remove_raise_node_already_removed(self):
 
        self.test_add() # Performs first commit
 

	
 
        node = FileNode(self.nodes[0].path)
 
        self.imc.remove(node)
 
        self.assertRaises(NodeAlreadyRemovedError, self.imc.remove, node)
 

	
 
    def test_remove_raise_node_already_changed(self):
 
        self.test_add()  # Performs first commit
 

	
 
        node = FileNode(self.nodes[0].path, content='Bending time')
 
        self.imc.change(node)
 
        self.assertRaises(NodeAlreadyChangedError, self.imc.remove, node)
 

	
 
    def test_reset(self):
 
        self.imc.add(FileNode('foo', content='bar'))
 
        #self.imc.change(FileNode('baz', content='new'))
 
        #self.imc.remove(FileNode('qwe'))
 
        self.imc.reset()
 
        self.assertTrue(not any((self.imc.added, self.imc.changed,
 
            self.imc.removed)))
 

	
 
    def test_multiple_commits(self):
 
        N = 3  # number of commits to perform
 
        last = None
 
        for x in xrange(N):
 
            fname = 'file%s' % str(x).rjust(5, '0')
 
            content = 'foobar\n' * x
 
            node = FileNode(fname, content=content)
 
            self.imc.add(node)
 
            commit = self.imc.commit(u"Commit no. %s" % (x + 1), author=u'vcs')
 
            self.assertTrue(last != commit)
 
            last = commit
 

	
 
        # Check commit number for same repo
 
        self.assertEqual(len(self.repo.revisions), N)
 

	
 
        # Check commit number for recreated repo
 
        backend = self.get_backend()
 
        repo = backend(self.repo_path)
 
        self.assertEqual(len(repo.revisions), N)
 

	
 
    def test_date_attr(self):
 
        node = FileNode('foobar.txt', content='Foobared!')
 
        self.imc.add(node)
 
        date = datetime.datetime(1985, 1, 30, 1, 45)
 
        commit = self.imc.commit(u"Committed at time when I was born ;-)",
 
            author=u'lb', date=date)
 

	
 
        self.assertEqual(commit.date, date)
 

	
 

	
 
class BackendBaseTestCase(unittest.TestCase):
 
    """
 
    Base test class for tests which requires repository.
 
    """
 
    backend_alias = 'hg'
 
    commits = [
 
        {
 
            'message': 'Initial commit',
 
            'author': 'Joe Doe <joe.doe@example.com>',
 
            'date': datetime.datetime(2010, 1, 1, 20),
 
            'added': [
 
                FileNode('foobar', content='Foobar'),
 
                FileNode('foobar2', content='Foobar II'),
 
                FileNode('foo/bar/baz', content='baz here!'),
 
            ],
 
        },
 
    ]
 

	
 
    def get_backend(self):
 
        return vcs.get_backend(self.backend_alias)
 

	
 
    def get_commits(self):
 
        """
 
        Returns list of commits which builds repository for each tests.
 
        """
 
        if hasattr(self, 'commits'):
 
            return self.commits
 

	
 
    def get_new_repo_path(self):
 
        """
 
        Returns newly created repository's directory.
 
        """
 
        backend = self.get_backend()
 
        key = '%s-%s' % (backend.alias, str(time.time()))
 
        repo_path = get_new_dir(key)
 
        return repo_path
 

	
 
    def setUp(self):
 
        Backend = self.get_backend()
 
        self.backend_class = Backend
 
        self.repo_path = self.get_new_repo_path()
 
        self.repo = Backend(self.repo_path, create=True)
 
        self.imc = self.repo.in_memory_changeset
 

	
 
        for commit in self.get_commits():
 
            for node in commit.get('added', []):
 
                self.imc.add(FileNode(node.path, content=node.content))
 
            for node in commit.get('changed', []):
 
                self.imc.change(FileNode(node.path, content=node.content))
 
            for node in commit.get('removed', []):
 
                self.imc.remove(FileNode(node.path))
 
            self.imc.commit(message=unicode(commit['message']),
 
                            author=unicode(commit['author']),
 
                date=commit['date'])
 

	
 
        self.tip = self.repo.get_changeset()
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s in memory changeset test' % alias).title().split())
 
    bases = (InMemoryChangesetTestMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
rhodecode/tests/vcs/test_nodes.py
Show inline comments
 
new file 100644
 
from __future__ import with_statement
 

	
 
import stat
 
from rhodecode.lib.vcs.nodes import DirNode
 
from rhodecode.lib.vcs.nodes import FileNode
 
from rhodecode.lib.vcs.nodes import Node
 
from rhodecode.lib.vcs.nodes import NodeError
 
from rhodecode.lib.vcs.nodes import NodeKind
 
from rhodecode.lib.vcs.utils.compat import unittest
 

	
 

	
 
class NodeBasicTest(unittest.TestCase):
 

	
 
    def test_init(self):
 
        """
 
        Cannot innitialize Node objects with path with slash at the beginning.
 
        """
 
        wrong_paths = (
 
            '/foo',
 
            '/foo/bar'
 
        )
 
        for path in wrong_paths:
 
            self.assertRaises(NodeError, Node, path, NodeKind.FILE)
 

	
 
        wrong_paths = (
 
            '/foo/',
 
            '/foo/bar/'
 
        )
 
        for path in wrong_paths:
 
            self.assertRaises(NodeError, Node, path, NodeKind.DIR)
 

	
 
    def test_name(self):
 
        node = Node('', NodeKind.DIR)
 
        self.assertEqual(node.name, '')
 

	
 
        node = Node('path', NodeKind.FILE)
 
        self.assertEqual(node.name, 'path')
 

	
 
        node = Node('path/', NodeKind.DIR)
 
        self.assertEqual(node.name, 'path')
 

	
 
        node = Node('some/path', NodeKind.FILE)
 
        self.assertEqual(node.name, 'path')
 

	
 
        node = Node('some/path/', NodeKind.DIR)
 
        self.assertEqual(node.name, 'path')
 

	
 
    def test_root_node(self):
 
        self.assertRaises(NodeError, Node, '', NodeKind.FILE)
 

	
 
    def test_kind_setter(self):
 
        node = Node('', NodeKind.DIR)
 
        self.assertRaises(NodeError, setattr, node, 'kind', NodeKind.FILE)
 

	
 
    def _test_parent_path(self, node_path, expected_parent_path):
 
        """
 
        Tests if node's parent path are properly computed.
 
        """
 
        node = Node(node_path, NodeKind.DIR)
 
        parent_path = node.get_parent_path()
 
        self.assertTrue(parent_path.endswith('/') or \
 
            node.is_root() and parent_path == '')
 
        self.assertEqual(parent_path, expected_parent_path,
 
            "Node's path is %r and parent path is %r but should be %r"
 
            % (node.path, parent_path, expected_parent_path))
 

	
 
    def test_parent_path(self):
 
        test_paths = (
 
            # (node_path, expected_parent_path)
 
            ('', ''),
 
            ('some/path/', 'some/'),
 
            ('some/longer/path/', 'some/longer/'),
 
        )
 
        for node_path, expected_parent_path in test_paths:
 
            self._test_parent_path(node_path, expected_parent_path)
 

	
 
    '''
 
    def _test_trailing_slash(self, path):
 
        if not path.endswith('/'):
 
            self.fail("Trailing slash tests needs paths to end with slash")
 
        for kind in NodeKind.FILE, NodeKind.DIR:
 
            self.assertRaises(NodeError, Node, path=path, kind=kind)
 

	
 
    def test_trailing_slash(self):
 
        for path in ('/', 'foo/', 'foo/bar/', 'foo/bar/biz/'):
 
            self._test_trailing_slash(path)
 
    '''
 

	
 
    def test_is_file(self):
 
        node = Node('any', NodeKind.FILE)
 
        self.assertTrue(node.is_file())
 

	
 
        node = FileNode('any')
 
        self.assertTrue(node.is_file())
 
        self.assertRaises(AttributeError, getattr, node, 'nodes')
 

	
 
    def test_is_dir(self):
 
        node = Node('any_dir', NodeKind.DIR)
 
        self.assertTrue(node.is_dir())
 

	
 
        node = DirNode('any_dir')
 

	
 
        self.assertTrue(node.is_dir())
 
        self.assertRaises(NodeError, getattr, node, 'content')
 

	
 
    def test_dir_node_iter(self):
 
        nodes = [
 
            DirNode('docs'),
 
            DirNode('tests'),
 
            FileNode('bar'),
 
            FileNode('foo'),
 
            FileNode('readme.txt'),
 
            FileNode('setup.py'),
 
        ]
 
        dirnode = DirNode('', nodes=nodes)
 
        for node in dirnode:
 
            node == dirnode.get_node(node.path)
 

	
 
    def test_node_state(self):
 
        """
 
        Without link to changeset nodes should raise NodeError.
 
        """
 
        node = FileNode('anything')
 
        self.assertRaises(NodeError, getattr, node, 'state')
 
        node = DirNode('anything')
 
        self.assertRaises(NodeError, getattr, node, 'state')
 

	
 
    def test_file_node_stat(self):
 
        node = FileNode('foobar', 'empty... almost')
 
        mode = node.mode  # default should be 0100644
 
        self.assertTrue(mode & stat.S_IRUSR)
 
        self.assertTrue(mode & stat.S_IWUSR)
 
        self.assertTrue(mode & stat.S_IRGRP)
 
        self.assertTrue(mode & stat.S_IROTH)
 
        self.assertFalse(mode & stat.S_IWGRP)
 
        self.assertFalse(mode & stat.S_IWOTH)
 
        self.assertFalse(mode & stat.S_IXUSR)
 
        self.assertFalse(mode & stat.S_IXGRP)
 
        self.assertFalse(mode & stat.S_IXOTH)
 

	
 
    def test_file_node_is_executable(self):
 
        node = FileNode('foobar', 'empty... almost', mode=0100755)
 
        self.assertTrue(node.is_executable())
 

	
 
        node = FileNode('foobar', 'empty... almost', mode=0100500)
 
        self.assertTrue(node.is_executable())
 

	
 
        node = FileNode('foobar', 'empty... almost', mode=0100644)
 
        self.assertFalse(node.is_executable())
 

	
 
    def test_mimetype(self):
 
        py_node = FileNode('test.py')
 
        tar_node = FileNode('test.tar.gz')
 

	
 
        ext = 'CustomExtension'
 

	
 
        my_node2 = FileNode('myfile2')
 
        my_node2._mimetype = [ext]
 

	
 
        my_node3 = FileNode('myfile3')
 
        my_node3._mimetype = [ext,ext]
 

	
 
        self.assertEqual(py_node.mimetype,'text/x-python')
 
        self.assertEqual(py_node.get_mimetype(),('text/x-python',None))
 

	
 
        self.assertEqual(tar_node.mimetype,'application/x-tar')
 
        self.assertEqual(tar_node.get_mimetype(),('application/x-tar','gzip'))
 

	
 
        self.assertRaises(NodeError,my_node2.get_mimetype)
 

	
 
        self.assertEqual(my_node3.mimetype,ext)
 
        self.assertEqual(my_node3.get_mimetype(),[ext,ext])
 

	
 
class NodeContentTest(unittest.TestCase):
 

	
 
    def test_if_binary(self):
 
        data = """\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x10\x00\x00\x00\x10\x08\x06\x00\x00\x00\x1f??a\x00\x00\x00\x04gAMA\x00\x00\xaf?7\x05\x8a?\x00\x00\x00\x19tEXtSoftware\x00Adobe ImageReadyq?e<\x00\x00\x025IDAT8?\xa5\x93?K\x94Q\x14\x87\x9f\xf7?Q\x1bs4?\x03\x9a\xa8?B\x02\x8b$\x10[U;i\x13?6h?&h[?"\x14j?\xa2M\x7fB\x14F\x9aQ?&\x842?\x0b\x89"\x82??!?\x9c!\x9c2l??{N\x8bW\x9dY\xb4\t/\x1c?=\x9b?}????\xa9*;9!?\x83\x91?[?\\v*?D\x04\'`EpNp\xa2X\'U?pVq"Sw.\x1e?\x08\x01D?jw????\xbc??7{|\x9b?\x89$\x01??W@\x15\x9c\x05q`Lt/\x97?\x94\xa1d?\x18~?\x18?\x18W[%\xb0?\x83??\x14\x88\x8dB?\xa6H\tL\tl\x19>/\x01`\xac\xabx?\x9cl\nx\xb0\x98\x07\x95\x88D$"q[\x19?d\x00(o\n\xa0??\x7f\xb9\xa4?\x1bF\x1f\x8e\xac\xa8?j??eUU}?.?\x9f\x8cE??x\x94??\r\xbdtoJU5"0N\x10U?\x00??V\t\x02\x9f\x81?U?\x00\x9eM\xae2?r\x9b7\x83\x82\x8aP3????.?&"?\xb7ZP \x0c<?O\xa5\t}\xb8?\x99\xa6?\x87?\x1di|/\xa0??0\xbe\x1fp?d&\x1a\xad\x95\x8a\x07?\t*\x10??b:?d?.\x13C\x8a?\x12\xbe\xbf\x8e?{???\x08?\x80\xa7\x13+d\x13>J?\x80\x15T\x95\x9a\x00??S\x8c\r?\xa1\x03\x07?\x96\x9b\xa7\xab=E??\xa4\xb3?\x19q??B\x91=\x8d??k?J\x0bV"??\xf7x?\xa1\x00?\\.\x87\x87???\x02F@D\x99],??\x10#?X\xb7=\xb9\x10?Z\x1by???cI??\x1ag?\x92\xbc?T?t[\x92\x81?<_\x17~\x92\x88?H%?\x10Q\x02\x9f\n\x81qQ\x0bm?\x1bX?\xb1AK\xa6\x9e\xb9?u\xb2?1\xbe|/\x92M@\xa2!F?\xa9>"\r<DT?>\x92\x8e?>\x9a9Qv\x127?a\xac?Y?8?:??]X???9\x80\xb7?u?\x0b#BZ\x8d=\x1d?p\x00\x00\x00\x00IEND\xaeB`\x82"""
 
        filenode = FileNode('calendar.png', content=data)
 
        self.assertTrue(filenode.is_binary)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
rhodecode/tests/vcs/test_repository.py
Show inline comments
 
new file 100644
 
from __future__ import with_statement
 
import datetime
 
from base import BackendTestMixin
 
from conf import SCM_TESTS
 
from conf import TEST_USER_CONFIG_FILE
 
from rhodecode.lib.vcs.nodes import FileNode
 
from rhodecode.lib.vcs.utils.compat import unittest
 
from rhodecode.lib.vcs.exceptions import ChangesetDoesNotExistError
 

	
 

	
 
class RepositoryBaseTest(BackendTestMixin):
 
    recreate_repo_per_test = False
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return super(RepositoryBaseTest, cls)._get_commits()[:1]
 

	
 
    def test_get_config_value(self):
 
        self.assertEqual(self.repo.get_config_value('universal', 'foo',
 
            TEST_USER_CONFIG_FILE), 'bar')
 

	
 
    def test_get_config_value_defaults_to_None(self):
 
        self.assertEqual(self.repo.get_config_value('universal', 'nonexist',
 
            TEST_USER_CONFIG_FILE), None)
 

	
 
    def test_get_user_name(self):
 
        self.assertEqual(self.repo.get_user_name(TEST_USER_CONFIG_FILE),
 
            'Foo Bar')
 

	
 
    def test_get_user_email(self):
 
        self.assertEqual(self.repo.get_user_email(TEST_USER_CONFIG_FILE),
 
            'foo.bar@example.com')
 

	
 

	
 

	
 
class RepositoryGetDiffTest(BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        commits = [
 
            {
 
                'message': 'Initial commit',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar', content='foobar'),
 
                    FileNode('foobar2', content='foobar2'),
 
                ],
 
            },
 
            {
 
                'message': 'Changed foobar, added foobar3',
 
                'author': 'Jane Doe <jane.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 21),
 
                'added': [
 
                    FileNode('foobar3', content='foobar3'),
 
                ],
 
                'changed': [
 
                    FileNode('foobar', 'FOOBAR'),
 
                ],
 
            },
 
            {
 
                'message': 'Removed foobar, changed foobar3',
 
                'author': 'Jane Doe <jane.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'changed': [
 
                    FileNode('foobar3', content='FOOBAR\nFOOBAR\nFOOBAR\n'),
 
                ],
 
                'removed': [FileNode('foobar')],
 
            },
 
        ]
 
        return commits
 

	
 
    def test_raise_for_wrong(self):
 
        with self.assertRaises(ChangesetDoesNotExistError):
 
            self.repo.get_diff('a' * 40, 'b' * 40)
 

	
 
class GitRepositoryGetDiffTest(RepositoryGetDiffTest, unittest.TestCase):
 
    backend_alias = 'git'
 

	
 
    def test_initial_commit_diff(self):
 
        initial_rev = self.repo.revisions[0]
 
        self.assertEqual(self.repo.get_diff(self.repo.EMPTY_CHANGESET, initial_rev), '''diff --git a/foobar b/foobar
 
new file mode 100644
 
index 0000000..f6ea049
 
--- /dev/null
 
+++ b/foobar
 
@@ -0,0 +1 @@
 
+foobar
 
\ No newline at end of file
 
diff --git a/foobar2 b/foobar2
 
new file mode 100644
 
index 0000000..e8c9d6b
 
--- /dev/null
 
+++ b/foobar2
 
@@ -0,0 +1 @@
 
+foobar2
 
\ No newline at end of file
 
''')
 

	
 
    def test_second_changeset_diff(self):
 
        revs = self.repo.revisions
 
        self.assertEqual(self.repo.get_diff(revs[0], revs[1]), '''diff --git a/foobar b/foobar
 
index f6ea049..389865b 100644
 
--- a/foobar
 
+++ b/foobar
 
@@ -1 +1 @@
 
-foobar
 
\ No newline at end of file
 
+FOOBAR
 
\ No newline at end of file
 
diff --git a/foobar3 b/foobar3
 
new file mode 100644
 
index 0000000..c11c37d
 
--- /dev/null
 
+++ b/foobar3
 
@@ -0,0 +1 @@
 
+foobar3
 
\ No newline at end of file
 
''')
 

	
 
    def test_third_changeset_diff(self):
 
        revs = self.repo.revisions
 
        self.assertEqual(self.repo.get_diff(revs[1], revs[2]), '''diff --git a/foobar b/foobar
 
deleted file mode 100644
 
index 389865b..0000000
 
--- a/foobar
 
+++ /dev/null
 
@@ -1 +0,0 @@
 
-FOOBAR
 
\ No newline at end of file
 
diff --git a/foobar3 b/foobar3
 
index c11c37d..f932447 100644
 
--- a/foobar3
 
+++ b/foobar3
 
@@ -1 +1,3 @@
 
-foobar3
 
\ No newline at end of file
 
+FOOBAR
 
+FOOBAR
 
+FOOBAR
 
''')
 

	
 

	
 
class HgRepositoryGetDiffTest(RepositoryGetDiffTest, unittest.TestCase):
 
    backend_alias = 'hg'
 

	
 
    def test_initial_commit_diff(self):
 
        initial_rev = self.repo.revisions[0]
 
        self.assertEqual(self.repo.get_diff(self.repo.EMPTY_CHANGESET, initial_rev), '''diff --git a/foobar b/foobar
 
new file mode 100755
 
--- /dev/null
 
+++ b/foobar
 
@@ -0,0 +1,1 @@
 
+foobar
 
\ No newline at end of file
 
diff --git a/foobar2 b/foobar2
 
new file mode 100755
 
--- /dev/null
 
+++ b/foobar2
 
@@ -0,0 +1,1 @@
 
+foobar2
 
\ No newline at end of file
 
''')
 

	
 
    def test_second_changeset_diff(self):
 
        revs = self.repo.revisions
 
        self.assertEqual(self.repo.get_diff(revs[0], revs[1]), '''diff --git a/foobar b/foobar
 
--- a/foobar
 
+++ b/foobar
 
@@ -1,1 +1,1 @@
 
-foobar
 
\ No newline at end of file
 
+FOOBAR
 
\ No newline at end of file
 
diff --git a/foobar3 b/foobar3
 
new file mode 100755
 
--- /dev/null
 
+++ b/foobar3
 
@@ -0,0 +1,1 @@
 
+foobar3
 
\ No newline at end of file
 
''')
 

	
 
    def test_third_changeset_diff(self):
 
        revs = self.repo.revisions
 
        self.assertEqual(self.repo.get_diff(revs[1], revs[2]), '''diff --git a/foobar b/foobar
 
deleted file mode 100755
 
--- a/foobar
 
+++ /dev/null
 
@@ -1,1 +0,0 @@
 
-FOOBAR
 
\ No newline at end of file
 
diff --git a/foobar3 b/foobar3
 
--- a/foobar3
 
+++ b/foobar3
 
@@ -1,1 +1,3 @@
 
-foobar3
 
\ No newline at end of file
 
+FOOBAR
 
+FOOBAR
 
+FOOBAR
 
''')
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = alias.capitalize() + RepositoryBaseTest.__name__
 
    bases = (RepositoryBaseTest, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 
if __name__ == '__main__':
 
    unittest.main()
rhodecode/tests/vcs/test_tags.py
Show inline comments
 
new file 100644
 
from __future__ import with_statement
 

	
 
from base import BackendTestMixin
 
from conf import SCM_TESTS
 
from rhodecode.lib.vcs.exceptions import TagAlreadyExistError
 
from rhodecode.lib.vcs.exceptions import TagDoesNotExistError
 
from rhodecode.lib.vcs.utils.compat import unittest
 

	
 

	
 
class TagsTestCaseMixin(BackendTestMixin):
 

	
 
    def test_new_tag(self):
 
        tip = self.repo.get_changeset()
 
        tagsize = len(self.repo.tags)
 
        tag = self.repo.tag('last-commit', 'joe', tip.raw_id)
 

	
 
        self.assertEqual(len(self.repo.tags), tagsize + 1)
 
        for top, dirs, files in tip.walk():
 
            self.assertEqual(top, tag.get_node(top.path))
 

	
 
    def test_tag_already_exist(self):
 
        tip = self.repo.get_changeset()
 
        self.repo.tag('last-commit', 'joe', tip.raw_id)
 

	
 
        self.assertRaises(TagAlreadyExistError,
 
            self.repo.tag, 'last-commit', 'joe', tip.raw_id)
 

	
 
        chset = self.repo.get_changeset(0)
 
        self.assertRaises(TagAlreadyExistError,
 
            self.repo.tag, 'last-commit', 'jane', chset.raw_id)
 

	
 
    def test_remove_tag(self):
 
        tip = self.repo.get_changeset()
 
        self.repo.tag('last-commit', 'joe', tip.raw_id)
 
        tagsize = len(self.repo.tags)
 

	
 
        self.repo.remove_tag('last-commit', user='evil joe')
 
        self.assertEqual(len(self.repo.tags), tagsize - 1)
 

	
 
    def test_remove_tag_which_does_not_exist(self):
 
        self.assertRaises(TagDoesNotExistError,
 
            self.repo.remove_tag, 'last-commit', user='evil joe')
 

	
 
    def test_name_with_slash(self):
 
        self.repo.tag('19/10/11', 'joe')
 
        self.assertTrue('19/10/11' in self.repo.tags)
 
        self.repo.tag('11', 'joe')
 
        self.assertTrue('11' in self.repo.tags)
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s tags test' % alias).title().split())
 
    bases = (TagsTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
rhodecode/tests/vcs/test_utils.py
Show inline comments
 
new file 100644
 
from __future__ import with_statement
 

	
 
import os
 
import mock
 
import time
 
import shutil
 
import tempfile
 
import datetime
 
from rhodecode.lib.vcs.utils.compat import unittest
 
from rhodecode.lib.vcs.utils.paths import get_dirs_for_path
 
from rhodecode.lib.vcs.utils.helpers import get_dict_for_attrs
 
from rhodecode.lib.vcs.utils.helpers import get_scm
 
from rhodecode.lib.vcs.utils.helpers import get_scms_for_path
 
from rhodecode.lib.vcs.utils.helpers import get_total_seconds
 
from rhodecode.lib.vcs.utils.helpers import parse_changesets
 
from rhodecode.lib.vcs.utils.helpers import parse_datetime
 
from rhodecode.lib.vcs.utils import author_email, author_name
 
from rhodecode.lib.vcs.utils.paths import get_user_home
 
from rhodecode.lib.vcs.exceptions import VCSError
 

	
 
from conf import TEST_HG_REPO, TEST_GIT_REPO, TEST_TMP_PATH
 

	
 

	
 
class PathsTest(unittest.TestCase):
 

	
 
    def _test_get_dirs_for_path(self, path, expected):
 
        """
 
        Tests if get_dirs_for_path returns same as expected.
 
        """
 
        expected = sorted(expected)
 
        result = sorted(get_dirs_for_path(path))
 
        self.assertEqual(result, expected,
 
            msg="%s != %s which was expected result for path %s"
 
            % (result, expected, path))
 

	
 
    def test_get_dirs_for_path(self):
 
        path = 'foo/bar/baz/file'
 
        paths_and_results = (
 
            ('foo/bar/baz/file', ['foo', 'foo/bar', 'foo/bar/baz']),
 
            ('foo/bar/', ['foo', 'foo/bar']),
 
            ('foo/bar', ['foo']),
 
        )
 
        for path, expected in paths_and_results:
 
            self._test_get_dirs_for_path(path, expected)
 

	
 

	
 
    def test_get_scm(self):
 
        self.assertEqual(('hg', TEST_HG_REPO), get_scm(TEST_HG_REPO))
 
        self.assertEqual(('git', TEST_GIT_REPO), get_scm(TEST_GIT_REPO))
 

	
 
    def test_get_two_scms_for_path(self):
 
        multialias_repo_path = os.path.join(TEST_TMP_PATH, 'hg-git-repo-2')
 
        if os.path.isdir(multialias_repo_path):
 
            shutil.rmtree(multialias_repo_path)
 

	
 
        os.mkdir(multialias_repo_path)
 

	
 
        self.assertRaises(VCSError, get_scm, multialias_repo_path)
 

	
 
    def test_get_scm_error_path(self):
 
        self.assertRaises(VCSError, get_scm, 'err')
 

	
 
    def test_get_scms_for_path(self):
 
        dirpath = tempfile.gettempdir()
 
        new = os.path.join(dirpath, 'vcs-scms-for-path-%s' % time.time())
 
        os.mkdir(new)
 
        self.assertEqual(get_scms_for_path(new), [])
 

	
 
        os.mkdir(os.path.join(new, '.tux'))
 
        self.assertEqual(get_scms_for_path(new), [])
 

	
 
        os.mkdir(os.path.join(new, '.git'))
 
        self.assertEqual(set(get_scms_for_path(new)), set(['git']))
 

	
 
        os.mkdir(os.path.join(new, '.hg'))
 
        self.assertEqual(set(get_scms_for_path(new)), set(['git', 'hg']))
 

	
 

	
 
class TestParseChangesets(unittest.TestCase):
 

	
 
    def test_main_is_returned_correctly(self):
 
        self.assertEqual(parse_changesets('123456'), {
 
            'start': None,
 
            'main': '123456',
 
            'end': None,
 
        })
 

	
 
    def test_start_is_returned_correctly(self):
 
        self.assertEqual(parse_changesets('aaabbb..'), {
 
            'start': 'aaabbb',
 
            'main': None,
 
            'end': None,
 
        })
 

	
 
    def test_end_is_returned_correctly(self):
 
        self.assertEqual(parse_changesets('..cccddd'), {
 
            'start': None,
 
            'main': None,
 
            'end': 'cccddd',
 
        })
 

	
 
    def test_that_two_or_three_dots_are_allowed(self):
 
        text1 = 'a..b'
 
        text2 = 'a...b'
 
        self.assertEqual(parse_changesets(text1), parse_changesets(text2))
 

	
 
    def test_that_input_is_stripped_first(self):
 
        text1 = 'a..bb'
 
        text2 = '  a..bb\t\n\t '
 
        self.assertEqual(parse_changesets(text1), parse_changesets(text2))
 

	
 
    def test_that_exception_is_raised(self):
 
        text = '123456.789012' # single dot is not recognized
 
        with self.assertRaises(ValueError):
 
            parse_changesets(text)
 

	
 
    def test_non_alphanumeric_raises_exception(self):
 
        with self.assertRaises(ValueError):
 
            parse_changesets('aaa@bbb')
 

	
 

	
 
class TestParseDatetime(unittest.TestCase):
 

	
 
    def test_datetime_text(self):
 
        self.assertEqual(parse_datetime('2010-04-07 21:29:41'),
 
            datetime.datetime(2010, 4, 7, 21, 29, 41))
 

	
 
    def test_no_seconds(self):
 
        self.assertEqual(parse_datetime('2010-04-07 21:29'),
 
            datetime.datetime(2010, 4, 7, 21, 29))
 

	
 
    def test_date_only(self):
 
        self.assertEqual(parse_datetime('2010-04-07'),
 
            datetime.datetime(2010, 4, 7))
 

	
 
    def test_another_format(self):
 
        self.assertEqual(parse_datetime('04/07/10 21:29:41'),
 
            datetime.datetime(2010, 4, 7, 21, 29, 41))
 

	
 
    def test_now(self):
 
        self.assertTrue(parse_datetime('now') - datetime.datetime.now() <
 
            datetime.timedelta(seconds=1))
 

	
 
    def test_today(self):
 
        today = datetime.date.today()
 
        self.assertEqual(parse_datetime('today'),
 
            datetime.datetime(*today.timetuple()[:3]))
 

	
 
    def test_yesterday(self):
 
        yesterday = datetime.date.today() - datetime.timedelta(days=1)
 
        self.assertEqual(parse_datetime('yesterday'),
 
            datetime.datetime(*yesterday.timetuple()[:3]))
 

	
 
    def test_tomorrow(self):
 
        tomorrow = datetime.date.today() + datetime.timedelta(days=1)
 
        args = tomorrow.timetuple()[:3] + (23, 59, 59)
 
        self.assertEqual(parse_datetime('tomorrow'), datetime.datetime(*args))
 

	
 
    def test_days(self):
 
        timestamp = datetime.datetime.today() - datetime.timedelta(days=3)
 
        args = timestamp.timetuple()[:3] + (0, 0, 0, 0)
 
        expected = datetime.datetime(*args)
 
        self.assertEqual(parse_datetime('3d'), expected)
 
        self.assertEqual(parse_datetime('3 d'), expected)
 
        self.assertEqual(parse_datetime('3 day'), expected)
 
        self.assertEqual(parse_datetime('3 days'), expected)
 

	
 
    def test_weeks(self):
 
        timestamp = datetime.datetime.today() - datetime.timedelta(days=3 * 7)
 
        args = timestamp.timetuple()[:3] + (0, 0, 0, 0)
 
        expected = datetime.datetime(*args)
 
        self.assertEqual(parse_datetime('3w'), expected)
 
        self.assertEqual(parse_datetime('3 w'), expected)
 
        self.assertEqual(parse_datetime('3 week'), expected)
 
        self.assertEqual(parse_datetime('3 weeks'), expected)
 

	
 
    def test_mixed(self):
 
        timestamp = datetime.datetime.today() - datetime.timedelta(days=2 * 7 + 3)
 
        args = timestamp.timetuple()[:3] + (0, 0, 0, 0)
 
        expected = datetime.datetime(*args)
 
        self.assertEqual(parse_datetime('2w3d'), expected)
 
        self.assertEqual(parse_datetime('2w 3d'), expected)
 
        self.assertEqual(parse_datetime('2w 3 days'), expected)
 
        self.assertEqual(parse_datetime('2 weeks 3 days'), expected)
 

	
 

	
 
class TestAuthorExtractors(unittest.TestCase):
 
    TEST_AUTHORS = [('Marcin Kuzminski <marcin@python-works.com>',
 
                    ('Marcin Kuzminski', 'marcin@python-works.com')),
 
                  ('Marcin Kuzminski Spaces < marcin@python-works.com >',
 
                    ('Marcin Kuzminski Spaces', 'marcin@python-works.com')),
 
                  ('Marcin Kuzminski <marcin.kuzminski@python-works.com>',
 
                    ('Marcin Kuzminski', 'marcin.kuzminski@python-works.com')),
 
                  ('mrf RFC_SPEC <marcin+kuzminski@python-works.com>',
 
                    ('mrf RFC_SPEC', 'marcin+kuzminski@python-works.com')),
 
                  ('username <user@email.com>',
 
                    ('username', 'user@email.com')),
 
                  ('username <user@email.com',
 
                   ('username', 'user@email.com')),
 
                  ('broken missing@email.com',
 
                   ('broken', 'missing@email.com')),
 
                  ('<justemail@mail.com>',
 
                   ('', 'justemail@mail.com')),
 
                  ('justname',
 
                   ('justname', '')),
 
                  ('Mr Double Name withemail@email.com ',
 
                   ('Mr Double Name', 'withemail@email.com')),
 
                  ]
 

	
 
    def test_author_email(self):
 

	
 
        for test_str, result in self.TEST_AUTHORS:
 
            self.assertEqual(result[1], author_email(test_str))
 

	
 

	
 
    def test_author_name(self):
 

	
 
        for test_str, result in self.TEST_AUTHORS:
 
            self.assertEqual(result[0], author_name(test_str))
 

	
 

	
 
class TestGetDictForAttrs(unittest.TestCase):
 

	
 
    def test_returned_dict_has_expected_attrs(self):
 
        obj = mock.Mock()
 
        obj.NOT_INCLUDED = 'this key/value should not be included'
 
        obj.CONST = True
 
        obj.foo = 'aaa'
 
        obj.attrs = {'foo': 'bar'}
 
        obj.date = datetime.datetime(2010, 12, 31)
 
        obj.count = 1001
 

	
 
        self.assertEqual(get_dict_for_attrs(obj, ['CONST', 'foo', 'attrs',
 
            'date', 'count']), {
 
            'CONST': True,
 
            'foo': 'aaa',
 
            'attrs': {'foo': 'bar'},
 
            'date': datetime.datetime(2010, 12, 31),
 
            'count': 1001,
 
        })
 

	
 

	
 
class TestGetTotalSeconds(unittest.TestCase):
 

	
 
    def assertTotalSecondsEqual(self, timedelta, expected_seconds):
 
        result = get_total_seconds(timedelta)
 
        self.assertEqual(result, expected_seconds,
 
            "We computed %s seconds for %s but expected %s"
 
            % (result, timedelta, expected_seconds))
 

	
 
    def test_get_total_seconds_returns_proper_value(self):
 
        self.assertTotalSecondsEqual(datetime.timedelta(seconds=1001), 1001)
 

	
 
    def test_get_total_seconds_returns_proper_value_for_partial_seconds(self):
 
        self.assertTotalSecondsEqual(datetime.timedelta(seconds=50.65), 50.65)
 

	
 

	
 
class TestGetUserHome(unittest.TestCase):
 

	
 
    @mock.patch.object(os, 'environ', {})
 
    def test_defaults_to_none(self):
 
        self.assertEqual(get_user_home(), None)
 

	
 
    @mock.patch.object(os, 'environ', {'HOME': '/home/foobar'})
 
    def test_unix_like(self):
 
        self.assertEqual(get_user_home(), '/home/foobar')
 

	
 
    @mock.patch.object(os, 'environ', {'USERPROFILE': '/Users/foobar'})
 
    def test_windows_like(self):
 
        self.assertEqual(get_user_home(), '/Users/foobar')
 

	
 
    @mock.patch.object(os, 'environ', {'HOME': '/home/foobar',
 
        'USERPROFILE': '/Users/foobar'})
 
    def test_prefers_home_over_userprofile(self):
 
        self.assertEqual(get_user_home(), '/home/foobar')
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
rhodecode/tests/vcs/test_utils_filesize.py
Show inline comments
 
new file 100644
 
from __future__ import with_statement
 

	
 
from rhodecode.lib.vcs.utils.filesize import filesizeformat
 
from rhodecode.lib.vcs.utils.compat import unittest
 

	
 

	
 
class TestFilesizeformat(unittest.TestCase):
 

	
 
    def test_bytes(self):
 
        self.assertEqual(filesizeformat(10), '10 B')
 

	
 
    def test_kilobytes(self):
 
        self.assertEqual(filesizeformat(1024 * 2), '2 KB')
 

	
 
    def test_megabytes(self):
 
        self.assertEqual(filesizeformat(1024 * 1024 * 2.3), '2.3 MB')
 

	
 
    def test_gigabytes(self):
 
        self.assertEqual(filesizeformat(1024 * 1024 * 1024 * 12.92), '12.92 GB')
 

	
 
    def test_that_function_respects_sep_paramtere(self):
 
        self.assertEqual(filesizeformat(1, ''), '1B')
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
rhodecode/tests/vcs/test_vcs.py
Show inline comments
 
new file 100644
 
from __future__ import with_statement
 

	
 
from rhodecode.lib.vcs import VCSError, get_repo, get_backend
 
from rhodecode.lib.vcs.backends.hg import MercurialRepository
 
from rhodecode.lib.vcs.utils.compat import unittest
 
from conf import TEST_HG_REPO, TEST_GIT_REPO, TEST_TMP_PATH
 
import os
 
import shutil
 

	
 

	
 
class VCSTest(unittest.TestCase):
 
    """
 
    Tests for main module's methods.
 
    """
 

	
 
    def test_get_backend(self):
 
        hg = get_backend('hg')
 
        self.assertEqual(hg, MercurialRepository)
 

	
 
    def test_alias_detect_hg(self):
 
        alias = 'hg'
 
        path = TEST_HG_REPO
 
        backend = get_backend(alias)
 
        repo = backend(path)
 
        self.assertEqual('hg',repo.alias)
 

	
 
    def test_alias_detect_git(self):
 
        alias = 'git'
 
        path = TEST_GIT_REPO
 
        backend = get_backend(alias)
 
        repo = backend(path)
 
        self.assertEqual('git',repo.alias)
 

	
 
    def test_wrong_alias(self):
 
        alias = 'wrong_alias'
 
        self.assertRaises(VCSError, get_backend, alias)
 

	
 
    def test_get_repo(self):
 
        alias = 'hg'
 
        path = TEST_HG_REPO
 
        backend = get_backend(alias)
 
        repo = backend(path)
 

	
 
        self.assertEqual(repo.__class__, get_repo(path, alias).__class__)
 
        self.assertEqual(repo.path, get_repo(path, alias).path)
 

	
 
    def test_get_repo_autoalias_hg(self):
 
        alias = 'hg'
 
        path = TEST_HG_REPO
 
        backend = get_backend(alias)
 
        repo = backend(path)
 

	
 
        self.assertEqual(repo.__class__, get_repo(path).__class__)
 
        self.assertEqual(repo.path, get_repo(path).path)
 

	
 
    def test_get_repo_autoalias_git(self):
 
        alias = 'git'
 
        path = TEST_GIT_REPO
 
        backend = get_backend(alias)
 
        repo = backend(path)
 

	
 
        self.assertEqual(repo.__class__, get_repo(path).__class__)
 
        self.assertEqual(repo.path, get_repo(path).path)
 

	
 

	
 
    def test_get_repo_err(self):
 
        blank_repo_path = os.path.join(TEST_TMP_PATH, 'blank-error-repo')
 
        if os.path.isdir(blank_repo_path):
 
            shutil.rmtree(blank_repo_path)
 

	
 
        os.mkdir(blank_repo_path)
 
        self.assertRaises(VCSError, get_repo, blank_repo_path)
 
        self.assertRaises(VCSError, get_repo, blank_repo_path + 'non_existing')
 

	
 
    def test_get_repo_multialias(self):
 
        multialias_repo_path = os.path.join(TEST_TMP_PATH, 'hg-git-repo')
 
        if os.path.isdir(multialias_repo_path):
 
            shutil.rmtree(multialias_repo_path)
 

	
 
        os.mkdir(multialias_repo_path)
 

	
 
        os.mkdir(os.path.join(multialias_repo_path, '.git'))
 
        os.mkdir(os.path.join(multialias_repo_path, '.hg'))
 
        self.assertRaises(VCSError, get_repo, multialias_repo_path)
rhodecode/tests/vcs/test_workdirs.py
Show inline comments
 
new file 100644
 
from __future__ import with_statement
 

	
 
import datetime
 
from rhodecode.lib.vcs.nodes import FileNode
 
from rhodecode.lib.vcs.utils.compat import unittest
 
from base import BackendTestMixin
 
from conf import SCM_TESTS
 

	
 

	
 
class WorkdirTestCaseMixin(BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        commits = [
 
            {
 
                'message': u'Initial commit',
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar', content='Foobar'),
 
                    FileNode('foobar2', content='Foobar II'),
 
                    FileNode('foo/bar/baz', content='baz here!'),
 
                ],
 
            },
 
            {
 
                'message': u'Changes...',
 
                'author': u'Jane Doe <jane.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 21),
 
                'added': [
 
                    FileNode('some/new.txt', content='news...'),
 
                ],
 
                'changed': [
 
                    FileNode('foobar', 'Foobar I'),
 
                ],
 
                'removed': [],
 
            },
 
        ]
 
        return commits
 

	
 
    def test_get_branch_for_default_branch(self):
 
        self.assertEqual(self.repo.workdir.get_branch(),
 
            self.repo.DEFAULT_BRANCH_NAME)
 

	
 
    def test_get_branch_after_adding_one(self):
 
        self.imc.add(FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 

	
 
    def test_get_changeset(self):
 
        self.imc.add(FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        head = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 
        self.assertEqual(self.repo.workdir.get_changeset(), head)
 

	
 
    def test_checkout_branch(self):
 
        from rhodecode.lib.vcs.exceptions import BranchDoesNotExistError
 
        # first, 'foobranch' does not exist.
 
        self.assertRaises(BranchDoesNotExistError, self.repo.workdir.checkout_branch,
 
                          branch='foobranch')
 
        # create new branch 'foobranch'.
 
        self.imc.add(FileNode('file1', content='blah'))
 
        self.imc.commit(message=u'asd', author=u'john', branch='foobranch')
 
        # go back to the default branch
 
        self.repo.workdir.checkout_branch()
 
        self.assertEqual(self.repo.workdir.get_branch(), self.backend_class.DEFAULT_BRANCH_NAME)
 
        # checkout 'foobranch'
 
        self.repo.workdir.checkout_branch('foobranch')
 
        self.assertEqual(self.repo.workdir.get_branch(), 'foobranch')
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    cls_name = ''.join(('%s branch test' % alias).title().split())
 
    bases = (WorkdirTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
rhodecode/tests/vcs/utils.py
Show inline comments
 
new file 100644
 
"""
 
Utilities for tests only. These are not or should not be used normally -
 
functions here are crafted as we don't want to use ``vcs`` to verify tests.
 
"""
 
import os
 
import re
 
import sys
 

	
 
from subprocess import Popen
 

	
 

	
 
class VCSTestError(Exception):
 
    pass
 

	
 

	
 
def run_command(cmd, args):
 
    """
 
    Runs command on the system with given ``args``.
 
    """
 
    command = ' '.join((cmd, args))
 
    p = Popen(command, shell=True)
 
    status = os.waitpid(p.pid, 0)[1]
 
    return status
 

	
 

	
 
def eprint(msg):
 
    """
 
    Prints given ``msg`` into sys.stderr as nose test runner hides all output
 
    from sys.stdout by default and if we want to pipe stream somewhere we don't
 
    need those verbose messages anyway.
 
    Appends line break.
 
    """
 
    sys.stderr.write(msg)
 
    sys.stderr.write('\n')
 

	
 

	
 
class SCMFetcher(object):
 

	
 
    def __init__(self, alias, test_repo_path, remote_repo, clone_cmd):
 
        """
 
        :param clone_cmd: command which would clone remote repository; pass
 
          only first bits - remote path and destination would be appended
 
          using ``remote_repo`` and ``test_repo_path``
 
        """
 
        self.alias = alias
 
        self.test_repo_path = test_repo_path
 
        self.remote_repo = remote_repo
 
        self.clone_cmd = clone_cmd
 

	
 
    def setup(self):
 
        if not os.path.isdir(self.test_repo_path):
 
            self.fetch_repo()
 

	
 
    def fetch_repo(self):
 
        """
 
        Tries to fetch repository from remote path.
 
        """
 
        remote = self.remote_repo
 
        eprint("Fetching repository %s into %s" % (remote, self.test_repo_path))
 
        run_command(self.clone_cmd,  '%s %s' % (remote, self.test_repo_path))
 

	
 

	
 
def get_normalized_path(path):
 
    """
 
    If given path exists, new path would be generated and returned. Otherwise
 
    same whats given is returned. Assumes that there would be no more than
 
    10000 same named files.
 
    """
 
    if os.path.exists(path):
 
        dir, basename = os.path.split(path)
 
        splitted_name = basename.split('.')
 
        if len(splitted_name) > 1:
 
            ext = splitted_name[-1]
 
        else:
 
            ext = None
 
        name = '.'.join(splitted_name[:-1])
 
        matcher = re.compile(r'^.*-(\d{5})$')
 
        start = 0
 
        m = matcher.match(name)
 
        if not m:
 
            # Haven't append number yet so return first
 
            newname = '%s-00000' % name
 
            newpath = os.path.join(dir, newname)
 
            if ext:
 
                newpath = '.'.join((newpath, ext))
 
            return get_normalized_path(newpath)
 
        else:
 
            start = int(m.group(1)[-5:]) + 1
 
            for x in xrange(start, 10000):
 
                newname = name[:-5] + str(x).rjust(5, '0')
 
                newpath = os.path.join(dir, newname)
 
                if ext:
 
                    newpath = '.'.join((newpath, ext))
 
                if not os.path.exists(newpath):
 
                    return newpath
 
        raise VCSTestError("Couldn't compute new path for %s" % path)
 
    return path
0 comments (0 inline, 0 general)