Changeset - 8a9d93838882
[Not reviewed]
stable
0 3 0
domruf - 10 years ago 2016-04-19 19:53:16
dominikruf@gmail.com
vcs: fix repo size calculation

tip.walk already 'dives into' the dirs - don't do that twice.
3 files changed with 2 insertions and 5 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/vcs/backends/base.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    vcs.backends.base
 
    ~~~~~~~~~~~~~~~~~
 

	
 
    Base for all available scm backends
 

	
 
    :created_on: Apr 8, 2010
 
    :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
 
"""
 

	
 
import datetime
 
import itertools
 

	
 
from kallithea.lib.vcs.utils import author_name, author_email, safe_unicode
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.helpers import get_dict_for_attrs
 
from kallithea.lib.vcs.conf import settings
 

	
 
from kallithea.lib.vcs.exceptions import (
 
    ChangesetError, EmptyRepositoryError, NodeAlreadyAddedError,
 
    NodeAlreadyChangedError, NodeAlreadyExistsError, NodeAlreadyRemovedError,
 
    NodeDoesNotExistError, NodeNotChangedError, RepositoryError
 
)
 

	
 

	
 
class BaseRepository(object):
 
    """
 
    Base Repository for final backends
 

	
 
    **Attributes**
 

	
 
        ``DEFAULT_BRANCH_NAME``
 
            name of default branch (i.e. "trunk" for svn, "master" for git etc.
 

	
 
        ``scm``
 
            alias of scm, i.e. *git* or *hg*
 

	
 
        ``repo``
 
            object from external api
 

	
 
        ``revisions``
 
            list of all available revisions' ids, in ascending order
 

	
 
        ``changesets``
 
            storage dict caching returned changesets
 

	
 
        ``path``
 
            absolute path to the repository
 

	
 
        ``branches``
 
            branches as list of changesets
 

	
 
        ``tags``
 
            tags as list of changesets
 
    """
 
    scm = None
 
    DEFAULT_BRANCH_NAME = None
 
    EMPTY_CHANGESET = '0' * 40
 

	
 
    def __init__(self, repo_path, create=False, **kwargs):
 
        """
 
        Initializes repository. Raises RepositoryError if repository could
 
        not be find at the given ``repo_path`` or directory at ``repo_path``
 
        exists and ``create`` is set to True.
 

	
 
        :param repo_path: local path of the repository
 
        :param create=False: if set to True, would try to create repository.
 
        :param src_url=None: if set, should be proper url from which repository
 
          would be cloned; requires ``create`` parameter to be set to True -
 
          raises RepositoryError if src_url is set and create evaluates to
 
          False
 
        """
 
        raise NotImplementedError
 

	
 
    def __str__(self):
 
        return '<%s at %s>' % (self.__class__.__name__, self.path)
 

	
 
    def __repr__(self):
 
        return self.__str__()
 

	
 
    def __len__(self):
 
        return self.count()
 

	
 
    def __eq__(self, other):
 
        same_instance = isinstance(other, self.__class__)
 
        return same_instance and getattr(other, 'path', None) == self.path
 

	
 
    def __ne__(self, other):
 
        return not self.__eq__(other)
 

	
 
    @LazyProperty
 
    def alias(self):
 
        for k, v in settings.BACKENDS.items():
 
            if v.split('.')[-1] == str(self.__class__.__name__):
 
                return k
 

	
 
    @LazyProperty
 
    def name(self):
 
        raise NotImplementedError
 

	
 
    @property
 
    def name_unicode(self):
 
        return safe_unicode(self.name)
 

	
 
    @LazyProperty
 
    def owner(self):
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def description(self):
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def size(self):
 
        """
 
        Returns combined size in bytes for all repository files
 
        """
 

	
 
        size = 0
 
        try:
 
            tip = self.get_changeset()
 
            for topnode, dirs, files in tip.walk('/'):
 
                for f in files:
 
                    size += tip.get_file_size(f.path)
 
                for dir in dirs:
 
                    for f in files:
 
                        size += tip.get_file_size(f.path)
 

	
 
        except RepositoryError as e:
 
            pass
 
        return size
 

	
 
    def is_valid(self):
 
        """
 
        Validates repository.
 
        """
 
        raise NotImplementedError
 

	
 
    def is_empty(self):
 
        return self._empty
 

	
 
    def get_last_change(self):
 
        self.get_changesets()
 

	
 
    #==========================================================================
 
    # CHANGESETS
 
    #==========================================================================
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns instance of ``Changeset`` class. If ``revision`` is None, most
 
        recent changeset is returned.
 

	
 
        :raises ``EmptyRepositoryError``: if there are no revisions
 
        """
 
        raise NotImplementedError
 

	
 
    def __iter__(self):
 
        """
 
        Allows Repository objects to be iterated.
 

	
 
        *Requires* implementation of ``__getitem__`` method.
 
        """
 
        for revision in self.revisions:
 
            yield self.get_changeset(revision)
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
 
                       end_date=None, branch_name=None, reverse=False):
 
        """
 
        Returns iterator of ``MercurialChangeset`` objects from start to end
 
        not inclusive This should behave just like a list, ie. end is not
 
        inclusive
 

	
 
        :param start: None or str
 
        :param end: None or str
 
        :param start_date:
 
        :param end_date:
 
        :param branch_name:
 
        :param reversed:
 
        """
 
        raise NotImplementedError
 

	
 
    def __getslice__(self, i, j):
 
        """
 
        Returns a iterator of sliced repository
 
        """
 
        for rev in self.revisions[i:j]:
 
            yield self.get_changeset(rev)
 

	
 
    def __getitem__(self, key):
 
        return self.get_changeset(key)
 

	
 
    def count(self):
 
        return len(self.revisions)
 

	
 
    def tag(self, name, user, revision=None, message=None, date=None, **opts):
 
        """
 
        Creates and returns a tag for the given ``revision``.
 

	
 
        :param name: name for new tag
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param revision: changeset id for which new tag would be created
 
        :param message: message of the tag's commit
 
        :param date: date of tag's commit
 

	
 
        :raises TagAlreadyExistError: if tag with same name already exists
 
        """
 
        raise NotImplementedError
 

	
 
    def remove_tag(self, name, user, message=None, date=None):
 
        """
 
        Removes tag with the given ``name``.
 

	
 
        :param name: name of the tag to be removed
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param message: message of the tag's removal commit
 
        :param date: date of tag's removal commit
 

	
 
        :raises TagDoesNotExistError: if tag with given name does not exists
 
        """
 
        raise NotImplementedError
 

	
 
    def get_diff(self, rev1, rev2, path=None, ignore_whitespace=False,
 
            context=3):
 
        """
 
        Returns (git like) *diff*, as plain text. Shows changes introduced by
 
        ``rev2`` since ``rev1``.
 

	
 
        :param rev1: Entry point from which diff is shown. Can be
 
          ``self.EMPTY_CHANGESET`` - in this case, patch showing all
 
          the changes since empty state of the repository until ``rev2``
 
        :param rev2: Until which revision changes should be shown.
 
        :param ignore_whitespace: If set to ``True``, would not show whitespace
 
          changes. Defaults to ``False``.
 
        :param context: How many lines before/after changed lines should be
 
          shown. Defaults to ``3``.
 
        """
 
        raise NotImplementedError
 

	
 
    # ========== #
 
    # COMMIT API #
 
    # ========== #
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        """
 
        Returns ``InMemoryChangeset`` object for this repository.
 
        """
 
        raise NotImplementedError
 

	
 
    def add(self, filenode, **kwargs):
 
        """
 
        Commit api function that will add given ``FileNode`` into this
 
        repository.
 

	
 
        :raises ``NodeAlreadyExistsError``: if there is a file with same path
 
          already in repository
 
        :raises ``NodeAlreadyAddedError``: if given node is already marked as
 
          *added*
 
        """
 
        raise NotImplementedError
 

	
 
    def remove(self, filenode, **kwargs):
 
        """
 
        Commit api function that will remove given ``FileNode`` into this
 
        repository.
 

	
 
        :raises ``EmptyRepositoryError``: if there are no changesets yet
 
        :raises ``NodeDoesNotExistError``: if there is no file with given path
 
        """
 
        raise NotImplementedError
 

	
 
    def commit(self, message, **kwargs):
 
        """
 
        Persists current changes made on this repository and returns newly
 
        created changeset.
 

	
 
        :raises ``NothingChangedError``: if no changes has been made
 
        """
 
        raise NotImplementedError
 

	
 
    def get_state(self):
 
        """
 
        Returns dictionary with ``added``, ``changed`` and ``removed`` lists
 
        containing ``FileNode`` objects.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_config_value(self, section, name, config_file=None):
 
        """
 
        Returns configuration value for a given [``section``] and ``name``.
 

	
 
        :param section: Section we want to retrieve value from
 
        :param name: Name of configuration we want to retrieve
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        raise NotImplementedError
 

	
 
    def get_user_name(self, config_file=None):
 
        """
 
        Returns user's name from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        raise NotImplementedError
 

	
 
    def get_user_email(self, config_file=None):
 
        """
 
        Returns user's email from global configuration file.
 

	
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        raise NotImplementedError
 

	
 
    # =========== #
 
    # WORKDIR API #
 
    # =========== #
 

	
 
    @LazyProperty
 
    def workdir(self):
 
        """
 
        Returns ``Workdir`` instance for this repository.
 
        """
 
        raise NotImplementedError
 

	
 

	
 
