Changeset - 30aac95e7fd5
[Not reviewed]
default
0 3 0
Mads Kiilerich - 5 years ago 2020-10-10 19:59:54
mads@kiilerich.com
Grafted from: 258a2da41849
db_manage: clarify that the purpose of admin_prompt is to create the admin user

Use cli_args to pass the test admin user information so it can be created the
usual way. This removes an unfortunate lib dependency on tests.
3 files changed with 39 insertions and 47 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/kallithea_cli_db.py
Show inline comments
 
@@ -19,63 +19,63 @@ from kallithea.lib.db_manage import DbMa
 
from kallithea.model.meta import Session
 

	
 

	
 
@cli_base.register_command(needs_config_file=True, config_file_initialize_app=True)
 
@click.option('--reuse/--no-reuse', default=False,
 
        help='Reuse and clean existing database instead of dropping and creating (default: no reuse)')
 
@click.option('--user', help='Username of administrator account.')
 
@click.option('--password', help='Password for administrator account.')
 
@click.option('--email', help='Email address of administrator account.')
 
@click.option('--repos', help='Absolute path to repositories location.')
 
@click.option('--force-yes', is_flag=True, help='Answer yes to every question.')
 
@click.option('--force-no', is_flag=True, help='Answer no to every question.')
 
@click.option('--public-access/--no-public-access', default=True,
 
        help='Enable/disable public access on this installation (default: enable)')
 
def db_create(user, password, email, repos, force_yes, force_no, public_access, reuse, config=None):
 
    """Initialize the database.
 

	
 
    Create all required tables in the database specified in the configuration
 
    file. Create the administrator account. Set certain settings based on
 
    values you provide.
 

	
 
    You can pass the answers to all questions as options to this command.
 
    """
 
    if config is not None:  # first called with config, before app initialization
 
        dbconf = config['sqlalchemy.url']
 

	
 
        # force_ask should be True (yes), False (no), or None (ask)
 
        if force_yes:
 
            force_ask = True
 
        elif force_no:
 
            force_ask = False
 
        else:
 
            force_ask = None
 

	
 
        cli_args = dict(
 
                username=user,
 
                password=password,
 
                email=email,
 
                repos_location=repos,
 
                force_ask=force_ask,
 
                public_access=public_access,
 
        )
 
        dbmanage = DbManage(dbconf=dbconf, root=config['here'],
 
                            tests=False, cli_args=cli_args)
 
        dbmanage.create_tables(reuse_database=reuse)
 
        repo_root_path = dbmanage.prompt_repo_root_path(None)
 
        dbmanage.create_settings(repo_root_path)
 
        dbmanage.create_default_user()
 
        dbmanage.admin_prompt()
 
        dbmanage.create_admin_user()
 
        dbmanage.create_permissions()
 
        dbmanage.populate_default_permissions()
 
        Session().commit()
 

	
 
    else:  # then called again after app initialization
 
        added, _ = kallithea.lib.utils.repo2db_mapper(kallithea.model.scm.ScmModel().repo_scan())
 
        if added:
 
            click.echo('Initial repository scan: added following repositories:')
 
            click.echo('\t%s' % '\n\t'.join(added))
 
        else:
 
            click.echo('Initial repository scan: no repositories found.')
 

	
 
        click.echo('Database set up successfully.')
 
        click.echo("Don't forget to build the front-end using 'kallithea-cli front-end-build'.")
kallithea/lib/db_manage.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.db_manage
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Database creation, and setup module for Kallithea. Used for creation
 
of database as well as for migration operations
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 10, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import getpass
 
import logging
 
import os
 
import sys
 
import uuid
 

	
 
import alembic.command
 
import alembic.config
 
import sqlalchemy
 
from sqlalchemy.engine import create_engine
 

	
 
from kallithea.lib.utils2 import ask_ok
 
from kallithea.model.base import init_model
 
from kallithea.model.db import Repository, Setting, Ui, User
 
from kallithea.model.meta import Base, Session
 
from kallithea.model.permission import PermissionModel
 
