Changeset - 8acbfa837180
[Not reviewed]
rhodecode/config/environment.py
Show inline comments
 
"""Pylons environment configuration"""
 
from mako.lookup import TemplateLookup
 
from pylons.configuration import PylonsConfig
 
from pylons.error import handle_mako_error
 
from rhodecode.config.routing import make_map
 
from rhodecode.lib.auth import set_available_permissions, set_base_path
 
from rhodecode.lib.utils import repo2db_mapper, make_ui, set_rhodecode_config
 
from rhodecode.model import init_model
 
from rhodecode.model.hg import HgModel
 
from sqlalchemy import engine_from_config
 
import logging
 
import os
 
import rhodecode.lib.app_globals as app_globals
 
import rhodecode.lib.helpers
 

	
 
log = logging.getLogger(__name__)
 

	
 
def load_environment(global_conf, app_conf, initial=False):
 
    """Configure the Pylons environment via the ``pylons.config``
 
    object
 
    """
 
    config = PylonsConfig()
 

	
 
    # Pylons paths
 
    root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 
    paths = dict(root=root,
 
                 controllers=os.path.join(root, 'controllers'),
 
                 static_files=os.path.join(root, 'public'),
 
                 templates=[os.path.join(root, 'templates')])
 

	
 
    # Initialize config with the basic options
 
    config.init_app(global_conf, app_conf, package='rhodecode', paths=paths)
 

	
 
    config['routes.map'] = make_map(config)
 
    config['pylons.app_globals'] = app_globals.Globals(config)
 
    config['pylons.h'] = rhodecode.lib.helpers
 

	
 
    # Setup cache object as early as possible
 
    import pylons
 
    pylons.cache._push_object(config['pylons.app_globals'].cache)
 

	
 
    # Create the Mako TemplateLookup, with the default auto-escaping
 
    config['pylons.app_globals'].mako_lookup = TemplateLookup(
 
        directories=paths['templates'],
 
        error_handler=handle_mako_error,
 
        module_directory=os.path.join(app_conf['cache_dir'], 'templates'),
 
        input_encoding='utf-8', default_filters=['escape'],
 
        imports=['from webhelpers.html import escape'])
 

	
 
    #sets the c attribute access when don't existing attribute are accessed
 
    config['pylons.strict_tmpl_context'] = True
 
    test = os.path.split(config['__file__'])[-1] == 'test.ini'
 
    if test:
 
        from rhodecode.lib.utils import create_test_env, create_test_index
 
        create_test_env('/tmp', config)
 
        create_test_index('/tmp', True)
 
        from rhodecode.tests import  TESTS_TMP_PATH
 
        create_test_env(TESTS_TMP_PATH, config)
 
        create_test_index(TESTS_TMP_PATH, True)
 

	
 
    #MULTIPLE DB configs
 
    # Setup the SQLAlchemy database engine
 
    if config['debug'] and not test:
 
        #use query time debugging.
 
        from rhodecode.lib.timerproxy import TimerProxy
 
        sa_engine_db1 = engine_from_config(config, 'sqlalchemy.db1.',
 
                                                            proxy=TimerProxy())
 
    else:
 
        sa_engine_db1 = engine_from_config(config, 'sqlalchemy.db1.')
 

	
 
    init_model(sa_engine_db1)
 
    #init baseui
 
    config['pylons.app_globals'].baseui = make_ui('db')
 

	
 
    g = config['pylons.app_globals']
 
    repo2db_mapper(HgModel().repo_scan(g.paths[0][1], g.baseui, initial))
 
    set_available_permissions(config)
 
    set_base_path(config)
 
    set_rhodecode_config(config)
 
    # CONFIGURATION OPTIONS HERE (note: all config options will override
 
    # any Pylons config options)
 

	
 
    return config
rhodecode/controllers/search.py
Show inline comments
 
#!/usr/bin/env python
 
# encoding: utf-8
 
# search controller for pylons
 
# Copyright (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
 
# 
 
# This program is free software; you can redistribute it and/or
 
# modify it under the terms of the GNU General Public License
 
# as published by the Free Software Foundation; version 2
 
# of the License or (at your opinion) any later version of the license.
 
# 
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
# 
 
# You should have received a copy of the GNU General Public License
 
# along with this program; if not, write to the Free Software
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 
# MA  02110-1301, USA.
 
"""
 
Created on Aug 7, 2010
 
search controller for pylons
 
@author: marcink
 
"""
 
from pylons import request, response, session, tmpl_context as c, url
 
from pylons import request, response, config, session, tmpl_context as c, url
 
from pylons.controllers.util import abort, redirect
 
from rhodecode.lib.auth import LoginRequired
 
from rhodecode.lib.base import BaseController, render
 
from rhodecode.lib.indexers import IDX_LOCATION, SCHEMA, IDX_NAME, ResultWrapper
 
from rhodecode.lib.indexers import SCHEMA, IDX_NAME, ResultWrapper
 
from webhelpers.paginate import Page
 
from webhelpers.util import update_params
 
from pylons.i18n.translation import _
 
from whoosh.index import open_dir, EmptyIndexError
 
from whoosh.qparser import QueryParser, QueryParserError
 
from whoosh.query import Phrase
 
import logging
 
import traceback
 

	
 
log = logging.getLogger(__name__)
 

	
 
class SearchController(BaseController):
 

	
 
    @LoginRequired()
 
    def __before__(self):
 
        super(SearchController, self).__before__()    
 
        super(SearchController, self).__before__()
 

	
 
    def index(self, search_repo=None):
 
        c.repo_name = search_repo
 
        c.formated_results = []
 
        c.runtime = ''
 
        c.cur_query = request.GET.get('q', None)
 
        c.cur_type = request.GET.get('type', 'source')
 
        c.cur_search = search_type = {'content':'content',
 
                                      'commit':'content',
 
                                      'path':'path',
 
                                      'repository':'repository'}\
 
                                      .get(c.cur_type, 'content')
 

	
 
        
 

	
 
        if c.cur_query:
 
            cur_query = c.cur_query.lower()
 
        
 

	
 
        if c.cur_query:
 
            p = int(request.params.get('page', 1))
 
            highlight_items = set()
 
            try:
 
                idx = open_dir(IDX_LOCATION, indexname=IDX_NAME)
 
                idx = open_dir(config['app_conf']['index_dir']
 
                               , indexname=IDX_NAME)
 
                searcher = idx.searcher()
 

	
 
                qp = QueryParser(search_type, schema=SCHEMA)
 
                if c.repo_name:
 
                    cur_query = u'repository:%s %s' % (c.repo_name, cur_query)
 
                try:
 
                    query = qp.parse(unicode(cur_query))
 
                    
 

	
 
                    if isinstance(query, Phrase):
 
                        highlight_items.update(query.words)
 
                    else:
 
                        for i in query.all_terms():
 
                            if i[0] == 'content':
 
                                highlight_items.add(i[1])
 

	
 
                    matcher = query.matcher(searcher)
 
                    
 

	
 
                    log.debug(query)
 
                    log.debug(highlight_items)
 
                    results = searcher.search(query)
 
                    res_ln = len(results)
 
                    c.runtime = '%s results (%.3f seconds)' \
 
                        % (res_ln, results.runtime)
 
                    
 

	
 
                    def url_generator(**kw):
 
                        return update_params("?q=%s&type=%s" \
 
                                           % (c.cur_query, c.cur_search), **kw)
 

	
 
                    c.formated_results = Page(
 
                                ResultWrapper(search_type, searcher, matcher,
 
                                              highlight_items),
 
                                page=p, item_count=res_ln,
 
                                items_per_page=10, url=url_generator)
 
                     
 
                    
 

	
 

	
 
                except QueryParserError:
 
                    c.runtime = _('Invalid search query. Try quoting it.')
 
                searcher.close()
 
            except (EmptyIndexError, IOError):
 
                log.error(traceback.format_exc())
 
                log.error('Empty Index data')
 
                c.runtime = _('There is no index to search in. Please run whoosh indexer')
 
                        
 

	
 
        # Return a rendered template
 
        return render('/search/search.html')
