Changeset - b0c75d6f8f32
[Not reviewed]
default
0 5 0
Mads Kiilerich - 6 years ago 2019-11-23 22:17:54
mads@kiilerich.com
Grafted from: bd5412f3641d
py3: use explicit octal literals

From 2to3 numliterals.
5 files changed with 8 insertions and 8 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/vcs/backends/hg/changeset.py
Show inline comments
 
@@ -201,99 +201,99 @@ class MercurialChangeset(BaseChangeset):
 
            if not branch or branch == cs.branch:
 
                return cs
 

	
 
    def diff(self):
 
        # Only used for feed diffstat
 
        return ''.join(self._ctx.diff())
 

	
 
    def _fix_path(self, path):
 
        """
 
        Paths are stored without trailing slash so we need to get rid off it if
 
        needed. Also mercurial keeps filenodes as str so we need to decode
 
        from unicode to str
 
        """
 
        if path.endswith('/'):
 
            path = path.rstrip('/')
 

	
 
        return safe_str(path)
 

	
 
    def _get_kind(self, path):
 
        path = self._fix_path(path)
 
        if path in self._file_paths:
 
            return NodeKind.FILE
 
        elif path in self._dir_paths:
 
            return NodeKind.DIR
 
        else:
 
            raise ChangesetError("Node does not exist at the given path '%s'"
 
                % (path))
 

	
 
    def _get_filectx(self, path):
 
        path = self._fix_path(path)
 
        if self._get_kind(path) != NodeKind.FILE:
 
            raise ChangesetError("File does not exist for revision %s at "
 
                " '%s'" % (self.raw_id, path))
 
        return self._ctx.filectx(path)
 

	
 
    def _extract_submodules(self):
 
        """
 
        returns a dictionary with submodule information from substate file
 
        of hg repository
 
        """
 
        return self._ctx.substate
 

	
 
    def get_file_mode(self, path):
 
        """
 
        Returns stat mode of the file at the given ``path``.
 
        """
 
        fctx = self._get_filectx(path)
 
        if 'x' in fctx.flags():
 
            return 0100755
 
            return 0o100755
 
        else:
 
            return 0100644
 
            return 0o100644
 

	
 
    def get_file_content(self, path):
 
        """
 
        Returns content of the file at given ``path``.
 
        """
 
        fctx = self._get_filectx(path)
 
        return fctx.data()
 

	
 
    def get_file_size(self, path):
 
        """
 
        Returns size of the file at given ``path``.
 
        """
 
        fctx = self._get_filectx(path)
 
        return fctx.size()
 

	
 
    def get_file_changeset(self, path):
 
        """
 
        Returns last commit of the file at the given ``path``.
 
        """
 
        return self.get_file_history(path, limit=1)[0]
 

	
 
    def get_file_history(self, path, limit=None):
 
        """
 
        Returns history of file as reversed list of ``Changeset`` objects for
 
        which file at given ``path`` has been modified.
 
        """
 
        fctx = self._get_filectx(path)
 
        hist = []
 
        cnt = 0
 
        for cs in reversed([x for x in fctx.filelog()]):
 
            cnt += 1
 
            hist.append(hex(fctx.filectx(cs).node()))
 
            if limit is not None and cnt == limit:
 
                break
 

	
 
        return [self.repository.get_changeset(node) for node in hist]
 

	
 
    def get_file_annotate(self, path):
 
        """
 
        Returns a generator of four element tuples with
 
            lineno, sha, changeset lazy loader and line
 
        """
 
        annotations = self._get_filectx(path).annotate()
 
        try:
 
            annotation_lines = [(annotateline.fctx, annotateline.text) for annotateline in annotations]
 
        except AttributeError: # annotateline was introduced in Mercurial 4.6 (b33b91ca2ec2)
 
            annotation_lines = [(aline.fctx, l) for aline, l in annotations]
 
        for i, (fctx, l) in enumerate(annotation_lines):
kallithea/lib/vcs/nodes.py
Show inline comments
 
