Changeset - 80dc0a23edf7
[Not reviewed]
default
0 7 0
Marcin Kuzminski - 15 years ago 2010-10-10 01:42:08
marcin@python-works.com
fixed whoosh failure on new repository
added few tests
7 files changed with 72 insertions and 14 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/indexers/daemon.py
Show inline comments
 
#!/usr/bin/env python
 
# encoding: utf-8
 
# whoosh indexer daemon for rhodecode
 
# Copyright (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
 
#
 
# This program is free software; you can redistribute it and/or
 
# modify it under the terms of the GNU General Public License
 
# as published by the Free Software Foundation; version 2
 
# of the License or (at your opinion) any later version of the license.
 
# 
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
# 
 
# You should have received a copy of the GNU General Public License
 
# along with this program; if not, write to the Free Software
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 
# MA  02110-1301, USA.
 
"""
 
Created on Jan 26, 2010
 

	
 
@author: marcink
 
A deamon will read from task table and run tasks
 
"""
 
import sys
 
import os
 
from os.path import dirname as dn
 
from os.path import join as jn
 

	
 
#to get the rhodecode import
 
project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
 
sys.path.append(project_path)
 

	
 
from rhodecode.lib.pidlock import LockHeld, DaemonLock
 
from rhodecode.model.hg_model import HgModel
 
from rhodecode.lib.helpers import safe_unicode
 
from whoosh.index import create_in, open_dir
 
from shutil import rmtree
 
from rhodecode.lib.indexers import INDEX_EXTENSIONS, IDX_LOCATION, SCHEMA, IDX_NAME
 

	
 
from time import mktime
 
from vcs.exceptions import ChangesetError
 
from vcs.exceptions import ChangesetError, RepositoryError
 

	
 
import logging
 

	
 
log = logging.getLogger('whooshIndexer')
 
# create logger
 
log.setLevel(logging.DEBUG)
 
log.propagate = False
 
# create console handler and set level to debug
 
ch = logging.StreamHandler()
 
ch.setLevel(logging.DEBUG)
 

	
 
# create formatter
 
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 

	
 
# add formatter to ch
 
ch.setFormatter(formatter)
 

	
 
# add ch to logger
 
log.addHandler(ch)
 

	
 
def scan_paths(root_location):
 
    return HgModel.repo_scan('/', root_location, None, True)
 

	
 
class WhooshIndexingDaemon(object):
 
    """
 
    Deamon for atomic jobs
 
    """
 

	
 
    def __init__(self, indexname='HG_INDEX', repo_location=None):
 
        self.indexname = indexname
 
        self.repo_location = repo_location
 
        self.repo_paths = scan_paths(self.repo_location)
 
        self.initial = False
 
        if not os.path.isdir(IDX_LOCATION):
 
            os.mkdir(IDX_LOCATION)
 
            log.info('Cannot run incremental index since it does not'
 
                     ' yet exist running full build')
 
            self.initial = True
 
        
 
    def get_paths(self, repo):
 
        """
 
        recursive walk in root dir and return a set of all path in that dir
 
        based on repository walk function
 
        """
 
        index_paths_ = set()
 
        for topnode, dirs, files in repo.walk('/', 'tip'):
 
            for f in files:
 
                index_paths_.add(jn(repo.path, f.path))
 
            for dir in dirs:
 
        try:
 
            for topnode, dirs, files in repo.walk('/', 'tip'):
 
                for f in files:
 
                    index_paths_.add(jn(repo.path, f.path))
 
            
 
                for dir in dirs:
 
                    for f in files:
 
                        index_paths_.add(jn(repo.path, f.path))
 
                
 
        except RepositoryError:
 
            pass
 
        return index_paths_        
 
    
 
    def get_node(self, repo, path):
 
        n_path = path[len(repo.path) + 1:]
 
        node = repo.get_changeset().get_node(n_path)
 
        return node
 
    
 
    def get_node_mtime(self, node):
 
        return mktime(node.last_changeset.date.timetuple())
 
    
 
    def add_doc(self, writer, path, repo):
 
        """Adding doc to writer"""
 
        node = self.get_node(repo, path)
 

	
 
        #we just index the content of chosen files
 
        if node.extension in INDEX_EXTENSIONS:
 
            log.debug('    >> %s [WITH CONTENT]' % path)
 
            u_content = node.content
 
        else:
 
            log.debug('    >> %s' % path)
 
            #just index file name without it's content
 
            u_content = u''
 
        
 
        writer.add_document(owner=unicode(repo.contact),
 
                        repository=safe_unicode(repo.name),
 
                        path=safe_unicode(path),
 
                        content=u_content,
 
                        modtime=self.get_node_mtime(node),
 
                        extension=node.extension)             
 

	
 
    
 
    def build_index(self):
 
        if os.path.exists(IDX_LOCATION):
 
            log.debug('removing previous index')
 
            rmtree(IDX_LOCATION)
 
            
 
        if not os.path.exists(IDX_LOCATION):
 
            os.mkdir(IDX_LOCATION)
 
        
 
        idx = create_in(IDX_LOCATION, SCHEMA, indexname=IDX_NAME)
 
        writer = idx.writer()
 
        
 
        for cnt, repo in enumerate(self.repo_paths.values()):
 
            log.debug('building index @ %s' % repo.path)
 
        
 
            for idx_path in self.get_paths(repo):
 
                self.add_doc(writer, idx_path, repo)
 
        