from kallithea.model.user import UserModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class DbManage(object):
 
    def __init__(self, dbconf, root, tests=False, SESSION=None, cli_args=None):
 
        self.dbname = dbconf.split('/')[-1]
 
        self.tests = tests
 
        self.root = root
 
        self.dburi = dbconf
 
        self.cli_args = cli_args or {}
 
        self.init_db(SESSION=SESSION)
 

	
 
    def _ask_ok(self, msg):
 
        """Invoke ask_ok unless the force_ask option provides the answer"""
 
        force_ask = self.cli_args.get('force_ask')
 
        if force_ask is not None:
 
            return force_ask
 
        return ask_ok(msg)
 

	
 
    def init_db(self, SESSION=None):
 
        if SESSION:
 
            self.sa = SESSION
 
        else:
 
            # init new sessions
 
            engine = create_engine(self.dburi)
 
            init_model(engine)
 
            self.sa = Session()
 

	
 
    def create_tables(self, reuse_database=False):
 
        """
 
@@ -83,143 +84,126 @@ class DbManage(object):
 
        database = url.database
 
        if reuse_database:
 
            log.info("The content of the database %r will be destroyed and new tables created." % database)
 
        else:
 
            log.info("The existing database %r will be destroyed and a new one created." % database)
 

	
 
        if not self.tests:
 
            if not self._ask_ok('Are you sure to destroy old database? [y/n]'):
 
                print('Nothing done.')
 
                sys.exit(0)
 

	
 
        if reuse_database:
 
            Base.metadata.drop_all()
 
        else:
 
            if url.drivername == 'mysql':
 
                url.database = None  # don't connect to the database (it might not exist)
 
                engine = sqlalchemy.create_engine(url)
 
                with engine.connect() as conn:
 
                    conn.execute('DROP DATABASE IF EXISTS `%s`' % database)
 
                    conn.execute('CREATE DATABASE `%s` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci' % database)
 
            elif url.drivername == 'postgresql':
 
                from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
 
                url.database = 'postgres'  # connect to the system database (as the real one might not exist)
 
                engine = sqlalchemy.create_engine(url)
 
                with engine.connect() as conn:
 
                    conn.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
 
                    conn.execute('DROP DATABASE IF EXISTS "%s"' % database)
 
                    conn.execute('CREATE DATABASE "%s"' % database)
 
            else:
 
                # Some databases enforce foreign key constraints and Base.metadata.drop_all() doesn't work, but this is
 
                # known to work on SQLite - possibly not on other databases with strong referential integrity
 
                Base.metadata.drop_all()
 

	
 
        Base.metadata.create_all(checkfirst=False)
 

	
 
        # Create an Alembic configuration and generate the version table,
 
        # "stamping" it with the most recent Alembic migration revision, to
 
        # tell Alembic that all the schema upgrades are already in effect.
 
        alembic_cfg = alembic.config.Config()
 
        alembic_cfg.set_main_option('script_location', 'kallithea:alembic')
 
        alembic_cfg.set_main_option('sqlalchemy.url', self.dburi)
 
        # This command will give an error in an Alembic multi-head scenario,
 
        # but in practice, such a scenario should not come up during database
 
        # creation, even during development.
 
        alembic.command.stamp(alembic_cfg, 'head')
 

	
 
        log.info('Created tables for %s', self.dbname)
 

	
 
    def admin_prompt(self, second=False):
 
        if not self.tests:
 
            import getpass
 
    def create_admin_user(self):
 
        username = self.cli_args.get('username')
 
        password = self.cli_args.get('password')
 
        email = self.cli_args.get('email')
 

	
 
            username = self.cli_args.get('username')
 
            password = self.cli_args.get('password')
 
            email = self.cli_args.get('email')
 
        def get_password():
 
            password = getpass.getpass('Specify admin password '
 
                                       '(min 6 chars):')
 
            confirm = getpass.getpass('Confirm password:')
 

	
 
            def get_password():
 
                password = getpass.getpass('Specify admin password '
 
                                           '(min 6 chars):')
 
                confirm = getpass.getpass('Confirm password:')
 
            if password != confirm:
 
                log.error('passwords mismatch')
 
                return False
 
            if len(password) < 6:
 
                log.error('password is to short use at least 6 characters')
 
                return False
 

	
 
                if password != confirm:
 
                    log.error('passwords mismatch')
 
                    return False
 
                if len(password) < 6:
 
                    log.error('password is to short use at least 6 characters')
 
                    return False
 

	
 
                return password
 
            if username is None:
 
                username = input('Specify admin username:')
 
            if password is None:
 
            return password
 
        if username is None:
 
            username = input('Specify admin username:')
 
        if password is None:
 
            password = get_password()
 
            if not password:
 
                # second try
 
                password = get_password()
 
                if not password:
 
                    # second try
 
                    password = get_password()
 
                    if not password:
 
                        sys.exit()
 
            if email is None:
 
                email = input('Specify admin email:')
 
            self.create_user(username, password, email, True)
 
        else:
 
            log.info('creating admin and regular test users')
 
            from kallithea.tests.base import (TEST_USER_ADMIN_EMAIL, TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS, TEST_USER_REGULAR2_EMAIL,
 
                                              TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS, TEST_USER_REGULAR_EMAIL, TEST_USER_REGULAR_LOGIN,
 
                                              TEST_USER_REGULAR_PASS)
 

	
 
            self.create_user(TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS,
 
                             TEST_USER_ADMIN_EMAIL, True)
 

	
 
            self.create_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS,
 
                             TEST_USER_REGULAR_EMAIL, False)
 

	
 
            self.create_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS,
 
                             TEST_USER_REGULAR2_EMAIL, False)
 
                    sys.exit()
 
        if email is None:
 
            email = input('Specify admin email:')
 
        self.create_user(username, password, email, True)
 

	
 
    def create_auth_plugin_options(self, skip_existing=False):
 
        """
 
        Create default auth plugin settings, and make it active
 

	
 
        :param skip_existing:
 
        """
 

	
 
        for k, v, t in [('auth_plugins', 'kallithea.lib.auth_modules.auth_internal', 'list'),
 
                        ('auth_internal_enabled', 'True', 'bool')]:
 
            if skip_existing and Setting.get_by_name(k) is not None:
 
                log.debug('Skipping option %s', k)
 
                continue
 
            setting = Setting(k, v, t)
 
            self.sa.add(setting)
 

	
 
    def create_default_options(self, skip_existing=False):
 
        """Creates default settings"""
 

	
 
        for k, v, t in [
 
            ('default_repo_enable_downloads', False, 'bool'),
 
            ('default_repo_enable_statistics', False, 'bool'),
 
            ('default_repo_private', False, 'bool'),
 
            ('default_repo_type', 'hg', 'unicode')
 
        ]:
 
            if skip_existing and Setting.get_by_name(k) is not None:
 
                log.debug('Skipping option %s', k)
 
                continue
 
            setting = Setting(k, v, t)
 
            self.sa.add(setting)
 

	
 
    def prompt_repo_root_path(self, test_repo_path='', retries=3):
 
        _path = self.cli_args.get('repos_location')
 
        if retries == 3:
 
            log.info('Setting up repositories config')
 

	
 
        if _path is not None:
 
            path = _path
 
        elif not self.tests and not test_repo_path:
 
            path = input(
 
                 'Enter a valid absolute path to store repositories. '
 
                 'All repositories in that path will be added automatically:'
 
            )
 
        else:
 
            path = test_repo_path
 
        path_ok = True
 

	
 
        # check proper dir
kallithea/tests/fixture.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
Helpers for fixture generation
 
"""
 

	
 
