Changeset - 42981614c624
[Not reviewed]
beta
0 3 0
Marcin Kuzminski - 13 years ago 2013-05-10 14:35:02
marcin@python-works.com
vcs: fixed issues with calling get_changesets method doesn't
throws EmptyRepositoryError when called on empty repos
3 files changed with 30 insertions and 5 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/vcs/backends/git/repository.py
Show inline comments
 
@@ -60,48 +60,60 @@ class GitRepository(BaseRepository):
 

	
 
        self.path = abspath(repo_path)
 
        repo = self._get_repo(create, src_url, update_after_clone, bare)
 
        self.bare = repo.bare
 

	
 
    @property
 
    def _config_files(self):
 
        return [
 
            self.bare and abspath(self.path, 'config')
 
                      or abspath(self.path, '.git', 'config'),
 
             abspath(get_user_home(), '.gitconfig'),
 
         ]
 

	
 
    @property
 
    def _repo(self):
 
        return Repo(self.path)
 

	
 
    @property
 
    def head(self):
 
        try:
 
            return self._repo.head()
 
        except KeyError:
 
            return None
 

	
 
    @property
 
    def _empty(self):
 
        """
 
        Checks if repository is empty ie. without any changesets
 
        """
 

	
 
        try:
 
            self.revisions[0]
 
        except (KeyError, IndexError):
 
            return True
 
        return False
 

	
 
    @LazyProperty
 
    def revisions(self):
 
        """
 
        Returns list of revisions' ids, in ascending order.  Being lazy
 
        attribute allows external tools to inject shas from cache.
 
        """
 
        return self._get_all_revisions()
 

	
 
    @classmethod
 
    def _run_git_command(cls, cmd, **opts):
 
        """
 
        Runs given ``cmd`` as git command and returns tuple
 
        (stdout, stderr).
 

	
 
        :param cmd: git command to be executed
 
        :param opts: env options to pass into Subprocess command
 
        """
 

	
 
        if '_bare' in opts:
 
            _copts = []
 
            del opts['_bare']
 
        else:
 
            _copts = ['-c', 'core.quotepath=false', ]
 
        safe_call = False
 
@@ -229,51 +241,49 @@ class GitRepository(BaseRepository):
 

	
 
        rev_filter = _git_path = settings.GIT_REV_FILTER
 
        cmd = 'rev-list %s --reverse --date-order' % (rev_filter)
 
        try:
 
            so, se = self.run_git_command(cmd)
 
        except RepositoryError:
 
            # Can be raised for empty repositories
 
            return []
 
        return so.splitlines()
 

	
 
    def _get_all_revisions2(self):
 
        #alternate implementation using dulwich
 
        includes = [x[1][0] for x in self._parsed_refs.iteritems()
 
                    if x[1][1] != 'T']
 
        return [c.commit.id for c in self._repo.get_walker(include=includes)]
 

	
 
    def _get_revision(self, revision):
 
        """
 
        For git backend we always return integer here. This way we ensure
 
        that changset's revision attribute would become integer.
 
        """
 

	
 
        is_null = lambda o: len(o) == revision.count('0')
 

	
 
        try:
 
            self.revisions[0]
 
        except (KeyError, IndexError):
 
        if self._empty:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        if revision in (None, '', 'tip', 'HEAD', 'head', -1):
 
            return self.revisions[-1]
 

	
 
        is_bstr = isinstance(revision, (str, unicode))
 
        if ((is_bstr and revision.isdigit() and len(revision) < 12)
 
            or isinstance(revision, int) or is_null(revision)):
 
            try:
 
                revision = self.revisions[int(revision)]
 
            except Exception:
 
                raise ChangesetDoesNotExistError("Revision %s does not exist "
 
                    "for this repository" % (revision))
 

	
 
        elif is_bstr:
 
            # get by branch/tag name
 
            _ref_revision = self._parsed_refs.get(revision)
 
            if _ref_revision:  # and _ref_revision[1] in ['H', 'RH', 'T']:
 
                return _ref_revision[0]
 

	
 
            _tags_shas = self.tags.values()
 
            # maybe it's a tag ? we don't have them in self.revisions
 
            if revision in _tags_shas:
 
                return _tags_shas[_tags_shas.index(revision)]
 