rhodecode/lib/utils.py
Show inline comments
 
@@ -422,69 +422,69 @@ class OrderedDict(dict, DictMixin):
 
def create_test_index(repo_location, full_index):
 
    """Makes default test index
 
    @param repo_location:
 
    @param full_index:
 
    """
 
    from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
 
    from rhodecode.lib.pidlock import DaemonLock, LockHeld
 
    from rhodecode.lib.indexers import IDX_LOCATION
 
    import shutil
 
    
 
    if os.path.exists(IDX_LOCATION):
 
        shutil.rmtree(IDX_LOCATION)
 
         
 
    try:
 
        l = DaemonLock()
 
        WhooshIndexingDaemon(repo_location=repo_location)\
 
            .run(full_index=full_index)
 
        l.release()
 
    except LockHeld:
 
        pass    
 
    
 
def create_test_env(repos_test_path, config):
 
    """Makes a fresh database and 
 
    install test repository into tmp dir
 
    """
 
    from rhodecode.lib.db_manage import DbManage
 
    import tarfile
 
    import shutil
 
    from os.path import dirname as dn, join as jn, abspath
 
    
 
    log = logging.getLogger('TestEnvCreator')
 
    # create logger
 
    log.setLevel(logging.DEBUG)
 
    log.propagate = True
 
    # create console handler and set level to debug
 
    ch = logging.StreamHandler()
 
    ch.setLevel(logging.DEBUG)
 
    
 
    # create formatter
 
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 
    
 
    # add formatter to ch
 
    ch.setFormatter(formatter)
 
    
 
    # add ch to logger
 
    log.addHandler(ch)
 
    
 
    #PART ONE create db
 
    log.debug('making test db in %s', repos_test_path)
 
    dbname = config['sqlalchemy.db1.url'].split('/')[-1]
 
    log.debug('making test db %s', dbname)
 
    
 
    dbmanage = DbManage(log_sql=True, dbname=dbname, root=config['here'],
 
                        tests=True)
 
    dbmanage.create_tables(override=True)
 
    dbmanage.config_prompt(repos_test_path)
 
    dbmanage.create_default_user()
 
    dbmanage.admin_prompt()
 
    dbmanage.create_permissions()
 
    dbmanage.populate_default_permissions()
 
    
 
    #PART TWO make test repo
 
    log.debug('making test vcs repo')
 
    if os.path.isdir('/tmp/vcs_test'):
 
        shutil.rmtree('/tmp/vcs_test')
 
        
 
    cur_dir = dn(dn(abspath(__file__)))
 
    tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test.tar.gz"))
 
    tar.extractall('/tmp')
 
    tar.close()
rhodecode/tests/functional/test_changelog.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestChangelogController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='changelog', action='index',repo_name='vcs_test'))
 
        # Test response...
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'))
 
        
 
        print response
 
        assert """<div id="chg_20" class="container">""" in response.body, 'wrong info about number ofchanges'
 
        assert """Small update at simplevcs app""" in response.body, 'missing info about commit message'
 
        assert """<span class="removed" title="removed">0</span>""" in response.body, 'wrong info about removed nodes'
 
        assert """<span class="changed" title="changed">2</span>""" in response.body, 'wrong info about changed nodes'
 
        assert """<span class="added" title="added">1</span>""" in response.body, 'wrong info about added nodes'
 
        
 
        #pagination
 
        
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':1})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':2})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':3})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':4})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':5})
 
        response = self.app.get(url(controller='changelog', action='index', repo_name='vcs_test'), {'page':6})
 
        # Test response after pagination...
 

	
 
        assert """<span class="removed" title="removed">20</span>"""in response.body, 'wrong info about number of removed'
 
        assert """<span class="changed" title="changed">1</span>"""in response.body, 'wrong info about number of changes'
 
        assert """<span class="added" title="added">0</span>"""in response.body, 'wrong info about number of added'
 
        assert """<div class="date">commit 64: 46ad32a4f974@2010-04-20 00:33:21</div>"""in response.body, 'wrong info about commit 64'
 
        assert """<div class="message"><a href="/vcs_test/changeset/46ad32a4f974">Merge with 2e6a2bf9356ca56df08807f4ad86d480da72a8f4</a></div>"""in response.body, 'wrong info about commit 64 is a merge'
 
        