class BaseChangeset(object):
 
    """
 
    Each backend should implement it's changeset representation.
 

	
 
    **Attributes**
 

	
 
        ``repository``
 
            repository object within which changeset exists
 

	
 
        ``id``
 
            may be ``raw_id`` or i.e. for mercurial's tip just ``tip``
 

	
 
        ``raw_id``
 
            raw changeset representation (i.e. full 40 length sha for git
 
            backend)
 

	
 
        ``short_id``
 
            shortened (if apply) version of ``raw_id``; it would be simple
 
            shortcut for ``raw_id[:12]`` for git/mercurial backends or same
 
            as ``raw_id`` for subversion
 

	
 
        ``revision``
 
            revision number as integer
 

	
 
        ``files``
 
            list of ``FileNode`` (``Node`` with NodeKind.FILE) objects
 

	
 
        ``dirs``
 
            list of ``DirNode`` (``Node`` with NodeKind.DIR) objects
 

	
 
        ``nodes``
 
            combined list of ``Node`` objects
 

	
 
        ``author``
 
            author of the changeset, as unicode
 

	
 
        ``message``
 
            message of the changeset, as unicode
 

	
 
        ``parents``
 
            list of parent changesets
 

	
 
        ``last``
 
            ``True`` if this is last changeset in repository, ``False``
 
            otherwise; trying to access this attribute while there is no
 
            changesets would raise ``EmptyRepositoryError``
 
    """
 
    def __str__(self):
 
        return '<%s at %s:%s>' % (self.__class__.__name__, self.revision,
 
            self.short_id)
 

	
 
    def __repr__(self):
 
        return self.__str__()
 

	
 
    def __unicode__(self):
 
        return u'%s:%s' % (self.revision, self.short_id)
 

	
 
    def __eq__(self, other):
 
        return self.raw_id == other.raw_id
 

	
 
    def __json__(self):
 
        return dict(
 
            short_id=self.short_id,
 
            raw_id=self.raw_id,
 
            revision=self.revision,
 
            message=self.message,
 
            date=self.date,
 
            author=self.author,
 
        )
 

	
 
    @LazyProperty
 
    def last(self):
 
        if self.repository is None:
 
            raise ChangesetError("Cannot check if it's most recent revision")
 
        return self.raw_id == self.repository.revisions[-1]
 

	
 
    @LazyProperty
 
    def parents(self):
 
        """
 
        Returns list of parents changesets.
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def children(self):
 
        """
 
        Returns list of children changesets.
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def id(self):
 
        """
 
        Returns string identifying this changeset.
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def raw_id(self):
 
        """
 
        Returns raw string identifying this changeset.
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def short_id(self):
 
        """
 
        Returns shortened version of ``raw_id`` attribute, as string,
 
        identifying this changeset, useful for web representation.
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def revision(self):
 
        """
 
        Returns integer identifying this changeset.
 

	
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def committer(self):
 
        """
 
        Returns Committer for given commit
 
        """
 

	
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def committer_name(self):
 
        """
 
        Returns Author name for given commit
 
        """
 

	
 
        return author_name(self.committer)
 

	
 
    @LazyProperty
 
    def committer_email(self):
 
        """
 
        Returns Author email address for given commit
 
        """
 

	
 
        return author_email(self.committer)
 

	
 
    @LazyProperty
 
    def author(self):
 
        """
 
        Returns Author for given commit
 
        """
 

	
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def author_name(self):
 
        """
 
        Returns Author name for given commit
 
        """
 

	
 
        return author_name(self.author)
 

	
 
    @LazyProperty
 
    def author_email(self):
 
        """
 
        Returns Author email address for given commit
 
        """
 

	
 
        return author_email(self.author)
 

	
 
    def get_file_mode(self, path):
 
        """
 
        Returns stat mode of the file at the given ``path``.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_file_content(self, path):
 
        """
 
        Returns content of the file at the given ``path``.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_file_size(self, path):
 
        """
 
        Returns size of the file at the given ``path``.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_file_changeset(self, path):
 
        """
 
        Returns last commit of the file at the given ``path``.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_file_history(self, path):
 
        """
 
        Returns history of file as reversed list of ``Changeset`` objects for
 
        which file at given ``path`` has been modified.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_nodes(self, path):
 
        """
 
        Returns combined ``DirNode`` and ``FileNode`` objects list representing
 
        state of changeset at the given ``path``.
 

	
 
        :raises ``ChangesetError``: if node at the given ``path`` is not
 
          instance of ``DirNode``
 
        """
 
        raise NotImplementedError
 

	
 
    def get_node(self, path):
 
        """
 
        Returns ``Node`` object from the given ``path``.
 

	
 
        :raises ``NodeDoesNotExistError``: if there is no node at the given
 
          ``path``
 
        """
 
        raise NotImplementedError
 

	
 
    def fill_archive(self, stream=None, kind='tgz', prefix=None):
 
        """
 
        Fills up given stream.
 

	
 
        :param stream: file like object.
 
        :param kind: one of following: ``zip``, ``tar``, ``tgz``
 
            or ``tbz2``. Default: ``tgz``.
 
        :param prefix: name of root directory in archive.
 
            Default is repository name and changeset's raw_id joined with dash.
 

	
 
            repo-tip.<kind>
 
        """
 

	
 
        raise NotImplementedError
 

	
 
    def get_chunked_archive(self, **kwargs):
 
        """
 
        Returns iterable archive. Tiny wrapper around ``fill_archive`` method.
 

	
 
        :param chunk_size: extra parameter which controls size of returned
 
            chunks. Default:8k.
 
        """
 

	
 
        chunk_size = kwargs.pop('chunk_size', 8192)
 
        stream = kwargs.get('stream')
 
        self.fill_archive(**kwargs)
 
        while True:
 
            data = stream.read(chunk_size)
 
            if not data:
 
                break
 
            yield data
 

	
 
    @LazyProperty
 
    def root(self):
 
        """
 
        Returns ``RootNode`` object for this changeset.
 
        """
 
        return self.get_node('')
 

	
 
    def next(self, branch=None):
 
        """
 
        Returns next changeset from current, if branch is gives it will return
 
        next changeset belonging to this branch
 

	
 
        :param branch: show changesets within the given named branch
 
        """
 
        raise NotImplementedError
 

	
 
    def prev(self, branch=None):
 
        """
 
        Returns previous changeset from current, if branch is gives it will
 
        return previous changeset belonging to this branch
 

	
 
        :param branch: show changesets within the given named branch
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def added(self):
 
        """
 
        Returns list of added ``FileNode`` objects.
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def changed(self):
 
        """
 
        Returns list of modified ``FileNode`` objects.
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def removed(self):
 
        """
 
        Returns list of removed ``FileNode`` objects.
 
        """
 
        raise NotImplementedError
 

	
 
    @LazyProperty
 
    def size(self):
 
        """
 
        Returns total number of bytes from contents of all filenodes.
 
        """
 
        return sum((node.size for node in self.get_filenodes_generator()))
 

	
 
    def walk(self, topurl=''):
 
        """
 
        Similar to os.walk method. Instead of filesystem it walks through
 
        changeset starting at given ``topurl``.  Returns generator of tuples
 
        (topnode, dirnodes, filenodes).
 
        """
 
        topnode = self.get_node(topurl)
 
        yield (topnode, topnode.dirs, topnode.files)
 
        for dirnode in topnode.dirs:
 
            for tup in self.walk(dirnode.path):
 
                yield tup
 

	
 
    def get_filenodes_generator(self):
 
        """
 
        Returns generator that yields *all* file nodes.
 
        """
 
        for topnode, dirs, files in self.walk():
 
            for node in files:
 
                yield node
 

	
 
    def as_dict(self):
 
        """
 
        Returns dictionary with changeset's attributes and their values.
 
        """
 
        data = get_dict_for_attrs(self, ['id', 'raw_id', 'short_id',
 
            'revision', 'date', 'message'])
 
        data['author'] = {'name': self.author_name, 'email': self.author_email}
 
        data['added'] = [node.path for node in self.added]
 
        data['changed'] = [node.path for node in self.changed]
 
        data['removed'] = [node.path for node in self.removed]
 
        return data
 

	
 
    @LazyProperty
 
    def closesbranch(self):
 
        return False
 

	
 
    @LazyProperty
 
    def obsolete(self):
 
        return False
 

	
 