@@ -210,97 +210,97 @@ class Node(object):
 
        Returns ``True`` if node's kind is ``NodeKind.SUBMODULE``, ``False``
 
        otherwise.
 
        """
 
        return self.kind == NodeKind.SUBMODULE
 

	
 
    @LazyProperty
 
    def added(self):
 
        return self.state is NodeState.ADDED
 

	
 
    @LazyProperty
 
    def changed(self):
 
        return self.state is NodeState.CHANGED
 

	
 
    @LazyProperty
 
    def not_changed(self):
 
        return self.state is NodeState.NOT_CHANGED
 

	
 
    @LazyProperty
 
    def removed(self):
 
        return self.state is NodeState.REMOVED
 

	
 

	
 
class FileNode(Node):
 
    """
 
    Class representing file nodes.
 

	
 
    :attribute: path: path to the node, relative to repository's root
 
    :attribute: content: if given arbitrary sets content of the file
 
    :attribute: changeset: if given, first time content is accessed, callback
 
    :attribute: mode: octal stat mode for a node. Default is 0100644.
 
    """
 

	
 
    def __init__(self, path, content=None, changeset=None, mode=None):
 
        """
 
        Only one of ``content`` and ``changeset`` may be given. Passing both
 
        would raise ``NodeError`` exception.
 

	
 
        :param path: relative path to the node
 
        :param content: content may be passed to constructor
 
        :param changeset: if given, will use it to lazily fetch content
 
        :param mode: octal representation of ST_MODE (i.e. 0100644)
 
        """
 

	
 
        if content and changeset:
 
            raise NodeError("Cannot use both content and changeset")
 
        super(FileNode, self).__init__(path, kind=NodeKind.FILE)
 
        self.changeset = changeset
 
        self._content = content
 
        self._mode = mode or 0100644
 
        self._mode = mode or 0o100644
 

	
 
    @LazyProperty
 
    def mode(self):
 
        """
 
        Returns lazily mode of the FileNode. If ``changeset`` is not set, would
 
        use value given at initialization or 0100644 (default).
 
        """
 
        if self.changeset:
 
            mode = self.changeset.get_file_mode(self.path)
 
        else:
 
            mode = self._mode
 
        return mode
 

	
 
    def _get_content(self):
 
        if self.changeset:
 
            content = self.changeset.get_file_content(self.path)
 
        else:
 
            content = self._content
 
        return content
 

	
 
    @property
 
    def content(self):
 
        """
 
        Returns lazily content of the FileNode. If possible, would try to
 
        decode content from UTF-8.
 
        """
 
        content = self._get_content()
 

	
 
        if bool(content and '\0' in content):
 
            return content
 
        return safe_unicode(content)
 

	
 
    @LazyProperty
 
    def size(self):
 
        if self.changeset:
 
            return self.changeset.get_file_size(self.path)
 
        raise NodeError("Cannot retrieve size of the file without related "
 
            "changeset attribute")
 

	
 
    @LazyProperty
 
    def message(self):
 
        if self.changeset:
 
            return self.last_changeset.message
 
        raise NodeError("Cannot retrieve message of the file without related "
 
            "changeset attribute")
 

	
 
    @LazyProperty
 
    def last_changeset(self):
kallithea/model/scm.py
Show inline comments
 
@@ -705,72 +705,72 @@ class ScmModel(object):
 

	
 
        :param repo: Instance of VCS repo
 
        :param force_create: Create even if same name hook exists
 
        """
 

	
 
        loc = os.path.join(repo.path, 'hooks')
 
        if not repo.bare:
 
            loc = os.path.join(repo.path, '.git', 'hooks')
 
        if not os.path.isdir(loc):
 
            os.makedirs(loc)
 

	
 
        tmpl_post = "#!%s\n" % self._get_git_hook_interpreter()
 
        tmpl_post += pkg_resources.resource_string(
 
            'kallithea', os.path.join('config', 'post_receive_tmpl.py')
 
        )
 
        tmpl_pre = "#!%s\n" % self._get_git_hook_interpreter()
 
        tmpl_pre += pkg_resources.resource_string(
 
            'kallithea', os.path.join('config', 'pre_receive_tmpl.py')
 
        )
 

	
 
        for h_type, tmpl in [('pre', tmpl_pre), ('post', tmpl_post)]:
 
            _hook_file = os.path.join(loc, '%s-receive' % h_type)
 
            has_hook = False
 
            log.debug('Installing git hook in repo %s', repo)
 
            if os.path.exists(_hook_file):
 
                # let's take a look at this hook, maybe it's kallithea ?
 
                log.debug('hook exists, checking if it is from kallithea')
 
                with open(_hook_file, 'rb') as f:
 
                    data = f.read()
 
                    matches = re.compile(r'(?:%s)\s*=\s*(.*)'
 
                                         % 'KALLITHEA_HOOK_VER').search(data)
 
                    if matches:
 
                        try:
 
                            ver = matches.groups()[0]
 
                            log.debug('got %s it is kallithea', ver)
 
                            has_hook = True
 
                        except Exception:
 
                            log.error(traceback.format_exc())
 
            else:
 
                # there is no hook in this dir, so we want to create one
 
                has_hook = True
 

	
 
            if has_hook or force_create:
 
                log.debug('writing %s hook file !', h_type)
 
                try:
 
                    with open(_hook_file, 'wb') as f:
 
                        tmpl = tmpl.replace('_TMPL_', kallithea.__version__)
 
                        f.write(tmpl)
 
                    os.chmod(_hook_file, 0755)
 
                    os.chmod(_hook_file, 0o755)
 
                except IOError as e:
 
                    log.error('error writing %s: %s', _hook_file, e)
 
            else:
 
                log.debug('skipping writing hook file')
 

	
 

	
 