import logging
 
import os
 
import shutil
 
import tarfile
 
from os.path import dirname
 

	
 
import mock
 
from tg import request
 
from tg.util.webtest import test_context
 

	
 
from kallithea.lib import helpers
 
from kallithea.lib.auth import AuthUser
 
from kallithea.lib.db_manage import DbManage
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.comment import ChangesetCommentsModel
 
from kallithea.model.db import ChangesetStatus, Gist, RepoGroup, Repository, User, UserGroup
 
from kallithea.model.gist import GistModel
 
from kallithea.model.meta import Session
 
from kallithea.model.pull_request import CreatePullRequestAction  # , CreatePullRequestIterationAction, PullRequestModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.tests.base import (GIT_REPO, HG_REPO, IP_ADDR, TEST_USER_ADMIN_EMAIL, TEST_USER_ADMIN_LOGIN, TEST_USER_REGULAR_LOGIN, TESTS_TMP_PATH,
 
                                  invalidate_all_caches)
 
from kallithea.tests.base import (GIT_REPO, HG_REPO, IP_ADDR, TEST_USER_ADMIN_EMAIL, TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS, TEST_USER_REGULAR2_EMAIL,
 
                                  TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS, TEST_USER_REGULAR_EMAIL, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS,
 
                                  TESTS_TMP_PATH, invalidate_all_caches)
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
FIXTURES = os.path.join(dirname(dirname(os.path.abspath(__file__))), 'tests', 'fixtures')
 

	
 

	
 
def raise_exception(*args, **kwargs):
 
    raise Exception('raise_exception raised exception')
 

	
 

	
 
