Changeset - 4abfb1afd9f5
[Not reviewed]
beta
0 2 0
Marcin Kuzminski - 13 years ago 2012-10-27 15:07:01
marcin@python-works.com
fixes #630 git statistics do too much work making them slow.
added/changed/removed methods didn't use generator methods to return the list of changes
- added test for that case
2 files changed with 13 insertions and 5 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/vcs/backends/git/changeset.py
Show inline comments
 
import re
 
from itertools import chain
 
from dulwich import objects
 
from subprocess import Popen, PIPE
 
from rhodecode.lib.vcs.conf import settings
 
from rhodecode.lib.vcs.exceptions import RepositoryError
 
from rhodecode.lib.vcs.exceptions import ChangesetError
 
from rhodecode.lib.vcs.exceptions import NodeDoesNotExistError
 
from rhodecode.lib.vcs.exceptions import VCSError
 
from rhodecode.lib.vcs.exceptions import ChangesetDoesNotExistError
 
from rhodecode.lib.vcs.exceptions import ImproperArchiveTypeError
 
from rhodecode.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from rhodecode.lib.vcs.nodes import FileNode, DirNode, NodeKind, RootNode, \
 
    RemovedFileNode, SubModuleNode
 
    RemovedFileNode, SubModuleNode, ChangedFileNodesGenerator,\
 
    AddedFileNodesGenerator, RemovedFileNodesGenerator
 
from rhodecode.lib.vcs.utils import safe_unicode
 
from rhodecode.lib.vcs.utils import date_fromtimestamp
 
from rhodecode.lib.vcs.utils.lazy import LazyProperty
 

	
 

	
 
