Changeset - 0f9e7dbfa5d2
[Not reviewed]
default
0 5 0
Mads Kiilerich - 7 years ago 2018-11-18 19:57:40
mads@kiilerich.com
utils2: Move ask_ok out of paster_commands

The ask_ok function is still a mess and should be cleaned up ...
5 files changed with 16 insertions and 17 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/kallithea_cli_extensions.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
This file was forked by the Kallithea project in July 2014 and later moved.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Mar 6, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 
import click
 
import kallithea.bin.kallithea_cli_base as cli_base
 

	
 
import os
 
import pkg_resources
 

	
 
import kallithea
 
from kallithea.lib.paster_commands.common import ask_ok
 
from kallithea.lib.utils2 import ask_ok
 

	
 
@cli_base.register_command(config_file=True)
 
def extensions_create():
 
    """Write template file for extending Kallithea in Python.
 

	
 
    An rcextensions directory with a __init__.py file will be created next to
 
    the ini file. Local customizations in that file will survive upgrades.
 
    The file contains instructions on how it can be customized.
 
    """
 
    here = kallithea.CONFIG['here']
 
    content = pkg_resources.resource_string(
 
        'kallithea', os.path.join('config', 'rcextensions', '__init__.py')
 
    )
 
    ext_file = os.path.join(here, 'rcextensions', '__init__.py')
 
    if os.path.exists(ext_file):
 
        msg = ('Extension file %s already exists, do you want '
 
               'to overwrite it ? [y/n] ') % ext_file
 
        if not ask_ok(msg):
 
            click.echo('Nothing done, exiting...')
 
            return
 

	
 
    dirname = os.path.dirname(ext_file)
 
    if not os.path.isdir(dirname):
 
        os.makedirs(dirname)
 
    with open(ext_file, 'wb') as f:
 
        f.write(content)
 
        click.echo('Wrote new extensions file to %s' % ext_file)
kallithea/bin/kallithea_cli_repo.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
This file was forked by the Kallithea project in July 2014 and later moved.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Feb 9, 2013
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 
import click
 
import kallithea.bin.kallithea_cli_base as cli_base
 

	
 
import datetime
 
import os
 
import re
 
import shutil
 

	
 
from kallithea.lib.paster_commands.common import ask_ok
 
from kallithea.lib.utils import repo2db_mapper, REMOVED_REPO_PAT
 
from kallithea.lib.utils2 import safe_unicode, safe_str
 
from kallithea.lib.utils2 import safe_unicode, safe_str, ask_ok
 
from kallithea.model.db import Repository, Ui
 
from kallithea.model.meta import Session
 
from kallithea.model.scm import ScmModel
 

	
 
@cli_base.register_command(config_file_initialize_app=True)
 
@click.option('--remove-missing', is_flag=True,
 
        help='Remove missing repositories from the Kallithea database.')
 
def repo_scan(remove_missing):
 
    """Scan filesystem for repositories.
 

	
 
    Search the configured repository root for new repositories and add them
 
    into Kallithea.
 
    Additionally, report repositories that were previously known to Kallithea
 
    but are no longer present on the filesystem. If option --remove-missing is
 
    given, remove the missing repositories from the Kallithea database.
 
    """
 
    click.echo('Now scanning root location for new repos ...')
 
    added, removed = repo2db_mapper(ScmModel().repo_scan(),
 
                                    remove_obsolete=remove_missing)
 
    click.echo('Scan completed.')
 
    if added:
 
        click.echo('Added: %s' % ', '.join(added))
 
    if removed:
 
        click.echo('%s: %s' % ('Removed' if remove_missing else 'Missing',
 
                          ', '.join(removed)))
 

	
 
@cli_base.register_command(config_file_initialize_app=True)
 
@click.argument('repositories', nargs=-1)
 
def repo_update_metadata(repositories):
 
    """
 
    Update repository metadata in database from repository content.
 

	
 
    In normal operation, Kallithea will keep caches up-to-date
 
    automatically. However, if repositories are externally modified, e.g. by
 
    a direct push via the filesystem rather than via a Kallithea URL,
 
    Kallithea is not aware of it. In this case, you should manually run this
 
    command to update the repository cache.
 

	
 
    If no repositories are specified, the caches of all repositories are
 
    updated.
 
    """
 
    if not repositories:
 
        repo_list = Repository.query().all()
 
    else:
 
        repo_names = [safe_unicode(n.strip()) for n in repositories]
 
        repo_list = list(Repository.query()
 
                        .filter(Repository.repo_name.in_(repo_names)))
 

	
