Changeset - 8acbfa837180
[Not reviewed]
rhodecode/config/environment.py
Show inline comments
 
@@ -43,26 +43,27 @@ def load_environment(global_conf, app_co
 
    config['pylons.app_globals'].mako_lookup = TemplateLookup(
 
        directories=paths['templates'],
 
        error_handler=handle_mako_error,
 
        module_directory=os.path.join(app_conf['cache_dir'], 'templates'),
 
        input_encoding='utf-8', default_filters=['escape'],
 
        imports=['from webhelpers.html import escape'])
 

	
 
    #sets the c attribute access when don't existing attribute are accessed
 
    config['pylons.strict_tmpl_context'] = True
 
    test = os.path.split(config['__file__'])[-1] == 'test.ini'
 
    if test:
 
        from rhodecode.lib.utils import create_test_env, create_test_index
 
        create_test_env('/tmp', config)
 
        create_test_index('/tmp', True)
 
        from rhodecode.tests import  TESTS_TMP_PATH
 
        create_test_env(TESTS_TMP_PATH, config)
 
        create_test_index(TESTS_TMP_PATH, True)
 

	
 
    #MULTIPLE DB configs
 
    # Setup the SQLAlchemy database engine
 
    if config['debug'] and not test:
 
        #use query time debugging.
 
        from rhodecode.lib.timerproxy import TimerProxy
 
        sa_engine_db1 = engine_from_config(config, 'sqlalchemy.db1.',
 
                                                            proxy=TimerProxy())
 
    else:
 
        sa_engine_db1 = engine_from_config(config, 'sqlalchemy.db1.')
 

	
 
    init_model(sa_engine_db1)
rhodecode/controllers/search.py
Show inline comments
 
@@ -13,100 +13,101 @@
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
# 
 
# You should have received a copy of the GNU General Public License
 
# along with this program; if not, write to the Free Software
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 
# MA  02110-1301, USA.
 
"""
 
Created on Aug 7, 2010
 
search controller for pylons
 
@author: marcink
 
"""
 
from pylons import request, response, session, tmpl_context as c, url
 
from pylons import request, response, config, session, tmpl_context as c, url
 
from pylons.controllers.util import abort, redirect
 
from rhodecode.lib.auth import LoginRequired
 
from rhodecode.lib.base import BaseController, render
 
from rhodecode.lib.indexers import IDX_LOCATION, SCHEMA, IDX_NAME, ResultWrapper
 
from rhodecode.lib.indexers import SCHEMA, IDX_NAME, ResultWrapper
 
from webhelpers.paginate import Page
 
from webhelpers.util import update_params
 
from pylons.i18n.translation import _
 
from whoosh.index import open_dir, EmptyIndexError
 
from whoosh.qparser import QueryParser, QueryParserError
 
from whoosh.query import Phrase
 
import logging
 
import traceback
 

	
 
log = logging.getLogger(__name__)
 

	
 
class SearchController(BaseController):
 

	
 
    @LoginRequired()
 
    def __before__(self):
 
        super(SearchController, self).__before__()    
 
        super(SearchController, self).__before__()
 

	
 
    def index(self, search_repo=None):
 
        c.repo_name = search_repo
 
        c.formated_results = []
 
        c.runtime = ''
 
        c.cur_query = request.GET.get('q', None)
 
        c.cur_type = request.GET.get('type', 'source')
 
        c.cur_search = search_type = {'content':'content',
 
                                      'commit':'content',
 
                                      'path':'path',
 
                                      'repository':'repository'}\
 
                                      .get(c.cur_type, 'content')
 

	
 
        
 

	
 
        if c.cur_query:
 
            cur_query = c.cur_query.lower()
 
        
 

	
 
        if c.cur_query:
 
            p = int(request.params.get('page', 1))
 
            highlight_items = set()
 
            try:
 
                idx = open_dir(IDX_LOCATION, indexname=IDX_NAME)
 
                idx = open_dir(config['app_conf']['index_dir']
 
                               , indexname=IDX_NAME)
 
                searcher = idx.searcher()
 

	
 
                qp = QueryParser(search_type, schema=SCHEMA)
 
                if c.repo_name:
 
                    cur_query = u'repository:%s %s' % (c.repo_name, cur_query)
 
                try:
 
                    query = qp.parse(unicode(cur_query))
 
                    
 

	
 
                    if isinstance(query, Phrase):
 
                        highlight_items.update(query.words)
 
                    else:
 
                        for i in query.all_terms():
 
                            if i[0] == 'content':
 
                                highlight_items.add(i[1])
 

	
 
                    matcher = query.matcher(searcher)
 
                    
 

	
 
                    log.debug(query)
 
                    log.debug(highlight_items)
 
                    results = searcher.search(query)
 
                    res_ln = len(results)
 
                    c.runtime = '%s results (%.3f seconds)' \
 
                        % (res_ln, results.runtime)
 
                    
 

	
 
                    def url_generator(**kw):
 
                        return update_params("?q=%s&type=%s" \
 
                                           % (c.cur_query, c.cur_search), **kw)
 

	
 
                    c.formated_results = Page(
 
                                ResultWrapper(search_type, searcher, matcher,
 
                                              highlight_items),
 
                                page=p, item_count=res_ln,
 
                                items_per_page=10, url=url_generator)
 
                     
 
                    
 

	
 

	
 
                except QueryParserError:
 
                    c.runtime = _('Invalid search query. Try quoting it.')
 
                searcher.close()
 
            except (EmptyIndexError, IOError):
 
                log.error(traceback.format_exc())
 
                log.error('Empty Index data')
 
                c.runtime = _('There is no index to search in. Please run whoosh indexer')
 
                        
 

	
 
        # Return a rendered template
 
        return render('/search/search.html')
rhodecode/lib/utils.py
Show inline comments
 
@@ -55,25 +55,25 @@ def is_mercurial(environ):
 
    if http_accept and http_accept.startswith('application/mercurial'):
 
        return True
 
    return False
 

	
 
def is_git(environ):
 
    """
 
    Returns True if request's target is git server. ``HTTP_USER_AGENT`` would
 
    then have git client version given.
 
    
 
    :param environ:
 
    """
 
    http_user_agent = environ.get('HTTP_USER_AGENT')
 
    if http_user_agent.startswith('git'):
 
    if http_user_agent and http_user_agent.startswith('git'):
 
        return True
 
    return False
 

	
 
def action_logger(user, action, repo, ipaddr, sa=None):
 
    """
 
    Action logger for various action made by users
 
    """
 

	
 
    if not sa:
 
        sa = meta.Session()
 

	
 
    try:
 
@@ -446,43 +446,46 @@ class OrderedDict(dict, DictMixin):
 

	
 

	
 
#===============================================================================
 
# TEST FUNCTIONS AND CREATORS
 
#===============================================================================
 
def create_test_index(repo_location, full_index):
 
    """Makes default test index
 
    :param repo_location:
 
    :param full_index:
 
    """
 
    from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
 
    from rhodecode.lib.pidlock import DaemonLock, LockHeld
 
    from rhodecode.lib.indexers import IDX_LOCATION
 
    import shutil
 

	
 
    if os.path.exists(IDX_LOCATION):
 
        shutil.rmtree(IDX_LOCATION)
 
    index_location = os.path.join(repo_location, 'index')
 
    if os.path.exists(index_location):
 
        shutil.rmtree(index_location)
 

	
 
    try:
 
        l = DaemonLock()
 
        WhooshIndexingDaemon(repo_location=repo_location)\
 
        WhooshIndexingDaemon(index_location=index_location,
 
                             repo_location=repo_location)\
 
            .run(full_index=full_index)
 
        l.release()
 
    except LockHeld:
 
        pass
 

	
 
def create_test_env(repos_test_path, config):
 
    """Makes a fresh database and 
 
    install test repository into tmp dir
 
    """
 
    from rhodecode.lib.db_manage import DbManage
 
    from rhodecode.tests import HG_REPO, GIT_REPO, NEW_HG_REPO, NEW_GIT_REPO, \
 
        HG_FORK, GIT_FORK, TESTS_TMP_PATH
 
    import tarfile
 
    import shutil
 
    from os.path import dirname as dn, join as jn, abspath
 

	
 
    log = logging.getLogger('TestEnvCreator')
 
    # create logger
 
    log.setLevel(logging.DEBUG)
 
    log.propagate = True
 
    # create console handler and set level to debug
 
    ch = logging.StreamHandler()
 
    ch.setLevel(logging.DEBUG)
 

	
 
@@ -500,31 +503,37 @@ def create_test_env(repos_test_path, con
 
    log.debug('making test db %s', dbname)
 

	
 
    dbmanage = DbManage(log_sql=True, dbname=dbname, root=config['here'],
 
                        tests=True)
 
    dbmanage.create_tables(override=True)
 
    dbmanage.config_prompt(repos_test_path)
 
    dbmanage.create_default_user()
 
    dbmanage.admin_prompt()
 
    dbmanage.create_permissions()
 
    dbmanage.populate_default_permissions()
 

	
 
    #PART TWO make test repo
 
    log.debug('making test vcs repo')
 
    if os.path.isdir('/tmp/vcs_test'):
 
        shutil.rmtree('/tmp/vcs_test')
 
    log.debug('making test vcs repositories')
 

	
 
    #remove old one from previos tests
 
    for r in [HG_REPO, GIT_REPO, NEW_HG_REPO, NEW_GIT_REPO, HG_FORK, GIT_FORK]:
 

	
 
        if os.path.isdir(jn(TESTS_TMP_PATH, r)):
 
            log.debug('removing %s', r)
 
            shutil.rmtree(jn(TESTS_TMP_PATH, r))
 

	
 
    #CREATE DEFAULT HG REPOSITORY
 
    cur_dir = dn(dn(abspath(__file__)))
 
    tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test.tar.gz"))
 
    tar.extractall('/tmp')
 
    tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_hg.tar.gz"))
 
    tar.extractall(jn(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
class UpgradeDb(command.Command):
 
    """Command used for paster to upgrade our database to newer version
 
    """
 

	
 
    max_args = 1
 
    min_args = 1
 

	
 
    usage = "CONFIG_FILE"
 
    summary = "Upgrades current db to newer version given configuration file"
 
    group_name = "RhodeCode"
rhodecode/tests/__init__.py
Show inline comments
 
@@ -10,49 +10,63 @@ setup-app`) and provides the base testin
 
from unittest import TestCase
 

	
 
from paste.deploy import loadapp
 
from paste.script.appinstall import SetupCommand
 
from pylons import config, url
 
from routes.util import URLGenerator
 
from webtest import TestApp
 
import os
 
from rhodecode.model import meta
 
import logging
 

	
 

	
 
log = logging.getLogger(__name__) 
 
log = logging.getLogger(__name__)
 

	
 
import pylons.test
 

	
 
__all__ = ['environ', 'url', 'TestController']
 
__all__ = ['environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO',
 
           'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK', 'GIT_FORK', ]
 

	
 
# Invoke websetup with the current config file
 
#SetupCommand('setup-app').run([config_file])
 

	
 
##RUNNING DESIRED TESTS
 
#nosetests rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
 

	
 
environ = {}
 

	
 
#SOME GLOBALS FOR TESTS
 
TESTS_TMP_PATH = '/tmp'
 

	
 
HG_REPO = 'vcs_test_hg'
 
GIT_REPO = 'vcs_test_git'
 

	
 
NEW_HG_REPO = 'vcs_test_hg_new'
 
NEW_GIT_REPO = 'vcs_test_git_new'
 

	
 
HG_FORK = 'vcs_test_hg_fork'
 
GIT_FORK = 'vcs_test_git_fork'
 

	
 
class TestController(TestCase):
 

	
 
    def __init__(self, *args, **kwargs):
 
        wsgiapp = pylons.test.pylonsapp
 
        config = wsgiapp.config
 

	
 
        self.app = TestApp(wsgiapp)
 
        url._push_object(URLGenerator(config['routes.map'], environ))
 
        self.sa = meta.Session
 

	
 
        self.index_location = config['app_conf']['index_dir']
 
        TestCase.__init__(self, *args, **kwargs)
 
    
 

	
 
    def log_user(self, username='test_admin', password='test12'):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':username,
 
                                  'password':password})
 
        print response
 
        
 

	
 
        if 'invalid user name' in response.body:
 
            assert False, 'could not login using %s %s' % (username, password)
 
        
 

	
 
        assert response.status == '302 Found', 'Wrong response code from login got %s' % response.status
 
        assert response.session['rhodecode_user'].username == username, 'wrong logged in user got %s expected %s' % (response.session['rhodecode_user'].username, username)
 
        return response.follow()
rhodecode/tests/functional/test_branches.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestBranchesController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='branches', action='index',repo_name='vcs_test'))
 
        response = self.app.get(url(controller='branches', action='index', repo_name=HG_REPO))
 

	
 
        assert """<a href="/%s/changeset/27cd5cce30c96924232dffcd24178a07ffeb5dfc">default</a>""" % HG_REPO in response.body, 'wrong info about default branch'
 
        assert """<a href="/%s/changeset/97e8b885c04894463c51898e14387d80c30ed1ee">git</a>""" % HG_REPO in response.body, 'wrong info about default git'
 
        assert """<a href="/%s/changeset/2e6a2bf9356ca56df08807f4ad86d480da72a8f4">web</a>""" % HG_REPO in response.body, 'wrong info about default web'
 

	
 

	
 

	
 

	
 

	
 

	
 
        # Test response...
rhodecode/tests/functional/test_changelog.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestChangelogController(TestController):
 

	
 
    def test_index(self):
 
    def test_index_hg(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'))
 
        
 
        print response
 
        assert """<div id="chg_20" class="container">""" in response.body, 'wrong info about number ofchanges'
 
        response = self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO))
 

	
 
        assert """<div id="chg_20" class="container">""" in response.body, 'wrong info about number of changes'
 
        assert """Small update at simplevcs app""" in response.body, 'missing info about commit message'
 
        assert """<span class="removed" title="removed">0</span>""" in response.body, 'wrong info about removed nodes'
 
        assert """<span class="changed" title="changed">2</span>""" in response.body, 'wrong info about changed nodes'
 
        assert """<span class="added" title="added">1</span>""" in response.body, 'wrong info about added nodes'
 
        
 
        assert """<span class="removed" title="removed: ">0</span>""" in response.body, 'wrong info about removed nodes'
 
        assert """<span class="changed" title="changed: hg.py | models.py">2</span>""" in response.body, 'wrong info about changed nodes'
 
        assert """<span class="added" title="added: managers.py">1</span>""" in response.body, 'wrong info about added nodes'
 

	
 
        #pagination
 
        
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':1})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':2})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':3})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':4})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':5})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':6})
 

	
 
        response = self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO), {'page':1})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO), {'page':2})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO), {'page':3})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO), {'page':4})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO), {'page':5})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name=HG_REPO), {'page':6})
 

	
 
        # Test response after pagination...
 

	
 
        assert """<span class="removed" title="removed">20</span>"""in response.body, 'wrong info about number of removed'
 
        assert """<span class="changed" title="changed">1</span>"""in response.body, 'wrong info about number of changes'
 
        assert """<span class="added" title="added">0</span>"""in response.body, 'wrong info about number of added'
 
        assert """<div class="date">commit 64: 46ad32a4f974@2010-04-20 00:33:21</div>"""in response.body, 'wrong info about commit 64'
 
        assert """<div class="message"><a href="/vcs_test/changeset/46ad32a4f974">Merge with 2e6a2bf9356ca56df08807f4ad86d480da72a8f4</a></div>"""in response.body, 'wrong info about commit 64 is a merge'
 
        
 
        assert """<span class="removed" title="removed: api.rst">1</span>"""in response.body, 'wrong info about number of removed'
 
        assert """<span class="changed" title="changed: .hgignore | README.rst | conf.py | index.rst | setup.py | test_hg.py | test_nodes.py | __init__.py | __init__.py | base.py | hg.py | nodes.py | __init__.py">13</span>"""in response.body, 'wrong info about number of changes'
 
        assert """<span class="added" title="added: hg.rst | index.rst | index.rst | nodes.rst | index.rst | simplevcs.rst | installation.rst | quickstart.rst | setup.cfg | baseui_config.py | web.py | __init__.py | exceptions.py | __init__.py | exceptions.py | middleware.py | models.py | settings.py | utils.py | views.py">20</span>"""in response.body, 'wrong info about number of added'
 
        assert """<div class="message"><a href="/%s/changeset/46ad32a4f974e45472a898c6b0acb600320579b1">Merge with 2e6a2bf9356ca56df08807f4ad86d480da72a8f4</a></div>""" % HG_REPO in response.body, 'wrong info about commit 64 is a merge'
 

	
 

	
 

	
 
    #def test_index_git(self):
 
    #    self.log_user()
 
    #    response = self.app.get(url(controller='changelog', action='index', repo_name=GIT_REPO))
rhodecode/tests/functional/test_changeset.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestChangesetController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                    repo_name='vcs_test',revision='tip'))
 
                                    repo_name=HG_REPO,revision='tip'))
 
        # Test response...
rhodecode/tests/functional/test_feed.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestFeedController(TestController):
 

	
 
    def test_rss(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='feed', action='rss',
 
                                    repo_name='vcs_test'))
 
                                    repo_name=HG_REPO))
 
        # Test response...
 

	
 
    def test_atom(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='feed', action='atom',
 
                                    repo_name='vcs_test'))
 
                                    repo_name=HG_REPO))
 
        # Test response...
 
\ No newline at end of file
rhodecode/tests/functional/test_files.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestFilesController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name='vcs_test',
 
                                    repo_name=HG_REPO,
 
                                    revision='tip',
 
                                    f_path='/'))
 
        # Test response...
rhodecode/tests/functional/test_hg.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestAdminController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='hg', action='index'))
 
        response = self.app.get(url(controller='home', action='index'))
 
        #if global permission is set
 
        assert 'ADD NEW REPOSITORY' in response.body, 'Wrong main page'
 
        assert 'href="/vcs_test/summary"' in response.body, ' mising repository in list'
 
        assert 'href="/%s/summary"' % HG_REPO in response.body, ' mising repository in list'
 
        # Test response...
rhodecode/tests/functional/test_login.py
Show inline comments
 
@@ -8,35 +8,35 @@ class TestLoginController(TestController
 
    def test_index(self):
 
        response = self.app.get(url(controller='login', action='index'))
 
        assert response.status == '200 OK', 'Wrong response from login page got %s' % response.status
 
        # Test response...
 

	
 
    def test_login_admin_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':'test_admin',
 
                                  'password':'test12'})
 
        assert response.status == '302 Found', 'Wrong response code from login got %s' % response.status
 
        assert response.session['rhodecode_user'].username == 'test_admin', 'wrong logged in user'
 
        response = response.follow()
 
        assert 'vcs_test repository' in response.body
 
        assert '%s repository' % HG_REPO in response.body
 

	
 
    def test_login_regular_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':'test_regular',
 
                                  'password':'test12'})
 
        print response
 
        assert response.status == '302 Found', 'Wrong response code from login got %s' % response.status
 
        assert response.session['rhodecode_user'].username == 'test_regular', 'wrong logged in user'
 
        response = response.follow()
 
        assert 'vcs_test repository' in response.body
 
        assert '%s repository' % HG_REPO in response.body
 
        assert '<a title="Admin" href="/_admin">' not in response.body
 

	
 
    def test_login_ok_came_from(self):
 
        test_came_from = '/_admin/users'
 
        response = self.app.post(url(controller='login', action='index', came_from=test_came_from),
 
                                 {'username':'test_admin',
 
                                  'password':'test12'})
 
        assert response.status == '302 Found', 'Wrong response code from came from redirection'
 
        response = response.follow()
 

	
 
        assert response.status == '200 OK', 'Wrong response from login page got %s' % response.status
 
        assert 'Users administration' in response.body, 'No proper title in response'
rhodecode/tests/functional/test_repos.py
Show inline comments
 
@@ -2,112 +2,140 @@ from rhodecode.model.db import Repositor
 
from rhodecode.tests import *
 

	
 
class TestReposController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('repos'))
 
        # Test response...
 

	
 
    def test_index_as_xml(self):
 
        response = self.app.get(url('formatted_repos', format='xml'))
 

	
 
    def test_create(self):
 
    def test_create_hg(self):
 
        self.log_user()
 
        repo_name = 'vcs_test_new'
 
        repo_name = NEW_HG_REPO
 
        description = 'description for newly created repo'
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name':repo_name,
 
                                               'description':description,
 
                                               'private':private})
 
                                                'repo_type':'hg',
 
                                                'description':description,
 
                                                'private':private})
 

	
 
        print response
 
        
 

	
 
        #test if we have a message for that repository
 
        print '-' * 100
 
        print response.session
 
        assert '''created repository %s''' % (repo_name) in response.session['flash'][0], 'No flash message about new repo'
 
                      
 

	
 
        #test if the fork was created in the database
 
        new_repo = self.sa.query(Repository).filter(Repository.repo_name == repo_name).one()
 
        
 

	
 
        assert new_repo.repo_name == repo_name, 'wrong name of repo name in db'
 
        assert new_repo.description == description, 'wrong description'
 
        
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 
        
 

	
 
        assert repo_name in response.body, 'missing new repo from the main repos list'
 
        
 
                
 

	
 
    def test_create_git(self):
 
        self.log_user()
 
        repo_name = NEW_GIT_REPO
 
        description = 'description for newly created repo'
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name':repo_name,
 
                                                'repo_type':'git',
 
                                                'description':description,
 
                                                'private':private})
 

	
 
        print response
 

	
 
        #test if we have a message for that repository
 
        print '-' * 100
 
        print response.session
 
        assert '''created repository %s''' % (repo_name) in response.session['flash'][0], 'No flash message about new repo'
 

	
 
        #test if the fork was created in the database
 
        new_repo = self.sa.query(Repository).filter(Repository.repo_name == repo_name).one()
 

	
 
        assert new_repo.repo_name == repo_name, 'wrong name of repo name in db'
 
        assert new_repo.description == description, 'wrong description'
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 

	
 
        assert repo_name in response.body, 'missing new repo from the main repos list'
 

	
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_repo'))
 

	
 
    def test_new_as_xml(self):
 
        response = self.app.get(url('formatted_new_repo', format='xml'))
 

	
 
    def test_update(self):
 
        response = self.app.put(url('repo', repo_name='vcs_test'))
 
        response = self.app.put(url('repo', repo_name=HG_REPO))
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('repo', repo_name='vcs_test'), params=dict(_method='put'))
 
        response = self.app.post(url('repo', repo_name=HG_REPO), params=dict(_method='put'))
 

	
 
    def test_delete(self):
 
        self.log_user()
 
        repo_name = 'vcs_test_new_to_delete'
 
        description = 'description for newly created repo'
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name':repo_name,
 
                                                'repo_type':'hg',
 
                                               'description':description,
 
                                               'private':private})
 

	
 
        print response
 
        
 

	
 
        #test if we have a message for that repository
 
        print '-' * 100
 
        print response.session
 
        assert '''created repository %s''' % (repo_name) in response.session['flash'][0], 'No flash message about new repo'
 
                      
 

	
 
        #test if the repo was created in the database
 
        new_repo = self.sa.query(Repository).filter(Repository.repo_name == repo_name).one()
 
        
 

	
 
        assert new_repo.repo_name == repo_name, 'wrong name of repo name in db'
 
        assert new_repo.description == description, 'wrong description'
 
        
 

	
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 
        
 

	
 
        assert repo_name in response.body, 'missing new repo from the main repos list'
 
        
 
                
 

	
 

	
 
        response = self.app.delete(url('repo', repo_name=repo_name))
 
        
 

	
 
        print '-' * 100
 
        print response.session
 
        assert '''deleted repository %s''' % (repo_name) in response.session['flash'][0], 'No flash message about delete repo'
 
                
 

	
 
        response.follow()
 
        
 

	
 
        #check if repo was deleted from db
 
        deleted_repo = self.sa.query(Repository).filter(Repository.repo_name == repo_name).scalar()
 
        
 

	
 
        assert deleted_repo is None, 'Deleted repository was found in db'
 
        
 

	
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('repo', repo_name='vcs_test'), params=dict(_method='delete'))
 
        response = self.app.post(url('repo', repo_name=HG_REPO), params=dict(_method='delete'))
 

	
 
    def test_show(self):
 
        self.log_user()
 
        response = self.app.get(url('repo', repo_name='vcs_test'))
 
        response = self.app.get(url('repo', repo_name=HG_REPO))
 

	
 
    def test_show_as_xml(self):
 
        response = self.app.get(url('formatted_repo', repo_name='vcs_test', format='xml'))
 
        response = self.app.get(url('formatted_repo', repo_name=HG_REPO, format='xml'))
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_repo', repo_name='vcs_test'))
 
        response = self.app.get(url('edit_repo', repo_name=HG_REPO))
 

	
 
    def test_edit_as_xml(self):
 
        response = self.app.get(url('formatted_edit_repo', repo_name='vcs_test', format='xml'))
 
        response = self.app.get(url('formatted_edit_repo', repo_name=HG_REPO, format='xml'))
rhodecode/tests/functional/test_search.py
Show inline comments
 
from rhodecode.tests import *
 
from rhodecode.lib.indexers import IDX_LOCATION
 
import os
 
from nose.plugins.skip import SkipTest
 

	
 
class TestSearchController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'))
 
        print response.body
 
        assert 'class="small" id="q" name="q" type="text"' in response.body, 'Search box content error'
 
        # Test response...
 

	
 
    def test_empty_search(self):
 
        
 
        if os.path.isdir(IDX_LOCATION):
 
        if os.path.isdir(self.index_location):
 
            raise SkipTest('skipped due to existing index')
 
        else:
 
            self.log_user()
 
            response = self.app.get(url(controller='search', action='index'), {'q':'vcs_test'})
 
            response = self.app.get(url(controller='search', action='index'), {'q':HG_REPO})
 
            assert 'There is no index to search in. Please run whoosh indexer' in response.body, 'No error message about empty index'
 
        
 

	
 
    def test_normal_search(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'), {'q':'def repo'})
 
        print response.body
 
        assert '10 results' in response.body, 'no message about proper search results'
 
        assert 'Permission denied' not in response.body, 'Wrong permissions settings for that repo and user'
 
        
 
    
 

	
 

	
 
    def test_repo_search(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='search', action='index'), {'q':'repository:vcs_test def test'})
 
        response = self.app.get(url(controller='search', action='index'), {'q':'repository:%s def test' % HG_REPO})
 
        print response.body
 
        assert '4 results' in response.body, 'no message about proper search results'
 
        assert 'Permission denied' not in response.body, 'Wrong permissions settings for that repo and user'
 
        
 

	
rhodecode/tests/functional/test_settings.py
Show inline comments
 
from rhodecode.model.db import Repository
 
from rhodecode.tests import *
 

	
 
class TestSettingsController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='settings', action='index',
 
                                    repo_name='vcs_test'))
 
                                    repo_name=HG_REPO))
 
        # Test response...
 
    
 

	
 
    def test_fork(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='settings', action='fork',
 
                                    repo_name='vcs_test'))
 
        
 
                                    repo_name=HG_REPO))
 

	
 

	
 
    def test_fork_create(self):
 
        self.log_user()
 
        fork_name = 'vcs_test_fork'
 
        fork_name = HG_FORK
 
        description = 'fork of vcs test'
 
        repo_name = 'vcs_test'
 
        repo_name = HG_REPO
 
        response = self.app.post(url(controller='settings', action='fork_create',
 
                                    repo_name=repo_name),
 
                                    {'fork_name':fork_name,
 
                                     'repo_type':'hg',
 
                                     'description':description,
 
                                     'private':'False'})
 
        
 
        
 
        print response
 
        
 

	
 
        #test if we have a message that fork is ok
 
        assert 'fork %s repository as %s task added' \
 
                      % (repo_name, fork_name) in response.session['flash'][0], 'No flash message about fork'
 
                      
 

	
 
        #test if the fork was created in the database
 
        fork_repo = self.sa.query(Repository).filter(Repository.repo_name == fork_name).one()
 
        
 

	
 
        assert fork_repo.repo_name == fork_name, 'wrong name of repo name in new db fork repo'
 
        assert fork_repo.fork.repo_name == repo_name, 'wrong fork parrent'
 
        
 
        
 

	
 

	
 
        #test if fork is visible in the list ?
 
        response = response.follow()
 

	
 

	
 
        #check if fork is marked as fork
 
        response = self.app.get(url(controller='summary', action='index',
 
                                    repo_name=fork_name))
 
        
 
        
 
        print response
 
        
 

	
 
        assert 'Fork of %s' % repo_name in response.body, 'no message about that this repo is a fork'
 
        
 

	
rhodecode/tests/functional/test_shortlog.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestShortlogController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='shortlog', action='index',repo_name='vcs_test'))
 
        response = self.app.get(url(controller='shortlog', action='index',repo_name=HG_REPO))
 
        # Test response...
rhodecode/tests/functional/test_summary.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestSummaryController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='summary', action='index', repo_name='vcs_test'))
 
        print response
 
        assert """<img style="margin-bottom:2px" class="icon" title="public repository" alt="public" src="/images/icons/lock_open.png"/>""" in response.body
 
        
 
        # Test response...
 
        response = self.app.get(url(controller='summary', action='index', repo_name=HG_REPO))
 

	
 
        #repo type
 
        assert """<img style="margin-bottom:2px" class="icon" title="Mercurial repository" alt="Mercurial repository" src="/images/icons/hgicon.png"/>""" in response.body
 
        assert """<img style="margin-bottom:2px" class="icon" title="public repository" alt="public repository" src="/images/icons/lock_open.png"/>""" in response.body
 

	
 
        #codes stats
 
        assert """var data = {"text/x-python": 42, "text/plain": 12};""" in response.body, 'wrong info about % of codes stats'
 

	
 
        # clone url...
 
        assert """<input type="text" id="clone_url" readonly="readonly" value="hg clone http://test_admin@localhost:80/%s" size="70"/>""" % HG_REPO in response.body
rhodecode/tests/functional/test_tags.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestTagsController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='tags', action='index',repo_name='vcs_test'))
 
        response = self.app.get(url(controller='tags', action='index', repo_name=HG_REPO))
 
        assert """<a href="/%s/changeset/27cd5cce30c96924232dffcd24178a07ffeb5dfc">tip</a>""" % HG_REPO, 'wrong info about tip tag'
 
        assert """<a href="/%s/changeset/fd4bdb5e9b2a29b4393a4ac6caef48c17ee1a200">0.1.4</a>""" % HG_REPO, 'wrong info about 0.1.4 tag'
 
        assert """<a href="/%s/changeset/17544fbfcd33ffb439e2b728b5d526b1ef30bfcf">0.1.3</a>""" % HG_REPO, 'wrong info about 0.1.3 tag'
 
        assert """<a href="/%s/changeset/a7e60bff65d57ac3a1a1ce3b12a70f8a9e8a7720">0.1.2</a>""" % HG_REPO, 'wrong info about 0.1.2 tag'
 
        assert """<a href="/%s/changeset/eb3a60fc964309c1a318b8dfe26aa2d1586c85ae">0.1.1</a>""" % HG_REPO, 'wrong info about 0.1.1 tag'
 
        # Test response...
rhodecode/tests/functional/test_users.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestUsersController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url('users'))
 
        # Test response...
 

	
 
    def test_index_as_xml(self):
 
        response = self.app.get(url('formatted_users', format='xml'))
 

	
 
    def test_create(self):
 
        response = self.app.post(url('users'))
 
        self.log_user()
 
#        user_name = 'new_user'
 
#        response = self.app.post(url('users'),{'repo_name':user_name,
 
#                                                'repo_type':'hg',
 
#                                               'description':description,
 
#                                               'private':private})
 

	
 

	
 
    def test_new(self):
 
        response = self.app.get(url('new_user'))
 

	
 
    def test_new_as_xml(self):
 
        response = self.app.get(url('formatted_new_user', format='xml'))
 

	
 
    def test_update(self):
 
        response = self.app.put(url('user', id=1))
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('user', id=1), params=dict(_method='put'))
rhodecode/tests/vcs_test.tar.gz
Show inline comments
 
deleted file
 
binary diff not shown

Changeset was too big and was cut off... Show full diff anyway

0 comments (0 inline, 0 general)