rhodecode/lib/utils.py
Show inline comments
 
#!/usr/bin/env python
 
# encoding: utf-8
 
# Utilities for RhodeCode
 
# Copyright (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
 
# This program is free software; you can redistribute it and/or
 
# modify it under the terms of the GNU General Public License
 
# as published by the Free Software Foundation; version 2
 
# of the License or (at your opinion) any later version of the license.
 
# 
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
# 
 
# You should have received a copy of the GNU General Public License
 
# along with this program; if not, write to the Free Software
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 
# MA  02110-1301, USA.
 
"""
 
Created on April 18, 2010
 
Utilities for RhodeCode
 
@author: marcink
 
"""
 

	
 
from UserDict import DictMixin
 
from mercurial import ui, config, hg
 
from mercurial.error import RepoError
 
from rhodecode.model import meta
 
from rhodecode.model.caching_query import FromCache
 
from rhodecode.model.db import Repository, User, RhodeCodeUi, RhodeCodeSettings, \
 
    UserLog
 
from rhodecode.model.repo import RepoModel
 
from rhodecode.model.user import UserModel
 
from vcs.backends.base import BaseChangeset
 
from paste.script import command
 
import ConfigParser
 
from vcs.utils.lazy import LazyProperty
 
import traceback
 
import datetime
 
import logging
 
import os
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def get_repo_slug(request):
 
    return request.environ['pylons.routes_dict'].get('repo_name')
 

	
 
def is_mercurial(environ):
 
    """
 
    Returns True if request's target is mercurial server - header
 
    ``HTTP_ACCEPT`` of such request would start with ``application/mercurial``.
 
    """
 
    http_accept = environ.get('HTTP_ACCEPT')
 
    if http_accept and http_accept.startswith('application/mercurial'):
 
        return True
 
    return False
 

	
 
def is_git(environ):
 
    """
 
    Returns True if request's target is git server. ``HTTP_USER_AGENT`` would
 
    then have git client version given.
 
    
 
    :param environ:
 
    """
 
    http_user_agent = environ.get('HTTP_USER_AGENT')
 
    if http_user_agent.startswith('git'):
 
    if http_user_agent and http_user_agent.startswith('git'):
 
        return True
 
    return False
 

	
 
def action_logger(user, action, repo, ipaddr, sa=None):
 
    """
 
    Action logger for various action made by users
 
    """
 

	
 
    if not sa:
 
        sa = meta.Session()
 

	
 
    try:
 
        if hasattr(user, 'user_id'):
 
            user_obj = user
 
        elif isinstance(user, basestring):
 
            user_obj = UserModel(sa).get_by_username(user, cache=False)
 
        else:
 
            raise Exception('You have to provide user object or username')
 

	
 
        repo_name = repo.lstrip('/')
 
        user_log = UserLog()
 
        user_log.user_id = user_obj.user_id
 
        user_log.action = action
 
        user_log.repository_name = repo_name
 
        user_log.repository = RepoModel(sa).get(repo_name, cache=False)
 
        user_log.action_date = datetime.datetime.now()
 
        user_log.user_ip = ipaddr
 
        sa.add(user_log)
 
        sa.commit()
 

	
 
        log.info('Adding user %s, action %s on %s',
 
                                        user_obj.username, action, repo)
 
    except:
 
        log.error(traceback.format_exc())
 
        sa.rollback()
 

	
 
def get_repos(path, recursive=False, initial=False):
 
    """
 
    Scans given path for repos and return (name,(type,path)) tuple 
 
    :param prefix:
 
    :param path:
 
    :param recursive:
 
    :param initial:
 
    """
 
    from vcs.utils.helpers import get_scm
 
    from vcs.exceptions import VCSError
 

	
 
    try:
 
        scm = get_scm(path)
 
    except:
 
        pass
 
    else:
 
        raise Exception('The given path %s should not be a repository got %s',
 
                        path, scm)
 

	
 
    for dirpath in os.listdir(path):
 
        try:
 
            yield dirpath, get_scm(os.path.join(path, dirpath))
 
        except VCSError:
 
            pass
 

	
 
if __name__ == '__main__':
 
    get_repos('', '/home/marcink/workspace-python')
 

	
 

	
 
def check_repo_fast(repo_name, base_path):
 
    if os.path.isdir(os.path.join(base_path, repo_name)):return False
 
    return True
 

	
 
def check_repo(repo_name, base_path, verify=True):
 

	
 
    repo_path = os.path.join(base_path, repo_name)
 

	
 
    try:
 
        if not check_repo_fast(repo_name, base_path):
 
            return False
 
        r = hg.repository(ui.ui(), repo_path)
 
        if verify:
 
            hg.verify(r)
 
        #here we hnow that repo exists it was verified
 
        log.info('%s repo is already created', repo_name)
 
        return False
 
    except RepoError:
 
        #it means that there is no valid repo there...
 
        log.info('%s repo is free for creation', repo_name)
 
        return True
 

	
 
def ask_ok(prompt, retries=4, complaint='Yes or no, please!'):
 
    while True:
 
        ok = raw_input(prompt)
 
        if ok in ('y', 'ye', 'yes'): return True
 
        if ok in ('n', 'no', 'nop', 'nope'): return False
 
        retries = retries - 1
 
        if retries < 0: raise IOError
 
        print complaint
 

	
 
def get_hg_ui_cached():
 
    try:
 
        sa = meta.Session
 
        ret = sa.query(RhodeCodeUi)\
 
        .options(FromCache("sql_cache_short", "get_hg_ui_settings"))\
 
        .all()
 
    except:
 
        pass
 
    finally:
 
        meta.Session.remove()
 
    return ret
 

	
 

	
 
def get_hg_settings():
 
    try:
 
        sa = meta.Session()
 
        ret = sa.query(RhodeCodeSettings)\
 
        .options(FromCache("sql_cache_short", "get_hg_settings"))\
 
        .all()
 
    except:
 
        pass
 
    finally:
 
        meta.Session.remove()
 

	
 
    if not ret:
 
        raise Exception('Could not get application settings !')
 
    settings = {}
 
    for each in ret:
 
        settings['rhodecode_' + each.app_settings_name] = each.app_settings_value
 

	
 
    return settings
 

	
 
def get_hg_ui_settings():
 
    try:
 
        sa = meta.Session()
 
        ret = sa.query(RhodeCodeUi).all()
 
    except:
 
        pass
 
    finally:
 
        meta.Session.remove()
 

	
 
    if not ret:
 
        raise Exception('Could not get application ui settings !')
 
    settings = {}
 
    for each in ret:
 
        k = each.ui_key
 
        v = each.ui_value
 
        if k == '/':
 
            k = 'root_path'
 

	
 
        if k.find('.') != -1:
 
            k = k.replace('.', '_')
 

	
 
        if each.ui_section == 'hooks':
 
            v = each.ui_active
 

	
 
        settings[each.ui_section + '_' + k] = v
 

	
 
    return settings
 

	
 