class BaseWorkdir(object):
 
    """
 
    Working directory representation of single repository.
 

	
 
    :attribute: repository: repository object of working directory
 
    """
 

	
 
    def __init__(self, repository):
 
        self.repository = repository
 

	
 
    def get_branch(self):
 
        """
 
        Returns name of current branch.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_changeset(self):
 
        """
 
        Returns current changeset.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_added(self):
 
        """
 
        Returns list of ``FileNode`` objects marked as *new* in working
 
        directory.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_changed(self):
 
        """
 
        Returns list of ``FileNode`` objects *changed* in working directory.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_removed(self):
 
        """
 
        Returns list of ``RemovedFileNode`` objects marked as *removed* in
 
        working directory.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_untracked(self):
 
        """
 
        Returns list of ``FileNode`` objects which are present within working
 
        directory however are not tracked by repository.
 
        """
 
        raise NotImplementedError
 

	
 
    def get_status(self):
 
        """
 
        Returns dict with ``added``, ``changed``, ``removed`` and ``untracked``
 
        lists.
 
        """
 
        raise NotImplementedError
 

	
 
    def commit(self, message, **kwargs):
 
        """
 
        Commits local (from working directory) changes and returns newly
 
        created
 
        ``Changeset``. Updates repository's ``revisions`` list.
 

	
 
        :raises ``CommitError``: if any error occurs while committing
 
        """
 
        raise NotImplementedError
 

	
 
    def update(self, revision=None):
 
        """
 
        Fetches content of the given revision and populates it within working
 
        directory.
 
        """
 
        raise NotImplementedError
 

	
 
    def checkout_branch(self, branch=None):
 
        """
 
        Checks out ``branch`` or the backend's default branch.
 

	
 
        Raises ``BranchDoesNotExistError`` if the branch does not exist.
 
        """
 
        raise NotImplementedError
 

	
 

	
 
class BaseInMemoryChangeset(object):
 
    """
 
    Represents differences between repository's state (most recent head) and
 
    changes made *in place*.
 

	
 
    **Attributes**
 

	
 
        ``repository``
 
            repository object for this in-memory-changeset
 

	
 
        ``added``
 
            list of ``FileNode`` objects marked as *added*
 

	
 
        ``changed``
 
            list of ``FileNode`` objects marked as *changed*
 

	
 
        ``removed``
 
            list of ``FileNode`` or ``RemovedFileNode`` objects marked to be
 
            *removed*
 

	
 
        ``parents``
 
            list of ``Changeset`` representing parents of in-memory changeset.
 
            Should always be 2-element sequence.
 

	
 
    """
 

	
 
    def __init__(self, repository):
 
        self.repository = repository
 
        self.added = []
 
        self.changed = []
 
        self.removed = []
 
        self.parents = []
 

	
 
    def add(self, *filenodes):
 
        """
 
        Marks given ``FileNode`` objects as *to be committed*.
 

	
 
        :raises ``NodeAlreadyExistsError``: if node with same path exists at
 
          latest changeset
 
        :raises ``NodeAlreadyAddedError``: if node with same path is already
 
          marked as *added*
 
        """
 
        # Check if not already marked as *added* first
 
        for node in filenodes:
 
            if node.path in (n.path for n in self.added):
 
                raise NodeAlreadyAddedError("Such FileNode %s is already "
 
                    "marked for addition" % node.path)
 
        for node in filenodes:
 
            self.added.append(node)
 

	
 
    def change(self, *filenodes):
 
        """
 
        Marks given ``FileNode`` objects to be *changed* in next commit.
 

	
 
        :raises ``EmptyRepositoryError``: if there are no changesets yet
 
        :raises ``NodeAlreadyExistsError``: if node with same path is already
 
          marked to be *changed*
 
        :raises ``NodeAlreadyRemovedError``: if node with same path is already
 
          marked to be *removed*
 
        :raises ``NodeDoesNotExistError``: if node doesn't exist in latest
 
          changeset
 
        :raises ``NodeNotChangedError``: if node hasn't really be changed
 
        """
 
        for node in filenodes:
 
            if node.path in (n.path for n in self.removed):
 
                raise NodeAlreadyRemovedError("Node at %s is already marked "
 
                    "as removed" % node.path)
 
        try:
 
            self.repository.get_changeset()
 
        except EmptyRepositoryError:
 
            raise EmptyRepositoryError("Nothing to change - try to *add* new "
 
                "nodes rather than changing them")
 
        for node in filenodes:
 
            if node.path in (n.path for n in self.changed):
 
                raise NodeAlreadyChangedError("Node at '%s' is already "
 
                    "marked as changed" % node.path)
 
            self.changed.append(node)
 

	
 
    def remove(self, *filenodes):
 
        """
 
        Marks given ``FileNode`` (or ``RemovedFileNode``) objects to be
 
        *removed* in next commit.
 

	
 
        :raises ``NodeAlreadyRemovedError``: if node has been already marked to
 
          be *removed*
 
        :raises ``NodeAlreadyChangedError``: if node has been already marked to
 
          be *changed*
 
        """
 
        for node in filenodes:
 
            if node.path in (n.path for n in self.removed):
 
                raise NodeAlreadyRemovedError("Node is already marked to "
 
                    "for removal at %s" % node.path)
 
            if node.path in (n.path for n in self.changed):
 
                raise NodeAlreadyChangedError("Node is already marked to "
 
                    "be changed at %s" % node.path)
 
            # We only mark node as *removed* - real removal is done by
 
            # commit method
 
            self.removed.append(node)
 

	
 
    def reset(self):
 
        """
 
        Resets this instance to initial state (cleans ``added``, ``changed``
 
        and ``removed`` lists).
 
        """
 
        self.added = []
 
        self.changed = []
 
        self.removed = []
 
        self.parents = []
 

	
 
    def get_ipaths(self):
 
        """
 
        Returns generator of paths from nodes marked as added, changed or
 
        removed.
 
        """
 
        for node in itertools.chain(self.added, self.changed, self.removed):
 
            yield node.path
 

	
 
    def get_paths(self):
 
        """
 
        Returns list of paths from nodes marked as added, changed or removed.
 
        """
 
        return list(self.get_ipaths())
 

	
 
    def check_integrity(self, parents=None):
 
        """
 
        Checks in-memory changeset's integrity. Also, sets parents if not
 
        already set.
 

	
 
        :raises CommitError: if any error occurs (i.e.
 
          ``NodeDoesNotExistError``).
 
        """
 
        if not self.parents:
 
            parents = parents or []
 
            if len(parents) == 0:
 
                try:
 
                    parents = [self.repository.get_changeset(), None]
 
                except EmptyRepositoryError:
 
                    parents = [None, None]
 
            elif len(parents) == 1:
 
                parents += [None]
 
            self.parents = parents
kallithea/tests/vcs/test_git.py
Show inline comments
 

	
 
import os
 
import sys
 
import mock
 
import datetime
 
import urllib2
 
from kallithea.lib.vcs.backends.git import GitRepository, GitChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError
 
from kallithea.lib.vcs.nodes import NodeKind, FileNode, DirNode, NodeState
 
from kallithea.lib.vcs.utils.compat import unittest
 
from kallithea.model.scm import ScmModel
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import TEST_GIT_REPO, TEST_GIT_REPO_CLONE, get_new_dir
 

	
 

	
 
class GitRepositoryTest(unittest.TestCase):
 

	
 
    def __check_for_existing_repo(self):
 
        if os.path.exists(TEST_GIT_REPO_CLONE):
 
            self.fail('Cannot test git clone repo as location %s already '
 
                      'exists. You should manually remove it first.'
 
                      % TEST_GIT_REPO_CLONE)
 

	
 
    def setUp(self):
 
        self.repo = GitRepository(TEST_GIT_REPO)
 

	
 
    def test_wrong_repo_path(self):
 
        wrong_repo_path = '/tmp/errorrepo'
 
        self.assertRaises(RepositoryError, GitRepository, wrong_repo_path)
 

	
 
    def test_git_cmd_injection(self):
 
        repo_inject_path = TEST_GIT_REPO + '; echo "Cake";'
 
        with self.assertRaises(urllib2.URLError):
 
            # Should fail because URL will contain the parts after ; too
 
            urlerror_fail_repo = GitRepository(get_new_dir('injection-repo'), src_url=repo_inject_path, update_after_clone=True, create=True)
 

	
 
        with self.assertRaises(RepositoryError):
 
            # Should fail on direct clone call, which as of this writing does not happen outside of class
 
            clone_fail_repo = GitRepository(get_new_dir('injection-repo'), create=True)
 
            clone_fail_repo.clone(repo_inject_path, update_after_clone=True,)
 

	
 
        # Verify correct quoting of evil characters that should work on posix file systems
 
        if sys.platform == 'win32':
 
            # windows does not allow '"' in dir names
 
            tricky_path = get_new_dir("tricky-path-repo-$'`")
 
        else:
 
            tricky_path = get_new_dir("tricky-path-repo-$'\"`")
 
        successfully_cloned = GitRepository(tricky_path, src_url=TEST_GIT_REPO, update_after_clone=True, create=True)
 
        # Repo should have been created
 
        self.assertFalse(successfully_cloned._repo.bare)
 

	
 
        if sys.platform == 'win32':
 
            # windows does not allow '"' in dir names
 
            tricky_path_2 = get_new_dir("tricky-path-2-repo-$'`")
 
        else:
 
            tricky_path_2 = get_new_dir("tricky-path-2-repo-$'\"`")
 
        successfully_cloned2 = GitRepository(tricky_path_2, src_url=tricky_path, bare=True, create=True)
 
        # Repo should have been created and thus used correct quoting for clone
 
        self.assertTrue(successfully_cloned2._repo.bare)
 

	
 
        # Should pass because URL has been properly quoted
 
        successfully_cloned.pull(tricky_path_2)
 
        successfully_cloned2.fetch(tricky_path)
 

	
 
    def test_repo_create_with_spaces_in_path(self):
 
        repo_path = get_new_dir("path with spaces")
 
        repo = GitRepository(repo_path, src_url=None, bare=True, create=True)
 
        # Repo should have been created
 
        self.assertTrue(repo._repo.bare)
 

	
 
    def test_repo_clone(self):
 
        self.__check_for_existing_repo()
 
        repo = GitRepository(TEST_GIT_REPO)
 
        repo_clone = GitRepository(TEST_GIT_REPO_CLONE,
 
            src_url=TEST_GIT_REPO, create=True, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        # Checking hashes of changesets should be enough
 
        for changeset in repo.get_changesets():
 
            raw_id = changeset.raw_id
 
            self.assertEqual(raw_id, repo_clone.get_changeset(raw_id).raw_id)
 

	
 
    def test_repo_clone_with_spaces_in_path(self):
 
        repo_path = get_new_dir("path with spaces")
 
        successfully_cloned = GitRepository(repo_path, src_url=TEST_GIT_REPO, update_after_clone=True, create=True)
 
        # Repo should have been created
 
        self.assertFalse(successfully_cloned._repo.bare)
 

	
 
        successfully_cloned.pull(TEST_GIT_REPO)
 
        self.repo.fetch(repo_path)
 

	
 
    def test_repo_clone_without_create(self):
 
        self.assertRaises(RepositoryError, GitRepository,
 
            TEST_GIT_REPO_CLONE + '_wo_create', src_url=TEST_GIT_REPO)
 

	
 
    def test_repo_clone_with_update(self):
 
        repo = GitRepository(TEST_GIT_REPO)
 
        clone_path = TEST_GIT_REPO_CLONE + '_with_update'
 
        repo_clone = GitRepository(clone_path,
 
            create=True, src_url=TEST_GIT_REPO, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 

	
 
        #check if current workdir was updated
 
        fpath = os.path.join(clone_path, 'MANIFEST.in')
 
        self.assertEqual(True, os.path.isfile(fpath),
 
            'Repo was cloned and updated but file %s could not be found'
 
            % fpath)
 

	
 
    def test_repo_clone_without_update(self):
 
        repo = GitRepository(TEST_GIT_REPO)
 
        clone_path = TEST_GIT_REPO_CLONE + '_without_update'
 
        repo_clone = GitRepository(clone_path,
 
            create=True, src_url=TEST_GIT_REPO, update_after_clone=False)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        #check if current workdir was *NOT* updated
 
        fpath = os.path.join(clone_path, 'MANIFEST.in')
 
        # Make sure it's not bare repo
 
        self.assertFalse(repo_clone._repo.bare)
 
        self.assertEqual(False, os.path.isfile(fpath),
 
            'Repo was cloned and updated but file %s was found'
 
            % fpath)
 

	
 
    def test_repo_clone_into_bare_repo(self):
 
        repo = GitRepository(TEST_GIT_REPO)
 
        clone_path = TEST_GIT_REPO_CLONE + '_bare.git'
 
        repo_clone = GitRepository(clone_path, create=True,
 
            src_url=repo.path, bare=True)
 
        self.assertTrue(repo_clone._repo.bare)
 

	
 
    def test_create_repo_is_not_bare_by_default(self):
 
        repo = GitRepository(get_new_dir('not-bare-by-default'), create=True)
 
        self.assertFalse(repo._repo.bare)
 

	
 
    def test_create_bare_repo(self):
 
        repo = GitRepository(get_new_dir('bare-repo'), create=True, bare=True)
 
        self.assertTrue(repo._repo.bare)
 

	
 
    def test_revisions(self):
 
        # there are 112 revisions (by now)
 
        # so we can assume they would be available from now on
 
        subset = set([
 
            'c1214f7e79e02fc37156ff215cd71275450cffc3',
 
            '38b5fe81f109cb111f549bfe9bb6b267e10bc557',
 
            'fa6600f6848800641328adbf7811fd2372c02ab2',
 
            '102607b09cdd60e2793929c4f90478be29f85a17',
 
            '49d3fd156b6f7db46313fac355dca1a0b94a0017',
 
            '2d1028c054665b962fa3d307adfc923ddd528038',
 
            'd7e0d30fbcae12c90680eb095a4f5f02505ce501',
 
            'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
 
            'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
 
            '8430a588b43b5d6da365400117c89400326e7992',
 
            'd955cd312c17b02143c04fa1099a352b04368118',
 
            'f67b87e5c629c2ee0ba58f85197e423ff28d735b',
 
            'add63e382e4aabc9e1afdc4bdc24506c269b7618',
 
            'f298fe1189f1b69779a4423f40b48edf92a703fc',
 
            'bd9b619eb41994cac43d67cf4ccc8399c1125808',
 
            '6e125e7c890379446e98980d8ed60fba87d0f6d1',
 
            'd4a54db9f745dfeba6933bf5b1e79e15d0af20bd',
 
            '0b05e4ed56c802098dfc813cbe779b2f49e92500',
 
            '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
 
            '45223f8f114c64bf4d6f853e3c35a369a6305520',
 
            'ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
 
            'f5ea29fc42ef67a2a5a7aecff10e1566699acd68',
 
            '27d48942240f5b91dfda77accd2caac94708cc7d',
 
            '622f0eb0bafd619d2560c26f80f09e3b0b0d78af',
 
            'e686b958768ee96af8029fe19c6050b1a8dd3b2b'])
 
        self.assertTrue(subset.issubset(set(self.repo.revisions)))
 

	
 

	
 

	
 
    def test_slicing(self):
 
        #4 1 5 10 95
 
        for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
 
                                 (10, 20, 10), (5, 100, 95)]:
 
            revs = list(self.repo[sfrom:sto])
 
            self.assertEqual(len(revs), size)
 
            self.assertEqual(revs[0], self.repo.get_changeset(sfrom))
 
            self.assertEqual(revs[-1], self.repo.get_changeset(sto - 1))
 

	
 

	
 
    def test_branches(self):
 
        # TODO: Need more tests here
 
        # Removed (those are 'remotes' branches for cloned repo)
 
        #self.assertTrue('master' in self.repo.branches)
 
        #self.assertTrue('gittree' in self.repo.branches)
 
        #self.assertTrue('web-branch' in self.repo.branches)
 
        for name, id in self.repo.branches.items():
 
            self.assertTrue(isinstance(
 
                self.repo.get_changeset(id), GitChangeset))
 

	
 
    def test_tags(self):
 
        # TODO: Need more tests here
 
        self.assertTrue('v0.1.1' in self.repo.tags)
 
        self.assertTrue('v0.1.2' in self.repo.tags)
 
        for name, id in self.repo.tags.items():
 
            self.assertTrue(isinstance(
 
                self.repo.get_changeset(id), GitChangeset))
 

	
 
    def _test_single_changeset_cache(self, revision):
 
        chset = self.repo.get_changeset(revision)
 
        self.assertTrue(revision in self.repo.changesets)
 
        self.assertTrue(chset is self.repo.changesets[revision])
 

	
 
    def test_initial_changeset(self):
 
        id = self.repo.revisions[0]
 
        init_chset = self.repo.get_changeset(id)
 
        self.assertEqual(init_chset.message, 'initial import\n')
 
        self.assertEqual(init_chset.author,
 
            'Marcin Kuzminski <marcin@python-blog.com>')
 
        for path in ('vcs/__init__.py',
 
                     'vcs/backends/BaseRepository.py',
 
                     'vcs/backends/__init__.py'):
 
            self.assertTrue(isinstance(init_chset.get_node(path), FileNode))
 
        for path in ('', 'vcs', 'vcs/backends'):
 
            self.assertTrue(isinstance(init_chset.get_node(path), DirNode))
 

	
 
        self.assertRaises(NodeDoesNotExistError, init_chset.get_node, path='foobar')
 

	
 
        node = init_chset.get_node('vcs/')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.DIR)
 

	
 
        node = init_chset.get_node('vcs')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.DIR)
 

	
 
        node = init_chset.get_node('vcs/__init__.py')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.FILE)
 

	
 
    def test_not_existing_changeset(self):
 
        self.assertRaises(RepositoryError, self.repo.get_changeset,
 
            'f' * 40)
 

	
 
    def test_changeset10(self):
 

	
 
        chset10 = self.repo.get_changeset(self.repo.revisions[9])
 
        README = """===
 
