Changeset - b35f74c2d661
[Not reviewed]
default
0 3 0
Søren Løvborg - 9 years ago 2017-01-17 18:20:17
sorenl@unity3d.com
tests: move test environment setup code out of utils.py

Move create_test_index and create_test_env, which are two functions
that create test fixtures, into kallithea.tests.fixture.
3 files changed with 93 insertions and 92 deletions:
0 comments (0 inline, 0 general)
kallithea/config/environment.py
Show inline comments
 
@@ -2,138 +2,138 @@
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
    Pylons environment configuration
 
"""
 

	
 
import os
 
import logging
 
import kallithea
 
import platform
 

	
 
import pylons
 
import mako.lookup
 
import beaker
 
import formencode
 

	
 
import kallithea.lib.app_globals as app_globals
 

	
 
from kallithea.config.routing import make_map
 

	
 
from kallithea.lib import helpers
 
from kallithea.lib.auth import set_available_permissions
 
from kallithea.lib.utils import repo2db_mapper, make_ui, set_app_settings, \
 
    load_rcextensions, check_git_version, set_vcs_config, set_indexer_config
 
from kallithea.lib.utils2 import engine_from_config, str2bool
 
from kallithea.lib.db_manage import DbManage
 
from kallithea.model.base import init_model
 
from kallithea.model.scm import ScmModel
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def load_environment(global_conf, app_conf,
 
                     test_env=None, test_index=None):
 
    """
 
    Configure the Pylons environment via the ``pylons.config``
 
    object
 
    """
 
    config = pylons.configuration.PylonsConfig()
 

	
 
    # Pylons paths
 
    root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 
    paths = dict(
 
        root=root,
 
        controllers=os.path.join(root, 'controllers'),
 
        static_files=os.path.join(root, 'public'),
 
        templates=[os.path.join(root, 'templates')]
 
    )
 

	
 
    # Initialize config with the basic options
 
    config.init_app(global_conf, app_conf, package='kallithea', paths=paths)
 

	
 
    # store some globals into kallithea
 
    kallithea.CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
 
    kallithea.CELERY_EAGER = str2bool(config['app_conf'].get('celery.always.eager'))
 

	
 
    config['routes.map'] = make_map(config)
 
    config['pylons.app_globals'] = app_globals.Globals(config)
 
    config['pylons.h'] = helpers
 
    kallithea.CONFIG = config
 

	
 
    load_rcextensions(root_path=config['here'])
 

	
 
    # Setup cache object as early as possible
 
    pylons.cache._push_object(config['pylons.app_globals'].cache)
 

	
 
    # Create the Mako TemplateLookup, with the default auto-escaping
 
    config['pylons.app_globals'].mako_lookup = mako.lookup.TemplateLookup(
 
        directories=paths['templates'],
 
        strict_undefined=True,
 
        module_directory=os.path.join(app_conf['cache_dir'], 'templates'),
 
        input_encoding='utf-8', default_filters=['escape'],
 
        imports=['from webhelpers.html import escape'])
 

	
 
    # sets the c attribute access when don't existing attribute are accessed
 
    config['pylons.strict_tmpl_context'] = True
 
    test = os.path.split(config['__file__'])[-1] == 'test.ini'
 
    if test:
 
        if test_env is None:
 
            test_env = not int(os.environ.get('KALLITHEA_NO_TMP_PATH', 0))
 
        if test_index is None:
 
            test_index = not int(os.environ.get('KALLITHEA_WHOOSH_TEST_DISABLE', 0))
 
        if os.environ.get('TEST_DB'):
 
            # swap config if we pass enviroment variable
 
            config['sqlalchemy.url'] = os.environ.get('TEST_DB')
 

	
 
        from kallithea.lib.utils import create_test_env, create_test_index
 
        from kallithea.tests.base import TESTS_TMP_PATH
 
        from kallithea.tests.fixture import create_test_env, create_test_index
 
        #set KALLITHEA_NO_TMP_PATH=1 to disable re-creating the database and
 
        #test repos
 
        if test_env:
 
            create_test_env(TESTS_TMP_PATH, config)
 
        #set KALLITHEA_WHOOSH_TEST_DISABLE=1 to disable whoosh index during tests
 
        if test_index:
 
            create_test_index(TESTS_TMP_PATH, config, True)
 

	
 
    # MULTIPLE DB configs
 
    # Setup the SQLAlchemy database engine
 
    sa_engine = engine_from_config(config, 'sqlalchemy.')
 
    init_model(sa_engine)
 

	
 
    set_available_permissions(config)
 
    repos_path = make_ui('db').configitems('paths')[0][1]
 
    config['base_path'] = repos_path
 
    set_app_settings(config)
 

	
 
    instance_id = kallithea.CONFIG.get('instance_id', '*')
 
    if instance_id == '*':
 
        instance_id = '%s-%s' % (platform.uname()[1], os.getpid())
 
        kallithea.CONFIG['instance_id'] = instance_id
 

	
 
    # CONFIGURATION OPTIONS HERE (note: all config options will override
 
    # any Pylons config options)
 

	
 
    # store config reference into our module to skip import magic of
 
    # pylons
 
    kallithea.CONFIG.update(config)
 
    set_vcs_config(kallithea.CONFIG)
 
    set_indexer_config(kallithea.CONFIG)
 

	
 
    #check git version
 
    check_git_version()
 

	
 
    if str2bool(config.get('initial_repo_scan', True)):
 
        repo2db_mapper(ScmModel().repo_scan(repos_path),
 
                       remove_obsolete=False, install_git_hooks=False)
 
    formencode.api.set_stdtranslation(languages=[config.get('lang')])
 
    return config
kallithea/lib/utils.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.utils
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Utilities library for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 18, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import re
 
import logging
 
import datetime
 
import traceback
 
import paste
 
import beaker
 
import tarfile
 
import shutil
 
from os.path import abspath
 
from os.path import dirname
 

	
 
from webhelpers.text import collapse, remove_formatting, strip_tags
 
from beaker.cache import _cache_decorate
 

	
 
from kallithea.lib.vcs.utils.hgcompat import ui, config
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.exceptions import VCSError
 

	
 
from kallithea.model import meta
 
from kallithea.model.db import Repository, User, Ui, \
 
    UserLog, RepoGroup, Setting, UserGroup
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.lib.utils2 import safe_str, safe_unicode, get_current_authuser
 
from kallithea.lib.vcs.utils.fakemod import create_module
 

	
 
log = logging.getLogger(__name__)
 

	
 
REMOVED_REPO_PAT = re.compile(r'rm__\d{8}_\d{6}_\d{6}_.*')
 

	
 

	
 
def recursive_replace(str_, replace=' '):
 
    """
 
    Recursive replace of given sign to just one instance
 

	
 
    :param str_: given string
 
    :param replace: char to find and replace multiple instances
 

	
 
    Examples::
 
    >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
 
    'Mighty-Mighty-Bo-sstones'
 
    """
 

	
 
    if str_.find(replace * 2) == -1:
 
        return str_
 
    else:
 
        str_ = str_.replace(replace * 2, replace)
 
        return recursive_replace(str_, replace)
 

	
 

	
 
def repo_name_slug(value):
 
    """
 
    Return slug of name of repository
 
    This function is called on each creation/modification
 
    of repository to prevent bad names in repo
 
    """
 

	
 
    slug = remove_formatting(value)
 
    slug = strip_tags(slug)
 

	
 
    for c in """`?=[]\;'"<>,/~!@#$%^&*()+{}|: """:
 
        slug = slug.replace(c, '-')
 
    slug = recursive_replace(slug, '-')
 
    slug = collapse(slug, '-')
 
    return slug
 

	
 

	
 
#==============================================================================
 
# PERM DECORATOR HELPERS FOR EXTRACTING NAMES FOR PERM CHECKS
 
#==============================================================================
 
def get_repo_slug(request):
 
    _repo = request.environ['pylons.routes_dict'].get('repo_name')
 
    if _repo:
 
        _repo = _repo.rstrip('/')
 
    return _repo
 

	
 

	
 
def get_repo_group_slug(request):
 
    _group = request.environ['pylons.routes_dict'].get('group_name')
 
    if _group:
 
        _group = _group.rstrip('/')
 
    return _group
 

	
 

	
 
def get_user_group_slug(request):
 
    _group = request.environ['pylons.routes_dict'].get('id')
 
    _group = UserGroup.get(_group)
 
    if _group:
 
        return _group.users_group_name
 
    return None
 

	
 

	
 
def _extract_id_from_repo_name(repo_name):
 
    if repo_name.startswith('/'):
 
        repo_name = repo_name.lstrip('/')
 
    by_id_match = re.match(r'^_(\d{1,})', repo_name)
 
    if by_id_match:
 
        return by_id_match.groups()[0]
 

	
 

	
 
def get_repo_by_id(repo_name):
 
    """
 
    Extracts repo_name by id from special urls. Example url is _11/repo_name
 

	
 
    :param repo_name:
 
    :return: repo_name if matched else None
 
    """
 
@@ -489,273 +484,191 @@ def repo2db_mapper(initial_repo_list, re
 
    ##creation defaults
 
    defs = Setting.get_default_repo_settings(strip_prefix=True)
 
    enable_statistics = defs.get('repo_enable_statistics')
 
    enable_locking = defs.get('repo_enable_locking')
 
    enable_downloads = defs.get('repo_enable_downloads')
 
    private = defs.get('repo_private')
 

	
 
    for name, repo in initial_repo_list.items():
 
        group = map_groups(name)
 
        unicode_name = safe_unicode(name)
 
        db_repo = repo_model.get_by_repo_name(unicode_name)
 
        # found repo that is on filesystem not in Kallithea database
 
        if not db_repo:
 
            log.info('repository %s not found, creating now', name)
 
            added.append(name)
 
            desc = (repo.description
 
                    if repo.description != 'unknown'
 
                    else '%s repository' % name)
 

	
 
            new_repo = repo_model._create_repo(
 
                repo_name=name,
 
                repo_type=repo.alias,
 
                description=desc,
 
                repo_group=getattr(group, 'group_id', None),
 
                owner=user,
 
                enable_locking=enable_locking,
 
                enable_downloads=enable_downloads,
 
                enable_statistics=enable_statistics,
 
                private=private,
 
                state=Repository.STATE_CREATED
 
            )
 
            sa.commit()
 
            # we added that repo just now, and make sure it has githook
 
            # installed, and updated server info
 
            if new_repo.repo_type == 'git':
 
                git_repo = new_repo.scm_instance
 
                ScmModel().install_git_hooks(git_repo)
 
                # update repository server-info
 
                log.debug('Running update server info')
 
                git_repo._update_server_info()
 
            new_repo.update_changeset_cache()
 
        elif install_git_hooks:
 
            if db_repo.repo_type == 'git':
 
                ScmModel().install_git_hooks(db_repo.scm_instance, force_create=overwrite_git_hooks)
 

	
 
    removed = []
 
    # remove from database those repositories that are not in the filesystem
 
    unicode_initial_repo_list = set(safe_unicode(name) for name in initial_repo_list)
 
    for repo in sa.query(Repository).all():
 
        if repo.repo_name not in unicode_initial_repo_list:
 
            if remove_obsolete:
 
                log.debug("Removing non-existing repository found in db `%s`",
 
                          repo.repo_name)
 
                try:
 
                    RepoModel(sa).delete(repo, forks='detach', fs_remove=False)
 
                    sa.commit()
 
                except Exception:
 
                    #don't hold further removals on error
 
                    log.error(traceback.format_exc())
 
                    sa.rollback()
 
            removed.append(repo.repo_name)
 
    return added, removed
 

	
 

	
 
def load_rcextensions(root_path):
 
    import kallithea
 
    from kallithea.config import conf
 

	
 
    path = os.path.join(root_path, 'rcextensions', '__init__.py')
 
    if os.path.isfile(path):
 
        rcext = create_module('rc', path)
 
        EXT = kallithea.EXTENSIONS = rcext
 
        log.debug('Found rcextensions now loading %s...', rcext)
 

	
 
        # Additional mappings that are not present in the pygments lexers
 
        conf.LANGUAGES_EXTENSIONS_MAP.update(getattr(EXT, 'EXTRA_MAPPINGS', {}))
 

	
 
        #OVERRIDE OUR EXTENSIONS FROM RC-EXTENSIONS (if present)
 

	
 
        if getattr(EXT, 'INDEX_EXTENSIONS', []):
 
            log.debug('settings custom INDEX_EXTENSIONS')
 
            conf.INDEX_EXTENSIONS = getattr(EXT, 'INDEX_EXTENSIONS', [])
 

	
 
        #ADDITIONAL MAPPINGS
 
        log.debug('adding extra into INDEX_EXTENSIONS')
 
        conf.INDEX_EXTENSIONS.extend(getattr(EXT, 'EXTRA_INDEX_EXTENSIONS', []))
 

	
 
        # auto check if the module is not missing any data, set to default if is
 
        # this will help autoupdate new feature of rcext module
 
        #from kallithea.config import rcextensions
 
        #for k in dir(rcextensions):
 
        #    if not k.startswith('_') and not hasattr(EXT, k):
 
        #        setattr(EXT, k, getattr(rcextensions, k))
 

	
 

	
 
#==============================================================================
 
# TEST FUNCTIONS AND CREATORS
 
# MISC
 
#==============================================================================
 
def create_test_index(repo_location, config, full_index):
 
    """
 
    Makes default test index
 

	
 
    :param config: test config
 
    :param full_index:
 
    """
 

	
 
    from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
    from kallithea.lib.pidlock import DaemonLock, LockHeld
 

	
 
    index_location = os.path.join(config['app_conf']['index_dir'])
 
    if not os.path.exists(index_location):
 
        os.makedirs(index_location)
 

	
 
    try:
 
        l = DaemonLock(file_=os.path.join(dirname(index_location), 'make_index.lock'))
 
        WhooshIndexingDaemon(index_location=index_location,
 
                             repo_location=repo_location) \
 
            .run(full_index=full_index)
 
        l.release()
 
    except LockHeld:
 
        pass
 

	
 

	
 
def create_test_env(repos_test_path, config):
 
    """
 
    Makes a fresh database and
 
    install test repository into tmp dir
 
    """
 
    from kallithea.lib.db_manage import DbManage
 
    from kallithea.tests.base import HG_REPO, GIT_REPO, TESTS_TMP_PATH
 

	
 
    # PART ONE create db
 
    dbconf = config['sqlalchemy.url']
 
    log.debug('making test db %s', dbconf)
 

	
 
    # create test dir if it doesn't exist
 
    if not os.path.isdir(repos_test_path):
 
        log.debug('Creating testdir %s', repos_test_path)
 
        os.makedirs(repos_test_path)
 

	
 
    dbmanage = DbManage(log_sql=True, dbconf=dbconf, root=config['here'],
 
                        tests=True)
 
    dbmanage.create_tables(override=True)
 
    # for tests dynamically set new root paths based on generated content
 
    dbmanage.create_settings(dbmanage.config_prompt(repos_test_path))
 
    dbmanage.create_default_user()
 
    dbmanage.admin_prompt()
 
    dbmanage.create_permissions()
 
    dbmanage.populate_default_permissions()
 
    meta.Session().commit()
 
    # PART TWO make test repo
 
    log.debug('making test vcs repositories')
 

	
 
    idx_path = config['app_conf']['index_dir']
 
    data_path = config['app_conf']['cache_dir']
 

	
 
    #clean index and data
 
    if idx_path and os.path.exists(idx_path):
 
        log.debug('remove %s', idx_path)
 
        shutil.rmtree(idx_path)
 

	
 
    if data_path and os.path.exists(data_path):
 
        log.debug('remove %s', data_path)
 
        shutil.rmtree(data_path)
 

	
 
    #CREATE DEFAULT TEST REPOS
 
    cur_dir = dirname(dirname(abspath(__file__)))
 
    tar = tarfile.open(os.path.join(cur_dir, 'tests', 'fixtures', "vcs_test_hg.tar.gz"))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
    cur_dir = dirname(dirname(abspath(__file__)))
 
    tar = tarfile.open(os.path.join(cur_dir, 'tests', 'fixtures', "vcs_test_git.tar.gz"))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, GIT_REPO))
 
    tar.close()
 

	
 
    #LOAD VCS test stuff
 
    from kallithea.tests.vcs import setup_package
 
    setup_package()
 

	
 

	
 
def check_git_version():
 
    """
 
    Checks what version of git is installed in system, and issues a warning
 
    if it's too old for Kallithea to work properly.
 
    """
 
    from kallithea import BACKENDS
 
    from kallithea.lib.vcs.backends.git.repository import GitRepository
 
    from kallithea.lib.vcs.conf import settings
 
    from distutils.version import StrictVersion
 

	
 
    if 'git' not in BACKENDS:
 
        return None
 

	
 
    stdout, stderr = GitRepository._run_git_command(['--version'], _bare=True,
 
                                                    _safe=True)
 

	
 
    m = re.search("\d+.\d+.\d+", stdout)
 
    if m:
 
        ver = StrictVersion(m.group(0))
 
    else:
 
        ver = StrictVersion('0.0.0')
 

	
 
    req_ver = StrictVersion('1.7.4')
 

	
 
    log.debug('Git executable: "%s" version %s detected: %s',
 
              settings.GIT_EXECUTABLE_PATH, ver, stdout)
 
    if stderr:
 
        log.warning('Error detecting git version: %r', stderr)
 
    elif ver < req_ver:
 
        log.warning('Kallithea detected git version %s, which is too old '
 
                    'for the system to function properly. '
 
                    'Please upgrade to version %s or later.' % (ver, req_ver))
 
    return ver
 

	
 

	
 
#===============================================================================
 
# CACHE RELATED METHODS
 
#===============================================================================
 

	
 
# set cache regions for beaker so celery can utilise it
 
def setup_cache_regions(settings):
 
    cache_settings = {'regions': None}
 
    for key in settings.keys():
 
        for prefix in ['beaker.cache.', 'cache.']:
 
            if key.startswith(prefix):
 
                name = key.split(prefix)[1].strip()
 
                cache_settings[name] = settings[key].strip()
 
    if cache_settings['regions']:
 
        for region in cache_settings['regions'].split(','):
 
            region = region.strip()
 
            region_settings = {}
 
            for key, value in cache_settings.items():
 
                if key.startswith(region):
 
                    region_settings[key.split('.')[1]] = value
 
            region_settings['expire'] = int(region_settings.get('expire',
 
                                                                60))
 
            region_settings.setdefault('lock_dir',
 
                                       cache_settings.get('lock_dir'))
 
            region_settings.setdefault('data_dir',
 
                                       cache_settings.get('data_dir'))
 

	
 
            if 'type' not in region_settings:
 
                region_settings['type'] = cache_settings.get('type',
 
                                                             'memory')
 
            beaker.cache.cache_regions[region] = region_settings
 

	
 

	
 
def conditional_cache(region, prefix, condition, func):
 
    """
 

	
 
    Conditional caching function use like::
 
        def _c(arg):
 
            #heavy computation function
 
            return data
 

	
 
        # depending from condition the compute is wrapped in cache or not
 
        compute = conditional_cache('short_term', 'cache_desc', condition=True, func=func)
 
        return compute(arg)
 

	
 
    :param region: name of cache region
 
    :param prefix: cache region prefix
 
    :param condition: condition for cache to be triggered, and return data cached
 
    :param func: wrapped heavy function to compute
 

	
 
    """
 
    wrapped = func
 
    if condition:
 
        log.debug('conditional_cache: True, wrapping call of '
 
                  'func: %s into %s region cache' % (region, func))
 
        wrapped = _cache_decorate((prefix,), None, None, region)(func)
 

	
 
    return wrapped
kallithea/tests/fixture.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
Helpers for fixture generation
 
"""
 

	
 
import logging
 
import os
 
import time
 
from kallithea.tests.base import *
 
import shutil
 
import tarfile
 
from os.path import dirname
 

	
 
from kallithea.model.db import Repository, User, RepoGroup, UserGroup, Gist
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.scm import ScmModel
 
from kallithea.lib.db_manage import DbManage
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from os.path import dirname
 
from kallithea.tests.base import invalidate_all_caches, GIT_REPO, HG_REPO, TESTS_TMP_PATH, TEST_USER_ADMIN_LOGIN
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
FIXTURES = os.path.join(dirname(dirname(os.path.abspath(__file__))), 'tests', 'fixtures')
 

	
 

	
 
def error_function(*args, **kwargs):
 
    raise Exception('Total Crash !')
 

	
 

	
 
class Fixture(object):
 

	
 
    def __init__(self):
 
        pass
 

	
 
    def anon_access(self, status):
 
        """
 
        Context manager for controlling anonymous access.
 
        Anon access will be set and committed, but restored again when exiting the block.
 

	
 
        Usage:
 

	
 
        fixture = Fixture()
 
        with fixture.anon_access(False):
 
            stuff
 
        """
 

	
 
        class context(object):
 
            def __enter__(self):
 
                anon = User.get_default_user()
 
                self._before = anon.active
 
                anon.active = status
 
                Session().commit()
 
                invalidate_all_caches()
 

	
 
            def __exit__(self, exc_type, exc_val, exc_tb):
 
                anon = User.get_default_user()
 
                anon.active = self._before
 
                Session().commit()
 

	
 
        return context()
 

	
 
    def _get_repo_create_params(self, **custom):
 
        defs = dict(
 
            repo_name=None,
 
            repo_type='hg',
 
            clone_uri='',
 
            repo_group=u'-1',
 
            repo_description=u'DESC',
 
            repo_private=False,
 
            repo_landing_rev='rev:tip',
 
            repo_copy_permissions=False,
 
            repo_state=Repository.STATE_CREATED,
 
        )
 
        defs.update(custom)
 
        if 'repo_name_full' not in custom:
 
            defs.update({'repo_name_full': defs['repo_name']})
 

	
 
        # fix the repo name if passed as repo_name_full
 
        if defs['repo_name']:
 
            defs['repo_name'] = defs['repo_name'].split('/')[-1]
 

	
 
        return defs
 

	
 
    def _get_group_create_params(self, **custom):
 
        defs = dict(
 
            group_name=None,
 
            group_description=u'DESC',
 
            parent_group_id=None,
 
            perms_updates=[],
 
            perms_new=[],
 
            enable_locking=False,
 
            recursive=False
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def _get_user_create_params(self, name, **custom):
 
        defs = dict(
 
            username=name,
 
            password='qweqwe',
 
            email='%s+test@example.com' % name,
 
            firstname=u'TestUser',
 
            lastname=u'Test',
 
            active=True,
 
            admin=False,
 
            extern_type='internal',
 
            extern_name=None
 
        )
 
        defs.update(custom)
 

	
 
        return defs
 

	
 
    def _get_user_group_create_params(self, name, **custom):
 
        defs = dict(
 
            users_group_name=name,
 
            user_group_description=u'DESC',
 
@@ -202,96 +210,176 @@ class Fixture(object):
 
                return user
 
        form_data = self._get_user_create_params(name, **kwargs)
 
        user = UserModel().create(form_data)
 
        Session().commit()
 
        user = User.get_by_username(user.username)
 
        return user
 

	
 
    def destroy_user(self, userid):
 
        UserModel().delete(userid)
 
        Session().commit()
 

	
 
    def create_user_group(self, name, **kwargs):
 
        if 'skip_if_exists' in kwargs:
 
            del kwargs['skip_if_exists']
 
            gr = UserGroup.get_by_group_name(group_name=name)
 
            if gr:
 
                return gr
 
        form_data = self._get_user_group_create_params(name, **kwargs)
 
        owner = kwargs.get('cur_user', TEST_USER_ADMIN_LOGIN)
 
        user_group = UserGroupModel().create(
 
            name=form_data['users_group_name'],
 
            description=form_data['user_group_description'],
 
            owner=owner, active=form_data['users_group_active'],
 
            group_data=form_data['user_group_data'])
 
        Session().commit()
 
        user_group = UserGroup.get_by_group_name(user_group.users_group_name)
 
        return user_group
 

	
 
    def destroy_user_group(self, usergroupid):
 
        UserGroupModel().delete(user_group=usergroupid, force=True)
 
        Session().commit()
 

	
 
    def create_gist(self, **kwargs):
 
        form_data = {
 
            'description': u'new-gist',
 
            'owner': TEST_USER_ADMIN_LOGIN,
 
            'gist_type': Gist.GIST_PUBLIC,
 
            'lifetime': -1,
 
            'gist_mapping': {'filename1.txt':{'content':'hello world'},}
 
        }
 
        form_data.update(kwargs)
 
        gist = GistModel().create(
 
            description=form_data['description'],owner=form_data['owner'],
 
            gist_mapping=form_data['gist_mapping'], gist_type=form_data['gist_type'],
 
            lifetime=form_data['lifetime']
 
        )
 
        Session().commit()
 

	
 
        return gist
 

	
 
    def destroy_gists(self, gistid=None):
 
        for g in Gist.query():
 
            if gistid:
 
                if gistid == g.gist_access_id:
 
                    GistModel().delete(g)
 
            else:
 
                GistModel().delete(g)
 
        Session().commit()
 

	
 
    def load_resource(self, resource_name, strip=True):
 
        with open(os.path.join(FIXTURES, resource_name), 'rb') as f:
 
            source = f.read()
 
            if strip:
 
                source = source.strip()
 

	
 
        return source
 

	
 
    def commit_change(self, repo, filename, content, message, vcs_type, parent=None, newfile=False):
 
        repo = Repository.get_by_repo_name(repo)
 
        _cs = parent
 
        if not parent:
 
            _cs = EmptyChangeset(alias=vcs_type)
 

	
 
        if newfile:
 
            nodes = {
 
                filename: {
 
                    'content': content
 
                }
 
            }
 
            cs = ScmModel().create_nodes(
 
                user=TEST_USER_ADMIN_LOGIN, repo=repo,
 
                message=message,
 
                nodes=nodes,
 
                parent_cs=_cs,
 
                author=TEST_USER_ADMIN_LOGIN,
 
            )
 
        else:
 
            cs = ScmModel().commit_change(
 
                repo=repo.scm_instance, repo_name=repo.repo_name,
 
                cs=parent, user=TEST_USER_ADMIN_LOGIN,
 
                author=TEST_USER_ADMIN_LOGIN,
 
                message=message,
 
                content=content,
 
                f_path=filename
 
            )
 
        return cs
 

	
 

	
 
#==============================================================================
 
# Global test environment setup
 
#==============================================================================
 

	
 
def create_test_env(repos_test_path, config):
 
    """
 
    Makes a fresh database and
 
    install test repository into tmp dir
 
    """
 

	
 
    # PART ONE create db
 
    dbconf = config['sqlalchemy.url']
 
    log.debug('making test db %s', dbconf)
 

	
 
    # create test dir if it doesn't exist
 
    if not os.path.isdir(repos_test_path):
 
        log.debug('Creating testdir %s', repos_test_path)
 
        os.makedirs(repos_test_path)
 

	
 
    dbmanage = DbManage(log_sql=True, dbconf=dbconf, root=config['here'],
 
                        tests=True)
 
    dbmanage.create_tables(override=True)
 
    # for tests dynamically set new root paths based on generated content
 
    dbmanage.create_settings(dbmanage.config_prompt(repos_test_path))
 
    dbmanage.create_default_user()
 
    dbmanage.admin_prompt()
 
    dbmanage.create_permissions()
 
    dbmanage.populate_default_permissions()
 
    Session().commit()
 
    # PART TWO make test repo
 
    log.debug('making test vcs repositories')
 

	
 
    idx_path = config['app_conf']['index_dir']
 
    data_path = config['app_conf']['cache_dir']
 

	
 
    #clean index and data
 
    if idx_path and os.path.exists(idx_path):
 
        log.debug('remove %s', idx_path)
 
        shutil.rmtree(idx_path)
 

	
 
    if data_path and os.path.exists(data_path):
 
        log.debug('remove %s', data_path)
 
        shutil.rmtree(data_path)
 

	
 
    #CREATE DEFAULT TEST REPOS
 
    tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_hg.tar.gz'))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
    tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_git.tar.gz'))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, GIT_REPO))
 
    tar.close()
 

	
 
    #LOAD VCS test stuff
 
    from kallithea.tests.vcs import setup_package
 
    setup_package()
 

	
 

	
 
def create_test_index(repo_location, config, full_index):
 
    """
 
    Makes default test index
 
    """
 

	
 
    from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
    from kallithea.lib.pidlock import DaemonLock, LockHeld
 

	
 
    index_location = os.path.join(config['app_conf']['index_dir'])
 
    if not os.path.exists(index_location):
 
        os.makedirs(index_location)
 

	
 
    try:
 
        l = DaemonLock(file_=os.path.join(dirname(index_location), 'make_index.lock'))
 
        WhooshIndexingDaemon(index_location=index_location,
 
                             repo_location=repo_location) \
 
            .run(full_index=full_index)
 
        l.release()
 
    except LockHeld:
 
        pass
0 comments (0 inline, 0 general)