rhodecode/tests/functional/test_login.py
Show inline comments
 
@@ -2,98 +2,98 @@ from rhodecode.tests import *
 
from rhodecode.model.db import User
 
from rhodecode.lib.auth import check_password
 

	
 

	
 
class TestLoginController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='login', action='index'))
 
        assert response.status == '200 OK', 'Wrong response from login page got %s' % response.status
 
        # Test response...
 

	
 
    def test_login_admin_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':'test_admin',
 
                                  'password':'test12'})
 
        assert response.status == '302 Found', 'Wrong response code from login got %s' % response.status
 
        assert response.session['rhodecode_user'].username == 'test_admin', 'wrong logged in user'
 
        response = response.follow()
 
        assert 'auto description for vcs_test' in response.body
 
    
 
    def test_login_regular_ok(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':'test_regular',
 
                                  'password':'test12'})
 
        print response
 
        assert response.status == '302 Found', 'Wrong response code from login got %s' % response.status
 
        assert response.session['rhodecode_user'].username == 'test_regular', 'wrong logged in user'
 
        response = response.follow()
 
        assert 'auto description for vcs_test' in response.body
 
        assert '<a title="Admin" href="/_admin">' not in response.body
 
    
 
    def test_login_ok_came_from(self):
 
        test_came_from = '/_admin/users'
 
        response = self.app.post(url(controller='login', action='index', came_from=test_came_from),
 
                                 {'username':'test_admin',
 
                                  'password':'test12'})
 
        assert response.status == '302 Found', 'Wrong response code from came from redirection'
 
        response = response.follow()
 
        
 
        assert response.status == '200 OK', 'Wrong response from login page got %s' % response.status
 
        assert 'Users administration' in response.body, 'No proper title in response'
 
        
 
                
 
    def test_login_short_password(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':'error',
 
                                  'password':'test'})
 
        assert response.status == '200 OK', 'Wrong response from login page'
 
        
 
        assert 'Enter a value 6 characters long or more' in response.body, 'No error password message in response'
 
        print response.body
 
        assert 'Enter 6 characters or more' in response.body, 'No error password message in response'
 

	
 
    def test_login_wrong_username_password(self):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':'error',
 
                                  'password':'test12'})
 
        assert response.status == '200 OK', 'Wrong response from login page'
 
        
 
        assert 'invalid user name' in response.body, 'No error username message in response'
 
        assert 'invalid password' in response.body, 'No error password message in response'
 
                
 
        
 
    def test_register(self):
 
        response = self.app.get(url(controller='login', action='register'))
 
        assert 'Sign Up to rhodecode' in response.body, 'wrong page for user registration'
 
        
 
    def test_register_err_same_username(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username':'test_admin',
 
                                             'password':'test',
 
                                             'email':'goodmail@domain.com',
 
                                             'name':'test',
 
                                             'lastname':'test'})
 
        
 
        assert response.status == '200 OK', 'Wrong response from register page got %s' % response.status
 
        assert 'This username already exists' in response.body 
 
        
 
    def test_register_err_wrong_data(self):
 
        response = self.app.post(url(controller='login', action='register'),
 
                                            {'username':'xs',
 
                                             'password':'',
 
                                             'email':'goodmailm',
 
                                             'name':'test',
 
                                             'lastname':'test'})
 
        
 
        assert response.status == '200 OK', 'Wrong response from register page got %s' % response.status
 
        assert 'An email address must contain a single @' in response.body
 
        assert 'Please enter a value' in response.body
 
        
 
        
 
        
 
    def test_register_ok(self):
 
        username = 'test_regular4'
 
        password = 'qweqwe'
 
        email = 'marcin@test.com'
 
        name = 'testname'
 
        lastname = 'testlastname'
 
        
 
        response = self.app.post(url(controller='login', action='register'),
rhodecode/tests/functional/test_repos.py
Show inline comments
 
from rhodecode.model.db import Repository
 
from rhodecode.tests import *
 

	
 
class TestReposController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url('repos'))
 
        # Test response...
 

	
 
    def test_index_as_xml(self):
 
        response = self.app.get(url('formatted_repos', format='xml'))
 

	
 
    def test_create(self):
 
        response = self.app.post(url('repos'))
 
        self.log_user()
 
        repo_name = 'vcs_test_new'
 
        description = 'description for newly created repo'
 
        private = False
 
        response = self.app.post(url('repos'), {'repo_name':repo_name,
 
                                               'description':description,
 
                                               'private':private})
 

	
 
        print response
 
        
 
        #test if we have a message that fork is ok
 
        assert '''created repository %s''' % (repo_name) in response.session['flash'][0], 'No flash message about new repo'
 
                      
 
        #test if the fork was created in the database
 
        new_repo = self.sa.query(Repository).filter(Repository.repo_name == repo_name).one()
 
        
 
        assert new_repo.repo_name == repo_name, 'wrong name of repo name in db'
 
        assert new_repo.description == description, 'wrong description'
 
        
 
        #test if repository is visible in the list ?
 
        response = response.follow()
 
        
 
        assert repo_name in response.body, 'missing new repo from the main repos list'
 
        
 
                
 

	
 

	
 
    def test_new(self):
 
        self.log_user()
 
        response = self.app.get(url('new_repo'))
 

	
 
    def test_new_as_xml(self):
 
        response = self.app.get(url('formatted_new_repo', format='xml'))
 

	
 
    def test_update(self):
 
        response = self.app.put(url('repo', repo_name='vcs_test'))
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('repo', repo_name='vcs_test'), params=dict(_method='put'))
 

	
 
    def test_delete(self):
 
        response = self.app.delete(url('repo', repo_name='vcs_test'))
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('repo', repo_name='vcs_test'), params=dict(_method='delete'))
 

	
 
    def test_show(self):
 
        self.log_user()
 
        response = self.app.get(url('repo', repo_name='vcs_test'))
 

	
 
    def test_show_as_xml(self):
 
        response = self.app.get(url('formatted_repo', repo_name='vcs_test', format='xml'))
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_repo', repo_name='vcs_test'))
 

	
 
    def test_edit_as_xml(self):
 
        response = self.app.get(url('formatted_edit_repo', repo_name='vcs_test', format='xml'))