#propagated from mercurial documentation
 
ui_sections = ['alias', 'auth',
 
                'decode/encode', 'defaults',
 
                'diff', 'email',
 
                'extensions', 'format',
 
                'merge-patterns', 'merge-tools',
 
                'hooks', 'http_proxy',
 
                'smtp', 'patch',
 
                'paths', 'profiling',
 
                'server', 'trusted',
 
                'ui', 'web', ]
 

	
 
def make_ui(read_from='file', path=None, checkpaths=True):
 
    """
 
    A function that will read python rc files or database
 
    and make an mercurial ui object from read options
 
    
 
    :param path: path to mercurial config file
 
    :param checkpaths: check the path
 
    :param read_from: read from 'file' or 'db'
 
    """
 

	
 
    baseui = ui.ui()
 

	
 
    if read_from == 'file':
 
        if not os.path.isfile(path):
 
            log.warning('Unable to read config file %s' % path)
 
            return False
 
        log.debug('reading hgrc from %s', path)
 
        cfg = config.config()
 
        cfg.read(path)
 
        for section in ui_sections:
 
            for k, v in cfg.items(section):
 
                baseui.setconfig(section, k, v)
 
                log.debug('settings ui from file[%s]%s:%s', section, k, v)
 

	
 
    elif read_from == 'db':
 
        hg_ui = get_hg_ui_cached()
 
        for ui_ in hg_ui:
 
            if ui_.ui_active:
 
                log.debug('settings ui from db[%s]%s:%s', ui_.ui_section, ui_.ui_key, ui_.ui_value)
 
                baseui.setconfig(ui_.ui_section, ui_.ui_key, ui_.ui_value)
 

	
 

	
 
    return baseui
 

	
 

	
 
def set_rhodecode_config(config):
 
    hgsettings = get_hg_settings()
 

	
 
    for k, v in hgsettings.items():
 
        config[k] = v
 

	
 
def invalidate_cache(name, *args):
 
    """
 
    Puts cache invalidation task into db for 
 
    further global cache invalidation
 
    """
 
    pass
 

	
 
class EmptyChangeset(BaseChangeset):
 
    """
 
    An dummy empty changeset. It's possible to pass hash when creating
 
    an EmptyChangeset
 
    """
 

	
 
    def __init__(self, cs='0' * 40):
 
        self._empty_cs = cs
 
        self.revision = -1
 
        self.message = ''
 
        self.author = ''
 
        self.date = ''
 

	
 
    @LazyProperty
 
    def raw_id(self):
 
        """
 
        Returns raw string identifying this changeset, useful for web
 
        representation.
 
        """
 
        return self._empty_cs
 

	
 
    @LazyProperty
 
    def short_id(self):
 
        return self.raw_id[:12]
 

	
 
    def get_file_changeset(self, path):
 
        return self
 

	
 
    def get_file_content(self, path):
 
        return u''
 

	
 
    def get_file_size(self, path):
 
        return 0
 

	
 
def repo2db_mapper(initial_repo_list, remove_obsolete=False):
 
    """
 
    maps all found repositories into db
 
    """
 

	
 
    sa = meta.Session()
 
    rm = RepoModel(sa)
 
    user = sa.query(User).filter(User.admin == True).first()
 

	
 
    for name, repo in initial_repo_list.items():
 
        if not rm.get(name, cache=False):
 
            log.info('repository %s not found creating default', name)
 

	
 
            form_data = {
 
                         'repo_name':name,
 
                         'repo_type':repo.alias,
 
                         'description':repo.description \
 
                            if repo.description != 'unknown' else \
 
                                        '%s repository' % name,
 
                         'private':False
 
                         }
 
            rm.create(form_data, user, just_db=True)
 

	
 
    if remove_obsolete:
 
        #remove from database those repositories that are not in the filesystem
 
        for repo in sa.query(Repository).all():
 
            if repo.repo_name not in initial_repo_list.keys():
 
                sa.delete(repo)
 
                sa.commit()
 

	
 
class OrderedDict(dict, DictMixin):
 

	
 
    def __init__(self, *args, **kwds):
 
        if len(args) > 1:
 
            raise TypeError('expected at most 1 arguments, got %d' % len(args))
 
        try:
 
            self.__end
 
        except AttributeError:
 
            self.clear()
 
        self.update(*args, **kwds)
 

	
 
    def clear(self):
 
        self.__end = end = []
 
        end += [None, end, end]         # sentinel node for doubly linked list
 
        self.__map = {}                 # key --> [key, prev, next]
 
        dict.clear(self)
 

	
 
    def __setitem__(self, key, value):
 
        if key not in self:
 
            end = self.__end
 
            curr = end[1]
 
            curr[2] = end[1] = self.__map[key] = [key, curr, end]
 
        dict.__setitem__(self, key, value)
 

	
 
    def __delitem__(self, key):
 
        dict.__delitem__(self, key)
 
        key, prev, next = self.__map.pop(key)
 
        prev[2] = next
 
        next[1] = prev
 

	
 
    def __iter__(self):
 
        end = self.__end
 
        curr = end[2]
 
        while curr is not end:
 
            yield curr[0]
 
            curr = curr[2]
 

	
 
    def __reversed__(self):
 
        end = self.__end
 
        curr = end[1]
 
        while curr is not end:
 
            yield curr[0]
 
            curr = curr[1]
 

	
 
    def popitem(self, last=True):
 
        if not self:
 
            raise KeyError('dictionary is empty')
 
        if last:
 
            key = reversed(self).next()
 
        else:
 
            key = iter(self).next()
 
        value = self.pop(key)
 
        return key, value
 

	
 
    def __reduce__(self):
 
        items = [[k, self[k]] for k in self]
 
        tmp = self.__map, self.__end
 
        del self.__map, self.__end
 
        inst_dict = vars(self).copy()
 
        self.__map, self.__end = tmp
 
        if inst_dict:
 
            return (self.__class__, (items,), inst_dict)
 
        return self.__class__, (items,)
 

	
 
    def keys(self):
 
        return list(self)
 

	
 
    setdefault = DictMixin.setdefault
 
    update = DictMixin.update
 
    pop = DictMixin.pop
 
    values = DictMixin.values
 
    items = DictMixin.items
 
    iterkeys = DictMixin.iterkeys
 
    itervalues = DictMixin.itervalues
 
    iteritems = DictMixin.iteritems
 

	
 
    def __repr__(self):
 
        if not self:
 
            return '%s()' % (self.__class__.__name__,)
 
        return '%s(%r)' % (self.__class__.__name__, self.items())
 

	
 
    def copy(self):
 
        return self.__class__(self)
 

	
 
    @classmethod
 
    def fromkeys(cls, iterable, value=None):
 
        d = cls()
 
        for key in iterable:
 
            d[key] = value
 
        return d
 

	
 
    def __eq__(self, other):
 
        if isinstance(other, OrderedDict):
 
            return len(self) == len(other) and self.items() == other.items()
 
        return dict.__eq__(self, other)
 

	
 
    def __ne__(self, other):
 
        return not self == other
 

	
 

	
 
#===============================================================================
 
