Changeset - 0cfd77281853
[Not reviewed]
default
0 2 0
Mads Kiilerich - 6 years ago 2019-11-25 02:46:02
mads@kiilerich.com
Grafted from: 12dca3a1321d
cleanup: convert some StringIO use to use the py3 compatible io module
2 files changed with 6 insertions and 6 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/kallithea_cli_base.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import cStringIO
 
import functools
 
import io
 
import logging.config
 
import os
 
import re
 
import sys
 

	
 
import click
 
import paste.deploy
 

	
 
import kallithea
 

	
 

	
 
# kallithea_cli is usually invoked through the 'kallithea-cli' wrapper script
 
# that is installed by setuptools, as specified in setup.py console_scripts
 
# entry_points. The script will be using the right virtualenv (if any), and for
 
# Unix, it will contain #! pointing at the right python executable. The script
 
# also makes sure sys.argv[0] points back at the script path, and that is what
 
# can be used to invoke 'kallithea-cli' later.
 
kallithea_cli_path = sys.argv[0]
 

	
 

	
 
def read_config(ini_file_name, strip_section_prefix):
 
    """Read ini_file_name content, and for all sections like '[X:Y]' where X is
 
    strip_section_prefix, replace the section name with '[Y]'."""
 

	
 
    def repl(m):
 
        if m.group(1) == strip_section_prefix:
 
            return '[%s]' % m.group(2)
 
        return m.group(0)
 

	
 
    with open(ini_file_name) as f:
 
        return re.sub(r'^\[([^:]+):(.*)]', repl, f.read(), flags=re.MULTILINE)
 
        return re.sub(r'^\[([^:]+):(.*)]', repl, f.read().decode(), flags=re.MULTILINE)
 

	
 

	
 
# This placeholder is the main entry point for the kallithea-cli command
 
@click.group(context_settings=dict(help_option_names=['-h', '--help']))
 
def cli():
 
    """Various commands to manage a Kallithea instance."""
 

	
 
def register_command(config_file=False, config_file_initialize_app=False, hidden=False):
 
    """Register a kallithea-cli subcommand.
 

	
 
    If one of the config_file flags are true, a config file must be specified
 
    with -c and it is read and logging is configured. The configuration is
 
    available in the kallithea.CONFIG dict.
 

	
 
    If config_file_initialize_app is true, Kallithea, TurboGears global state
 
    (including tg.config), and database access will also be fully initialized.
 
    """
 
    cli_command = cli.command(hidden=hidden)
 
    if config_file or config_file_initialize_app:
 
        def annotator(annotated):
 
            @click.option('--config_file', '-c', help="Path to .ini file with app configuration.",
 
                type=click.Path(dir_okay=False, exists=True, readable=True), required=True)
 
            @functools.wraps(annotated) # reuse meta data from the wrapped function so click can see other options
 
            def runtime_wrapper(config_file, *args, **kwargs):
 
                path_to_ini_file = os.path.realpath(config_file)
 
                kallithea.CONFIG = paste.deploy.appconfig('config:' + path_to_ini_file)
 
                config_bytes = read_config(path_to_ini_file, strip_section_prefix=annotated.__name__)
 
                logging.config.fileConfig(cStringIO.StringIO(config_bytes))
 
                config_string = read_config(path_to_ini_file, strip_section_prefix=annotated.__name__)
 
                logging.config.fileConfig(io.StringIO(config_string))
 
                if config_file_initialize_app:
 
                    kallithea.config.middleware.make_app_without_logging(kallithea.CONFIG.global_conf, **kallithea.CONFIG.local_conf)
 
                    kallithea.lib.utils.setup_cache_regions(kallithea.CONFIG)
 
                return annotated(*args, **kwargs)
 
            return cli_command(runtime_wrapper)
 
        return annotator
 
    return cli_command
kallithea/tests/vcs/test_archives.py
Show inline comments
 
import datetime
 
import io
 
import os
 
import StringIO
 
import tarfile
 
import tempfile
 
import zipfile
 

	
 
import pytest
 

	
 
from kallithea.lib.vcs.exceptions import VCSError
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.tests.vcs.base import _BackendTestMixin
 
from kallithea.tests.vcs.conf import TESTS_TMP_PATH
 

	
 

	
 
class ArchivesTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        start_date = datetime.datetime(2010, 1, 1, 20)
 
        for x in xrange(5):
 
            yield {
 
                'message': 'Commit %d' % x,
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': start_date + datetime.timedelta(hours=12 * x),
 
                'added': [
 
                    FileNode('%d/file_%d.txt' % (x, x),
 
@@ -49,49 +49,49 @@ class ArchivesTestCaseMixin(_BackendTest
 
        outfile = tarfile.open(path, 'r|gz')
 
        outfile.extractall(outdir)
 

	
 
        for x in xrange(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            assert open(os.path.join(outdir, 'repo/' + node_path)).read() == self.tip.get_node(node_path).content
 

	
 
    def test_archive_tbz2(self):
 
        path = tempfile.mkstemp(dir=TESTS_TMP_PATH, prefix='test_archive_tbz2-')[1]
 
        with open(path, 'w+b') as f:
 
            self.tip.fill_archive(stream=f, kind='tbz2', prefix='repo')
 
        outdir = tempfile.mkdtemp(dir=TESTS_TMP_PATH, prefix='test_archive_tbz2-', suffix='-outdir')
 

	
 
        outfile = tarfile.open(path, 'r|bz2')
 
        outfile.extractall(outdir)
 

	
 
        for x in xrange(5):
 
            node_path = '%d/file_%d.txt' % (x, x)
 
            assert open(os.path.join(outdir, 'repo/' + node_path)).read() == self.tip.get_node(node_path).content
 

	
 
    def test_archive_default_stream(self):
 
        tmppath = tempfile.mkstemp(dir=TESTS_TMP_PATH, prefix='test_archive_default_stream-')[1]
 
        with open(tmppath, 'wb') as stream:
 
            self.tip.fill_archive(stream=stream)
 
        mystream = StringIO.StringIO()
 
        mystream = io.BytesIO()
 
        self.tip.fill_archive(stream=mystream)
 
        mystream.seek(0)
 
        with open(tmppath, 'rb') as f:
 
            file_content = f.read()
 
            stringio_content = mystream.read()
 
            # the gzip header contains a MTIME header
 
            # because is takes a little bit of time from one fill_archive call to the next
 
            # this part may differ so don't include that part in the comparison
 
            assert file_content[:4] == stringio_content[:4]
 
            assert file_content[8:] == stringio_content[8:]
 

	
 
    def test_archive_wrong_kind(self):
 
        with pytest.raises(VCSError):
 
            self.tip.fill_archive(kind='wrong kind')
 

	
 
    def test_archive_empty_prefix(self):
 
        with pytest.raises(VCSError):
 
            self.tip.fill_archive(prefix='')
 

	
 
    def test_archive_prefix_with_leading_slash(self):
 
        with pytest.raises(VCSError):
 
            self.tip.fill_archive(prefix='/any')
 

	
 

	
0 comments (0 inline, 0 general)