VCS
 
===
 

	
 
Various Version Control System management abstraction layer for Python.
 

	
 
Introduction
 
------------
 

	
 
TODO: To be written...
 

	
 
"""
 
        node = chset10.get_node('README.rst')
 
        self.assertEqual(node.kind, NodeKind.FILE)
 
        self.assertEqual(node.content, README)
 

	
 

	
 
class GitChangesetTest(unittest.TestCase):
 

	
 
    def setUp(self):
 
        self.repo = GitRepository(TEST_GIT_REPO)
 

	
 
    def test_default_changeset(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(tip, self.repo.get_changeset(None))
 
        self.assertEqual(tip, self.repo.get_changeset('tip'))
 

	
 
    def test_root_node(self):
 
        tip = self.repo.get_changeset()
 
        self.assertTrue(tip.root is tip.get_node(''))
 

	
 
    def test_lazy_fetch(self):
 
        """
 
        Test if changeset's nodes expands and are cached as we walk through
 
        the revision. This test is somewhat hard to write as order of tests
 
        is a key here. Written by running command after command in a shell.
 
        """
 
        hex = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
 
        self.assertTrue(hex in self.repo.revisions)
 
        chset = self.repo.get_changeset(hex)
 
        self.assertTrue(len(chset.nodes) == 0)
 
        root = chset.root
 
        self.assertTrue(len(chset.nodes) == 1)
 
        self.assertTrue(len(root.nodes) == 8)
 
        # accessing root.nodes updates chset.nodes
 
        self.assertTrue(len(chset.nodes) == 9)
 

	
 
        docs = root.get_node('docs')
 
        # we haven't yet accessed anything new as docs dir was already cached
 
        self.assertTrue(len(chset.nodes) == 9)
 
        self.assertTrue(len(docs.nodes) == 8)
 
        # accessing docs.nodes updates chset.nodes
 
        self.assertTrue(len(chset.nodes) == 17)
 

	
 
        self.assertTrue(docs is chset.get_node('docs'))
 
        self.assertTrue(docs is root.nodes[0])
 
        self.assertTrue(docs is root.dirs[0])
 
        self.assertTrue(docs is chset.get_node('docs'))
 

	
 
    def test_nodes_with_changeset(self):
 
        hex = '2a13f185e4525f9d4b59882791a2d397b90d5ddc'
 
        chset = self.repo.get_changeset(hex)
 
        root = chset.root
 
        docs = root.get_node('docs')
 
        self.assertTrue(docs is chset.get_node('docs'))
 
        api = docs.get_node('api')
 
        self.assertTrue(api is chset.get_node('docs/api'))
 
        index = api.get_node('index.rst')
 
        self.assertTrue(index is chset.get_node('docs/api/index.rst'))
 
        self.assertTrue(index is chset.get_node('docs')\
 
            .get_node('api')\
 
            .get_node('index.rst'))
 

	
 
    def test_branch_and_tags(self):
 
        """
 
        rev0 = self.repo.revisions[0]
 
        chset0 = self.repo.get_changeset(rev0)
 
        self.assertEqual(chset0.branch, 'master')
 
        self.assertEqual(chset0.tags, [])
 

	
 
        rev10 = self.repo.revisions[10]
 
        chset10 = self.repo.get_changeset(rev10)
 
        self.assertEqual(chset10.branch, 'master')
 
        self.assertEqual(chset10.tags, [])
 

	
 
        rev44 = self.repo.revisions[44]
 
        chset44 = self.repo.get_changeset(rev44)
 
        self.assertEqual(chset44.branch, 'web-branch')
 

	
 
        tip = self.repo.get_changeset('tip')
 
        self.assertTrue('tip' in tip.tags)
 
        """
 
        # Those tests would fail - branches are now going
 
        # to be changed at main API in order to support git backend
 
        pass
 

	
 
    def _test_slices(self, limit, offset):
 
        count = self.repo.count()
 
        changesets = self.repo.get_changesets(limit=limit, offset=offset)
 
        idx = 0
 
        for changeset in changesets:
 
            rev = offset + idx
 
            idx += 1
 
            rev_id = self.repo.revisions[rev]
 
            if idx > limit:
 
                self.fail("Exceeded limit already (getting revision %s, "
 
                    "there are %s total revisions, offset=%s, limit=%s)"
 
                    % (rev_id, count, offset, limit))
 
            self.assertEqual(changeset, self.repo.get_changeset(rev_id))
 
        result = list(self.repo.get_changesets(limit=limit, offset=offset))
 
        start = offset
 
        end = limit and offset + limit or None
 
        sliced = list(self.repo[start:end])
 
        self.failUnlessEqual(result, sliced,
 
            msg="Comparison failed for limit=%s, offset=%s"
 
            "(get_changeset returned: %s and sliced: %s"
 
            % (limit, offset, result, sliced))
 

	
 
    def _test_file_size(self, revision, path, size):
 
        node = self.repo.get_changeset(revision).get_node(path)
 
        self.assertTrue(node.is_file())
 
        self.assertEqual(node.size, size)
 

	
 
    def test_file_size(self):
 
        to_check = (
 
            ('c1214f7e79e02fc37156ff215cd71275450cffc3',
 
                'vcs/backends/BaseRepository.py', 502),
 
            ('d7e0d30fbcae12c90680eb095a4f5f02505ce501',
 
                'vcs/backends/hg.py', 854),
 
            ('6e125e7c890379446e98980d8ed60fba87d0f6d1',
 
                'setup.py', 1068),
 

	
 
            ('d955cd312c17b02143c04fa1099a352b04368118',
 
                'vcs/backends/base.py', 2921),
 
            ('ca1eb7957a54bce53b12d1a51b13452f95bc7c7e',
 
                'vcs/backends/base.py', 3936),
 
            ('f50f42baeed5af6518ef4b0cb2f1423f3851a941',
 
                'vcs/backends/base.py', 6189),
 
        )
 
        for revision, path, size in to_check:
 
            self._test_file_size(revision, path, size)
 

	
 
    def _test_dir_size(self, revision, path, size):
 
        node = self.repo.get_changeset(revision).get_node(path)
 
        self.assertEqual(node.size, size)
 

	
 
    def test_dir_size(self):
 
        to_check = (
 
            ('5f2c6ee195929b0be80749243c18121c9864a3b3', '/', 674076),
 
            ('7ab37bc680b4aa72c34d07b230c866c28e9fc204', '/', 674049),
 
            ('6892503fb8f2a552cef5f4d4cc2cdbd13ae1cd2f', '/', 671830),
 
        )
 
        for revision, path, size in to_check:
 
            self._test_dir_size(revision, path, size)
 

	
 
    def test_repo_size(self):
 
        self.assertEqual(self.repo.size, 1022026) # FIXME
 
        self.assertEqual(self.repo.size, 674076)
 

	
 
    def test_file_history(self):
 
        # we can only check if those revisions are present in the history
 
        # as we cannot update this test every time file is changed
 
        files = {
 
            'setup.py': [
 
                '54386793436c938cff89326944d4c2702340037d',
 
                '51d254f0ecf5df2ce50c0b115741f4cf13985dab',
 
                '998ed409c795fec2012b1c0ca054d99888b22090',
 
                '5e0eb4c47f56564395f76333f319d26c79e2fb09',
 
                '0115510b70c7229dbc5dc49036b32e7d91d23acd',
 
                '7cb3fd1b6d8c20ba89e2264f1c8baebc8a52d36e',
 
                '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
 
                '191caa5b2c81ed17c0794bf7bb9958f4dcb0b87e',
 
                'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
 
            ],
 
            'vcs/nodes.py': [
 
                '33fa3223355104431402a888fa77a4e9956feb3e',
 
                'fa014c12c26d10ba682fadb78f2a11c24c8118e1',
 
                'e686b958768ee96af8029fe19c6050b1a8dd3b2b',
 
                'ab5721ca0a081f26bf43d9051e615af2cc99952f',
 
                'c877b68d18e792a66b7f4c529ea02c8f80801542',
 
                '4313566d2e417cb382948f8d9d7c765330356054',
 
                '6c2303a793671e807d1cfc70134c9ca0767d98c2',
 
                '54386793436c938cff89326944d4c2702340037d',
 
                '54000345d2e78b03a99d561399e8e548de3f3203',
 
                '1c6b3677b37ea064cb4b51714d8f7498f93f4b2b',
 
                '2d03ca750a44440fb5ea8b751176d1f36f8e8f46',
 
                '2a08b128c206db48c2f0b8f70df060e6db0ae4f8',
 
                '30c26513ff1eb8e5ce0e1c6b477ee5dc50e2f34b',
 
                'ac71e9503c2ca95542839af0ce7b64011b72ea7c',
 
                '12669288fd13adba2a9b7dd5b870cc23ffab92d2',
 
                '5a0c84f3e6fe3473e4c8427199d5a6fc71a9b382',
 
                '12f2f5e2b38e6ff3fbdb5d722efed9aa72ecb0d5',
 
                '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
 
                'f50f42baeed5af6518ef4b0cb2f1423f3851a941',
 
                'd7e390a45f6aa96f04f5e7f583ad4f867431aa25',
 
                'f15c21f97864b4f071cddfbf2750ec2e23859414',
 
                'e906ef056cf539a4e4e5fc8003eaf7cf14dd8ade',
 
                'ea2b108b48aa8f8c9c4a941f66c1a03315ca1c3b',
 
                '84dec09632a4458f79f50ddbbd155506c460b4f9',
 
                '0115510b70c7229dbc5dc49036b32e7d91d23acd',
 
                '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
 
                '3bf1c5868e570e39569d094f922d33ced2fa3b2b',
 
                'b8d04012574729d2c29886e53b1a43ef16dd00a1',
 
                '6970b057cffe4aab0a792aa634c89f4bebf01441',
 
                'dd80b0f6cf5052f17cc738c2951c4f2070200d7f',
 
                'ff7ca51e58c505fec0dd2491de52c622bb7a806b',
 
            ],
 
            'vcs/backends/git.py': [
 
                '4cf116ad5a457530381135e2f4c453e68a1b0105',
 
                '9a751d84d8e9408e736329767387f41b36935153',
 
                'cb681fb539c3faaedbcdf5ca71ca413425c18f01',
 
                '428f81bb652bcba8d631bce926e8834ff49bdcc6',
 
                '180ab15aebf26f98f714d8c68715e0f05fa6e1c7',
 
                '2b8e07312a2e89e92b90426ab97f349f4bce2a3a',
 
                '50e08c506174d8645a4bb517dd122ac946a0f3bf',
 
                '54000345d2e78b03a99d561399e8e548de3f3203',
 
            ],
 
        }
 
        for path, revs in files.items():
 
            node = self.repo.get_changeset(revs[0]).get_node(path)
 
            node_revs = [chset.raw_id for chset in node.history]
 
            self.assertTrue(set(revs).issubset(set(node_revs)),
 
                "We assumed that %s is subset of revisions for which file %s "
 
                "has been changed, and history of that node returned: %s"
 
                % (revs, path, node_revs))
 

	
 
    def test_file_annotate(self):
 
        files = {
 
            'vcs/backends/__init__.py': {
 
                'c1214f7e79e02fc37156ff215cd71275450cffc3': {
 
                    'lines_no': 1,
 
                    'changesets': [
 
                        'c1214f7e79e02fc37156ff215cd71275450cffc3',
 
                    ],
 
                },
 
                '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647': {
 
                    'lines_no': 21,
 
                    'changesets': [
 
                        '49d3fd156b6f7db46313fac355dca1a0b94a0017',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                    ],
 
                },
 
                'e29b67bd158580fc90fc5e9111240b90e6e86064': {
 
                    'lines_no': 32,
 
                    'changesets': [
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '5eab1222a7cd4bfcbabc218ca6d04276d4e27378',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '54000345d2e78b03a99d561399e8e548de3f3203',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '78c3f0c23b7ee935ec276acb8b8212444c33c396',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '2a13f185e4525f9d4b59882791a2d397b90d5ddc',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '78c3f0c23b7ee935ec276acb8b8212444c33c396',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '992f38217b979d0b0987d0bae3cc26dac85d9b19',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                        '16fba1ae9334d79b66d7afed2c2dfbfa2ae53647',
 
                    ],
 
                },
 
            },
 
        }
 

	
 
        for fname, revision_dict in files.items():
 
            for rev, data in revision_dict.items():
 
                cs = self.repo.get_changeset(rev)
 

	
 
                l1_1 = [x[1] for x in cs.get_file_annotate(fname)]
 
                l1_2 = [x[2]().raw_id for x in cs.get_file_annotate(fname)]
 
                self.assertEqual(l1_1, l1_2)
 
                l1 = l1_1
 
                l2 = files[fname][rev]['changesets']
 
                self.assertTrue(l1 == l2 , "The lists of revision for %s@rev %s"
 
                                "from annotation list should match each other, "
 
                                "got \n%s \nvs \n%s " % (fname, rev, l1, l2))
 

	
 
    def test_files_state(self):
 
        """
 
        Tests state of FileNodes.
 
        """
 
        node = self.repo\
 
            .get_changeset('e6ea6d16e2f26250124a1f4b4fe37a912f9d86a0')\
 
            .get_node('vcs/utils/diffs.py')
 
        self.assertTrue(node.state, NodeState.ADDED)
 
        self.assertTrue(node.added)
 
        self.assertFalse(node.changed)
 
        self.assertFalse(node.not_changed)
 
        self.assertFalse(node.removed)
 

	
 
        node = self.repo\
 
            .get_changeset('33fa3223355104431402a888fa77a4e9956feb3e')\
 
            .get_node('.hgignore')
 
        self.assertTrue(node.state, NodeState.CHANGED)
 
        self.assertFalse(node.added)
 
        self.assertTrue(node.changed)
 
        self.assertFalse(node.not_changed)
 
        self.assertFalse(node.removed)
 

	
 
        node = self.repo\
 
            .get_changeset('e29b67bd158580fc90fc5e9111240b90e6e86064')\
 
            .get_node('setup.py')
 
        self.assertTrue(node.state, NodeState.NOT_CHANGED)
 
        self.assertFalse(node.added)
 
        self.assertFalse(node.changed)
 
        self.assertTrue(node.not_changed)
 
        self.assertFalse(node.removed)
 

	
 
        # If node has REMOVED state then trying to fetch it would raise
 
        # ChangesetError exception
 
        chset = self.repo.get_changeset(
 
            'fa6600f6848800641328adbf7811fd2372c02ab2')
 
        path = 'vcs/backends/BaseRepository.py'
 
        self.assertRaises(NodeDoesNotExistError, chset.get_node, path)
 
        # but it would be one of ``removed`` (changeset's attribute)
 
        self.assertTrue(path in [rf.path for rf in chset.removed])
 

	
 
        chset = self.repo.get_changeset(
 
            '54386793436c938cff89326944d4c2702340037d')
 
        changed = ['setup.py', 'tests/test_nodes.py', 'vcs/backends/hg.py',
 
            'vcs/nodes.py']
 
        self.assertEqual(set(changed), set([f.path for f in chset.changed]))
 

	
 
    def test_commit_message_is_unicode(self):
 
        for cs in self.repo:
 
            self.assertEqual(type(cs.message), unicode)
 

	
 
    def test_changeset_author_is_unicode(self):
 
        for cs in self.repo:
 
            self.assertEqual(type(cs.author), unicode)
 

	
 
    def test_repo_files_content_is_unicode(self):
 
        changeset = self.repo.get_changeset()
 
        for node in changeset.get_node('/'):
 
            if node.is_file():
 
                self.assertEqual(type(node.content), unicode)
 

	
 
    def test_wrong_path(self):
 
        # There is 'setup.py' in the root dir but not there:
 
        path = 'foo/bar/setup.py'
 
        tip = self.repo.get_changeset()
 
        self.assertRaises(VCSError, tip.get_node, path)
 

	
 
    def test_author_email(self):
 
        self.assertEqual('marcin@python-blog.com',
 
          self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3')\
 
          .author_email)
 
        self.assertEqual('lukasz.balcerzak@python-center.pl',
 
          self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b')\
 
          .author_email)
 
        self.assertEqual('none@none',
 
          self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992')\
 
          .author_email)
 

	
 
    def test_author_username(self):
 
        self.assertEqual('Marcin Kuzminski',
 
          self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3')\
 
          .author_name)
 
        self.assertEqual('Lukasz Balcerzak',
 
          self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b')\
 
          .author_name)
 
        self.assertEqual('marcink',
 
          self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992')\
 
          .author_name)
 

	
 

	
 
class GitSpecificTest(unittest.TestCase):
 

	
 
    def test_error_is_raised_for_added_if_diff_name_status_is_wrong(self):
 
        repo = mock.MagicMock()
 
        changeset = GitChangeset(repo, 'foobar')
 
        changeset._diff_name_status = 'foobar'
 
        with self.assertRaises(VCSError):
 
            changeset.added
 

	
 
    def test_error_is_raised_for_changed_if_diff_name_status_is_wrong(self):
 
        repo = mock.MagicMock()
 
        changeset = GitChangeset(repo, 'foobar')
 
        changeset._diff_name_status = 'foobar'
 
        with self.assertRaises(VCSError):
 
            changeset.added
 

	
 
    def test_error_is_raised_for_removed_if_diff_name_status_is_wrong(self):
 
        repo = mock.MagicMock()
 
        changeset = GitChangeset(repo, 'foobar')
 
        changeset._diff_name_status = 'foobar'
 
        with self.assertRaises(VCSError):
 
            changeset.added
 

	
 

	
 
class GitSpecificWithRepoTest(_BackendTestMixin, unittest.TestCase):
 
    backend_alias = 'git'
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': 'Initial',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar/static/js/admin/base.js', content='base'),
 
                    FileNode('foobar/static/admin', content='admin',
 
                        mode=0120000), # this is a link
 
                    FileNode('foo', content='foo'),
 
                ],
 
            },
 
            {
 
                'message': 'Second',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [
 
                    FileNode('foo2', content='foo2'),
 
                ],
 
            },
 
        ]
 

	
 
    def test_paths_slow_traversing(self):
 
        cs = self.repo.get_changeset()
 
        self.assertEqual(cs.get_node('foobar').get_node('static').get_node('js')
 
            .get_node('admin').get_node('base.js').content, 'base')
 

	
 
    def test_paths_fast_traversing(self):
 
        cs = self.repo.get_changeset()
 
        self.assertEqual(cs.get_node('foobar/static/js/admin/base.js').content,
 
            'base')
 

	
 
    def test_workdir_get_branch(self):
 
        self.repo.run_git_command(['checkout', '-b', 'production'])
 
        # Regression test: one of following would fail if we don't check
 
        # .git/HEAD file
 
        self.repo.run_git_command(['checkout', 'production'])
 
        self.assertEqual(self.repo.workdir.get_branch(), 'production')
 
        self.repo.run_git_command(['checkout', 'master'])
 
        self.assertEqual(self.repo.workdir.get_branch(), 'master')
 

	
 
    def test_get_diff_runs_git_command_with_hashes(self):
 
        self.repo.run_git_command = mock.Mock(return_value=['', ''])
 
        self.repo.get_diff(0, 1)
 
        self.repo.run_git_command.assert_called_once_with(
 
            ['diff', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1)])
 

	
 
    def test_get_diff_runs_git_command_with_str_hashes(self):
 
        self.repo.run_git_command = mock.Mock(return_value=['', ''])
 
        self.repo.get_diff(self.repo.EMPTY_CHANGESET, 1)
 
        self.repo.run_git_command.assert_called_once_with(
 
            ['show', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(1)])
 

	
 
    def test_get_diff_runs_git_command_with_path_if_its_given(self):
 
        self.repo.run_git_command = mock.Mock(return_value=['', ''])
 
        self.repo.get_diff(0, 1, 'foo')
 
        self.repo.run_git_command.assert_called_once_with(
 
            ['diff', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1), '--', 'foo'])
 

	
 

	
 
class GitRegressionTest(_BackendTestMixin, unittest.TestCase):
 
    backend_alias = 'git'
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': 'Initial',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('bot/__init__.py', content='base'),
 
                    FileNode('bot/templates/404.html', content='base'),
 
                    FileNode('bot/templates/500.html', content='base'),
 
                ],
 
            },
 
            {
 
                'message': 'Second',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [
 
                    FileNode('bot/build/migrations/1.py', content='foo2'),
 
                    FileNode('bot/build/migrations/2.py', content='foo2'),
 
                    FileNode('bot/build/static/templates/f.html', content='foo2'),
 
                    FileNode('bot/build/static/templates/f1.html', content='foo2'),
 
                    FileNode('bot/build/templates/err.html', content='foo2'),
 
                    FileNode('bot/build/templates/err2.html', content='foo2'),
 
                ],
 
            },
 
        ]
 

	
 
    def test_similar_paths(self):
 
        cs = self.repo.get_changeset()
 
        paths = lambda *n:[x.path for x in n]
 
        self.assertEqual(paths(*cs.get_nodes('bot')), ['bot/build', 'bot/templates', 'bot/__init__.py'])
 
        self.assertEqual(paths(*cs.get_nodes('bot/build')), ['bot/build/migrations', 'bot/build/static', 'bot/build/templates'])
 
        self.assertEqual(paths(*cs.get_nodes('bot/build/static')), ['bot/build/static/templates'])
 
        # this get_nodes below causes troubles !
 
        self.assertEqual(paths(*cs.get_nodes('bot/build/static/templates')), ['bot/build/static/templates/f.html', 'bot/build/static/templates/f1.html'])
 
        self.assertEqual(paths(*cs.get_nodes('bot/build/templates')), ['bot/build/templates/err.html', 'bot/build/templates/err2.html'])
 
        self.assertEqual(paths(*cs.get_nodes('bot/templates/')), ['bot/templates/404.html', 'bot/templates/500.html'])
 

	
 

	
 
class GitHooksTest(unittest.TestCase):
 
    """
 
    Tests related to hook functionality of Git repositories.
 
    """
 

	
 
    def setUp(self):
 
        # For each run we want a fresh repo.
 
        self.repo_directory = get_new_dir("githookrepo")
 
        self.repo = GitRepository(self.repo_directory, create=True)
 

	
 
        # Create a dictionary where keys are hook names, and values are paths to
 
        # them. Deduplicates code in tests a bit.
 
        self.hook_directory = self.repo.get_hook_location()
 
        self.kallithea_hooks = {h: os.path.join(self.hook_directory, h) for h in ("pre-receive", "post-receive")}
 

	
 
    def test_hooks_created_if_missing(self):
 
        """
 
        Tests if hooks are installed in repository if they are missing.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
            if os.path.exists(hook_path):
 
                os.remove(hook_path)
 

	
 
        ScmModel().install_git_hooks(repo=self.repo)
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
            self.assertTrue(os.path.exists(hook_path))
 

	
 
    def test_kallithea_hooks_updated(self):
 
        """
 
        Tests if hooks are updated if they are Kallithea hooks already.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
            with open(hook_path, "w") as f:
 
                f.write("KALLITHEA_HOOK_VER=0.0.0\nJUST_BOGUS")
 

	
 
        ScmModel().install_git_hooks(repo=self.repo)
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
            with open(hook_path) as f:
 
                self.assertNotIn("JUST_BOGUS", f.read())
 

	
 
    def test_custom_hooks_untouched(self):
 
        """
 
        Tests if hooks are left untouched if they are not Kallithea hooks.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
            with open(hook_path, "w") as f:
 
                f.write("#!/bin/bash\n#CUSTOM_HOOK")
 

	
 
        ScmModel().install_git_hooks(repo=self.repo)
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
            with open(hook_path) as f:
 
                self.assertIn("CUSTOM_HOOK", f.read())
 

	
 
    def test_custom_hooks_forced_update(self):
 
        """
 
        Tests if hooks are forcefully updated even though they are custom hooks.
 
        """
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
            with open(hook_path, "w") as f:
 
                f.write("#!/bin/bash\n#CUSTOM_HOOK")
 

	
 
        ScmModel().install_git_hooks(repo=self.repo, force_create=True)
 

	
 
        for hook, hook_path in self.kallithea_hooks.iteritems():
 
            with open(hook_path) as f:
 
                self.assertIn("KALLITHEA_HOOK_VER", f.read())
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
kallithea/tests/vcs/test_hg.py
Show inline comments
 

	
 
