Changeset - 182f46d62ab4
[Not reviewed]
default
0 3 0
Mads Kiilerich - 8 years ago 2017-05-30 02:59:45
mads@kiilerich.com
repository: fix crash when forking repositories with unicode names
3 files changed with 26 insertions and 5 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/vcs/backends/git/repository.py
Show inline comments
 
@@ -9,49 +9,49 @@
 
    :copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
 
"""
 

	
 
import os
 
import re
 
import time
 
import errno
 
import urllib
 
import urllib2
 
import logging
 
import posixpath
 

	
 
from dulwich.objects import Tag
 
from dulwich.repo import Repo, NotGitRepository
 
from dulwich.config import ConfigFile
 

	
 
from kallithea.lib.vcs import subprocessio
 
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
 
from kallithea.lib.vcs.conf import settings
 

	
 
from kallithea.lib.vcs.exceptions import (
 
    BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError,
 
    RepositoryError, TagAlreadyExistError, TagDoesNotExistError
 
)
 
from kallithea.lib.vcs.utils import safe_unicode, makedate, date_fromtimestamp
 
from kallithea.lib.vcs.utils import safe_str, safe_unicode, makedate, date_fromtimestamp
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.ordered_dict import OrderedDict
 
from kallithea.lib.vcs.utils.paths import abspath, get_user_home
 

	
 
from kallithea.lib.vcs.utils.hgcompat import (
 
    hg_url, httpbasicauthhandler, httpdigestauthhandler
 
)
 

	
 
from .changeset import GitChangeset
 
from .inmemory import GitInMemoryChangeset
 
from .workdir import GitWorkdir
 

	
 
SHA_PATTERN = re.compile(r'^[[0-9a-fA-F]{12}|[0-9a-fA-F]{40}]$')
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class GitRepository(BaseRepository):
 
    """
 
    Git repository backend.
 
    """
 
    DEFAULT_BRANCH_NAME = 'master'
 
    scm = 'git'
 

	
 
@@ -307,49 +307,49 @@ class GitRepository(BaseRepository):
 

	
 
        # Ensure we return full id
 
        if not SHA_PATTERN.match(str(revision)):
 
            raise ChangesetDoesNotExistError("Given revision %s not recognized"
 
                % revision)
 
        return revision
 

	
 
    def get_ref_revision(self, ref_type, ref_name):
 
        """
 
        Returns ``MercurialChangeset`` object representing repository's
 
        changeset at the given ``revision``.
 
        """
 
        return self._get_revision(ref_name)
 

	
 
    def _get_archives(self, archive_name='tip'):
 

	
 
        for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
 
                yield {"type": i[0], "extension": i[1], "node": archive_name}
 

	
 
    def _get_url(self, url):
 
        """
 
        Returns normalized url. If schema is not given, would fall to
 
        filesystem (``file:///``) schema.
 
        """
 
        url = str(url)
 
        url = safe_str(url)
 
        if url != 'default' and not '://' in url:
 
            url = ':///'.join(('file', url))
 
        return url
 

	
 
    def get_hook_location(self):
 
        """
 
        returns absolute path to location where hooks are stored
 
        """
 
        loc = os.path.join(self.path, 'hooks')
 
        if not self.bare:
 
            loc = os.path.join(self.path, '.git', 'hooks')
 
        return loc
 

	
 
    @LazyProperty
 
    def name(self):
 
        return os.path.basename(self.path)
 

	
 
    @LazyProperty
 
    def last_change(self):
 
        """
 
        Returns last change made on this repository as datetime object
 
        """
 
        return date_fromtimestamp(self._get_mtime(), makedate()[1])
 

	
kallithea/lib/vcs/backends/hg/repository.py
Show inline comments
 
@@ -328,49 +328,49 @@ class MercurialRepository(BaseRepository
 
        if not url_prefix: # skip svn+http://... (and git+... too)
 
            # now check if it's a proper hg repo
 
            try:
 
                httppeer(repoui or ui.ui(), url).lookup('tip')
 
            except Exception as e:
 
                raise urllib2.URLError(
 
                    "url [%s] does not look like an hg repo org_exc: %s"
 
                    % (cleaned_uri, e))
 

	
 
        return True
 

	
 
    def _get_repo(self, create, src_url=None, update_after_clone=False):
 
        """
 
        Function will check for mercurial repository in given path and return
 
        a localrepo object. If there is no repository in that path it will
 
        raise an exception unless ``create`` parameter is set to True - in
 
        that case repository would be created and returned.
 
        If ``src_url`` is given, would try to clone repository from the
 
        location at given clone_point. Additionally it'll make update to
 
        working copy accordingly to ``update_after_clone`` flag
 
        """
 

	
 
        try:
 
            if src_url:
 
                url = str(self._get_url(src_url))
 
                url = safe_str(self._get_url(src_url))
 
                opts = {}
 
                if not update_after_clone:
 
                    opts.update({'noupdate': True})
 
                MercurialRepository._check_url(url, self.baseui)
 
                clone(self.baseui, url, self.path, **opts)
 

	
 
                # Don't try to create if we've already cloned repo
 
                create = False
 
            return localrepository(self.baseui, self.path, create=create)
 
        except (Abort, RepoError) as err:
 
            if create:
 
                msg = "Cannot create repository at %s. Original error was %s" \
 
                    % (self.path, err)
 
            else:
 
                msg = "Not valid repository at %s. Original error was %s" \
 
                    % (self.path, err)
 
            raise RepositoryError(msg)
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        return MercurialInMemoryChangeset(self)
 

	
 
    @LazyProperty
 
    def description(self):
 
@@ -460,49 +460,49 @@ class MercurialRepository(BaseRepository
 
            except AttributeError:
 
                # removed in hg 3.2
 
                revision = revs[-1]
 
        else:
 
            # TODO: just report 'not found'?
 
            revision = ref_name
 

	
 
        return self._get_revision(revision)
 

	
 
    def _get_archives(self, archive_name='tip'):
 
        allowed = self.baseui.configlist("web", "allow_archive",
 
                                         untrusted=True)
 
        for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
 
            if i[0] in allowed or self._repo.ui.configbool("web",
 
                                                           "allow" + i[0],
 
                                                           untrusted=True):
 
                yield {"type": i[0], "extension": i[1], "node": archive_name}
 

	
 
    def _get_url(self, url):
 
        """
 
        Returns normalized url. If schema is not given, would fall
 
        to filesystem
 
        (``file:///``) schema.
 
        """
 
        url = str(url)
 
        url = safe_str(url)
 
        if url != 'default' and not '://' in url:
 
            url = "file:" + urllib.pathname2url(url)
 
        return url
 

	
 
    def get_hook_location(self):
 
        """
 
        returns absolute path to location where hooks are stored
 
        """
 
        return os.path.join(self.path, '.hg', '.hgrc')
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns ``MercurialChangeset`` object representing repository's
 
        changeset at the given ``revision``.
 
        """
 
        revision = self._get_revision(revision)
 
        changeset = MercurialChangeset(repository=self, revision=revision)
 
        return changeset
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
 
                       end_date=None, branch_name=None, reverse=False):
 
        """
 
        Returns iterator of ``MercurialChangeset`` objects from start to end
 
        (both are inclusive)
kallithea/tests/functional/test_forks.py
Show inline comments
 
@@ -140,49 +140,70 @@ class _BaseTestCase(TestController):
 

	
 
        # create a fork
 
        repo_name = self.REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        fork_name = safe_str(self.REPO_FORK + u'-rødgrød')
 
        creation_args = {
 
            'repo_name': fork_name,
 
            'repo_group': u'-1',
 
            'fork_parent_id': org_repo.repo_id,
 
            'repo_type': self.REPO_TYPE,
 
            'description': 'unicode repo 1',
 
            'private': 'False',
 
            'landing_rev': 'rev:tip',
 
            '_authentication_token': self.authentication_token()}
 
        self.app.post(url(controller='forks', action='fork_create',
 
                          repo_name=repo_name), creation_args)
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=repo_name))
 
        response.mustcontain(
 
            """<a href="/%s">%s</a>""" % (urllib.quote(fork_name), fork_name)
 
        )
 
        fork_repo = Repository.get_by_repo_name(safe_unicode(fork_name))
 
        assert fork_repo
 

	
 
        # remove this fork
 
        # fork the fork
 
        fork_name_2 = safe_str(self.REPO_FORK + u'-blåbærgrød')
 
        creation_args = {
 
            'repo_name': fork_name_2,
 
            'repo_group': u'-1',
 
            'fork_parent_id': fork_repo.repo_id,
 
            'repo_type': self.REPO_TYPE,
 
            'description': 'unicode repo 2',
 
            'private': 'False',
 
            'landing_rev': 'rev:tip',
 
            '_authentication_token': self.authentication_token()}
 
        self.app.post(url(controller='forks', action='fork_create',
 
                          repo_name=fork_name), creation_args)
 
        response = self.app.get(url(controller='forks', action='forks',
 
                                    repo_name=fork_name))
 
        response.mustcontain(
 
            """<a href="/%s">%s</a>""" % (urllib.quote(fork_name_2), fork_name_2)
 
        )
 

	
 
        # remove these forks
 
        response = self.app.post(url('delete_repo', repo_name=fork_name_2),
 
            params={'_authentication_token': self.authentication_token()})
 
        response = self.app.post(url('delete_repo', repo_name=fork_name),
 
            params={'_authentication_token': self.authentication_token()})
 

	
 
    def test_z_fork_create(self):
 
        self.log_user()
 
        fork_name = self.REPO_FORK
 
        description = 'fork of vcs test'
 
        repo_name = self.REPO
 
        org_repo = Repository.get_by_repo_name(repo_name)
 
        creation_args = {
 
            'repo_name': fork_name,
 
            'repo_group': u'-1',
 
            'fork_parent_id': org_repo.repo_id,
 
            'repo_type': self.REPO_TYPE,
 
            'description': description,
 
            'private': 'False',
 
            'landing_rev': 'rev:tip',
 
            '_authentication_token': self.authentication_token()}
 
        self.app.post(url(controller='forks', action='fork_create',
 
                          repo_name=repo_name), creation_args)
 
        repo = Repository.get_by_repo_name(self.REPO_FORK)
 
        assert repo.fork.repo_name == self.REPO
 

	
 
        ## run the check page that triggers the flash message
0 comments (0 inline, 0 general)