class GitChangeset(BaseChangeset):
 
    """
 
    Represents state of the repository at single revision.
 
    """
 

	
 
    def __init__(self, repository, revision):
 
        self._stat_modes = {}
 
        self.repository = repository
 

	
 
        try:
 
            commit = self.repository._repo.get_object(revision)
 
            if isinstance(commit, objects.Tag):
 
                revision = commit.object[1]
 
                commit = self.repository._repo.get_object(commit.object[1])
 
        except KeyError:
 
            raise RepositoryError("Cannot get object with id %s" % revision)
 
        self.raw_id = revision
 
        self.id = self.raw_id
 
        self.short_id = self.raw_id[:12]
 
        self._commit = commit
 

	
 
        self._tree_id = commit.tree
 
        self._commiter_property = 'committer'
 
        self._date_property = 'commit_time'
 
        self._date_tz_property = 'commit_timezone'
 
        self.revision = repository.revisions.index(revision)
 

	
 
        self.message = safe_unicode(commit.message)
 
        #self.branch = None
 
        self.tags = []
 
        self.nodes = {}
 
        self._paths = {}
 

	
 
    @LazyProperty
 
    def author(self):
 
        return safe_unicode(getattr(self._commit, self._commiter_property))
 

	
 
    @LazyProperty
 
    def date(self):
 
        return date_fromtimestamp(getattr(self._commit, self._date_property),
 
                                  getattr(self._commit, self._date_tz_property))
 

	
 
    @LazyProperty
 
    def _timestamp(self):
 
        return getattr(self._commit, self._date_property)
 

	
 
    @LazyProperty
 
    def status(self):
 
        """
 
        Returns modified, added, removed, deleted files for current changeset
 
        """
 
        return self.changed, self.added, self.removed
 

	
 
    @LazyProperty
 
    def branch(self):
 

	
 
        heads = self.repository._heads(reverse=False)
 

	
 
        ref = heads.get(self.raw_id)
 
        if ref:
 
            return safe_unicode(ref)
 

	
 
    def _fix_path(self, path):
 
        """
 
        Paths are stored without trailing slash so we need to get rid off it if
 
        needed.
 
        """
 
        if path.endswith('/'):
 
            path = path.rstrip('/')
 
        return path
 

	
 
    def _get_id_for_path(self, path):
 

	
 
        # FIXME: Please, spare a couple of minutes and make those codes cleaner;
 
        if not path in self._paths:
 
            path = path.strip('/')
 
            # set root tree
 
            tree = self.repository._repo[self._tree_id]
 
            if path == '':
 
                self._paths[''] = tree.id
 
                return tree.id
 
            splitted = path.split('/')
 
            dirs, name = splitted[:-1], splitted[-1]
 
            curdir = ''
 

	
 
            # initially extract things from root dir
 
            for item, stat, id in tree.iteritems():
 
                if curdir:
 
                    name = '/'.join((curdir, item))
 
                else:
 
                    name = item
 
                self._paths[name] = id
 
                self._stat_modes[name] = stat
 

	
 
            for dir in dirs:
 
                if curdir:
 
                    curdir = '/'.join((curdir, dir))
 
                else:
 
                    curdir = dir
 
                dir_id = None
 
                for item, stat, id in tree.iteritems():
 
                    if dir == item:
 
                        dir_id = id
 
                if dir_id:
 
                    # Update tree
 
                    tree = self.repository._repo[dir_id]
 
                    if not isinstance(tree, objects.Tree):
 
                        raise ChangesetError('%s is not a directory' % curdir)
 
                else:
 
                    raise ChangesetError('%s have not been found' % curdir)
 

	
 
                # cache all items from the given traversed tree
 
                for item, stat, id in tree.iteritems():
 
                    if curdir:
 
                        name = '/'.join((curdir, item))
 
                    else:
 
                        name = item
 
                    self._paths[name] = id
 
                    self._stat_modes[name] = stat
 
            if not path in self._paths:
 
                raise NodeDoesNotExistError("There is no file nor directory "
 
                    "at the given path %r at revision %r"
 
                    % (path, self.short_id))
 
        return self._paths[path]
 

	
 
    def _get_kind(self, path):
 
        obj = self.repository._repo[self._get_id_for_path(path)]
 
        if isinstance(obj, objects.Blob):
 
            return NodeKind.FILE
 
        elif isinstance(obj, objects.Tree):
 
            return NodeKind.DIR
 

	
 
    def _get_file_nodes(self):
 
        return chain(*(t[2] for t in self.walk()))
 

	
 
    @LazyProperty
 
    def parents(self):
 
        """
 
        Returns list of parents changesets.
 
        """
 
        return [self.repository.get_changeset(parent)
 
                for parent in self._commit.parents]
 

	
 
    def next(self, branch=None):
 

	
 
        if branch and self.branch != branch:
 
            raise VCSError('Branch option used on changeset not belonging '
 
                           'to that branch')
 

	
 
        def _next(changeset, branch):
 
            try:
 
                next_ = changeset.revision + 1
 
                next_rev = changeset.repository.revisions[next_]
 
            except IndexError:
 
                raise ChangesetDoesNotExistError
 
            cs = changeset.repository.get_changeset(next_rev)
 

	
 
            if branch and branch != cs.branch:
 
                return _next(cs, branch)
 

	
 
            return cs
 

	
 
        return _next(self, branch)
 

	
 
    def prev(self, branch=None):
 
        if branch and self.branch != branch:
 
            raise VCSError('Branch option used on changeset not belonging '
 
                           'to that branch')
 

	
 
        def _prev(changeset, branch):
 
            try:
 
                prev_ = changeset.revision - 1
 
                if prev_ < 0:
 
                    raise IndexError
 
                prev_rev = changeset.repository.revisions[prev_]
 
            except IndexError:
 
                raise ChangesetDoesNotExistError
 

	
 
            cs = changeset.repository.get_changeset(prev_rev)
 

	
 
            if branch and branch != cs.branch:
 
                return _prev(cs, branch)
 

	
 
            return cs
 

	
 
        return _prev(self, branch)
 

	
 
    def diff(self, ignore_whitespace=True, context=3):
 
        rev1 = self.parents[0] if self.parents else self.repository.EMPTY_CHANGESET
 
        rev2 = self
 
        return ''.join(self.repository.get_diff(rev1, rev2,
 
                                    ignore_whitespace=ignore_whitespace,
 
                                    context=context))
 

	
 
    def get_file_mode(self, path):
 
        """
 
        Returns stat mode of the file at the given ``path``.
 
        """
 
        # ensure path is traversed
 
        self._get_id_for_path(path)
 
        return self._stat_modes[path]
 

	
 
    def get_file_content(self, path):
 
        """
 
        Returns content of the file at given ``path``.
 
        """
 
        id = self._get_id_for_path(path)
 
        blob = self.repository._repo[id]
 
        return blob.as_pretty_string()
 

	
 
    def get_file_size(self, path):
 
        """
 
        Returns size of the file at given ``path``.
 
        """
 
        id = self._get_id_for_path(path)
 
        blob = self.repository._repo[id]
 
        return blob.raw_length()
 

	
 
    def get_file_changeset(self, path):
 
        """
 
        Returns last commit of the file at the given ``path``.
 
        """
 
        node = self.get_node(path)
 
        return node.history[0]
 

	
 
    def get_file_history(self, path):
 
        """
 
        Returns history of file as reversed list of ``Changeset`` objects for
 
        which file at given ``path`` has been modified.
 

	
 
        TODO: This function now uses os underlying 'git' and 'grep' commands
 
        which is generally not good. Should be replaced with algorithm
 
        iterating commits.
 
        """
 
        cmd = 'log --pretty="format: %%H" -s -p %s -- "%s"' % (
 
                  self.id, path
 
               )
 
        so, se = self.repository.run_git_command(cmd)
 
        ids = re.findall(r'[0-9a-fA-F]{40}', so)
 
        return [self.repository.get_changeset(id) for id in ids]
 

	
 
    def get_file_annotate(self, path):
 
        """
 
        Returns a list of three element tuples with lineno,changeset and line
 

	
 
        TODO: This function now uses os underlying 'git' command which is
 
        generally not good. Should be replaced with algorithm iterating
 
        commits.
 
        """
 
        cmd = 'blame -l --root -r %s -- "%s"' % (self.id, path)
 
        # -l     ==> outputs long shas (and we need all 40 characters)
 
        # --root ==> doesn't put '^' character for bounderies
 
        # -r sha ==> blames for the given revision
 
        so, se = self.repository.run_git_command(cmd)
 

	
 
        annotate = []
 
        for i, blame_line in enumerate(so.split('\n')[:-1]):
 
            ln_no = i + 1
 
            id, line = re.split(r' ', blame_line, 1)
 
            annotate.append((ln_no, self.repository.get_changeset(id), line))
 
        return annotate
 

	
 
    def fill_archive(self, stream=None, kind='tgz', prefix=None,
 
                     subrepos=False):
 
        """
 
        Fills up given stream.
 

	
 
        :param stream: file like object.
 
        :param kind: one of following: ``zip``, ``tgz`` or ``tbz2``.
 
            Default: ``tgz``.
 
        :param prefix: name of root directory in archive.
 
            Default is repository name and changeset's raw_id joined with dash
 
            (``repo-tip.<KIND>``).
 
        :param subrepos: include subrepos in this archive.
 

	
 
        :raise ImproperArchiveTypeError: If given kind is wrong.
 
        :raise VcsError: If given stream is None
 

	
 
        """
 
        allowed_kinds = settings.ARCHIVE_SPECS.keys()
 
        if kind not in allowed_kinds:
 
            raise ImproperArchiveTypeError('Archive kind not supported use one'
 
                'of %s', allowed_kinds)
 

	
 
        if prefix is None:
 
            prefix = '%s-%s' % (self.repository.name, self.short_id)
 
        elif prefix.startswith('/'):
 
            raise VCSError("Prefix cannot start with leading slash")
 
        elif prefix.strip() == '':
 
            raise VCSError("Prefix cannot be empty")
 

	
 
        if kind == 'zip':
 
            frmt = 'zip'
 
        else:
 
            frmt = 'tar'
 
        cmd = 'git archive --format=%s --prefix=%s/ %s' % (frmt, prefix,
 
            self.raw_id)
 
        if kind == 'tgz':
 
            cmd += ' | gzip -9'
 
        elif kind == 'tbz2':
 
            cmd += ' | bzip2 -9'
 

	
 
        if stream is None:
 
            raise VCSError('You need to pass in a valid stream for filling'
 
                           ' with archival data')
 
        popen = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True,
 
            cwd=self.repository.path)
 

	
 
        buffer_size = 1024 * 8
 
        chunk = popen.stdout.read(buffer_size)
 
        while chunk:
 
            stream.write(chunk)
 
            chunk = popen.stdout.read(buffer_size)
 
        # Make sure all descriptors would be read
 
        popen.communicate()
 

	
 
    def get_nodes(self, path):
 
        if self._get_kind(path) != NodeKind.DIR:
 
            raise ChangesetError("Directory does not exist for revision %r at "
 
                " %r" % (self.revision, path))
 
        path = self._fix_path(path)
 
        id = self._get_id_for_path(path)
 
        tree = self.repository._repo[id]
 
        dirnodes = []
 
        filenodes = []
 
        als = self.repository.alias
 
        for name, stat, id in tree.iteritems():
 
            if objects.S_ISGITLINK(stat):
 
                dirnodes.append(SubModuleNode(name, url=None, changeset=id,
 
                                              alias=als))
 
                continue
 

	
 
            obj = self.repository._repo.get_object(id)
 
            if path != '':
 
                obj_path = '/'.join((path, name))
 
            else:
 
                obj_path = name
 
            if obj_path not in self._stat_modes:
 
                self._stat_modes[obj_path] = stat
 
            if isinstance(obj, objects.Tree):
 
                dirnodes.append(DirNode(obj_path, changeset=self))
 
            elif isinstance(obj, objects.Blob):
 
                filenodes.append(FileNode(obj_path, changeset=self, mode=stat))
 
            else:
 
                raise ChangesetError("Requested object should be Tree "
 
                                     "or Blob, is %r" % type(obj))
 
        nodes = dirnodes + filenodes
 
        for node in nodes:
 
            if not node.path in self.nodes:
 
                self.nodes[node.path] = node
 
        nodes.sort()
 
        return nodes
 

	
 
    def get_node(self, path):
 
        if isinstance(path, unicode):
 
            path = path.encode('utf-8')
 
        path = self._fix_path(path)
 
        if not path in self.nodes:
 
            try:
 
                id_ = self._get_id_for_path(path)
 
            except ChangesetError:
 
                raise NodeDoesNotExistError("Cannot find one of parents' "
 
                    "directories for a given path: %s" % path)
 

	
 
            _GL = lambda m: m and objects.S_ISGITLINK(m)
 
            if _GL(self._stat_modes.get(path)):
 
                node = SubModuleNode(path, url=None, changeset=id_,
 
                                     alias=self.repository.alias)
 
            else:
 
                obj = self.repository._repo.get_object(id_)
 

	
 
                if isinstance(obj, objects.Tree):
 
                    if path == '':
 
                        node = RootNode(changeset=self)
 
                    else:
 
                        node = DirNode(path, changeset=self)
 
                    node._tree = obj
 
                elif isinstance(obj, objects.Blob):
 
                    node = FileNode(path, changeset=self)
 
                    node._blob = obj
 
                else:
 
                    raise NodeDoesNotExistError("There is no file nor directory "
 
                        "at the given path %r at revision %r"
 
                        % (path, self.short_id))
 
            # cache node
 
            self.nodes[path] = node
 
        return self.nodes[path]
 

	
 
    @LazyProperty
 
    def affected_files(self):
 
        """
 
        Get's a fast accessible file changes for given changeset
 
        """
 
        a, m, d = self._changes_cache
 
        return list(a.union(m).union(d))
 

	
 
    @LazyProperty
 
    def _diff_name_status(self):
 
        output = []
 
        for parent in self.parents:
 
            cmd = 'diff --name-status %s %s --encoding=utf8' % (parent.raw_id,
 
                                                                self.raw_id)
 
            so, se = self.repository.run_git_command(cmd)
 
            output.append(so.strip())
 
        return '\n'.join(output)
 

	
 
    @LazyProperty
 
    def _changes_cache(self):
 
        added = set()
 
        modified = set()
 
        deleted = set()
 
        _r = self.repository._repo
 

	
 
        parents = self.parents
 
        if not self.parents:
 
            parents = [EmptyChangeset()]
 
        for parent in parents:
 
            if isinstance(parent, EmptyChangeset):
 
                oid = None
 
            else:
 
                oid = _r[parent.raw_id].tree
 
            changes = _r.object_store.tree_changes(oid, _r[self.raw_id].tree)
 
            for (oldpath, newpath), (_, _), (_, _) in changes:
 
                if newpath and oldpath:
 
                    modified.add(newpath)
 
                elif newpath and not oldpath:
 
                    added.add(newpath)
 
                elif not newpath and oldpath:
 
                    deleted.add(oldpath)
 
        return added, modified, deleted
 

	
 
    def _get_paths_for_status(self, status):
 
        """
 
        Returns sorted list of paths for given ``status``.
 

	
 
        :param status: one of: *added*, *modified* or *deleted*
 
        """
 
        a, m, d = self._changes_cache
 
        return sorted({
 
            'added': list(a),
 
            'modified': list(m),
 
            'deleted': list(d)}[status]
 
        )
 

	
 
    @LazyProperty
 
    def added(self):
 
        """
 
        Returns list of added ``FileNode`` objects.
 
        """
 
        if not self.parents:
 
            return list(self._get_file_nodes())
 
        return [self.get_node(path) for path in self._get_paths_for_status('added')]
 
        return AddedFileNodesGenerator([n for n in
 
                                self._get_paths_for_status('added')], self)
 

	
 
    @LazyProperty
 
    def changed(self):
 
        """
 
        Returns list of modified ``FileNode`` objects.
 
        """
 
        if not self.parents:
 
            return []
 
        return [self.get_node(path) for path in self._get_paths_for_status('modified')]
 
        return ChangedFileNodesGenerator([n for n in
 
                                self._get_paths_for_status('modified')], self)
 

	
 
    @LazyProperty
 
    def removed(self):
 
        """
 
        Returns list of removed ``FileNode`` objects.
 
        """
 
        if not self.parents:
 
            return []
 
        return [RemovedFileNode(path) for path in self._get_paths_for_status('deleted')]
 
        return RemovedFileNodesGenerator([n for n in
 
                                self._get_paths_for_status('deleted')], self)