import os
 
from kallithea.lib.vcs.backends.hg import MercurialRepository, MercurialChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError, NodeDoesNotExistError
 
from kallithea.lib.vcs.nodes import NodeKind, NodeState
 
from kallithea.tests.vcs.conf import TEST_HG_REPO, TEST_HG_REPO_CLONE, \
 
    TEST_HG_REPO_PULL
 
from kallithea.lib.vcs.utils.compat import unittest
 

	
 

	
 
# Use only clean mercurial's ui
 
from kallithea.lib.vcs.utils.hgcompat import mercurial
 
mercurial.scmutil.rcpath()
 
if mercurial.scmutil._rcpath:
 
    mercurial.scmutil._rcpath = mercurial.scmutil._rcpath[:1]
 

	
 

	
 
class MercurialRepositoryTest(unittest.TestCase):
 

	
 
    def __check_for_existing_repo(self):
 
        if os.path.exists(TEST_HG_REPO_CLONE):
 
            self.fail('Cannot test mercurial clone repo as location %s already '
 
                      'exists. You should manually remove it first.'
 
                      % TEST_HG_REPO_CLONE)
 

	
 
    def setUp(self):
 
        self.repo = MercurialRepository(TEST_HG_REPO)
 

	
 
    def test_wrong_repo_path(self):
 
        wrong_repo_path = '/tmp/errorrepo'
 
        self.assertRaises(RepositoryError, MercurialRepository, wrong_repo_path)
 

	
 
    def test_unicode_path_repo(self):
 
        self.assertRaises(VCSError,lambda:MercurialRepository(u'iShouldFail'))
 

	
 
    def test_repo_clone(self):
 
        self.__check_for_existing_repo()
 
        repo = MercurialRepository(TEST_HG_REPO)
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE,
 
            src_url=TEST_HG_REPO, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        # Checking hashes of changesets should be enough
 
        for changeset in repo.get_changesets():
 
            raw_id = changeset.raw_id
 
            self.assertEqual(raw_id, repo_clone.get_changeset(raw_id).raw_id)
 

	
 
    def test_repo_clone_with_update(self):
 
        repo = MercurialRepository(TEST_HG_REPO)
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_w_update',
 
            src_url=TEST_HG_REPO, update_after_clone=True)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 

	
 
        #check if current workdir was updated
 
        self.assertEqual(os.path.isfile(os.path.join(TEST_HG_REPO_CLONE \
 
                                                    + '_w_update',
 
                                                    'MANIFEST.in')), True,)
 

	
 
    def test_repo_clone_without_update(self):
 
        repo = MercurialRepository(TEST_HG_REPO)
 
        repo_clone = MercurialRepository(TEST_HG_REPO_CLONE + '_wo_update',
 
            src_url=TEST_HG_REPO, update_after_clone=False)
 
        self.assertEqual(len(repo.revisions), len(repo_clone.revisions))
 
        self.assertEqual(os.path.isfile(os.path.join(TEST_HG_REPO_CLONE \
 
                                                    + '_wo_update',
 
                                                    'MANIFEST.in')), False,)
 

	
 
    def test_pull(self):
 
        if os.path.exists(TEST_HG_REPO_PULL):
 
            self.fail('Cannot test mercurial pull command as location %s '
 
                      'already exists. You should manually remove it first'
 
                      % TEST_HG_REPO_PULL)
 
        repo_new = MercurialRepository(TEST_HG_REPO_PULL, create=True)
 
        self.assertTrue(len(self.repo.revisions) > len(repo_new.revisions))
 

	
 
        repo_new.pull(self.repo.path)
 
        repo_new = MercurialRepository(TEST_HG_REPO_PULL)
 
        self.assertTrue(len(self.repo.revisions) == len(repo_new.revisions))
 

	
 
    def test_revisions(self):
 
        # there are 21 revisions at bitbucket now
 
        # so we can assume they would be available from now on
 
        subset = set(['b986218ba1c9b0d6a259fac9b050b1724ed8e545',
 
                 '3d8f361e72ab303da48d799ff1ac40d5ac37c67e',
 
                 '6cba7170863a2411822803fa77a0a264f1310b35',
 
                 '56349e29c2af3ac913b28bde9a2c6154436e615b',
 
                 '2dda4e345facb0ccff1a191052dd1606dba6781d',
 
                 '6fff84722075f1607a30f436523403845f84cd9e',
 
                 '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
 
                 '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
 
                 'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
 
                 'be90031137367893f1c406e0a8683010fd115b79',
 
                 'db8e58be770518cbb2b1cdfa69146e47cd481481',
 
                 '84478366594b424af694a6c784cb991a16b87c21',
 
                 '17f8e105dddb9f339600389c6dc7175d395a535c',
 
                 '20a662e756499bde3095ffc9bc0643d1def2d0eb',
 
                 '2e319b85e70a707bba0beff866d9f9de032aa4f9',
 
                 '786facd2c61deb9cf91e9534735124fb8fc11842',
 
                 '94593d2128d38210a2fcd1aabff6dda0d6d9edf8',
 
                 'aa6a0de05b7612707db567078e130a6cd114a9a7',
 
                 'eada5a770da98ab0dd7325e29d00e0714f228d09'
 
                ])
 
        self.assertTrue(subset.issubset(set(self.repo.revisions)))
 

	
 

	
 
        # check if we have the proper order of revisions
 
        org = ['b986218ba1c9b0d6a259fac9b050b1724ed8e545',
 
                '3d8f361e72ab303da48d799ff1ac40d5ac37c67e',
 
                '6cba7170863a2411822803fa77a0a264f1310b35',
 
                '56349e29c2af3ac913b28bde9a2c6154436e615b',
 
                '2dda4e345facb0ccff1a191052dd1606dba6781d',
 
                '6fff84722075f1607a30f436523403845f84cd9e',
 
                '7d4bc8ec6be56c0f10425afb40b6fc315a4c25e7',
 
                '3803844fdbd3b711175fc3da9bdacfcd6d29a6fb',
 
                'dc5d2c0661b61928834a785d3e64a3f80d3aad9c',
 
                'be90031137367893f1c406e0a8683010fd115b79',
 
                'db8e58be770518cbb2b1cdfa69146e47cd481481',
 
                '84478366594b424af694a6c784cb991a16b87c21',
 
                '17f8e105dddb9f339600389c6dc7175d395a535c',
 
                '20a662e756499bde3095ffc9bc0643d1def2d0eb',
 
                '2e319b85e70a707bba0beff866d9f9de032aa4f9',
 
                '786facd2c61deb9cf91e9534735124fb8fc11842',
 
                '94593d2128d38210a2fcd1aabff6dda0d6d9edf8',
 
                'aa6a0de05b7612707db567078e130a6cd114a9a7',
 
                'eada5a770da98ab0dd7325e29d00e0714f228d09',
 
                '2c1885c735575ca478bf9e17b0029dca68824458',
 
                'd9bcd465040bf869799b09ad732c04e0eea99fe9',
 
                '469e9c847fe1f6f7a697b8b25b4bc5b48780c1a7',
 
                '4fb8326d78e5120da2c7468dcf7098997be385da',
 
                '62b4a097164940bd66030c4db51687f3ec035eed',
 
                '536c1a19428381cfea92ac44985304f6a8049569',
 
                '965e8ab3c44b070cdaa5bf727ddef0ada980ecc4',
 
                '9bb326a04ae5d98d437dece54be04f830cf1edd9',
 
                'f8940bcb890a98c4702319fbe36db75ea309b475',
 
                'ff5ab059786ebc7411e559a2cc309dfae3625a3b',
 
                '6b6ad5f82ad5bb6190037671bd254bd4e1f4bf08',
 
                'ee87846a61c12153b51543bf860e1026c6d3dcba', ]
 
        self.assertEqual(org, self.repo.revisions[:31])
 

	
 
    def test_iter_slice(self):
 
        sliced = list(self.repo[:10])
 
        itered = list(self.repo)[:10]
 
        self.assertEqual(sliced, itered)
 

	
 
    def test_slicing(self):
 
        #4 1 5 10 95
 
        for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
 
                                 (10, 20, 10), (5, 100, 95)]:
 
            revs = list(self.repo[sfrom:sto])
 
            self.assertEqual(len(revs), size)
 
            self.assertEqual(revs[0], self.repo.get_changeset(sfrom))
 
            self.assertEqual(revs[-1], self.repo.get_changeset(sto - 1))
 

	
 
    def test_branches(self):
 
        # TODO: Need more tests here
 

	
 
        #active branches
 
        self.assertTrue('default' in self.repo.branches)
 
        self.assertTrue('stable' in self.repo.branches)
 

	
 
        # closed
 
        self.assertTrue('git' in self.repo._get_branches(closed=True))
 
        self.assertTrue('web' in self.repo._get_branches(closed=True))
 

	
 
        for name, id in self.repo.branches.items():
 
            self.assertTrue(isinstance(
 
                self.repo.get_changeset(id), MercurialChangeset))
 

	
 
    def test_tip_in_tags(self):
 
        # tip is always a tag
 
        self.assertIn('tip', self.repo.tags)
 

	
 
    def test_tip_changeset_in_tags(self):
 
        tip = self.repo.get_changeset()
 
        self.assertEqual(self.repo.tags['tip'], tip.raw_id)
 

	
 
    def test_initial_changeset(self):
 

	
 
        init_chset = self.repo.get_changeset(0)
 
        self.assertEqual(init_chset.message, 'initial import')
 
        self.assertEqual(init_chset.author,
 
            'Marcin Kuzminski <marcin@python-blog.com>')
 
        self.assertEqual(sorted(init_chset._file_paths),
 
            sorted([
 
                'vcs/__init__.py',
 
                'vcs/backends/BaseRepository.py',
 
                'vcs/backends/__init__.py',
 
            ])
 
        )
 
        self.assertEqual(sorted(init_chset._dir_paths),
 
            sorted(['', 'vcs', 'vcs/backends']))
 

	
 
        self.assertRaises(NodeDoesNotExistError, init_chset.get_node, path='foobar')
 

	
 
        node = init_chset.get_node('vcs/')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.DIR)
 

	
 
        node = init_chset.get_node('vcs')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.DIR)
 

	
 
        node = init_chset.get_node('vcs/__init__.py')
 
        self.assertTrue(hasattr(node, 'kind'))
 
        self.assertEqual(node.kind, NodeKind.FILE)
 

	
 
    def test_not_existing_changeset(self):
 
        #rawid
 
        self.assertRaises(RepositoryError, self.repo.get_changeset,
 
            'abcd' * 10)
 
        #shortid
 
        self.assertRaises(RepositoryError, self.repo.get_changeset,
 
            'erro' * 4)
 
        #numeric
 
        self.assertRaises(RepositoryError, self.repo.get_changeset,
 
            self.repo.count() + 1)
 

	
 

	
 
        # Small chance we ever get to this one
 
        revision = pow(2, 30)
 
        self.assertRaises(RepositoryError, self.repo.get_changeset, revision)
 

	
 
    def test_changeset10(self):
 

	
 
        chset10 = self.repo.get_changeset(10)
 
        README = """===
 
VCS
 
===
 

	
 
Various Version Control System management abstraction layer for Python.
 

	
 
Introduction
 
------------
 

	
 
TODO: To be written...
 

	
 
"""
 
        node = chset10.get_node('README.rst')
 
        self.assertEqual(node.kind, NodeKind.FILE)
 
        self.assertEqual(node.content, README)
 

	
 

	
 