def AvailableRepoGroupChoices(top_perms, repo_group_perm_level, extras=()):
 
    """Return group_id,string tuples with choices for all the repo groups where
 
    the user has the necessary permissions.
 

	
 
    Top level is -1.
 
    """
 
    groups = RepoGroup.query().all()
 
    if HasPermissionAny('hg.admin')('available repo groups'):
 
        groups.append(None)
 
    else:
 
        groups = list(RepoGroupList(groups, perm_level=repo_group_perm_level))
 
        if top_perms and HasPermissionAny(*top_perms)('available repo groups'):
 
            groups.append(None)
 
        for extra in extras:
 
            if not any(rg == extra for rg in groups):
 
                groups.append(extra)
 
    return RepoGroup.groups_choices(groups=groups)
kallithea/tests/vcs/test_git.py
Show inline comments
 
@@ -612,97 +612,97 @@ class TestGitChangeset(object):
 
    def test_author_email(self):
 
        assert 'marcin@python-blog.com' == self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3').author_email
 
        assert 'lukasz.balcerzak@python-center.pl' == self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b').author_email
 
        assert '' == self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992').author_email
 

	
 
    def test_author_username(self):
 
        assert 'Marcin Kuzminski' == self.repo.get_changeset('c1214f7e79e02fc37156ff215cd71275450cffc3').author_name
 
        assert 'Lukasz Balcerzak' == self.repo.get_changeset('ff7ca51e58c505fec0dd2491de52c622bb7a806b').author_name
 
        assert 'marcink none@none' == self.repo.get_changeset('8430a588b43b5d6da365400117c89400326e7992').author_name
 

	
 

	
 
class TestGitSpecific():
 

	
 
    def test_error_is_raised_for_added_if_diff_name_status_is_wrong(self):
 
        repo = mock.MagicMock()
 
        changeset = GitChangeset(repo, 'foobar')
 
        changeset._diff_name_status = 'foobar'
 
        with pytest.raises(VCSError):
 
            changeset.added
 

	
 
    def test_error_is_raised_for_changed_if_diff_name_status_is_wrong(self):
 
        repo = mock.MagicMock()
 
        changeset = GitChangeset(repo, 'foobar')
 
        changeset._diff_name_status = 'foobar'
 
        with pytest.raises(VCSError):
 
            changeset.added
 

	
 
    def test_error_is_raised_for_removed_if_diff_name_status_is_wrong(self):
 
        repo = mock.MagicMock()
 
        changeset = GitChangeset(repo, 'foobar')
 
        changeset._diff_name_status = 'foobar'
 
        with pytest.raises(VCSError):
 
            changeset.added
 

	
 

	
 