class Fixture(object):
 

	
 
    def __init__(self):
 
        pass
 

	
 
    def anon_access(self, status):
 
        """
 
        Context manager for controlling anonymous access.
 
        Anon access will be set and committed, but restored again when exiting the block.
 

	
 
        Usage:
 

	
 
        fixture = Fixture()
 
        with fixture.anon_access(False):
 
            stuff
 
        """
 

	
 
        class context(object):
 
            def __enter__(self):
 
                anon = User.get_default_user()
 
                self._before = anon.active
 
                anon.active = status
 
                Session().commit()
 
                invalidate_all_caches()
 

	
 
            def __exit__(self, exc_type, exc_val, exc_tb):
 
                anon = User.get_default_user()
 
                anon.active = self._before
 
                Session().commit()
 

	
 
        return context()
 

	
 
    def _get_repo_create_params(self, **custom):
 
        """Return form values to be validated through RepoForm"""
 
        defs = dict(
 
            repo_name=None,
 
            repo_type='hg',
 
@@ -320,102 +321,109 @@ class Fixture(object):
 
                message=message,
 
                content=content,
 
                f_path=filename
 
            )
 
        return cs
 

	
 
    def review_changeset(self, repo, revision, status, author=TEST_USER_ADMIN_LOGIN):
 
        comment = ChangesetCommentsModel().create("review comment", repo, author, revision=revision, send_email=False)
 
        csm = ChangesetStatusModel().set_status(repo, ChangesetStatus.STATUS_APPROVED, author, comment, revision=revision)
 
        Session().commit()
 
        return csm
 

	
 
    def create_pullrequest(self, testcontroller, repo_name, pr_src_rev, pr_dst_rev, title='title'):
 
        org_ref = 'branch:stable:%s' % pr_src_rev
 
        other_ref = 'branch:default:%s' % pr_dst_rev
 
        with test_context(testcontroller.app): # needed to be able to mock request user
 
            org_repo = other_repo = Repository.get_by_repo_name(repo_name)
 
            owner_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 
            reviewers = [User.get_by_username(TEST_USER_REGULAR_LOGIN)]
 
            request.authuser = AuthUser(dbuser=owner_user)
 
            # creating a PR sends a message with an absolute URL - without routing that requires mocking
 
            with mock.patch.object(helpers, 'url', (lambda arg, qualified=False, **kwargs: ('https://localhost' if qualified else '') + '/fake/' + arg)):
 
                cmd = CreatePullRequestAction(org_repo, other_repo, org_ref, other_ref, title, 'No description', owner_user, reviewers)
 
                pull_request = cmd.execute()
 
            Session().commit()
 
        return pull_request.pull_request_id
 

	
 

	
 
#==============================================================================
 
# Global test environment setup
 
#==============================================================================
 

	
 
def create_test_env(repos_test_path, config, reuse_database):
 
    """
 
    Makes a fresh database and
 
    install test repository into tmp dir
 
    """
 

	
 
    # PART ONE create db
 
    dbconf = config['sqlalchemy.url']
 
    log.debug('making test db %s', dbconf)
 

	
 
    # create test dir if it doesn't exist
 
    if not os.path.isdir(repos_test_path):
 
        log.debug('Creating testdir %s', repos_test_path)
 
        os.makedirs(repos_test_path)
 

	
 
    dbmanage = DbManage(dbconf=dbconf, root=config['here'],
 
                        tests=True)
 
                        tests=True,
 
                        cli_args={
 
                            'username': TEST_USER_ADMIN_LOGIN,
 
                            'password': TEST_USER_ADMIN_PASS,
 
                            'email': TEST_USER_ADMIN_EMAIL,
 
                        })
 
    dbmanage.create_tables(reuse_database=reuse_database)
 
    # for tests dynamically set new root paths based on generated content
 
    dbmanage.create_settings(dbmanage.prompt_repo_root_path(repos_test_path))
 
    dbmanage.create_default_user()
 
    dbmanage.admin_prompt()
 
    dbmanage.create_admin_user()
 
    dbmanage.create_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS, TEST_USER_REGULAR_EMAIL, False)
 
    dbmanage.create_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS, TEST_USER_REGULAR2_EMAIL, False)
 
    dbmanage.create_permissions()
 
    dbmanage.populate_default_permissions()
 
    Session().commit()
 
    # PART TWO make test repo
 
    log.debug('making test vcs repositories')
 

	
 
    idx_path = config['index_dir']
 
    data_path = config['cache_dir']
 

	
 
    # clean index and data
 
    if idx_path and os.path.exists(idx_path):
 
        log.debug('remove %s', idx_path)
 
        shutil.rmtree(idx_path)
 

	
 
    if data_path and os.path.exists(data_path):
 
        log.debug('remove %s', data_path)
 
        shutil.rmtree(data_path)
 

	
 
    # CREATE DEFAULT TEST REPOS
 
    tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_hg.tar.gz'))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
    tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_git.tar.gz'))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, GIT_REPO))
 
    tar.close()
 

	
 
    # LOAD VCS test stuff
 
    from kallithea.tests.vcs import setup_package
 
    setup_package()
 

	
 

	
 
def create_test_index(repo_location, config, full_index):
 
    """
 
    Makes default test index
 
    """
 

	
 
    from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
    from kallithea.lib.pidlock import DaemonLock
 

	
 
    index_location = os.path.join(config['index_dir'])
 
    if not os.path.exists(index_location):
 
        os.makedirs(index_location)
 

	
 
    l = DaemonLock(os.path.join(index_location, 'make_index.lock'))
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location) \
 
        .run(full_index=full_index)
0 comments (0 inline, 0 general)