Changeset - a5746b83123f
[Not reviewed]
beta
5 1 6
Marcin Kuzminski - 13 years ago 2013-05-08 18:06:22
marcin@python-works.com
moved top-level tests to rhodecode/tests/other.

- There are no toplevel tests
- moved proper files to fixtures where they should be in the first place
7 files changed with 2 insertions and 3 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/utils.py
Show inline comments
 
@@ -276,541 +276,541 @@ def is_valid_repos_group(repos_group_nam
 
        get_scm(os.path.dirname(full_path))
 
        return False
 
    except VCSError:
 
        pass
 

	
 
    # check if it's a valid path
 
    if skip_path_check or os.path.isdir(full_path):
 
        return True
 

	
 
    return False
 

	
 

	
 
def ask_ok(prompt, retries=4, complaint='Yes or no please!'):
 
    while True:
 
        ok = raw_input(prompt)
 
        if ok in ('y', 'ye', 'yes'):
 
            return True
 
        if ok in ('n', 'no', 'nop', 'nope'):
 
            return False
 
        retries = retries - 1
 
        if retries < 0:
 
            raise IOError
 
        print complaint
 

	
 
#propagated from mercurial documentation
 
ui_sections = ['alias', 'auth',
 
                'decode/encode', 'defaults',
 
                'diff', 'email',
 
                'extensions', 'format',
 
                'merge-patterns', 'merge-tools',
 
                'hooks', 'http_proxy',
 
                'smtp', 'patch',
 
                'paths', 'profiling',
 
                'server', 'trusted',
 
                'ui', 'web', ]
 

	
 

	
 
def make_ui(read_from='file', path=None, checkpaths=True, clear_session=True):
 
    """
 
    A function that will read python rc files or database
 
    and make an mercurial ui object from read options
 

	
 
    :param path: path to mercurial config file
 
    :param checkpaths: check the path
 
    :param read_from: read from 'file' or 'db'
 
    """
 

	
 
    baseui = ui.ui()
 

	
 
    # clean the baseui object
 
    baseui._ocfg = config.config()
 
    baseui._ucfg = config.config()
 
    baseui._tcfg = config.config()
 

	
 
    if read_from == 'file':
 
        if not os.path.isfile(path):
 
            log.debug('hgrc file is not present at %s, skipping...' % path)
 
            return False
 
        log.debug('reading hgrc from %s' % path)
 
        cfg = config.config()
 
        cfg.read(path)
 
        for section in ui_sections:
 
            for k, v in cfg.items(section):
 
                log.debug('settings ui from file: [%s] %s=%s' % (section, k, v))
 
                baseui.setconfig(safe_str(section), safe_str(k), safe_str(v))
 

	
 
    elif read_from == 'db':
 
        sa = meta.Session()
 
        ret = sa.query(RhodeCodeUi)\
 
            .options(FromCache("sql_cache_short", "get_hg_ui_settings"))\
 
            .all()
 

	
 
        hg_ui = ret
 
        for ui_ in hg_ui:
 
            if ui_.ui_active:
 
                log.debug('settings ui from db: [%s] %s=%s', ui_.ui_section,
 
                          ui_.ui_key, ui_.ui_value)
 
                baseui.setconfig(safe_str(ui_.ui_section), safe_str(ui_.ui_key),
 
                                 safe_str(ui_.ui_value))
 
            if ui_.ui_key == 'push_ssl':
 
                # force set push_ssl requirement to False, rhodecode
 
                # handles that
 
                baseui.setconfig(safe_str(ui_.ui_section), safe_str(ui_.ui_key),
 
                                 False)
 
        if clear_session:
 
            meta.Session.remove()
 
    return baseui
 

	
 

	
 
def set_rhodecode_config(config):
 
    """
 
    Updates pylons config with new settings from database
 

	
 
    :param config:
 
    """
 
    hgsettings = RhodeCodeSetting.get_app_settings()
 

	
 
    for k, v in hgsettings.items():
 
        config[k] = v
 

	
 

	
 
def set_vcs_config(config):
 
    """
 
    Patch VCS config with some RhodeCode specific stuff
 

	
 
    :param config: rhodecode.CONFIG
 
    """
 
    import rhodecode
 
    from rhodecode.lib.vcs import conf
 
    from rhodecode.lib.utils2 import aslist
 
    conf.settings.BACKENDS = {
 
        'hg': 'rhodecode.lib.vcs.backends.hg.MercurialRepository',
 
        'git': 'rhodecode.lib.vcs.backends.git.GitRepository',
 
    }
 

	
 
    conf.settings.GIT_EXECUTABLE_PATH = config.get('git_path', 'git')
 
    conf.settings.GIT_REV_FILTER = config.get('git_rev_filter', '--all').strip()
 
    conf.settings.DEFAULT_ENCODINGS = aslist(config.get('default_encoding',
 
                                                        'utf8'), sep=',')
 

	
 

	
 
def map_groups(path):
 
    """
 
    Given a full path to a repository, create all nested groups that this
 
    repo is inside. This function creates parent-child relationships between
 
    groups and creates default perms for all new groups.
 

	
 
    :param paths: full path to repository
 
    """
 
    sa = meta.Session()
 
    groups = path.split(Repository.url_sep())
 
    parent = None
 
    group = None
 

	
 
    # last element is repo in nested groups structure
 
    groups = groups[:-1]
 
    rgm = ReposGroupModel(sa)
 
    owner = User.get_first_admin()
 
    for lvl, group_name in enumerate(groups):
 
        group_name = '/'.join(groups[:lvl] + [group_name])
 
        group = RepoGroup.get_by_group_name(group_name)
 
        desc = '%s group' % group_name
 

	
 
        # skip folders that are now removed repos
 
        if REMOVED_REPO_PAT.match(group_name):
 
            break
 

	
 
        if group is None:
 
            log.debug('creating group level: %s group_name: %s'
 
                      % (lvl, group_name))
 
            group = RepoGroup(group_name, parent)
 
            group.group_description = desc
 
            group.user = owner
 
            sa.add(group)
 
            perm_obj = rgm._create_default_perms(group)
 
            sa.add(perm_obj)
 
            sa.flush()
 

	
 
        parent = group
 
    return group
 

	
 

	
 
def repo2db_mapper(initial_repo_list, remove_obsolete=False,
 
                   install_git_hook=False):
 
    """
 
    maps all repos given in initial_repo_list, non existing repositories
 
    are created, if remove_obsolete is True it also check for db entries
 
    that are not in initial_repo_list and removes them.
 

	
 
    :param initial_repo_list: list of repositories found by scanning methods
 
    :param remove_obsolete: check for obsolete entries in database
 
    :param install_git_hook: if this is True, also check and install githook
 
        for a repo if missing
 
    """
 
    from rhodecode.model.repo import RepoModel
 
    from rhodecode.model.scm import ScmModel
 
    sa = meta.Session()
 
    rm = RepoModel()
 
    user = User.get_first_admin()
 
    added = []
 

	
 
    ##creation defaults
 
    defs = RhodeCodeSetting.get_default_repo_settings(strip_prefix=True)
 
    enable_statistics = defs.get('repo_enable_statistics')
 
    enable_locking = defs.get('repo_enable_locking')
 
    enable_downloads = defs.get('repo_enable_downloads')
 
    private = defs.get('repo_private')
 

	
 
    for name, repo in initial_repo_list.items():
 
        group = map_groups(name)
 
        db_repo = rm.get_by_repo_name(name)
 
        # found repo that is on filesystem not in RhodeCode database
 
        if not db_repo:
 
            log.info('repository %s not found, creating now' % name)
 
            added.append(name)
 
            desc = (repo.description
 
                    if repo.description != 'unknown'
 
                    else '%s repository' % name)
 

	
 
            new_repo = rm.create_repo(
 
                repo_name=name,
 
                repo_type=repo.alias,
 
                description=desc,
 
                repos_group=getattr(group, 'group_id', None),
 
                owner=user,
 
                just_db=True,
 
                enable_locking=enable_locking,
 
                enable_downloads=enable_downloads,
 
                enable_statistics=enable_statistics,
 
                private=private
 
            )
 
            # we added that repo just now, and make sure it has githook
 
            # installed
 
            if new_repo.repo_type == 'git':
 
                ScmModel().install_git_hook(new_repo.scm_instance)
 
            new_repo.update_changeset_cache()
 
        elif install_git_hook:
 
            if db_repo.repo_type == 'git':
 
                ScmModel().install_git_hook(db_repo.scm_instance)
 

	
 
    sa.commit()
 
    removed = []
 
    if remove_obsolete:
 
        # remove from database those repositories that are not in the filesystem
 
        for repo in sa.query(Repository).all():
 
            if repo.repo_name not in initial_repo_list.keys():
 
                log.debug("Removing non-existing repository found in db `%s`" %
 
                          repo.repo_name)
 
                try:
 
                    removed.append(repo.repo_name)
 
                    RepoModel(sa).delete(repo, forks='detach', fs_remove=False)
 
                    sa.commit()
 
                except Exception:
 
                    #don't hold further removals on error
 
                    log.error(traceback.format_exc())
 
                    sa.rollback()
 
    return added, removed
 

	
 

	
 
# set cache regions for beaker so celery can utilise it
 
def add_cache(settings):
 
    cache_settings = {'regions': None}
 
    for key in settings.keys():
 
        for prefix in ['beaker.cache.', 'cache.']:
 
            if key.startswith(prefix):
 
                name = key.split(prefix)[1].strip()
 
                cache_settings[name] = settings[key].strip()
 
    if cache_settings['regions']:
 
        for region in cache_settings['regions'].split(','):
 
            region = region.strip()
 
            region_settings = {}
 
            for key, value in cache_settings.items():
 
                if key.startswith(region):
 
                    region_settings[key.split('.')[1]] = value
 
            region_settings['expire'] = int(region_settings.get('expire',
 
                                                                60))
 
            region_settings.setdefault('lock_dir',
 
                                       cache_settings.get('lock_dir'))
 
            region_settings.setdefault('data_dir',
 
                                       cache_settings.get('data_dir'))
 

	
 
            if 'type' not in region_settings:
 
                region_settings['type'] = cache_settings.get('type',
 
                                                             'memory')
 
            beaker.cache.cache_regions[region] = region_settings
 

	
 

	
 
def load_rcextensions(root_path):
 
    import rhodecode
 
    from rhodecode.config import conf
 

	
 
    path = os.path.join(root_path, 'rcextensions', '__init__.py')
 
    if os.path.isfile(path):
 
        rcext = create_module('rc', path)
 
        EXT = rhodecode.EXTENSIONS = rcext
 
        log.debug('Found rcextensions now loading %s...' % rcext)
 

	
 
        # Additional mappings that are not present in the pygments lexers
 
        conf.LANGUAGES_EXTENSIONS_MAP.update(getattr(EXT, 'EXTRA_MAPPINGS', {}))
 

	
 
        #OVERRIDE OUR EXTENSIONS FROM RC-EXTENSIONS (if present)
 

	
 
        if getattr(EXT, 'INDEX_EXTENSIONS', []) != []:
 
            log.debug('settings custom INDEX_EXTENSIONS')
 
            conf.INDEX_EXTENSIONS = getattr(EXT, 'INDEX_EXTENSIONS', [])
 

	
 
        #ADDITIONAL MAPPINGS
 
        log.debug('adding extra into INDEX_EXTENSIONS')
 
        conf.INDEX_EXTENSIONS.extend(getattr(EXT, 'EXTRA_INDEX_EXTENSIONS', []))
 

	
 
        # auto check if the module is not missing any data, set to default if is
 
        # this will help autoupdate new feature of rcext module
 
        from rhodecode.config import rcextensions
 
        for k in dir(rcextensions):
 
            if not k.startswith('_') and not hasattr(EXT, k):
 
                setattr(EXT, k, getattr(rcextensions, k))
 

	
 

	
 
def get_custom_lexer(extension):
 
    """
 
    returns a custom lexer if it's defined in rcextensions module, or None
 
    if there's no custom lexer defined
 
    """
 
    import rhodecode
 
    from pygments import lexers
 
    #check if we didn't define this extension as other lexer
 
    if rhodecode.EXTENSIONS and extension in rhodecode.EXTENSIONS.EXTRA_LEXERS:
 
        _lexer_name = rhodecode.EXTENSIONS.EXTRA_LEXERS[extension]
 
        return lexers.get_lexer_by_name(_lexer_name)
 

	
 

	
 
#==============================================================================
 
# TEST FUNCTIONS AND CREATORS
 
#==============================================================================
 
def create_test_index(repo_location, config, full_index):
 
    """
 
    Makes default test index
 

	
 
    :param config: test config
 
    :param full_index:
 
    """
 

	
 
    from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
 
    from rhodecode.lib.pidlock import DaemonLock, LockHeld
 

	
 
    repo_location = repo_location
 

	
 
    index_location = os.path.join(config['app_conf']['index_dir'])
 
    if not os.path.exists(index_location):
 
        os.makedirs(index_location)
 

	
 
    try:
 
        l = DaemonLock(file_=jn(dn(index_location), 'make_index.lock'))
 
        WhooshIndexingDaemon(index_location=index_location,
 
                             repo_location=repo_location)\
 
            .run(full_index=full_index)
 
        l.release()
 
    except LockHeld:
 
        pass
 

	
 

	
 
def create_test_env(repos_test_path, config):
 
    """
 
    Makes a fresh database and
 
    install test repository into tmp dir
 
    """
 
    from rhodecode.lib.db_manage import DbManage
 
    from rhodecode.tests import HG_REPO, GIT_REPO, TESTS_TMP_PATH
 

	
 
    # PART ONE create db
 
    dbconf = config['sqlalchemy.db1.url']
 
    log.debug('making test db %s' % dbconf)
 

	
 
    # create test dir if it doesn't exist
 
    if not os.path.isdir(repos_test_path):
 
        log.debug('Creating testdir %s' % repos_test_path)
 
        os.makedirs(repos_test_path)
 

	
 
    dbmanage = DbManage(log_sql=True, dbconf=dbconf, root=config['here'],
 
                        tests=True)
 
    dbmanage.create_tables(override=True)
 
    dbmanage.create_settings(dbmanage.config_prompt(repos_test_path))
 
    dbmanage.create_default_user()
 
    dbmanage.admin_prompt()
 
    dbmanage.create_permissions()
 
    dbmanage.populate_default_permissions()
 
    Session().commit()
 
    # PART TWO make test repo
 
    log.debug('making test vcs repositories')
 

	
 
    idx_path = config['app_conf']['index_dir']
 
    data_path = config['app_conf']['cache_dir']
 

	
 
    #clean index and data
 
    if idx_path and os.path.exists(idx_path):
 
        log.debug('remove %s' % idx_path)
 
        shutil.rmtree(idx_path)
 

	
 
    if data_path and os.path.exists(data_path):
 
        log.debug('remove %s' % data_path)
 
        shutil.rmtree(data_path)
 

	
 
    #CREATE DEFAULT TEST REPOS
 
    cur_dir = dn(dn(abspath(__file__)))
 
    tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_hg.tar.gz"))
 
    tar = tarfile.open(jn(cur_dir, 'tests', 'fixtures', "vcs_test_hg.tar.gz"))
 
    tar.extractall(jn(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
    cur_dir = dn(dn(abspath(__file__)))
 
    tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_git.tar.gz"))
 
    tar = tarfile.open(jn(cur_dir, 'tests', 'fixtures', "vcs_test_git.tar.gz"))
 
    tar.extractall(jn(TESTS_TMP_PATH, GIT_REPO))
 
    tar.close()
 

	
 
    #LOAD VCS test stuff
 
    from rhodecode.tests.vcs import setup_package
 
    setup_package()
 

	
 

	
 
#==============================================================================
 
# PASTER COMMANDS
 
#==============================================================================
 
class BasePasterCommand(Command):
 
    """
 
    Abstract Base Class for paster commands.
 

	
 
    The celery commands are somewhat aggressive about loading
 
    celery.conf, and since our module sets the `CELERY_LOADER`
 
    environment variable to our loader, we have to bootstrap a bit and
 
    make sure we've had a chance to load the pylons config off of the
 
    command line, otherwise everything fails.
 
    """
 
    min_args = 1
 
    min_args_error = "Please provide a paster config file as an argument."
 
    takes_config_file = 1
 
    requires_config_file = True
 

	
 
    def notify_msg(self, msg, log=False):
 
        """Make a notification to user, additionally if logger is passed
 
        it logs this action using given logger
 

	
 
        :param msg: message that will be printed to user
 
        :param log: logging instance, to use to additionally log this message
 

	
 
        """
 
        if log and isinstance(log, logging):
 
            log(msg)
 

	
 
    def run(self, args):
 
        """
 
        Overrides Command.run
 

	
 
        Checks for a config file argument and loads it.
 
        """
 
        if len(args) < self.min_args:
 
            raise BadCommand(
 
                self.min_args_error % {'min_args': self.min_args,
 
                                       'actual_args': len(args)})
 

	
 
        # Decrement because we're going to lob off the first argument.
 
        # @@ This is hacky
 
        self.min_args -= 1
 
        self.bootstrap_config(args[0])
 
        self.update_parser()
 
        return super(BasePasterCommand, self).run(args[1:])
 

	
 
    def update_parser(self):
 
        """
 
        Abstract method.  Allows for the class's parser to be updated
 
        before the superclass's `run` method is called.  Necessary to
 
        allow options/arguments to be passed through to the underlying
 
        celery command.
 
        """
 
        raise NotImplementedError("Abstract Method.")
 

	
 
    def bootstrap_config(self, conf):
 
        """
 
        Loads the pylons configuration.
 
        """
 
        from pylons import config as pylonsconfig
 

	
 
        self.path_to_ini_file = os.path.realpath(conf)
 
        conf = paste.deploy.appconfig('config:' + self.path_to_ini_file)
 
        pylonsconfig.init_app(conf.global_conf, conf.local_conf)
 

	
 
    def _init_session(self):
 
        """
 
        Inits SqlAlchemy Session
 
        """
 
        logging.config.fileConfig(self.path_to_ini_file)
 
        from pylons import config
 
        from rhodecode.model import init_model
 
        from rhodecode.lib.utils2 import engine_from_config
 

	
 
        #get to remove repos !!
 
        add_cache(config)
 
        engine = engine_from_config(config, 'sqlalchemy.db1.')
 
        init_model(engine)
 

	
 

	
 
def check_git_version():
 
    """
 
    Checks what version of git is installed in system, and issues a warning
 
    if it's too old for RhodeCode to properly work.
 
    """
 
    from rhodecode import BACKENDS
 
    from rhodecode.lib.vcs.backends.git.repository import GitRepository
 
    from rhodecode.lib.vcs.conf import settings
 
    from distutils.version import StrictVersion
 

	
 
    stdout, stderr = GitRepository._run_git_command('--version', _bare=True,
 
                                                    _safe=True)
 

	
 
    ver = (stdout.split(' ')[-1] or '').strip() or '0.0.0'
 
    if len(ver.split('.')) > 3:
 
        #StrictVersion needs to be only 3 element type
 
        ver = '.'.join(ver.split('.')[:3])
 
    try:
 
        _ver = StrictVersion(ver)
 
    except Exception:
 
        _ver = StrictVersion('0.0.0')
 
        stderr = traceback.format_exc()
 

	
 
    req_ver = '1.7.4'
 
    to_old_git = False
 
    if  _ver < StrictVersion(req_ver):
 
        to_old_git = True
 

	
 
    if 'git' in BACKENDS:
 
        log.debug('GIT executable: "%s" version detected: %s'
 
                  % (settings.GIT_EXECUTABLE_PATH, stdout))
 
        if stderr:
 
            log.warning('Unable to detect git version, org error was: %r' % stderr)
 
        elif to_old_git:
 
            log.warning('RhodeCode detected git version %s, which is too old '
 
                        'for the system to function properly. Make sure '
 
                        'its version is at least %s' % (ver, req_ver))
 
    return _ver
 

	
 

	
 
@decorator.decorator
 
def jsonify(func, *args, **kwargs):
 
    """Action decorator that formats output for JSON
 

	
 
    Given a function that will return content, this decorator will turn
 
    the result into JSON, with a content-type of 'application/json' and
 
    output it.
 

	
 
    """
 
    from pylons.decorators.util import get_pylons
 
    from rhodecode.lib.compat import json
 
    pylons = get_pylons(args)
 
    pylons.response.headers['Content-Type'] = 'application/json; charset=utf-8'
 
    data = func(*args, **kwargs)
 
    if isinstance(data, (list, tuple)):
 
        msg = "JSON responses with Array envelopes are susceptible to " \
 
              "cross-site data leak attacks, see " \
 
              "http://wiki.pylonshq.com/display/pylonsfaq/Warnings"
 
        warnings.warn(msg, Warning, 2)
 
        log.warning(msg)
 
    log.debug("Returning JSON wrapped action output")
 
    return json.dumps(data, encoding='utf-8')
rhodecode/tests/fixtures/vcs_test_git.tar.gz
Show inline comments
 
file renamed from rhodecode/tests/vcs_test_git.tar.gz to rhodecode/tests/fixtures/vcs_test_git.tar.gz
rhodecode/tests/fixtures/vcs_test_hg.tar.gz
Show inline comments
 
file renamed from rhodecode/tests/vcs_test_hg.tar.gz to rhodecode/tests/fixtures/vcs_test_hg.tar.gz
rhodecode/tests/other/__init__.py
Show inline comments
 
new file 100644
rhodecode/tests/other/test_libs.py
Show inline comments
 
file renamed from rhodecode/tests/test_libs.py to rhodecode/tests/other/test_libs.py
rhodecode/tests/other/test_validators.py
Show inline comments
 
file renamed from rhodecode/tests/test_validators.py to rhodecode/tests/other/test_validators.py
 
# -*- coding: utf-8 -*-
 
import unittest
 
import formencode
 

	
 
from rhodecode.tests import *
 

	
 
from rhodecode.model import validators as v
 
from rhodecode.model.users_group import UserGroupModel
 

	
 
from rhodecode.model.meta import Session
 
from rhodecode.model.repos_group import ReposGroupModel
 
from rhodecode.config.routing import ADMIN_PREFIX
 
from rhodecode.model.db import ChangesetStatus, Repository
 
from rhodecode.model.changeset_status import ChangesetStatusModel
 
from rhodecode.tests.fixture import Fixture
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestReposGroups(unittest.TestCase):
 

	
 
    def setUp(self):
 
        pass
 

	
 
    def tearDown(self):
 
        Session.remove()
 

	
 
    def test_Message_extractor(self):
 
        validator = v.ValidUsername()
 
        self.assertRaises(formencode.Invalid, validator.to_python, 'default')
 

	
 
        class StateObj(object):
 
            pass
 

	
 
        self.assertRaises(formencode.Invalid,
 
                          validator.to_python, 'default', StateObj)
 

	
 
    def test_ValidUsername(self):
 
        validator = v.ValidUsername()
 

	
 
        self.assertRaises(formencode.Invalid, validator.to_python, 'default')
 
        self.assertRaises(formencode.Invalid, validator.to_python, 'new_user')
 
        self.assertRaises(formencode.Invalid, validator.to_python, '.,')
 
        self.assertRaises(formencode.Invalid, validator.to_python,
 
                          TEST_USER_ADMIN_LOGIN)
 
        self.assertEqual('test', validator.to_python('test'))
 

	
 
        validator = v.ValidUsername(edit=True, old_data={'user_id': 1})
 

	
 
    def test_ValidRepoUser(self):
 
        validator = v.ValidRepoUser()
 
        self.assertRaises(formencode.Invalid, validator.to_python, 'nouser')
 
        self.assertEqual(TEST_USER_ADMIN_LOGIN,
 
                         validator.to_python(TEST_USER_ADMIN_LOGIN))
 

	
 
    def test_ValidUserGroup(self):
 
        validator = v.ValidUserGroup()
 
        self.assertRaises(formencode.Invalid, validator.to_python, 'default')
 
        self.assertRaises(formencode.Invalid, validator.to_python, '.,')
 

	
 
        gr = fixture.create_user_group('test')
 
        gr2 = fixture.create_user_group('tes2')
 
        Session().commit()
 
        self.assertRaises(formencode.Invalid, validator.to_python, 'test')
 
        assert gr.users_group_id != None
 
        validator = v.ValidUserGroup(edit=True,
 
                                    old_data={'users_group_id':
 
                                              gr2.users_group_id})
 

	
 
        self.assertRaises(formencode.Invalid, validator.to_python, 'test')
 
        self.assertRaises(formencode.Invalid, validator.to_python, 'TesT')
 
        self.assertRaises(formencode.Invalid, validator.to_python, 'TEST')
 
        UserGroupModel().delete(gr)
 
        UserGroupModel().delete(gr2)
 
        Session().commit()
 

	
 
    def test_ValidReposGroup(self):
 
        validator = v.ValidReposGroup()
 
        model = ReposGroupModel()
 
        self.assertRaises(formencode.Invalid, validator.to_python,
 
                          {'group_name': HG_REPO, })
 
        gr = model.create(group_name='test_gr', group_description='desc',
 
                          parent=None,
 
                          just_db=True,
 
                          owner=TEST_USER_ADMIN_LOGIN)
 
        self.assertRaises(formencode.Invalid,
 
                          validator.to_python, {'group_name': gr.group_name, })
 

	
 
        validator = v.ValidReposGroup(edit=True,
 
                                      old_data={'group_id':  gr.group_id})
 
        self.assertRaises(formencode.Invalid,
 
                          validator.to_python, {
 
                                        'group_name': gr.group_name + 'n',
 
                                        'group_parent_id': gr.group_id
 
                                        })
 
        model.delete(gr)
 

	
 
    def test_ValidPassword(self):
 
        validator = v.ValidPassword()
 
        self.assertEqual('lol', validator.to_python('lol'))
 
        self.assertEqual(None, validator.to_python(None))
 
        self.assertRaises(formencode.Invalid, validator.to_python, 'ąćżź')
 

	
 
    def test_ValidPasswordsMatch(self):
 
        validator = v.ValidPasswordsMatch()
 
        self.assertRaises(formencode.Invalid,
 
                    validator.to_python, {'password': 'pass',
 
                                          'password_confirmation': 'pass2'})
 

	
 
        self.assertRaises(formencode.Invalid,
 
                    validator.to_python, {'new_password': 'pass',
 
                                          'password_confirmation': 'pass2'})
 

	
 
        self.assertEqual({'new_password': 'pass',
 
                          'password_confirmation': 'pass'},
 
                    validator.to_python({'new_password': 'pass',
 
                                         'password_confirmation': 'pass'}))
 

	
 
        self.assertEqual({'password': 'pass',
 
                          'password_confirmation': 'pass'},
 
                    validator.to_python({'password': 'pass',
 
                                         'password_confirmation': 'pass'}))
 

	
 
    def test_ValidAuth(self):
 
        validator = v.ValidAuth()
 
        valid_creds = {
 
            'username': TEST_USER_REGULAR2_LOGIN,
 
            'password': TEST_USER_REGULAR2_PASS,
 
        }
 
        invalid_creds = {
 
            'username': 'err',
 
            'password': 'err',
 
        }
 
        self.assertEqual(valid_creds, validator.to_python(valid_creds))
 
        self.assertRaises(formencode.Invalid,
 
                          validator.to_python, invalid_creds)
 

	
 
    def test_ValidAuthToken(self):
 
        validator = v.ValidAuthToken()
 
        # this is untestable without a threadlocal
 
#        self.assertRaises(formencode.Invalid,
 
#                          validator.to_python, 'BadToken')
 
        validator
 

	
 
    def test_ValidRepoName(self):
 
        validator = v.ValidRepoName()
 

	
 
        self.assertRaises(formencode.Invalid,
 
                          validator.to_python, {'repo_name': ''})
 

	
 
        self.assertRaises(formencode.Invalid,
 
                          validator.to_python, {'repo_name': HG_REPO})
 

	
 
        gr = ReposGroupModel().create(group_name='group_test',
 
                                      group_description='desc',
 
                                      parent=None,
 
                                      owner=TEST_USER_ADMIN_LOGIN)
 
        self.assertRaises(formencode.Invalid,
 
                          validator.to_python, {'repo_name': gr.group_name})
 

	
 
        #TODO: write an error case for that ie. create a repo withinh a group
 
#        self.assertRaises(formencode.Invalid,
 
#                          validator.to_python, {'repo_name': 'some',
 
#                                                'repo_group': gr.group_id})
 

	
 
    def test_ValidForkName(self):
 
        # this uses ValidRepoName validator
 
        assert True
 

	
 
    @parameterized.expand([
 
        ('test', 'test'), ('lolz!', 'lolz'), ('  aavv', 'aavv'),
 
        ('ala ma kota', 'ala-ma-kota'), ('@nooo', 'nooo'),
 
        ('$!haha lolz !', 'haha-lolz'), ('$$$$$', ''), ('{}OK!', 'OK'),
 
        ('/]re po', 're-po')])
 
    def test_SlugifyName(self, name, expected):
 
        validator = v.SlugifyName()
 
        self.assertEqual(expected, validator.to_python(name))
 

	
 
    def test_ValidCloneUri(self):
 
            #TODO: write this one
 
            pass
 

	
 
    def test_ValidForkType(self):
 
            validator = v.ValidForkType(old_data={'repo_type': 'hg'})
 
            self.assertEqual('hg', validator.to_python('hg'))
 
            self.assertRaises(formencode.Invalid, validator.to_python, 'git')
 

	
 
    def test_ValidPerms(self):
 
            #TODO: write this one
 
            pass
 

	
 
    def test_ValidSettings(self):
 
        validator = v.ValidSettings()
 
        self.assertEqual({'pass': 'pass'},
 
                         validator.to_python(value={'user': 'test',
 
                                                    'pass': 'pass'}))
 

	
 
        self.assertEqual({'user2': 'test', 'pass': 'pass'},
 
                         validator.to_python(value={'user2': 'test',
 
                                                    'pass': 'pass'}))
 

	
 
    def test_ValidPath(self):
 
            validator = v.ValidPath()
 
            self.assertEqual(TESTS_TMP_PATH,
 
                             validator.to_python(TESTS_TMP_PATH))
 
            self.assertRaises(formencode.Invalid, validator.to_python,
 
                              '/no_such_dir')
 

	
 
    def test_UniqSystemEmail(self):
 
        validator = v.UniqSystemEmail(old_data={})
 

	
 
        self.assertEqual('mail@python.org',
 
                         validator.to_python('MaiL@Python.org'))
 

	
 
        email = TEST_USER_REGULAR2_EMAIL
 
        self.assertRaises(formencode.Invalid, validator.to_python, email)
 

	
 
    def test_ValidSystemEmail(self):
 
        validator = v.ValidSystemEmail()
 
        email = TEST_USER_REGULAR2_EMAIL
 

	
 
        self.assertEqual(email, validator.to_python(email))
 
        self.assertRaises(formencode.Invalid, validator.to_python, 'err')
 

	
 
    def test_LdapLibValidator(self):
 
        if ldap_lib_installed:
 
            validator = v.LdapLibValidator()
 
            self.assertEqual("DN", validator.to_python('DN'))
 
        else:
 
            validator = v.LdapLibValidator()
 
            self.assertRaises(v.LdapImportError, validator.to_python, 'err')
 

	
 
    def test_AttrLoginValidator(self):
 
        validator = v.AttrLoginValidator()
 
        self.assertEqual('DN_attr', validator.to_python('DN_attr'))
 

	
 
    def test_NotReviewedRevisions(self):
 
        repo_id = Repository.get_by_repo_name(HG_REPO).repo_id
 
        validator = v.NotReviewedRevisions(repo_id)
 
        rev = '0' * 40
 
        # add status for a rev, that should throw an error because it is already
 
        # reviewed
 
        new_status = ChangesetStatus()
 
        new_status.author = ChangesetStatusModel()._get_user(TEST_USER_ADMIN_LOGIN)
 
        new_status.repo = ChangesetStatusModel()._get_repo(HG_REPO)
 
        new_status.status = ChangesetStatus.STATUS_APPROVED
 
        new_status.comment = None
 
        new_status.revision = rev
 
        Session().add(new_status)
 
        Session().commit()
 
        try:
 
            self.assertRaises(formencode.Invalid, validator.to_python, [rev])
 
        finally:
 
            Session().delete(new_status)
 
            Session().commit()
rhodecode/tests/other/test_vcs_operations.py
Show inline comments
 
file renamed from rhodecode/tests/scripts/test_vcs_operations.py to rhodecode/tests/other/test_vcs_operations.py
0 comments (0 inline, 0 general)