class MercurialChangesetTest(unittest.TestCase):
 

	
 
    def setUp(self):
 
        self.repo = MercurialRepository(TEST_HG_REPO)
 

	
 
    def _test_equality(self, changeset):
 
        revision = changeset.revision
 
        self.assertEqual(changeset, self.repo.get_changeset(revision))
 

	
 
    def test_equality(self):
 
        self.setUp()
 
        revs = [0, 10, 20]
 
        changesets = [self.repo.get_changeset(rev) for rev in revs]
 
        for changeset in changesets:
 
            self._test_equality(changeset)
 

	
 
    def test_default_changeset(self):
 
        tip = self.repo.get_changeset('tip')
 
        self.assertEqual(tip, self.repo.get_changeset())
 
        self.assertEqual(tip, self.repo.get_changeset(revision=None))
 
        self.assertEqual(tip, list(self.repo[-1:])[0])
 

	
 
    def test_root_node(self):
 
        tip = self.repo.get_changeset('tip')
 
        self.assertTrue(tip.root is tip.get_node(''))
 

	
 
    def test_lazy_fetch(self):
 
        """
 
        Test if changeset's nodes expands and are cached as we walk through
 
        the revision. This test is somewhat hard to write as order of tests
 
        is a key here. Written by running command after command in a shell.
 
        """
 
        self.setUp()
 
        chset = self.repo.get_changeset(45)
 
        self.assertTrue(len(chset.nodes) == 0)
 
        root = chset.root
 
        self.assertTrue(len(chset.nodes) == 1)
 
        self.assertTrue(len(root.nodes) == 8)
 
        # accessing root.nodes updates chset.nodes
 
        self.assertTrue(len(chset.nodes) == 9)
 

	
 
        docs = root.get_node('docs')
 
        # we haven't yet accessed anything new as docs dir was already cached
 
        self.assertTrue(len(chset.nodes) == 9)
 
        self.assertTrue(len(docs.nodes) == 8)
 
        # accessing docs.nodes updates chset.nodes
 
        self.assertTrue(len(chset.nodes) == 17)
 

	
 
        self.assertTrue(docs is chset.get_node('docs'))
 
        self.assertTrue(docs is root.nodes[0])
 
        self.assertTrue(docs is root.dirs[0])
 
        self.assertTrue(docs is chset.get_node('docs'))
 

	
 
    def test_nodes_with_changeset(self):
 
        self.setUp()
 
        chset = self.repo.get_changeset(45)
 
        root = chset.root
 
        docs = root.get_node('docs')
 
        self.assertTrue(docs is chset.get_node('docs'))
 
        api = docs.get_node('api')
 
        self.assertTrue(api is chset.get_node('docs/api'))
 
        index = api.get_node('index.rst')
 
        self.assertTrue(index is chset.get_node('docs/api/index.rst'))
 
        self.assertTrue(index is chset.get_node('docs')\
 
            .get_node('api')\
 
            .get_node('index.rst'))
 

	
 
    def test_branch_and_tags(self):
 
        chset0 = self.repo.get_changeset(0)
 
        self.assertEqual(chset0.branch, 'default')
 
        self.assertEqual(chset0.tags, [])
 

	
 
        chset10 = self.repo.get_changeset(10)
 
        self.assertEqual(chset10.branch, 'default')
 
        self.assertEqual(chset10.tags, [])
 

	
 
        chset44 = self.repo.get_changeset(44)
 
        self.assertEqual(chset44.branch, 'web')
 

	
 
        tip = self.repo.get_changeset('tip')
 
        self.assertTrue('tip' in tip.tags)
 

	
 
    def _test_file_size(self, revision, path, size):
 
        node = self.repo.get_changeset(revision).get_node(path)
 
        self.assertTrue(node.is_file())
 
        self.assertEqual(node.size, size)
 

	
 
    def test_file_size(self):
 
        to_check = (
 
            (10, 'setup.py', 1068),
 
            (20, 'setup.py', 1106),
 
            (60, 'setup.py', 1074),
 

	
 
            (10, 'vcs/backends/base.py', 2921),
 
            (20, 'vcs/backends/base.py', 3936),
 
            (60, 'vcs/backends/base.py', 6189),
 
        )
 
        for revision, path, size in to_check:
 
            self._test_file_size(revision, path, size)
 

	
 
    def _test_dir_size(self, revision, path, size):
 
        node = self.repo.get_changeset(revision).get_node(path)
 
        self.assertFalse(node.is_file())
 
        self.assertEqual(node.size, size)
 

	
 
    def test_dir_size(self):
 
        to_check = (
 
            ('96507bd11ecc', '/', 682421),
 
            ('a53d9201d4bc', '/', 682410),
 
            ('90243de06161', '/', 682006),
 
        )
 
        for revision, path, size in to_check:
 
            self._test_dir_size(revision, path, size)
 

	
 
    def test_repo_size(self):
 
        self.assertEqual(self.repo.size, 1042958) # FIXME
 
        self.assertEqual(self.repo.size, 682421)
 

	
 
    def test_file_history(self):
 
        # we can only check if those revisions are present in the history
 
        # as we cannot update this test every time file is changed
 
        files = {
 
            'setup.py': [7, 18, 45, 46, 47, 69, 77],
 
            'vcs/nodes.py': [7, 8, 24, 26, 30, 45, 47, 49, 56, 57, 58, 59, 60,
 
                61, 73, 76],
 
            'vcs/backends/hg.py': [4, 5, 6, 11, 12, 13, 14, 15, 16, 21, 22, 23,
 
                26, 27, 28, 30, 31, 33, 35, 36, 37, 38, 39, 40, 41, 44, 45, 47,
 
                48, 49, 53, 54, 55, 58, 60, 61, 67, 68, 69, 70, 73, 77, 78, 79,
 
                82],
 
        }
 
        for path, revs in files.items():
 
            tip = self.repo.get_changeset(revs[-1])
 
            node = tip.get_node(path)
 
            node_revs = [chset.revision for chset in node.history]
 
            self.assertTrue(set(revs).issubset(set(node_revs)),
 
                "We assumed that %s is subset of revisions for which file %s "
 
                "has been changed, and history of that node returned: %s"
 
                % (revs, path, node_revs))
 

	
 
    def test_file_annotate(self):
 
        files = {
 
                 'vcs/backends/__init__.py':
 
                  {89: {'lines_no': 31,
 
                        'changesets': [32, 32, 61, 32, 32, 37, 32, 32, 32, 44,
 
                                       37, 37, 37, 37, 45, 37, 44, 37, 37, 37,
 
                                       32, 32, 32, 32, 37, 32, 37, 37, 32,
 
                                       32, 32]},
 
                   20: {'lines_no': 1,
 
                        'changesets': [4]},
 
                   55: {'lines_no': 31,
 
                        'changesets': [32, 32, 45, 32, 32, 37, 32, 32, 32, 44,
 
                                       37, 37, 37, 37, 45, 37, 44, 37, 37, 37,
 
                                       32, 32, 32, 32, 37, 32, 37, 37, 32,
 
                                       32, 32]}},
 
                 'vcs/exceptions.py':
 
                 {89: {'lines_no': 18,
 
                       'changesets': [16, 16, 16, 16, 16, 16, 16, 16, 16, 16,
 
                                      16, 16, 17, 16, 16, 18, 18, 18]},
 
                  20: {'lines_no': 18,
 
                       'changesets': [16, 16, 16, 16, 16, 16, 16, 16, 16, 16,
 
                                      16, 16, 17, 16, 16, 18, 18, 18]},
 
                  55: {'lines_no': 18, 'changesets': [16, 16, 16, 16, 16, 16,
 
                                                      16, 16, 16, 16, 16, 16,
 
                                                      17, 16, 16, 18, 18, 18]}},
 
                 'MANIFEST.in': {89: {'lines_no': 5,
 
                                      'changesets': [7, 7, 7, 71, 71]},
 
                                 20: {'lines_no': 3,
 
                                      'changesets': [7, 7, 7]},
 
                                 55: {'lines_no': 3,
 
                                     'changesets': [7, 7, 7]}}}
 

	
 
        for fname, revision_dict in files.items():
 
            for rev, data in revision_dict.items():
 
                cs = self.repo.get_changeset(rev)
 
                l1_1 = [x[1] for x in cs.get_file_annotate(fname)]
 
                l1_2 = [x[2]().raw_id for x in cs.get_file_annotate(fname)]
 
                self.assertEqual(l1_1, l1_2)
 
                l1 = l1_2 = [x[2]().revision for x in cs.get_file_annotate(fname)]
 
                l2 = files[fname][rev]['changesets']
 
                self.assertTrue(l1 == l2 , "The lists of revision for %s@rev%s"
 
                                "from annotation list should match each other,"
 
                                "got \n%s \nvs \n%s " % (fname, rev, l1, l2))
 

	
 
    def test_changeset_state(self):
 
        """
 
        Tests which files have been added/changed/removed at particular revision
 
        """
 

	
 
        # rev 46ad32a4f974:
 
        # hg st --rev 46ad32a4f974
 
        #    changed: 13
 
        #    added:   20
 
        #    removed: 1
 
        changed = set(['.hgignore'
 
            , 'README.rst' , 'docs/conf.py' , 'docs/index.rst' , 'setup.py'
 
            , 'tests/test_hg.py' , 'tests/test_nodes.py' , 'vcs/__init__.py'
 
            , 'vcs/backends/__init__.py' , 'vcs/backends/base.py'
 
            , 'vcs/backends/hg.py' , 'vcs/nodes.py' , 'vcs/utils/__init__.py'])
 

	
 
        added = set(['docs/api/backends/hg.rst'
 
            , 'docs/api/backends/index.rst' , 'docs/api/index.rst'
 
            , 'docs/api/nodes.rst' , 'docs/api/web/index.rst'
 
            , 'docs/api/web/simplevcs.rst' , 'docs/installation.rst'
 
            , 'docs/quickstart.rst' , 'setup.cfg' , 'vcs/utils/baseui_config.py'
 
            , 'vcs/utils/web.py' , 'vcs/web/__init__.py' , 'vcs/web/exceptions.py'
 
            , 'vcs/web/simplevcs/__init__.py' , 'vcs/web/simplevcs/exceptions.py'
 
            , 'vcs/web/simplevcs/middleware.py' , 'vcs/web/simplevcs/models.py'
 
            , 'vcs/web/simplevcs/settings.py' , 'vcs/web/simplevcs/utils.py'
 
            , 'vcs/web/simplevcs/views.py'])
 

	
 
        removed = set(['docs/api.rst'])
 

	
 
        chset64 = self.repo.get_changeset('46ad32a4f974')
 
        self.assertEqual(set((node.path for node in chset64.added)), added)
 
        self.assertEqual(set((node.path for node in chset64.changed)), changed)
 
        self.assertEqual(set((node.path for node in chset64.removed)), removed)
 

	
 
        # rev b090f22d27d6:
 
        # hg st --rev b090f22d27d6
 
        #    changed: 13
 
        #    added:   20
 
        #    removed: 1
 
        chset88 = self.repo.get_changeset('b090f22d27d6')
 
        self.assertEqual(set((node.path for node in chset88.added)), set())
 
        self.assertEqual(set((node.path for node in chset88.changed)),
 
            set(['.hgignore']))
 
        self.assertEqual(set((node.path for node in chset88.removed)), set())
 