# TEST FUNCTIONS AND CREATORS
 
#===============================================================================
 
def create_test_index(repo_location, full_index):
 
    """Makes default test index
 
    :param repo_location:
 
    :param full_index:
 
    """
 
    from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
 
    from rhodecode.lib.pidlock import DaemonLock, LockHeld
 
    from rhodecode.lib.indexers import IDX_LOCATION
 
    import shutil
 

	
 
    if os.path.exists(IDX_LOCATION):
 
        shutil.rmtree(IDX_LOCATION)
 
    index_location = os.path.join(repo_location, 'index')
 
    if os.path.exists(index_location):
 
        shutil.rmtree(index_location)
 

	
 
    try:
 
        l = DaemonLock()
 
        WhooshIndexingDaemon(repo_location=repo_location)\
 
        WhooshIndexingDaemon(index_location=index_location,
 
                             repo_location=repo_location)\
 
            .run(full_index=full_index)
 
        l.release()
 
    except LockHeld:
 
        pass
 

	
 
def create_test_env(repos_test_path, config):
 
    """Makes a fresh database and 
 
    install test repository into tmp dir
 
    """
 
    from rhodecode.lib.db_manage import DbManage
 
    from rhodecode.tests import HG_REPO, GIT_REPO, NEW_HG_REPO, NEW_GIT_REPO, \
 
        HG_FORK, GIT_FORK, TESTS_TMP_PATH
 
    import tarfile
 
    import shutil
 
    from os.path import dirname as dn, join as jn, abspath
 

	
 
    log = logging.getLogger('TestEnvCreator')
 
    # create logger
 
    log.setLevel(logging.DEBUG)
 
    log.propagate = True
 
    # create console handler and set level to debug
 
    ch = logging.StreamHandler()
 
    ch.setLevel(logging.DEBUG)
 

	
 
    # create formatter
 
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 

	
 
    # add formatter to ch
 
    ch.setFormatter(formatter)
 

	
 
    # add ch to logger
 
    log.addHandler(ch)
 

	
 
    #PART ONE create db
 
    dbname = config['sqlalchemy.db1.url'].split('/')[-1]
 
    log.debug('making test db %s', dbname)
 

	
 
    dbmanage = DbManage(log_sql=True, dbname=dbname, root=config['here'],
 
                        tests=True)
 
    dbmanage.create_tables(override=True)
 
    dbmanage.config_prompt(repos_test_path)
 
    dbmanage.create_default_user()
 
    dbmanage.admin_prompt()
 
    dbmanage.create_permissions()
 
    dbmanage.populate_default_permissions()
 

	
 
    #PART TWO make test repo
 
    log.debug('making test vcs repo')
 
    if os.path.isdir('/tmp/vcs_test'):
 
        shutil.rmtree('/tmp/vcs_test')
 
    log.debug('making test vcs repositories')
 

	
 
    #remove old one from previos tests
 
    for r in [HG_REPO, GIT_REPO, NEW_HG_REPO, NEW_GIT_REPO, HG_FORK, GIT_FORK]:
 

	
 
        if os.path.isdir(jn(TESTS_TMP_PATH, r)):
 
            log.debug('removing %s', r)
 
            shutil.rmtree(jn(TESTS_TMP_PATH, r))
 

	
 
    #CREATE DEFAULT HG REPOSITORY
 
    cur_dir = dn(dn(abspath(__file__)))
 
    tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test.tar.gz"))
 
    tar.extractall('/tmp')
 
    tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_hg.tar.gz"))
 
    tar.extractall(jn(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
class UpgradeDb(command.Command):
 
    """Command used for paster to upgrade our database to newer version
 
    """
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    usage = "CONFIG_FILE"
 
    summary = "Upgrades current db to newer version given configuration file"
 
    group_name = "RhodeCode"
 

	
 
    parser = command.Command.standard_parser(verbose=True)
 

	
 
    parser.add_option('--sql',
 
                      action='store_true',
 
                      dest='just_sql',
 
                      help="Prints upgrade sql for further investigation",
 
                      default=False)
 
    def command(self):
 
        config_name = self.args[0]
 
        p = config_name.split('/')
 
        root = '.' if len(p) == 1 else '/'.join(p[:-1])
 
        config = ConfigParser.ConfigParser({'here':root})
 
        config.read(config_name)
rhodecode/tests/__init__.py
Show inline comments
 
"""Pylons application test package
 

	
 
This package assumes the Pylons environment is already loaded, such as
 
when this script is imported from the `nosetests --with-pylons=test.ini`
 
command.
 

	
 
This module initializes the application via ``websetup`` (`paster
 
setup-app`) and provides the base testing objects.
 
"""
 
from unittest import TestCase
 

	
 
from paste.deploy import loadapp
 
from paste.script.appinstall import SetupCommand
 
from pylons import config, url
 
from routes.util import URLGenerator
 
from webtest import TestApp
 
import os
 
from rhodecode.model import meta
 
import logging
 

	
 

	
 
log = logging.getLogger(__name__) 
 
log = logging.getLogger(__name__)
 

	
 
import pylons.test
 

	
 
__all__ = ['environ', 'url', 'TestController']
 
__all__ = ['environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO',
 
           'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK', 'GIT_FORK', ]
 

	
 
# Invoke websetup with the current config file
 
#SetupCommand('setup-app').run([config_file])
 

	
 
##RUNNING DESIRED TESTS
 
#nosetests rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
 

	
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 
TESTS_TMP_PATH = '/tmp'
 

	
 
HG_REPO = 'vcs_test_hg'
 
GIT_REPO = 'vcs_test_git'
 

	
 
NEW_HG_REPO = 'vcs_test_hg_new'
 
NEW_GIT_REPO = 'vcs_test_git_new'
 

	
 
HG_FORK = 'vcs_test_hg_fork'
 
GIT_FORK = 'vcs_test_git_fork'
 

	
 
class TestController(TestCase):
 

	
 
    def __init__(self, *args, **kwargs):
 
        wsgiapp = pylons.test.pylonsapp
 
        config = wsgiapp.config
 

	
 
        self.app = TestApp(wsgiapp)
 
        url._push_object(URLGenerator(config['routes.map'], environ))
 
        self.sa = meta.Session
 

	
 
        self.index_location = config['app_conf']['index_dir']
 
        TestCase.__init__(self, *args, **kwargs)
 
    
 

	
 
    def log_user(self, username='test_admin', password='test12'):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':username,
 
                                  'password':password})
 
        print response
 
        
 

	
 
        if 'invalid user name' in response.body:
 
            assert False, 'could not login using %s %s' % (username, password)
 
        
 

	
 
        assert response.status == '302 Found', 'Wrong response code from login got %s' % response.status
 
        assert response.session['rhodecode_user'].username == username, 'wrong logged in user got %s expected %s' % (response.session['rhodecode_user'].username, username)
 
        return response.follow()
rhodecode/tests/functional/test_branches.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestBranchesController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='branches', action='index',repo_name='vcs_test'))
 
        response = self.app.get(url(controller='branches', action='index', repo_name=HG_REPO))
 

	
 
        assert """<a href="/%s/changeset/27cd5cce30c96924232dffcd24178a07ffeb5dfc">default</a>""" % HG_REPO in response.body, 'wrong info about default branch'
 
        assert """<a href="/%s/changeset/97e8b885c04894463c51898e14387d80c30ed1ee">git</a>""" % HG_REPO in response.body, 'wrong info about default git'
 
        assert """<a href="/%s/changeset/2e6a2bf9356ca56df08807f4ad86d480da72a8f4">web</a>""" % HG_REPO in response.body, 'wrong info about default web'
 

	
 

	
 

	
 

	
 

	
 

	
 
        # Test response...