rhodecode/tests/vcs/test_changesets.py
Show inline comments
 
from __future__ import with_statement
 

	
 
from rhodecode.lib import vcs
 
import datetime
 
from base import BackendTestMixin
 
from conf import SCM_TESTS
 
from rhodecode.lib.vcs.backends.base import BaseChangeset
 
from rhodecode.lib.vcs.nodes import FileNode
 
from rhodecode.lib.vcs.nodes import FileNode, AddedFileNodesGenerator,\
 
    ChangedFileNodesGenerator, RemovedFileNodesGenerator
 
from rhodecode.lib.vcs.exceptions import BranchDoesNotExistError
 
from rhodecode.lib.vcs.exceptions import ChangesetDoesNotExistError
 
from rhodecode.lib.vcs.exceptions import RepositoryError
 
from rhodecode.lib.vcs.utils.compat import unittest
 

	
 

	
 
class TestBaseChangeset(unittest.TestCase):
 

	
 
    def test_as_dict(self):
 
        changeset = BaseChangeset()
 
        changeset.id = 'ID'
 
        changeset.raw_id = 'RAW_ID'
 
        changeset.short_id = 'SHORT_ID'
 
        changeset.revision = 1009
 
        changeset.date = datetime.datetime(2011, 1, 30, 1, 45)
 
        changeset.message = 'Message of a commit'
 
        changeset.author = 'Joe Doe <joe.doe@example.com>'
 
        changeset.added = [FileNode('foo/bar/baz'), FileNode('foobar')]
 
        changeset.changed = []
 
        changeset.removed = []
 
        self.assertEqual(changeset.as_dict(), {
 
            'id': 'ID',
 
            'raw_id': 'RAW_ID',
 
            'short_id': 'SHORT_ID',
 
            'revision': 1009,
 
            'date': datetime.datetime(2011, 1, 30, 1, 45),
 
            'message': 'Message of a commit',
 
            'author': {
 
                'name': 'Joe Doe',
 
                'email': 'joe.doe@example.com',
 
            },
 
            'added': ['foo/bar/baz', 'foobar'],
 
            'changed': [],
 
            'removed': [],
 
        })
 

	
 