@@ -471,48 +481,53 @@ class GitRepository(BaseRepository):
 
        """
 
        Returns iterator of ``GitChangeset`` objects from start to end (both
 
        are inclusive), in ascending date order (unless ``reverse`` is set).
 

	
 
        :param start: changeset ID, as str; first returned changeset
 
        :param end: changeset ID, as str; last returned changeset
 
        :param start_date: if specified, changesets with commit date less than
 
          ``start_date`` would be filtered out from returned set
 
        :param end_date: if specified, changesets with commit date greater than
 
          ``end_date`` would be filtered out from returned set
 
        :param branch_name: if specified, changesets not reachable from given
 
          branch would be filtered out from returned set
 
        :param reverse: if ``True``, returned generator would be reversed
 
          (meaning that returned changesets would have descending date order)
 

	
 
        :raise BranchDoesNotExistError: If given ``branch_name`` does not
 
            exist.
 
        :raise ChangesetDoesNotExistError: If changeset for given ``start`` or
 
          ``end`` could not be found.
 

	
 
        """
 
        if branch_name and branch_name not in self.branches:
 
            raise BranchDoesNotExistError("Branch '%s' not found" \
 
                                          % branch_name)
 
        # actually we should check now if it's not an empty repo to not spaw
 
        # subprocess commands
 
        if self._empty:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        # %H at format means (full) commit hash, initial hashes are retrieved
 
        # in ascending date order
 
        cmd_template = 'log --date-order --reverse --pretty=format:"%H"'
 
        cmd_params = {}
 
        if start_date:
 
            cmd_template += ' --since "$since"'
 
            cmd_params['since'] = start_date.strftime('%m/%d/%y %H:%M:%S')
 
        if end_date:
 
            cmd_template += ' --until "$until"'
 
            cmd_params['until'] = end_date.strftime('%m/%d/%y %H:%M:%S')
 
        if branch_name:
 
            cmd_template += ' $branch_name'
 
            cmd_params['branch_name'] = branch_name
 
        else:
 
            rev_filter = _git_path = settings.GIT_REV_FILTER
 
            cmd_template += ' %s' % (rev_filter)
 

	
 
        cmd = string.Template(cmd_template).safe_substitute(**cmd_params)
 
        revs = self.run_git_command(cmd)[0].splitlines()
 
        start_pos = 0
 
        end_pos = len(revs)
 
        if start:
 
            _start = self._get_revision(start)
 
            try:
rhodecode/lib/vcs/backends/hg/repository.py
Show inline comments
 