class TestGitSpecificWithRepo(_BackendTestMixin):
 
    backend_alias = 'git'
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return [
 
            {
 
                'message': 'Initial',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar/static/js/admin/base.js', content='base'),
 
                    FileNode('foobar/static/admin', content='admin',
 
                        mode=0120000), # this is a link
 
                        mode=0o120000), # this is a link
 
                    FileNode('foo', content='foo'),
 
                ],
 
            },
 
            {
 
                'message': 'Second',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 22),
 
                'added': [
 
                    FileNode('foo2', content='foo2'),
 
                ],
 
            },
 
        ]
 

	
 
    def test_paths_slow_traversing(self):
 
        cs = self.repo.get_changeset()
 
        assert cs.get_node('foobar').get_node('static').get_node('js').get_node('admin').get_node('base.js').content == 'base'
 

	
 
    def test_paths_fast_traversing(self):
 
        cs = self.repo.get_changeset()
 
        assert cs.get_node('foobar/static/js/admin/base.js').content == 'base'
 

	
 
    def test_workdir_get_branch(self):
 
        self.repo.run_git_command(['checkout', '-b', 'production'])
 
        # Regression test: one of following would fail if we don't check
 
        # .git/HEAD file
 
        self.repo.run_git_command(['checkout', 'production'])
 
        assert self.repo.workdir.get_branch() == 'production'
 
        self.repo.run_git_command(['checkout', 'master'])
 
        assert self.repo.workdir.get_branch() == 'master'
 

	
 
    def test_get_diff_runs_git_command_with_hashes(self):
 
        self.repo.run_git_command = mock.Mock(return_value=['', ''])
 
        self.repo.get_diff(0, 1)
 
        self.repo.run_git_command.assert_called_once_with(
 
            ['diff', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(0), self.repo._get_revision(1)])
 

	
 
    def test_get_diff_runs_git_command_with_str_hashes(self):
 
        self.repo.run_git_command = mock.Mock(return_value=['', ''])
 
        self.repo.get_diff(self.repo.EMPTY_CHANGESET, 1)
 
        self.repo.run_git_command.assert_called_once_with(
 
            ['show', '-U3', '--full-index', '--binary', '-p', '-M', '--abbrev=40',
 
             self.repo._get_revision(1)])
 

	
 
    def test_get_diff_runs_git_command_with_path_if_its_given(self):
 
        self.repo.run_git_command = mock.Mock(return_value=['', ''])
 
        self.repo.get_diff(0, 1, 'foo')
 
        self.repo.run_git_command.assert_called_once_with(
kallithea/tests/vcs/test_nodes.py
Show inline comments
 
@@ -99,86 +99,86 @@ class TestNodeBasic(object):
 
    def test_is_dir(self):
 
        node = Node('any_dir', NodeKind.DIR)
 
        assert node.is_dir()
 

	
 
        node = DirNode('any_dir')
 

	
 
        assert node.is_dir()
 
        with pytest.raises(NodeError):
 
            getattr(node, 'content')
 

	
 
    def test_dir_node_iter(self):
 
        nodes = [
 
            DirNode('docs'),
 
            DirNode('tests'),
 
            FileNode('bar'),
 
            FileNode('foo'),
 
            FileNode('readme.txt'),
 
            FileNode('setup.py'),
 
        ]
 
        dirnode = DirNode('', nodes=nodes)
 
        for node in dirnode:
 
            node == dirnode.get_node(node.path)
 

	
 
    def test_node_state(self):
 
        """
 
        Without link to changeset nodes should raise NodeError.
 
        """
 
        node = FileNode('anything')
 
        with pytest.raises(NodeError):
 
            getattr(node, 'state')
 
        node = DirNode('anything')
 
        with pytest.raises(NodeError):
 
            getattr(node, 'state')
 

	
 
    def test_file_node_stat(self):
 
        node = FileNode('foobar', 'empty... almost')
 
        mode = node.mode  # default should be 0100644
 
        assert mode & stat.S_IRUSR
 
        assert mode & stat.S_IWUSR
 
        assert mode & stat.S_IRGRP
 
        assert mode & stat.S_IROTH
 
        assert not mode & stat.S_IWGRP
 
        assert not mode & stat.S_IWOTH
 
        assert not mode & stat.S_IXUSR
 
        assert not mode & stat.S_IXGRP
 
        assert not mode & stat.S_IXOTH
 

	
 
    def test_file_node_is_executable(self):
 
        node = FileNode('foobar', 'empty... almost', mode=0100755)
 
        node = FileNode('foobar', 'empty... almost', mode=0o100755)
 
        assert node.is_executable
 

	
 
        node = FileNode('foobar', 'empty... almost', mode=0100500)
 
        node = FileNode('foobar', 'empty... almost', mode=0o100500)
 
        assert node.is_executable
 

	
 
        node = FileNode('foobar', 'empty... almost', mode=0100644)
 
        node = FileNode('foobar', 'empty... almost', mode=0o100644)
 
        assert not node.is_executable
 

	
 
    def test_mimetype(self):
 
        py_node = FileNode('test.py')
 
        tar_node = FileNode('test.tar.gz')
 

	
 
        my_node2 = FileNode('myfile2')
 
        my_node2._content = 'foobar'
 

	
 
        my_node3 = FileNode('myfile3')
 
        my_node3._content = '\0foobar'
 

	
 
        assert py_node.mimetype == mimetypes.guess_type(py_node.name)[0]
 
        assert py_node.get_mimetype() == mimetypes.guess_type(py_node.name)
 

	
 
        assert tar_node.mimetype == mimetypes.guess_type(tar_node.name)[0]
 
        assert tar_node.get_mimetype() == mimetypes.guess_type(tar_node.name)
 

	
 
        assert my_node2.mimetype == 'text/plain'
 
        assert my_node2.get_mimetype() == ('text/plain', None)
 

	
 
        assert my_node3.mimetype == 'application/octet-stream'
 
        assert my_node3.get_mimetype() == ('application/octet-stream', None)
 

	
 

	
 
class TestNodeContent(object):
 

	
 
    def test_if_binary(self):
 
        data = """\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x10\x00\x00\x00\x10\x08\x06\x00\x00\x00\x1f??a\x00\x00\x00\x04gAMA\x00\x00\xaf?7\x05\x8a?\x00\x00\x00\x19tEXtSoftware\x00Adobe ImageReadyq?e<\x00\x00\x025IDAT8?\xa5\x93?K\x94Q\x14\x87\x9f\xf7?Q\x1bs4?\x03\x9a\xa8?B\x02\x8b$\x10[U;i\x13?6h?&h[?"\x14j?\xa2M\x7fB\x14F\x9aQ?&\x842?\x0b\x89"\x82??!?\x9c!\x9c2l??{N\x8bW\x9dY\xb4\t/\x1c?=\x9b?}????\xa9*;9!?\x83\x91?[?\\v*?D\x04\'`EpNp\xa2X\'U?pVq"Sw.\x1e?\x08\x01D?jw????\xbc??7{|\x9b?\x89$\x01??W@\x15\x9c\x05q`Lt/\x97?\x94\xa1d?\x18~?\x18?\x18W[%\xb0?\x83??\x14\x88\x8dB?\xa6H\tL\tl\x19>/\x01`\xac\xabx?\x9cl\nx\xb0\x98\x07\x95\x88D$"q[\x19?d\x00(o\n\xa0??\x7f\xb9\xa4?\x1bF\x1f\x8e\xac\xa8?j??eUU}?.?\x9f\x8cE??x\x94??\r\xbdtoJU5"0N\x10U?\x00??V\t\x02\x9f\x81?U?\x00\x9eM\xae2?r\x9b7\x83\x82\x8aP3????.?&"?\xb7ZP \x0c<?O\xa5\t}\xb8?\x99\xa6?\x87?\x1di|/\xa0??0\xbe\x1fp?d&\x1a\xad\x95\x8a\x07?\t*\x10??b:?d?.\x13C\x8a?\x12\xbe\xbf\x8e?{???\x08?\x80\xa7\x13+d\x13>J?\x80\x15T\x95\x9a\x00??S\x8c\r?\xa1\x03\x07?\x96\x9b\xa7\xab=E??\xa4\xb3?\x19q??B\x91=\x8d??k?J\x0bV"??\xf7x?\xa1\x00?\\.\x87\x87???\x02F@D\x99],??\x10#?X\xb7=\xb9\x10?Z\x1by???cI??\x1ag?\x92\xbc?T?t[\x92\x81?<_\x17~\x92\x88?H%?\x10Q\x02\x9f\n\x81qQ\x0bm?\x1bX?\xb1AK\xa6\x9e\xb9?u\xb2?1\xbe|/\x92M@\xa2!F?\xa9>"\r<DT?>\x92\x8e?>\x9a9Qv\x127?a\xac?Y?8?:??]X???9\x80\xb7?u?\x0b#BZ\x8d=\x1d?p\x00\x00\x00\x00IEND\xaeB`\x82"""
 
        filenode = FileNode('calendar.png', content=data)
 
        assert filenode.is_binary
0 comments (0 inline, 0 general)