class ChangesetsWithCommitsTestCaseixin(BackendTestMixin):
 
    recreate_repo_per_test = True
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_new_branch(self):
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
        )
 
        self.assertTrue('foobar' in self.repo.branches)
 
        self.assertEqual(foobar_tip.branch, 'foobar')
 
        # 'foobar' should be the only branch that contains the new commit
 
        self.assertNotEqual(*self.repo.branches.values())
 

	
 
    def test_new_head_in_default_branch(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        foobar_tip = self.imc.commit(
 
            message=u'New branch: foobar',
 
            author=u'joe',
 
            branch='foobar',
 
            parents=[tip],
 
        )
 
        self.imc.change(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\nand more...\n'))
 
        newtip = self.imc.commit(
 
            message=u'At default branch',
 
            author=u'joe',
 
            branch=foobar_tip.branch,
 
            parents=[foobar_tip],
 
        )
 

	
 
        newest_tip = self.imc.commit(
 
            message=u'Merged with %s' % foobar_tip.raw_id,
 
            author=u'joe',
 
            branch=self.backend_class.DEFAULT_BRANCH_NAME,
 
            parents=[newtip, foobar_tip],
 
        )
 

	
 
        self.assertEqual(newest_tip.branch,
 
            self.backend_class.DEFAULT_BRANCH_NAME)
 

	
 
    def test_get_changesets_respects_branch_name(self):
 
        tip = self.repo.get_changeset()
 
        self.imc.add(vcs.nodes.FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        doc_changeset = self.imc.commit(
 
            message=u'New branch: docs',
 
            author=u'joe',
 
            branch='docs',
 
        )
 
        self.imc.add(vcs.nodes.FileNode('newfile', content=''))
 
        self.imc.commit(
 
            message=u'Back in default branch',
 
            author=u'joe',
 
            parents=[tip],
 
        )
 
        default_branch_changesets = self.repo.get_changesets(
 
            branch_name=self.repo.DEFAULT_BRANCH_NAME)
 
        self.assertNotIn(doc_changeset, default_branch_changesets)
 

	
 
    def test_get_changeset_by_branch(self):
 
        for branch, sha in self.repo.branches.iteritems():
 
            self.assertEqual(sha, self.repo.get_changeset(branch).raw_id)
 

	
 
    def test_get_changeset_by_tag(self):
 
        for tag, sha in self.repo.tags.iteritems():
 
            self.assertEqual(sha, self.repo.get_changeset(tag).raw_id)
 

	
 

	
 
class ChangesetsTestCaseMixin(BackendTestMixin):
 
    recreate_repo_per_test = False
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': u'Commit %d' % x,
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('file_%d.txt' % x, content='Foobar %d' % x),
 
                ],
 
            }
 

	
 
    def test_simple(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(tip.date, datetime.datetime(2010, 1, 3, 20))
 

	
 
    def test_get_changesets_is_ordered_by_date(self):
 
        changesets = list(self.repo.get_changesets())
 
        ordered_by_date = sorted(changesets,
 
            key=lambda cs: cs.date)
 
        self.assertItemsEqual(changesets, ordered_by_date)
 

	
 
    def test_get_changesets_respects_start(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        self.assertEqual(len(changesets), 4)
 

	
 
    def test_get_changesets_numerical_id_respects_start(self):
 
        second_id = 1
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        self.assertEqual(len(changesets), 4)
 

	
 
    def test_get_changesets_includes_start_changeset(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(start=second_id))
 
        self.assertEqual(changesets[0].raw_id, second_id)
 

	
 
    def test_get_changesets_respects_end(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        self.assertEqual(changesets[-1].raw_id, second_id)
 
        self.assertEqual(len(changesets), 2)
 

	
 
    def test_get_changesets_numerical_id_respects_end(self):
 
        second_id = 1
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        self.assertEqual(changesets.index(changesets[-1]), second_id)
 
        self.assertEqual(len(changesets), 2)
 

	
 
    def test_get_changesets_respects_both_start_and_end(self):
 
        second_id = self.repo.revisions[1]
 
        third_id = self.repo.revisions[2]
 
        changesets = list(self.repo.get_changesets(start=second_id,
 
            end=third_id))
 
        self.assertEqual(len(changesets), 2)
 

	
 
    def test_get_changesets_numerical_id_respects_both_start_and_end(self):
 
        changesets = list(self.repo.get_changesets(start=2, end=3))
 
        self.assertEqual(len(changesets), 2)
 

	
 
    def test_get_changesets_includes_end_changeset(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        self.assertEqual(changesets[-1].raw_id, second_id)
 

	
 
    def test_get_changesets_respects_start_date(self):
 
        start_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(start_date=start_date):
 
            self.assertGreaterEqual(cs.date, start_date)
 

	
 
    def test_get_changesets_respects_end_date(self):
 
        end_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(end_date=end_date):
 
            self.assertLessEqual(cs.date, end_date)
 

	
 
    def test_get_changesets_respects_reverse(self):
 
        changesets_id_list = [cs.raw_id for cs in
 
            self.repo.get_changesets(reverse=True)]
 
        self.assertItemsEqual(changesets_id_list, reversed(self.repo.revisions))
 

	
 
    def test_get_filenodes_generator(self):
 
        tip = self.repo.get_changeset()
 
        filepaths = [node.path for node in tip.get_filenodes_generator()]
 
        self.assertItemsEqual(filepaths, ['file_%d.txt' % x for x in xrange(5)])
 

	
 
    def test_size(self):
 
        tip = self.repo.get_changeset()
 
        size = 5 * len('Foobar N') # Size of 5 files
 
        self.assertEqual(tip.size, size)
 

	
 
    def test_author(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(tip.author, u'Joe Doe <joe.doe@example.com>')
 

	
 
    def test_author_name(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(tip.author_name, u'Joe Doe')
 

	
 
    def test_author_email(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(tip.author_email, u'joe.doe@example.com')
 

	
 
    def test_get_changesets_raise_changesetdoesnotexist_for_wrong_start(self):
 
        with self.assertRaises(ChangesetDoesNotExistError):
 
            list(self.repo.get_changesets(start='foobar'))
 

	
 
    def test_get_changesets_raise_changesetdoesnotexist_for_wrong_end(self):
 
        with self.assertRaises(ChangesetDoesNotExistError):
 
            list(self.repo.get_changesets(end='foobar'))
 

	
 
    def test_get_changesets_raise_branchdoesnotexist_for_wrong_branch_name(self):
 
        with self.assertRaises(BranchDoesNotExistError):
 
            list(self.repo.get_changesets(branch_name='foobar'))
 

	
 
    def test_get_changesets_raise_repositoryerror_for_wrong_start_end(self):
 
        start = self.repo.revisions[-1]
 
        end = self.repo.revisions[0]
 
        with self.assertRaises(RepositoryError):
 
            list(self.repo.get_changesets(start=start, end=end))
 

	
 
    def test_get_changesets_numerical_id_reversed(self):
 
        with self.assertRaises(RepositoryError):
 
            [x for x in self.repo.get_changesets(start=3, end=2)]
 

	
 
    def test_get_changesets_numerical_id_respects_both_start_and_end_last(self):
 
        with self.assertRaises(RepositoryError):
 
            last = len(self.repo.revisions)
 
            list(self.repo.get_changesets(start=last-1, end=last-2))
 

	
 
    def test_get_changesets_numerical_id_last_zero_error(self):
 
        with self.assertRaises(RepositoryError):
 
            last = len(self.repo.revisions)
 
            list(self.repo.get_changesets(start=last-1, end=0))
 

	
 

	
 
class ChangesetsChangesTestCaseMixin(BackendTestMixin):
 
    recreate_repo_per_test = False
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': u'Initial',
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foo/bar', content='foo'),
 
                    FileNode('foobar', content='foo'),
 
                    FileNode('qwe', content='foo'),
 
                ],
 
            },
 
            {
 
                'message': u'Massive changes',
 
                'author': u'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [FileNode('fallout', content='War never changes')],
 
                'changed': [
 
                    FileNode('foo/bar', content='baz'),
 
                    FileNode('foobar', content='baz'),
 
                ],
 
                'removed': [FileNode('qwe')],
 
            },
 
        ]
 

	
 
    def test_initial_commit(self):
 
        changeset = self.repo.get_changeset(0)
 
        self.assertItemsEqual(changeset.added, [
 
            changeset.get_node('foo/bar'),
 
            changeset.get_node('foobar'),
 
            changeset.get_node('qwe'),
 
        ])
 
        self.assertItemsEqual(changeset.changed, [])
 
        self.assertItemsEqual(changeset.removed, [])
 

	
 
    def test_head_added(self):
 
        changeset = self.repo.get_changeset()
 
        self.assertTrue(isinstance(changeset.added, AddedFileNodesGenerator))
 
        self.assertItemsEqual(changeset.added, [
 
            changeset.get_node('fallout'),
 
        ])
 
        self.assertTrue(isinstance(changeset.changed, ChangedFileNodesGenerator))
 
        self.assertItemsEqual(changeset.changed, [
 
            changeset.get_node('foo/bar'),
 
            changeset.get_node('foobar'),
 
        ])
 
        self.assertTrue(isinstance(changeset.removed, RemovedFileNodesGenerator))
 
        self.assertEqual(len(changeset.removed), 1)
 
        self.assertEqual(list(changeset.removed)[0].path, 'qwe')
 

	
 

	
 
# For each backend create test case class
 
for alias in SCM_TESTS:
 
    attrs = {
 
        'backend_alias': alias,
 
    }
 
    # tests with additional commits
 
    cls_name = ''.join(('%s changesets with commits test' % alias).title().split())
 
    bases = (ChangesetsWithCommitsTestCaseixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 
    # tests without additional commits
 
    cls_name = ''.join(('%s changesets test' % alias).title().split())
 
    bases = (ChangesetsTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 
    # tests changes
 
    cls_name = ''.join(('%s changesets changes test' % alias).title().split())
 
    bases = (ChangesetsChangesTestCaseMixin, unittest.TestCase)
 
    globals()[cls_name] = type(cls_name, bases, attrs)
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
0 comments (0 inline, 0 general)