kallithea/lib/db_manage.py
Show inline comments
 
@@ -32,97 +32,97 @@ import time
 
import uuid
 
import logging
 
import sqlalchemy
 
from os.path import dirname
 

	
 
import alembic.config
 
import alembic.command
 

	
 
from kallithea.model.user import UserModel
 
from kallithea.model.base import init_model
 
from kallithea.model.db import User, Permission, Ui, \
 
    Setting, UserToPerm, RepoGroup, \
 
    UserRepoGroupToPerm, CacheInvalidation, Repository
 

	
 
from sqlalchemy.engine import create_engine
 
from kallithea.model.repo_group import RepoGroupModel
 
#from kallithea.model import meta
 
from kallithea.model.meta import Session, Base
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.permission import PermissionModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def notify(msg):
 
    """
 
    Notification for migrations messages
 
    """
 
    ml = len(msg) + (4 * 2)
 
    print('\n%s\n*** %s ***\n%s' % ('*' * ml, msg, '*' * ml)).upper()
 

	
 

	
 
class DbManage(object):
 
    def __init__(self, dbconf, root, tests=False, SESSION=None, cli_args=None):
 
        self.dbname = dbconf.split('/')[-1]
 
        self.tests = tests
 
        self.root = root
 
        self.dburi = dbconf
 
        self.db_exists = False
 
        self.cli_args = cli_args or {}
 
        self.init_db(SESSION=SESSION)
 

	
 
    def _ask_ok(self, msg):
 
        """Invoke ask_ok unless the force_ask option provides the answer"""
 
        force_ask = self.cli_args.get('force_ask')
 
        if force_ask is not None:
 
            return force_ask
 
        from kallithea.lib.paster_commands.common import ask_ok
 
        from kallithea.lib.utils2 import ask_ok
 
        return ask_ok(msg)
 

	
 
    def init_db(self, SESSION=None):
 
        if SESSION:
 
            self.sa = SESSION
 
        else:
 
            # init new sessions
 
            engine = create_engine(self.dburi)
 
            init_model(engine)
 
            self.sa = Session()
 

	
 
    def create_tables(self, override=False):
 
        """
 
        Create a auth database
 
        """
 

	
 
        log.info("Any existing database is going to be destroyed")
 
        if self.tests:
 
            destroy = True
 
        else:
 
            destroy = self._ask_ok('Are you sure to destroy old database ? [y/n]')
 
        if not destroy:
 
            print 'Nothing done.'
 
            sys.exit(0)
 
        if destroy:
 
            # drop and re-create old schemas
 

	
 
            url = sqlalchemy.engine.url.make_url(self.dburi)
 
            database = url.database
 

	
 
            # Some databases enforce foreign key constraints and Base.metadata.drop_all() doesn't work
 
            if url.drivername == 'mysql':
 
                url.database = None  # don't connect to the database (it might not exist)
 
                engine = sqlalchemy.create_engine(url)
 
                with engine.connect() as conn:
 
                    conn.execute('DROP DATABASE IF EXISTS ' + database)
 
                    conn.execute('CREATE DATABASE ' + database)
 
            elif url.drivername == 'postgresql':
 
                from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
 
                url.database = 'postgres'  # connect to the system database (as the real one might not exist)
 
                engine = sqlalchemy.create_engine(url)
 
                with engine.connect() as conn:
 
                    conn.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
 
                    conn.execute('DROP DATABASE IF EXISTS ' + database)
 
                    conn.execute('CREATE DATABASE ' + database)
 
            else:
 
                # known to work on SQLite - possibly not on other databases with strong referential integrity
 
                Base.metadata.drop_all()
kallithea/lib/paster_commands/common.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.common
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Common code for gearbox commands.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 18, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import sys
 
import logging.config
 

	
 
import paste.deploy
 
import gearbox.command
 

	
 