rhodecode/tests/functional/test_settings.py
Show inline comments
 
from rhodecode.model.db import Repository
 
from rhodecode.tests import *
 

	
 
class TestSettingsController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='settings', action='index',
 
                                    repo_name='vcs_test'))
 
        # Test response...
 
    
 
    def test_fork(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='settings', action='fork',
 
                                    repo_name='vcs_test'))
 
        
 

	
 
    def test_fork_create(self):
 
        self.log_user()
 
        fork_name = 'vcs_test_fork'
 
        description = 'fork of vcs test'
 
        repo_name = 'vcs_test'
 
        response = self.app.post(url(controller='settings', action='fork_create',
 
                                    repo_name=repo_name),
 
                                    {'fork_name':fork_name,
 
                                     'description':description,
 
                                     'private':'False'})
 
        
 
        
 
        print response
 
        
 
        #test if we have a message that fork is ok
 
        assert 'fork %s repository as %s task added' \
 
                      % (repo_name, fork_name) in response.session['flash'][0], 'No flash message about fork'
 
                      
 
        #test if the fork was created in the database
 
        fork_repo = self.sa.query(Repository).filter(Repository.repo_name == fork_name).one()
 
        
 
        assert fork_repo.repo_name == fork_name, 'wrong name of repo name in new db fork repo'
 
        assert fork_repo.fork.repo_name == repo_name, 'wron fork parrent'
 
        assert fork_repo.fork.repo_name == repo_name, 'wrong fork parrent'
 
        
 
        
 
        #test if fork is visible in the list ?
 
        response = response.follow()
 

	
 

	
 
        #check if fork is marked as fork
 
        response = self.app.get(url(controller='summary', action='index',
 
                                    repo_name=fork_name))
 
        
 
        
 
        print response
 
        
 
        assert 'Fork of %s' % repo_name in response.body, 'no message about that this repo is a fork'
 
        
rhodecode/tests/functional/test_summary.py
Show inline comments
 
from rhodecode.tests import *
 

	
 
class TestSummaryController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='summary', action='index',repo_name='vcs_test'))
 
        response = self.app.get(url(controller='summary', action='index', repo_name='vcs_test'))
 
        print response
 
        assert """<img style="margin-bottom:2px" class="icon" title="public repository" alt="public" src="/images/icons/lock_open.png"/>""" in response.body
 
        
 
        # Test response...
0 comments (0 inline, 0 general)