Changeset - b24b1f0fa505
[Not reviewed]
beta
0 1 0
Marcin Kuzminski - 13 years ago 2012-07-02 21:13:44
marcin@python-works.com
Get tags and branches using _parsed_refs
1 file changed with 13 insertions and 10 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/vcs/backends/git/repository.py
Show inline comments
 
@@ -113,105 +113,109 @@ class GitRepository(BaseRepository):
 
            p = Popen(cmd, **opts)
 
        except OSError, err:
 
            raise RepositoryError("Couldn't run git command (%s).\n"
 
                "Original error was:%s" % (cmd, err))
 
        so, se = p.communicate()
 
        if not se.startswith("fatal: bad default revision 'HEAD'") and \
 
            p.returncode != 0:
 
            raise RepositoryError("Couldn't run git command (%s).\n"
 
                "stderr:\n%s" % (cmd, se))
 
        return so, se
 

	
 
    def _check_url(self, url):
 
        """
 
        Functon will check given url and try to verify if it's a valid
 
        link. Sometimes it may happened that mercurial will issue basic
 
        auth request that can cause whole API to hang when used from python
 
        or other external calls.
 

	
 
        On failures it'll raise urllib2.HTTPError
 
        """
 

	
 
        #TODO: implement this
 
        pass
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False,
 
            bare=False):
 
        if create and os.path.exists(self.path):
 
            raise RepositoryError("Location already exist")
 
        if src_url and not create:
 
            raise RepositoryError("Create should be set to True if src_url is "
 
                                  "given (clone operation creates repository)")
 
        try:
 
            if create and src_url:
 
                self._check_url(src_url)
 
                self.clone(src_url, update_after_clone, bare)
 
                return Repo(self.path)
 
            elif create:
 
                os.mkdir(self.path)
 
                if bare:
 
                    return Repo.init_bare(self.path)
 
                else:
 
                    return Repo.init(self.path)
 
            else:
 
                return Repo(self.path)
 
        except (NotGitRepository, OSError), err:
 
            raise RepositoryError(err)
 

	
 
    def _get_all_revisions(self):
 
        cmd = 'rev-list --all --date-order'
 
        cmd = 'rev-list --all --reverse --date-order'
 
        try:
 
            so, se = self.run_git_command(cmd)
 
        except RepositoryError:
 
            # Can be raised for empty repositories
 
            return []
 
        revisions = so.splitlines()
 
        revisions.reverse()
 
        return revisions
 
        return so.splitlines()
 

	
 
    def _get_all_revisions2(self):
 
        #alternate implementation using dulwich
 
        includes = [x[1][0] for x in self._parsed_refs.iteritems()
 
                    if x[1][1] != 'T']
 
        return [c.commit.id for c in self._repo.get_walker(include=includes)]
 

	
 
    def _get_revision(self, revision):
 
        """
 
        For git backend we always return integer here. This way we ensure
 
        that changset's revision attribute would become integer.
 
        """
 
        pattern = re.compile(r'^[[0-9a-fA-F]{12}|[0-9a-fA-F]{40}]$')
 
        is_bstr = lambda o: isinstance(o, (str, unicode))
 
        is_null = lambda o: len(o) == revision.count('0')
 

	
 
        if len(self.revisions) == 0:
 
            raise EmptyRepositoryError("There are no changesets yet")
 

	
 
        if revision in (None, '', 'tip', 'HEAD', 'head', -1):
 
            revision = self.revisions[-1]
 

	
 
        if ((is_bstr(revision) and revision.isdigit() and len(revision) < 12)
 
            or isinstance(revision, int) or is_null(revision)):
 
            try:
 
                revision = self.revisions[int(revision)]
 
            except:
 
                raise ChangesetDoesNotExistError("Revision %r does not exist "
 
                    "for this repository %s" % (revision, self))
 

	
 
        elif is_bstr(revision):
 
            _ref_revision = self._parsed_refs.get(revision)
 
            if _ref_revision:  # and _ref_revision[1] in ['H', 'RH', 'T']:
 
                return _ref_revision[0]
 

	
 
            elif not pattern.match(revision) or revision not in self.revisions:
 
                raise ChangesetDoesNotExistError("Revision %r does not exist "
 
                    "for this repository %s" % (revision, self))
 

	
 
        # Ensure we return full id
 
        if not pattern.match(str(revision)):
 
            raise ChangesetDoesNotExistError("Given revision %r not recognized"
 
                % revision)
 
        return revision
 

	
 
    def _get_archives(self, archive_name='tip'):
 

	
 
        for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
 
                yield {"type": i[0], "extension": i[1], "node": archive_name}
 

	
 
    def _get_url(self, url):
 
        """
 
        Returns normalized url. If schema is not given, would fall to
 
        filesystem (``file:///``) schema.
 
@@ -219,113 +223,112 @@ class GitRepository(BaseRepository):
 
        url = str(url)
 
        if url != 'default' and not '://' in url:
 
            url = ':///'.join(('file', url))
 
        return url
 

	
 
    @LazyProperty
 
    def name(self):
 
        return os.path.basename(self.path)
 

	
 
    @LazyProperty
 
    def last_change(self):
 
        """
 
        Returns last change made on this repository as datetime object
 
        """
 
        return date_fromtimestamp(self._get_mtime(), makedate()[1])
 

	
 
    def _get_mtime(self):
 
        try:
 
            return time.mktime(self.get_changeset().date.timetuple())
 
        except RepositoryError:
 
            idx_loc = '' if self.bare else '.git'
 
            # fallback to filesystem
 
            in_path = os.path.join(self.path, idx_loc, "index")
 
            he_path = os.path.join(self.path, idx_loc, "HEAD")
 
            if os.path.exists(in_path):
 
                return os.stat(in_path).st_mtime
 
            else:
 
                return os.stat(he_path).st_mtime
 

	
 
    @LazyProperty
 
    def description(self):
 
        idx_loc = '' if self.bare else '.git'
 
        undefined_description = u'unknown'
 
        description_path = os.path.join(self.path, idx_loc, 'description')
 
        if os.path.isfile(description_path):
 
            return safe_unicode(open(description_path).read())
 
        else:
 
            return undefined_description
 

	
 
    @LazyProperty
 
    def contact(self):
 
        undefined_contact = u'Unknown'
 
        return undefined_contact
 

	
 
    @property
 
    def branches(self):
 
        if not self.revisions:
 
            return {}
 
        refs = self._repo.refs.as_dict()
 
        sortkey = lambda ctx: ctx[0]
 
        _branches = [('/'.join(ref.split('/')[2:]), head)
 
            for ref, head in refs.items()
 
            if ref.startswith('refs/heads/') and not ref.endswith('/HEAD')]
 
        _branches = [(x[0], x[1][0])
 
                     for x in self._parsed_refs.iteritems() if x[1][1] == 'H']
 
        return OrderedDict(sorted(_branches, key=sortkey, reverse=False))
 

	
 
    @LazyProperty
 
    def tags(self):
 
        return self._get_tags()
 

	
 
    def _get_tags(self):
 
        if not self.revisions:
 
            return {}
 

	
 
        sortkey = lambda ctx: ctx[0]
 
        _tags = [('/'.join(ref.split('/')[2:]), head) for ref, head in
 
            self._repo.get_refs().items() if ref.startswith('refs/tags/')]
 
        _tags = [(x[0], x[1][0])
 
                 for x in self._parsed_refs.iteritems() if x[1][1] == 'T']
 
        return OrderedDict(sorted(_tags, key=sortkey, reverse=True))
 

	
 
    def tag(self, name, user, revision=None, message=None, date=None,
 
            **kwargs):
 
        """
 
        Creates and returns a tag for the given ``revision``.
 

	
 
        :param name: name for new tag
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param revision: changeset id for which new tag would be created
 
        :param message: message of the tag's commit
 
        :param date: date of tag's commit
 

	
 
        :raises TagAlreadyExistError: if tag with same name already exists
 
        """
 
        if name in self.tags:
 
            raise TagAlreadyExistError("Tag %s already exists" % name)
 
        changeset = self.get_changeset(revision)
 
        message = message or "Added tag %s for commit %s" % (name,
 
            changeset.raw_id)
 
        self._repo.refs["refs/tags/%s" % name] = changeset._commit.id
 

	
 
        self.tags = self._get_tags()
 
        return changeset
 

	
 
    def remove_tag(self, name, user, message=None, date=None):
 
        """
 
        Removes tag with the given ``name``.
 

	
 
        :param name: name of the tag to be removed
 
        :param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
 
        :param message: message of the tag's removal commit
 
        :param date: date of tag's removal commit
 

	
 
        :raises TagDoesNotExistError: if tag with given name does not exists
 
        """
 
        if name not in self.tags:
 
            raise TagDoesNotExistError("Tag %s does not exist" % name)
 
        tagpath = posixpath.join(self._repo.refs.path, 'refs', 'tags', name)
 
        try:
 
            os.remove(tagpath)
 
            self.tags = self._get_tags()
 
        except OSError, e:
 
            raise RepositoryError(e.strerror)
 

	
 
    @LazyProperty
 
    def _parsed_refs(self):
 
        refs = self._repo.get_refs()
0 comments (0 inline, 0 general)