import kallithea.config.middleware
 
import kallithea.model.base
 
import kallithea.lib.utils2
 
import kallithea.lib.utils
 

	
 

	
 
def ask_ok(prompt, retries=4, complaint='Yes or no please!'):
 
    while True:
 
        ok = raw_input(prompt)
 
        if ok in ('y', 'ye', 'yes'):
 
            return True
 
        if ok in ('n', 'no', 'nop', 'nope'):
 
            return False
 
        retries = retries - 1
 
        if retries < 0:
 
            raise IOError
 
        print complaint
 

	
 

	
 
class BasePasterCommand(gearbox.command.Command):
 
    """
 
    Abstract Base Class for gearbox commands.
 
    """
 

	
 
    # override to control how much get_parser and run should do:
 
    takes_config_file = True
 
    requires_db_session = True
 
    config = None # set to the actual config object in run if takes_config_file is true
 

	
 
    def run(self, args):
 
        """
 
        Overrides Command.run
 

	
 
        If needed by the command, read config file and initialize database before running.
 
        """
 
        if self.takes_config_file:
 
            path_to_ini_file = os.path.realpath(args.config_file)
 
            self.config = paste.deploy.appconfig('config:' + path_to_ini_file)
 
            # TODO: also initialize kallithea.CONFIG?
 
            logging.config.fileConfig(path_to_ini_file)
 

	
 
            if self.requires_db_session:
 
                kallithea.config.middleware.make_app_without_logging(self.config.global_conf, **self.config.local_conf)
 
                # *now*, tg.config has been set and could be used ... but we just keep using self.config
 
                kallithea.lib.utils.setup_cache_regions(self.config)
 

	
 
        return super(BasePasterCommand, self).run(args)
 

	
 
    def get_parser(self, prog_name):
 
        parser = super(BasePasterCommand, self).get_parser(prog_name)
 

	
 
        if self.takes_config_file:
 
            parser.add_argument("-c", "--config",
 
                help='Kallithea .ini file with configuration of database etc',
 
                dest='config_file', required=True)
 

	
 
        return parser
 

	
 
    def error(self, msg, exitcode=1):
 
        """Write error message and exit"""
 
        sys.stderr.write('%s\n' % msg)
 
        raise SystemExit(exitcode)
kallithea/lib/utils2.py
Show inline comments
 
@@ -636,48 +636,61 @@ class Optional(object):
 

	
 
        :param val:
 
        :return: original value if it's not Optional instance else
 
            value of instance
 
        """
 
        if isinstance(val, cls):
 
            return val.getval()
 
        return val
 

	
 

	
 
def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub):
 
    return _cleanstringsub('_', safe_str(s)).rstrip('_')
 

	
 

	
 
def recursive_replace(str_, replace=' '):
 
    """
 
    Recursive replace of given sign to just one instance
 

	
 
    :param str_: given string
 
    :param replace: char to find and replace multiple instances
 

	
 
    Examples::
 
    >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
 
    'Mighty-Mighty-Bo-sstones'
 
    """
 

	
 
    if str_.find(replace * 2) == -1:
 
        return str_
 
    else:
 
        str_ = str_.replace(replace * 2, replace)
 
        return recursive_replace(str_, replace)
 

	
 

	
 
def repo_name_slug(value):
 
    """
 
    Return slug of name of repository
 
    This function is called on each creation/modification
 
    of repository to prevent bad names in repo
 
    """
 

	
 
    slug = remove_formatting(value)
 
    slug = strip_tags(slug)
 

	
 
    for c in """`?=[]\;'"<>,/~!@#$%^&*()+{}|: """:
 
        slug = slug.replace(c, '-')
 
    slug = recursive_replace(slug, '-')
 
    slug = collapse(slug, '-')
 
    return slug
 

	
 

	
 
def ask_ok(prompt, retries=4, complaint='Yes or no please!'):
 
    while True:
 
        ok = raw_input(prompt)
 
        if ok in ('y', 'ye', 'yes'):
 
            return True
 
        if ok in ('n', 'no', 'nop', 'nope'):
 
            return False
 
        retries = retries - 1
 
        if retries < 0:
 
            raise IOError
 
        print complaint
0 comments (0 inline, 0 general)