@@ -57,49 +57,49 @@ class MercurialRepository(BaseRepository
 
        ``repo_path``.
 

	
 
        :param repo_path: local path of the repository
 
        :param create=False: if set to True, would try to create repository if
 
           it does not exist rather than raising exception
 
        :param baseui=None: user data
 
        :param src_url=None: would try to clone repository from given location
 
        :param update_after_clone=False: sets update of working copy after
 
          making a clone
 
        """
 

	
 
        if not isinstance(repo_path, str):
 
            raise VCSError('Mercurial backend requires repository path to '
 
                           'be instance of <str> got %s instead' %
 
                           type(repo_path))
 

	
 
        self.path = abspath(repo_path)
 
        self.baseui = baseui or ui.ui()
 
        # We've set path and ui, now we can set _repo itself
 
        self._repo = self._get_repo(create, src_url, update_after_clone)
 

	
 
    @property
 
    def _empty(self):
 
        """
 
        Checks if repository is empty without any changesets
 
        Checks if repository is empty ie. without any changesets
 
        """
 
        # TODO: Following raises errors when using InMemoryChangeset...
 
        # return len(self._repo.changelog) == 0
 
        return len(self.revisions) == 0
 

	
 
    @LazyProperty
 
    def revisions(self):
 
        """
 
        Returns list of revisions' ids, in ascending order.  Being lazy
 
        attribute allows external tools to inject shas from cache.
 
        """
 
        return self._get_all_revisions()
 

	
 
    @LazyProperty
 
    def name(self):
 
        return os.path.basename(self.path)
 

	
 
    @LazyProperty
 
    def branches(self):
 
        return self._get_branches()
 

	
 
    @LazyProperty
 
    def allbranches(self):
 
        """
rhodecode/tests/vcs/test_changesets.py
Show inline comments
 
from __future__ import with_statement
 

	
 
import time
 
import datetime
 
from rhodecode.lib import vcs
 
from rhodecode.tests.vcs.base import BackendTestMixin
 
from rhodecode.tests.vcs.conf import SCM_TESTS
 

	
 
from rhodecode.lib.vcs.backends.base import BaseChangeset
 
from rhodecode.lib.vcs.nodes import (
 
    FileNode, AddedFileNodesGenerator,
 
    ChangedFileNodesGenerator, RemovedFileNodesGenerator
 
)
 
from rhodecode.lib.vcs.exceptions import (
 
    BranchDoesNotExistError, ChangesetDoesNotExistError,
 
    RepositoryError
 
    RepositoryError, EmptyRepositoryError
 
)
 
from rhodecode.lib.vcs.utils.compat import unittest
 
from rhodecode.tests.vcs.conf import get_new_dir
 

	
 

	
 
class TestBaseChangeset(unittest.TestCase):
 

	
 
    def test_as_dict(self):
 
        changeset = BaseChangeset()
 
        changeset.id = 'ID'
 
        changeset.raw_id = 'RAW_ID'
 
        changeset.short_id = 'SHORT_ID'
 
        changeset.revision = 1009
 
        changeset.date = datetime.datetime(2011, 1, 30, 1, 45)
 
        changeset.message = 'Message of a commit'
 
        changeset.author = 'Joe Doe <joe.doe@example.com>'
 
        changeset.added = [FileNode('foo/bar/baz'), FileNode('foobar')]
 
        changeset.changed = []
 
        changeset.removed = []
 
        self.assertEqual(changeset.as_dict(), {
 
            'id': 'ID',
 
            'raw_id': 'RAW_ID',
 
            'short_id': 'SHORT_ID',
 
            'revision': 1009,
 
            'date': datetime.datetime(2011, 1, 30, 1, 45),
 
            'message': 'Message of a commit',
 
            'author': {
 
@@ -176,48 +178,56 @@ class ChangesetsTestCaseMixin(BackendTes
 

	
 
    def test_get_changesets_respects_end(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        self.assertEqual(changesets[-1].raw_id, second_id)
 
        self.assertEqual(len(changesets), 2)
 

	
 
    def test_get_changesets_numerical_id_respects_end(self):
 
        second_id = 1
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        self.assertEqual(changesets.index(changesets[-1]), second_id)
 
        self.assertEqual(len(changesets), 2)
 

	
 
    def test_get_changesets_respects_both_start_and_end(self):
 
        second_id = self.repo.revisions[1]
 
        third_id = self.repo.revisions[2]
 
        changesets = list(self.repo.get_changesets(start=second_id,
 
            end=third_id))
 
        self.assertEqual(len(changesets), 2)
 

	
 
    def test_get_changesets_numerical_id_respects_both_start_and_end(self):
 
        changesets = list(self.repo.get_changesets(start=2, end=3))
 
        self.assertEqual(len(changesets), 2)
 

	
 
    def test_get_changesets_on_empty_repo_raises_EmptyRepository_error(self):
 
        Backend = self.get_backend()
 
        repo_path = get_new_dir(str(time.time()))
 
        repo = Backend(repo_path, create=True)
 

	
 
        with self.assertRaises(EmptyRepositoryError):
 
            list(repo.get_changesets(start='foobar'))
 

	
 
    def test_get_changesets_includes_end_changeset(self):
 
        second_id = self.repo.revisions[1]
 
        changesets = list(self.repo.get_changesets(end=second_id))
 
        self.assertEqual(changesets[-1].raw_id, second_id)
 

	
 
    def test_get_changesets_respects_start_date(self):
 
        start_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(start_date=start_date):
 
            self.assertGreaterEqual(cs.date, start_date)
 

	
 
    def test_get_changesets_respects_end_date(self):
 
        start_date = datetime.datetime(2010, 1, 1)
 
        end_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(start_date=start_date,
 
                                           end_date=end_date):
 
            self.assertGreaterEqual(cs.date, start_date)
 
            self.assertLessEqual(cs.date, end_date)
 

	
 
    def test_get_changesets_respects_start_date_and_end_date(self):
 
        end_date = datetime.datetime(2010, 2, 1)
 
        for cs in self.repo.get_changesets(end_date=end_date):
 
            self.assertLessEqual(cs.date, end_date)
 

	
 
    def test_get_changesets_respects_reverse(self):
0 comments (0 inline, 0 general)