rhodecode/tests/functional/test_changelog.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestChangelogController(TestController):
 

	
 
    def test_index(self):
 
    def test_index_hg(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'))
 
        
 
        print response
 
        assert """<div id="chg_20" class="container">""" in response.body, 'wrong info about number ofchanges'
 
        response = self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO))
 

	
 
        assert """<div id="chg_20" class="container">""" in response.body, 'wrong info about number of changes'
 
        assert """Small update at simplevcs app""" in response.body, 'missing info about commit message'
 
        assert """<span class="removed" title="removed">0</span>""" in response.body, 'wrong info about removed nodes'
 
        assert """<span class="changed" title="changed">2</span>""" in response.body, 'wrong info about changed nodes'
 
        assert """<span class="added" title="added">1</span>""" in response.body, 'wrong info about added nodes'
 
        
 
        assert """<span class="removed" title="removed: ">0</span>""" in response.body, 'wrong info about removed nodes'
 
        assert """<span class="changed" title="changed: hg.py | models.py">2</span>""" in response.body, 'wrong info about changed nodes'
 
        assert """<span class="added" title="added: managers.py">1</span>""" in response.body, 'wrong info about added nodes'
 

	
 
        #pagination
 
        
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':1})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':2})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':3})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':4})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':5})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':6})
 

	
 
        response = self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO), {'page':1})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO), {'page':2})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO), {'page':3})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO), {'page':4})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO), {'page':5})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO), {'page':6})
 

	
 
        # Test response after pagination...
 

	
 
        assert """<span class="removed" title="removed">20</span>"""in response.body, 'wrong info about number of removed'
 
        assert """<span class="changed" title="changed">1</span>"""in response.body, 'wrong info about number of changes'
 
        assert """<span class="added" title="added">0</span>"""in response.body, 'wrong info about number of added'
 
        assert """<div class="date">commit 64: 46ad32a4f974@2010-04-20 00:33:21</div>"""in response.body, 'wrong info about commit 64'
 
        assert """<div class="message"><a href="/vcs_test/changeset/46ad32a4f974">Merge with 2e6a2bf9356ca56df08807f4ad86d480da72a8f4</a></div>"""in response.body, 'wrong info about commit 64 is a merge'
 
        
 
        assert """<span class="removed" title="removed: api.rst">1</span>"""in response.body, 'wrong info about number of removed'
 
        assert """<span class="changed" title="changed: .hgignore | README.rst | conf.py | index.rst | setup.py | test_hg.py | test_nodes.py | __init__.py | __init__.py | base.py | hg.py | nodes.py | __init__.py">13</span>"""in response.body, 'wrong info about number of changes'
 
        assert """<span class="added" title="added: hg.rst | index.rst | index.rst | nodes.rst | index.rst | simplevcs.rst | installation.rst | quickstart.rst | setup.cfg | baseui_config.py | web.py | __init__.py | exceptions.py | __init__.py | exceptions.py | middleware.py | models.py | settings.py | utils.py | views.py">20</span>"""in response.body, 'wrong info about number of added'
 
        assert """<div class="message"><a href="/%s/changeset/46ad32a4f974e45472a898c6b0acb600320579b1">Merge with 2e6a2bf9356ca56df08807f4ad86d480da72a8f4</a></div>""" % HG_REPO in response.body, 'wrong info about commit 64 is a merge'
 

	
 

	
 

	
 
    #def test_index_git(self):
 
    #    self.log_user()
 
    #    response = self.app.get(url(controller='changelog', action='index', repo_name=GIT_REPO))
rhodecode/tests/functional/test_changeset.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestChangesetController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                    repo_name='vcs_test',revision='tip'))
 
                                    repo_name=HG_REPO,revision='tip'))
 
        # Test response...
rhodecode/tests/functional/test_feed.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestFeedController(TestController):
 

	
 
    def test_rss(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='feed', action='rss',
 
                                    repo_name='vcs_test'))
 
                                    repo_name=HG_REPO))
 
        # Test response...
 

	
 
    def test_atom(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='feed', action='atom',
 
                                    repo_name='vcs_test'))
 
                                    repo_name=HG_REPO))
 
        # Test response...
 
\ No newline at end of file
rhodecode/tests/functional/test_files.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestFilesController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name='vcs_test',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='/'))
 
        # Test response...
rhodecode/tests/functional/test_hg.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestAdminController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='hg', action='index'))
 
        response = self.app.get(url(controller='home', action='index'))
 
        #if global permission is set
 
        assert 'ADD NEW REPOSITORY' in response.body, 'Wrong main page'
 
        assert 'href="/vcs_test/summary"' in response.body, ' mising repository in list'
 
        assert 'href="/%s/summary"' % HG_REPO in response.body, ' mising repository in list'
 
        # Test response...
rhodecode/tests/functional/test_login.py
Show inline comments
 
from rhodecode.tests import *
 
from rhodecode.model.db import User
 
from rhodecode.lib.auth import check_password
 

	
 

	
 