#
 
        # 85:
 
        #    added:   2 ['vcs/utils/diffs.py', 'vcs/web/simplevcs/views/diffs.py']
 
        #    changed: 4 ['vcs/web/simplevcs/models.py', ...]
 
        #    removed: 1 ['vcs/utils/web.py']
 
        chset85 = self.repo.get_changeset(85)
 
        self.assertEqual(set((node.path for node in chset85.added)), set([
 
            'vcs/utils/diffs.py',
 
            'vcs/web/simplevcs/views/diffs.py']))
 
        self.assertEqual(set((node.path for node in chset85.changed)), set([
 
            'vcs/web/simplevcs/models.py',
 
            'vcs/web/simplevcs/utils.py',
 
            'vcs/web/simplevcs/views/__init__.py',
 
            'vcs/web/simplevcs/views/repository.py',
 
            ]))
 
        self.assertEqual(set((node.path for node in chset85.removed)),
 
            set(['vcs/utils/web.py']))
 

	
 

	
 
    def test_files_state(self):
 
        """
 
        Tests state of FileNodes.
 
        """
 
        chset = self.repo.get_changeset(85)
 
        node = chset.get_node('vcs/utils/diffs.py')
 
        self.assertTrue(node.state, NodeState.ADDED)
 
        self.assertTrue(node.added)
 
        self.assertFalse(node.changed)
 
        self.assertFalse(node.not_changed)
 
        self.assertFalse(node.removed)
 

	
 
        chset = self.repo.get_changeset(88)
 
        node = chset.get_node('.hgignore')
 
        self.assertTrue(node.state, NodeState.CHANGED)
 
        self.assertFalse(node.added)
 
        self.assertTrue(node.changed)
 
        self.assertFalse(node.not_changed)
 
        self.assertFalse(node.removed)
 

	
 
        chset = self.repo.get_changeset(85)
 
        node = chset.get_node('setup.py')
 
        self.assertTrue(node.state, NodeState.NOT_CHANGED)
 
        self.assertFalse(node.added)
 
        self.assertFalse(node.changed)
 
        self.assertTrue(node.not_changed)
 
        self.assertFalse(node.removed)
 

	
 
        # If node has REMOVED state then trying to fetch it would raise
 
        # ChangesetError exception
 
        chset = self.repo.get_changeset(2)
 
        path = 'vcs/backends/BaseRepository.py'
 
        self.assertRaises(NodeDoesNotExistError, chset.get_node, path)
 
        # but it would be one of ``removed`` (changeset's attribute)
 
        self.assertTrue(path in [rf.path for rf in chset.removed])
 

	
 
    def test_commit_message_is_unicode(self):
 
        for cm in self.repo:
 
            self.assertEqual(type(cm.message), unicode)
 

	
 
    def test_changeset_author_is_unicode(self):
 
        for cm in self.repo:
 
            self.assertEqual(type(cm.author), unicode)
 

	
 
    def test_repo_files_content_is_unicode(self):
 
        test_changeset = self.repo.get_changeset(100)
 
        for node in test_changeset.get_node('/'):
 
            if node.is_file():
 
                self.assertEqual(type(node.content), unicode)
 

	
 
    def test_wrong_path(self):
 
        # There is 'setup.py' in the root dir but not there:
 
        path = 'foo/bar/setup.py'
 
        self.assertRaises(VCSError, self.repo.get_changeset().get_node, path)
 

	
 

	
 
    def test_archival_file(self):
 
        #TODO:
 
        pass
 

	
 
    def test_archival_as_generator(self):
 
        #TODO:
 
        pass
 

	
 
    def test_archival_wrong_kind(self):
 
        tip = self.repo.get_changeset()
 
        self.assertRaises(VCSError, tip.fill_archive, kind='error')
 

	
 
    def test_archival_empty_prefix(self):
 
        #TODO:
 
        pass
 

	
 

	
 
    def test_author_email(self):
 
        self.assertEqual('marcin@python-blog.com',
 
                         self.repo.get_changeset('b986218ba1c9').author_email)
 
        self.assertEqual('lukasz.balcerzak@python-center.pl',
 
                         self.repo.get_changeset('3803844fdbd3').author_email)
 
        self.assertEqual('',
 
                         self.repo.get_changeset('84478366594b').author_email)
 

	
 
    def test_author_username(self):
 
        self.assertEqual('Marcin Kuzminski',
 
                         self.repo.get_changeset('b986218ba1c9').author_name)
 
        self.assertEqual('Lukasz Balcerzak',
 
                         self.repo.get_changeset('3803844fdbd3').author_name)
 
        self.assertEqual('marcink',
 
                         self.repo.get_changeset('84478366594b').author_name)
0 comments (0 inline, 0 general)