class TestLoginController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='login', action='index'))
 
        assert response.status == '200 OK', 'Wrong response from login page got %s' % response.status
 
        # Test response...
 

	
 
    def test_login_admin_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':'test_admin',
 
                                  'password':'test12'})
 
        assert response.status == '302 Found', 'Wrong response code from login got %s' % response.status
 
        assert response.session['rhodecode_user'].username == 'test_admin', 'wrong logged in user'
 
        response = response.follow()
 
        assert 'vcs_test repository' in response.body
 
        assert '%s repository' % HG_REPO in response.body
 

	
 
    def test_login_regular_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':'test_regular',
 
                                  'password':'test12'})
 
        print response
 
        assert response.status == '302 Found', 'Wrong response code from login got %s' % response.status
 
        assert response.session['rhodecode_user'].username == 'test_regular', 'wrong logged in user'
 
        response = response.follow()
 
        assert 'vcs_test repository' in response.body
 
        assert '%s repository' % HG_REPO in response.body
 
        assert '<a title="Admin" href="/_admin">' not in response.body
 

	
 
    def test_login_ok_came_from(self):
 
        test_came_from = '/_admin/users'
 
        response = self.app.post(url(controller='login', action='index', came_from=test_came_from),
 
                                 {'username':'test_admin',
 
                                  'password':'test12'})
 
        assert response.status == '302 Found', 'Wrong response code from came from redirection'
 
        response = response.follow()
 

	
 
        assert response.status == '200 OK', 'Wrong response from login page got %s' % response.status
 
        assert 'Users administration' in response.body, 'No proper title in response'
 

	
 

	
 
    def test_login_short_password(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':'error',
 
                                  'password':'test'})
 
        assert response.status == '200 OK', 'Wrong response from login page'
 
        print response.body
 
        assert 'Enter 6 characters or more' in response.body, 'No error password message in response'
 

	
 
    def test_login_wrong_username_password(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':'error',
 
                                  'password':'test12'})
 
        assert response.status == '200 OK', 'Wrong response from login page'
 

	
 
        assert 'invalid user name' in response.body, 'No error username message in response'
 
        assert 'invalid password' in response.body, 'No error password message in response'
 

	
 

	
 
    def test_register(self):
 
        response = self.app.get(url(controller='login', action='register'))
 
        assert 'Sign Up to rhodecode' in response.body, 'wrong page for user registration'
 

	
 
    def test_register_err_same_username(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username':'test_admin',
 
                                             'password':'test',
 
                                             'email':'goodmail@domain.com',
 
                                             'name':'test',
 
                                             'lastname':'test'})
 

	
 
        assert response.status == '200 OK', 'Wrong response from register page got %s' % response.status
 
        assert 'This username already exists' in response.body
 

	
 
    def test_register_err_wrong_data(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username':'xs',
 
                                             'password':'',
 
                                             'email':'goodmailm',
 
                                             'name':'test',
 
                                             'lastname':'test'})
 

	
 
        assert response.status == '200 OK', 'Wrong response from register page got %s' % response.status
 
        assert 'An email address must contain a single @' in response.body
 
        assert 'Please enter a value' in response.body
 

	
 

	
 

	
 
    def test_register_ok(self):
 
        username = 'test_regular4'
 
        password = 'qweqwe'
 
        email = 'marcin@test.com'
 
        name = 'testname'
 
        lastname = 'testlastname'
 

	
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username':username,
 
                                             'password':password,
 
                                             'email':email,
 
                                             'name':name,
 
                                             'lastname':lastname})
 
        print response.body
 
        assert response.status == '302 Found', 'Wrong response from register page got %s' % response.status
 
        assert 'You have successfully registered into rhodecode' in response.session['flash'][0], 'No flash message about user registration'
 

	
 
        ret = self.sa.query(User).filter(User.username == 'test_regular4').one()
 
        assert ret.username == username , 'field mismatch %s %s' % (ret.username, username)
 
        assert check_password(password, ret.password) == True , 'password mismatch'
 
        assert ret.email == email , 'field mismatch %s %s' % (ret.email, email)
 
        assert ret.name == name , 'field mismatch %s %s' % (ret.name, name)
 
        assert ret.lastname == lastname , 'field mismatch %s %s' % (ret.lastname, lastname)
 

	
 

	
 
    def test_forgot_password_wrong_mail(self):
 
        response = self.app.post(url(controller='login', action='password_reset'),
 
                                            {'email':'marcin@wrongmail.org', })
 

	
 
        assert "That e-mail address doesn't exist" in response.body, 'Missing error message about wrong email'
 

	
 
    def test_forgot_password(self):
 
        response = self.app.get(url(controller='login', action='password_reset'))
 
        assert response.status == '200 OK', 'Wrong response from login page got %s' % response.status
 

	
 
        username = 'test_password_reset_1'
 
        password = 'qweqwe'
 
        email = 'marcin@python-works.com'
 
        name = 'passwd'
 
        lastname = 'reset'
 

	
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username':username,
 
                                             'password':password,
 
                                             'email':email,
 
                                             'name':name,
 
                                             'lastname':lastname})
 
        #register new user for email test
 
        response = self.app.post(url(controller='login', action='password_reset'),
 
                                            {'email':email, })
 
        print response.session['flash']
 
        assert 'You have successfully registered into rhodecode' in response.session['flash'][0], 'No flash message about user registration'
 
        assert 'Your new password was sent' in response.session['flash'][1], 'No flash message about password reset'
 

	
 

	
 

	
rhodecode/tests/functional/test_repos.py
Show inline comments
 
from rhodecode.model.db import Repository
 
from rhodecode.tests import *
 

	
 
class TestReposController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('repos'))
 
        # Test response...
 

	
 
    def test_index_as_xml(self):
 
        response = self.app.get(url('formatted_repos', format='xml'))
 

	
 
    def test_create(self):
 
    def test_create_hg(self):
 
        self.log_user()
 
        repo_name = 'vcs_test_new'
 
        repo_name = NEW_HG_REPO
 
        description = 'description for newly created repo'
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name':repo_name,
 
                                               'description':description,
 
                                               'private':private})
 
                                                'repo_type':'hg',
 
                                                'description':description,
 
                                                'private':private})
 

	
 
        print response
 
        
 

	
 
        #test if we have a message for that repository
 
        print '-' * 100
 
        print response.session
 
        assert '''created repository %s''' % (repo_name) in response.session['flash'][0], 'No flash message about new repo'
 
                      
 

	
 
        #test if the fork was created in the database
 
        new_repo = self.sa.query(Repository).filter(Repository.repo_name == repo_name).one()
 
        
 

	
 
        assert new_repo.repo_name == repo_name, 'wrong name of repo name in db'
 
        assert new_repo.description == description, 'wrong description'
 
        
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 
        
 

	
 
        assert repo_name in response.body, 'missing new repo from the main repos list'
 
        
 
                
 

	
 
    def test_create_git(self):
 
        self.log_user()
 
        repo_name = NEW_GIT_REPO
 
        description = 'description for newly created repo'
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name':repo_name,
 
                                                'repo_type':'git',
 
                                                'description':description,
 
                                                'private':private})
 

	
 
        print response
 

	
 
        #test if we have a message for that repository
 
        print '-' * 100
 
        print response.session
 
        assert '''created repository %s''' % (repo_name) in response.session['flash'][0], 'No flash message about new repo'
 

	
 
        #test if the fork was created in the database
 
        new_repo = self.sa.query(Repository).filter(Repository.repo_name == repo_name).one()
 

	
 
        assert new_repo.repo_name == repo_name, 'wrong name of repo name in db'
 
        assert new_repo.description == description, 'wrong description'
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        assert repo_name in response.body, 'missing new repo from the main repos list'
 

	
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_repo'))
 

	
 
    def test_new_as_xml(self):
 
        response = self.app.get(url('formatted_new_repo', format='xml'))
 

	
 
    def test_update(self):
 
        response = self.app.put(url('repo', repo_name='vcs_test'))
 
        response = self.app.put(url('repo', repo_name=HG_REPO))
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('repo', repo_name='vcs_test'), params=dict(_method='put'))
 
        response = self.app.post(url('repo', repo_name=HG_REPO), params=dict(_method='put'))
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        repo_name = 'vcs_test_new_to_delete'
 
        description = 'description for newly created repo'
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name':repo_name,
 
                                                'repo_type':'hg',
 
                                               'description':description,
 
                                               'private':private})
 

	
 
        print response
 
        
 

	
 
        #test if we have a message for that repository
 
        print '-' * 100
 
        print response.session
 
        assert '''created repository %s''' % (repo_name) in response.session['flash'][0], 'No flash message about new repo'
 
                      
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.sa.query(Repository).filter(Repository.repo_name == repo_name).one()
 
        
 

	
 
        assert new_repo.repo_name == repo_name, 'wrong name of repo name in db'
 
        assert new_repo.description == description, 'wrong description'
 
        
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 
        
 

	
 
        assert repo_name in response.body, 'missing new repo from the main repos list'
 
        
 
                
 

	
 

	
 
        response = self.app.delete(url('repo', repo_name=repo_name))
 
        
 

	
 
        print '-' * 100
 
        print response.session
 
        assert '''deleted repository %s''' % (repo_name) in response.session['flash'][0], 'No flash message about delete repo'
 
                
 

	
 
        response.follow()
 
        
 

	
 
        #check if repo was deleted from db
 
        deleted_repo = self.sa.query(Repository).filter(Repository.repo_name == repo_name).scalar()
 
        
 

	
 
        assert deleted_repo is None, 'Deleted repository was found in db'
 
        
 

	
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('repo', repo_name='vcs_test'), params=dict(_method='delete'))
 
        response = self.app.post(url('repo', repo_name=HG_REPO), params=dict(_method='delete'))
 

	
 
    def test_show(self):
 
        self.log_user()
 
        response = self.app.get(url('repo', repo_name='vcs_test'))
 
        response = self.app.get(url('repo', repo_name=HG_REPO))
 

	
 
    def test_show_as_xml(self):
 
        response = self.app.get(url('formatted_repo', repo_name='vcs_test', format='xml'))
 
        response = self.app.get(url('formatted_repo', repo_name=HG_REPO, format='xml'))
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_repo', repo_name='vcs_test'))
 
        response = self.app.get(url('edit_repo', repo_name=HG_REPO))
 

	
 
    def test_edit_as_xml(self):
 
        response = self.app.get(url('formatted_edit_repo', repo_name='vcs_test', format='xml'))
 
        response = self.app.get(url('formatted_edit_repo', repo_name=HG_REPO, format='xml'))
rhodecode/tests/functional/test_search.py
Show inline comments
 
from rhodecode.tests import *
 
from rhodecode.lib.indexers import IDX_LOCATION
 
import os
 
from nose.plugins.skip import SkipTest
 

	
 
class TestSearchController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'))
 
        print response.body
 
        assert 'class="small" id="q" name="q" type="text"' in response.body, 'Search box content error'
 
        # Test response...
 

	
 
    def test_empty_search(self):
 
        
 
        if os.path.isdir(IDX_LOCATION):
 
        if os.path.isdir(self.index_location):
 
            raise SkipTest('skipped due to existing index')
 
        else:
 
            self.log_user()
 
            response = self.app.get(url(controller='search', action='index'), {'q':'vcs_test'})
 
            response = self.app.get(url(controller='search', action='index'), {'q':HG_REPO})
 
            assert 'There is no index to search in. Please run whoosh indexer' in response.body, 'No error message about empty index'
 
        
 

	
 
    def test_normal_search(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'), {'q':'def repo'})
 
        print response.body
 
        assert '10 results' in response.body, 'no message about proper search results'
 
        assert 'Permission denied' not in response.body, 'Wrong permissions settings for that repo and user'
 
        
 
    
 

	
 

	
 
    def test_repo_search(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'), {'q':'repository:vcs_test def test'})
 
        response = self.app.get(url(controller='search', action='index'), {'q':'repository:%s def test' % HG_REPO})
 
        print response.body
 
        assert '4 results' in response.body, 'no message about proper search results'
 
        assert 'Permission denied' not in response.body, 'Wrong permissions settings for that repo and user'
 
        
 

	
rhodecode/tests/functional/test_settings.py
Show inline comments
 
from rhodecode.model.db import Repository
 
from rhodecode.tests import *
 

	
 
class TestSettingsController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='settings', action='index',
 
                                    repo_name='vcs_test'))
 
                                    repo_name=HG_REPO))
 
        # Test response...
 
    
 

	
 
    def test_fork(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='settings', action='fork',
 
                                    repo_name='vcs_test'))
 
        
 
                                    repo_name=HG_REPO))
 

	
 

	
 
    def test_fork_create(self):
 
        self.log_user()
 
        fork_name = 'vcs_test_fork'
 
        fork_name = HG_FORK
 
        description = 'fork of vcs test'
 
        repo_name = 'vcs_test'
 
        repo_name = HG_REPO
 
        response = self.app.post(url(controller='settings', action='fork_create',
 
                                    repo_name=repo_name),
 
                                    {'fork_name':fork_name,
 
                                     'repo_type':'hg',
 
                                     'description':description,
 
                                     'private':'False'})
 
        
 
        
 
        print response
 
        
 

	
 
        #test if we have a message that fork is ok
 
        assert 'fork %s repository as %s task added' \
 
                      % (repo_name, fork_name) in response.session['flash'][0], 'No flash message about fork'
 
                      
 

	
 
        #test if the fork was created in the database
 
        fork_repo = self.sa.query(Repository).filter(Repository.repo_name == fork_name).one()
 
        
 

	
 
        assert fork_repo.repo_name == fork_name, 'wrong name of repo name in new db fork repo'
 
        assert fork_repo.fork.repo_name == repo_name, 'wrong fork parrent'
 
        
 
        
 

	
 

	
 
        #test if fork is visible in the list ?
 
        response = response.follow()
 

	
 

	
 
        #check if fork is marked as fork
 
        response = self.app.get(url(controller='summary', action='index',
 
                                    repo_name=fork_name))
 
        
 
        
 
        print response
 
        
 

	
 
        assert 'Fork of %s' % repo_name in response.body, 'no message about that this repo is a fork'
 
        
 

	
rhodecode/tests/functional/test_shortlog.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestShortlogController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='shortlog', action='index',repo_name='vcs_test'))
 
        response = self.app.get(url(controller='shortlog', action='index',repo_name=HG_REPO))
 
        # Test response...
rhodecode/tests/functional/test_summary.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestSummaryController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='summary', action='index', repo_name='vcs_test'))
 
        print response
 
        assert """<img style="margin-bottom:2px" class="icon" title="public repository" alt="public" src="/images/icons/lock_open.png"/>""" in response.body
 
        
 
        # Test response...
 
        response = self.app.get(url(controller='summary', action='index', repo_name=HG_REPO))
 

	
 
        #repo type
 
        assert """<img style="margin-bottom:2px" class="icon" title="Mercurial repository" alt="Mercurial repository" src="/images/icons/hgicon.png"/>""" in response.body
 
        assert """<img style="margin-bottom:2px" class="icon" title="public repository" alt="public repository" src="/images/icons/lock_open.png"/>""" in response.body
 

	
 
        #codes stats
 
        assert """var data = {"text/x-python": 42, "text/plain": 12};""" in response.body, 'wrong info about % of codes stats'
 

	
 
        # clone url...
 
        assert """<input type="text" id="clone_url" readonly="readonly" value="hg clone http://test_admin@localhost:80/%s" size="70"/>""" % HG_REPO in response.body
rhodecode/tests/functional/test_tags.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestTagsController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='tags', action='index',repo_name='vcs_test'))
 
        response = self.app.get(url(controller='tags', action='index', repo_name=HG_REPO))
 
        assert """<a href="/%s/changeset/27cd5cce30c96924232dffcd24178a07ffeb5dfc">tip</a>""" % HG_REPO, 'wrong info about tip tag'
 
        assert """<a href="/%s/changeset/fd4bdb5e9b2a29b4393a4ac6caef48c17ee1a200">0.1.4</a>""" % HG_REPO, 'wrong info about 0.1.4 tag'
 
        assert """<a href="/%s/changeset/17544fbfcd33ffb439e2b728b5d526b1ef30bfcf">0.1.3</a>""" % HG_REPO, 'wrong info about 0.1.3 tag'
 
        assert """<a href="/%s/changeset/a7e60bff65d57ac3a1a1ce3b12a70f8a9e8a7720">0.1.2</a>""" % HG_REPO, 'wrong info about 0.1.2 tag'
 
        assert """<a href="/%s/changeset/eb3a60fc964309c1a318b8dfe26aa2d1586c85ae">0.1.1</a>""" % HG_REPO, 'wrong info about 0.1.1 tag'
 
        # Test response...
rhodecode/tests/functional/test_users.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestUsersController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url('users'))
 
        # Test response...
 

	
 
    def test_index_as_xml(self):
 
        response = self.app.get(url('formatted_users', format='xml'))
 

	
 
    def test_create(self):
 
        response = self.app.post(url('users'))
 
        self.log_user()
 
#        user_name = 'new_user'
 
#        response = self.app.post(url('users'),{'repo_name':user_name,
 
#                                                'repo_type':'hg',
 
#                                               'description':description,
 
#                                               'private':private})
 

	
 

	
 
    def test_new(self):
 
        response = self.app.get(url('new_user'))
 

	
 
    def test_new_as_xml(self):
 
        response = self.app.get(url('formatted_new_user', format='xml'))
 

	
 
    def test_update(self):
 
        response = self.app.put(url('user', id=1))
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('user', id=1), params=dict(_method='put'))
 

	
 
    def test_delete(self):
 
        response = self.app.delete(url('user', id=1))
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('user', id=1), params=dict(_method='delete'))
 

	
 
    def test_show(self):
 
        response = self.app.get(url('user', id=1))
 

	
 
    def test_show_as_xml(self):
 
        response = self.app.get(url('formatted_user', id=1, format='xml'))
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_user', id=1))
 

	
 
    def test_edit_as_xml(self):
 
        response = self.app.get(url('formatted_edit_user', id=1, format='xml'))
rhodecode/tests/vcs_test.tar.gz
Show inline comments
 
deleted file
 
binary diff not shown
rhodecode/tests/vcs_test_hg.tar.gz
Show inline comments
 
new file 100644
 
binary diff not shown
test.ini
Show inline comments
 
################################################################################
 
################################################################################
 
# rhodecode - Pylons environment configuration                                 #
 
#                                                                              # 
 
# The %(here)s variable will be replaced with the parent directory of this file#
 
################################################################################
 

	
 
[DEFAULT]
 
debug = true
 
################################################################################
 
## Uncomment and replace with the address which should receive                ## 
 
## any error reports after application crash                                  ##
 
## Additionally those settings will be used by rhodecode mailing system       ##
 
################################################################################
 
#email_to = admin@localhost
 
#error_email_from = paste_error@localhost
 
#app_email_from = rhodecode-noreply@localhost
 
#error_message =
 

	
 
#smtp_server = mail.server.com
 
#smtp_username = 
 
#smtp_password = 
 
#smtp_port = 
 
#smtp_use_tls = false
 

	
 
[server:main]
 
##nr of threads to spawn
 
threadpool_workers = 5
 

	
 
##max request before thread respawn
 
threadpool_max_requests = 2
 

	
 
##option to use threads of process
 
use_threadpool = true
 

	
 
use = egg:Paste#http
 
host = 127.0.0.1
 
port = 5000
 

	
 
[app:main]
 
use = egg:rhodecode
 
full_stack = true
 
static_files = true
 
lang=en
 
cache_dir = %(here)s/data
 
index_dir = /tmp/index
 

	
 
####################################
 
###         BEAKER CACHE        ####
 
####################################
 
beaker.cache.data_dir=/%(here)s/data/cache/data
 
beaker.cache.lock_dir=/%(here)s/data/cache/lock
 
beaker.cache.regions=super_short_term,short_term,long_term,sql_cache_short,sql_cache_med,sql_cache_long
 

	
 
beaker.cache.super_short_term.type=memory
 
beaker.cache.super_short_term.expire=10
 

	
 
beaker.cache.short_term.type=memory
 
beaker.cache.short_term.expire=60
 

	
 
beaker.cache.long_term.type=memory
 
beaker.cache.long_term.expire=36000
 

	
 

	
 
beaker.cache.sql_cache_short.type=memory
 
beaker.cache.sql_cache_short.expire=5
 

	
 
beaker.cache.sql_cache_med.type=memory
 
beaker.cache.sql_cache_med.expire=360
 

	
 
beaker.cache.sql_cache_long.type=file
 
beaker.cache.sql_cache_long.expire=3600
 

	
 
####################################
 
###       BEAKER SESSION        ####
 
####################################
 
## Type of storage used for the session, current types are 
 
## dbm, file, memcached, database, and memory. 
 
## The storage uses the Container API 
 
##that is also used by the cache system.
 
beaker.session.type = file
 

	
 
beaker.session.key = rhodecode
 
beaker.session.secret = g654dcno0-9873jhgfreyu
 
beaker.session.timeout = 36000
 

	
 
##auto save the session to not to use .save()
 
beaker.session.auto = False
 

	
 
##true exire at browser close
 
#beaker.session.cookie_expires = 3600
 

	
 
    
 
################################################################################
 
## WARNING: *THE LINE BELOW MUST BE UNCOMMENTED ON A PRODUCTION ENVIRONMENT*  ##
 
## Debug mode will enable the interactive debugging tool, allowing ANYONE to  ##
 
## execute malicious code after an exception is raised.                       ##
 
################################################################################
 
#set debug = false
 

	
 
##################################
 
###       LOGVIEW CONFIG       ###
 
##################################
 
logview.sqlalchemy = #faa
 
logview.pylons.templating = #bfb
 
logview.pylons.util = #eee
 

	
 
#########################################################
 
### DB CONFIGS - EACH DB WILL HAVE IT'S OWN CONFIG    ###
 
#########################################################
 
sqlalchemy.db1.url = sqlite:///%(here)s/test.db
 
#sqlalchemy.db1.echo = False
 
#sqlalchemy.db1.pool_recycle = 3600
 
sqlalchemy.convert_unicode = true
 

	
 
################################
 
### LOGGING CONFIGURATION   ####
 
################################
 
[loggers]
 
keys = root, routes, rhodecode, sqlalchemy
 

	
 
[handlers]
 
keys = console
 

	
 
[formatters]
 
keys = generic,color_formatter
 

	
 
#############
 
## LOGGERS ##
 
#############
 
[logger_root]
 
level = ERROR
 
handlers = console
 

	
 
[logger_routes]
 
level = ERROR
 
handlers = console
 
qualname = routes.middleware
 
# "level = DEBUG" logs the route matched and routing variables.
 

	
 
[logger_rhodecode]
 
level = ERROR
 
handlers = console
 
qualname = rhodecode
 
propagate = 0
 

	
 
[logger_sqlalchemy]
 
level = ERROR
 
handlers = console
 
qualname = sqlalchemy.engine
 
propagate = 0
 

	
 
##############
 
## HANDLERS ##
 
##############
 

	
 
[handler_console]
 
class = StreamHandler
 
args = (sys.stderr,)
 
level = NOTSET
 
formatter = color_formatter
 

	
 
################
 
## FORMATTERS ##
 
################
 

	
 
[formatter_generic]
 
format = %(asctime)s.%(msecs)03d %(levelname)-5.5s [%(name)s] %(message)s
 
datefmt = %Y-%m-%d %H:%M:%S
 

	
 
[formatter_color_formatter]
 
class=rhodecode.lib.colored_formatter.ColorFormatter
 
format= %(asctime)s.%(msecs)03d %(levelname)-5.5s [%(name)s] %(message)s
 
datefmt = %Y-%m-%d %H:%M:%S
 
\ No newline at end of file
0 comments (0 inline, 0 general)