Changeset - 72778dda34cf
[Not reviewed]
default
0 6 0
Marcin Kuzminski - 15 years ago 2010-10-01 03:04:52
marcin@python-works.com
some fixups in cache, added fallback and cache invalidation when key not found in cached repos list,
added extra test, some other small fixes
6 files changed with 37 insertions and 8 deletions:
0 comments (0 inline, 0 general)
celeryconfig.py
Show inline comments
 
# List of modules to import when celery starts.
 
import sys
 
import os
 
import ConfigParser
 
root = os.getcwd()
 

	
 
PYLONS_CONFIG_NAME = 'test.ini'
 
PYLONS_CONFIG_NAME = 'production.ini'
 

	
 
sys.path.append(root)
 
config = ConfigParser.ConfigParser({'here':root})
 
config.read('%s/%s' % (root, PYLONS_CONFIG_NAME))
 
PYLONS_CONFIG = config
 

	
 
CELERY_IMPORTS = ("pylons_app.lib.celerylib.tasks",)
 

	
 
## Result store settings.
 
CELERY_RESULT_BACKEND = "database"
 
CELERY_RESULT_DBURI = dict(config.items('app:main'))['sqlalchemy.db1.url']
 
CELERY_RESULT_SERIALIZER = 'json'
 

	
 

	
 
BROKER_CONNECTION_MAX_RETRIES = 30
 

	
 
## Broker settings.
 
BROKER_HOST = "localhost"
 
BROKER_PORT = 5672
 
BROKER_VHOST = "rabbitmqhost"
 
BROKER_USER = "rabbitmq"
 
BROKER_PASSWORD = "qweqwe"
 

	
 
## Worker settings
pylons_app/lib/base.py
Show inline comments
 
"""The base Controller API
 

	
 
Provides the BaseController class for subclassing.
 
"""
 
from pylons import config, tmpl_context as c, request, session
 
from pylons.controllers import WSGIController
 
from pylons.templating import render_mako as render
 
from pylons_app import __version__
 
from pylons_app.lib import auth
 
from pylons_app.lib.utils import get_repo_slug
 
from pylons_app.model import meta
 
from pylons_app.model.hg_model import _get_repos_cached, \
 
    _get_repos_switcher_cached
 

	
 
class BaseController(WSGIController):
 
    
 
    def __before__(self):
 
        c.hg_app_version = __version__
 
        c.hg_app_name = config['hg_app_title']
 
        c.repo_name = get_repo_slug(request)
 
        c.cached_repo_list = _get_repos_cached()
 
        c.repo_switcher_list = _get_repos_switcher_cached(c.cached_repo_list)
 
        
 
        if c.repo_name:
 
            c.repository_tags = c.cached_repo_list[c.repo_name].tags
 
            c.repository_branches = c.cached_repo_list[c.repo_name].branches
 
            cached_repo = c.cached_repo_list.get(c.repo_name)
 
            
 
            if cached_repo:
 
                c.repository_tags = cached_repo.tags
 
                c.repository_branches = cached_repo.branches
 
            else:
 
                c.repository_tags = {}
 
                c.repository_branches = {}
 
                    
 
        self.sa = meta.Session
 
    
 
    def __call__(self, environ, start_response):
 
        """Invoke the Controller"""
 
        # WSGIController.__call__ dispatches to the Controller method
 
        # the request is routed to. This routing information is
 
        # available in environ['pylons.routes_dict']
 
        try:
 
            #putting this here makes sure that we update permissions every time
 
            c.hg_app_user = auth.get_user(session)
 
            return WSGIController.__call__(self, environ, start_response)
 
        finally:
 
            meta.Session.remove()
pylons_app/lib/celerylib/tasks.py
Show inline comments
 
@@ -253,61 +253,59 @@ def reset_user_password(user_email):
 
def send_email(recipients, subject, body):
 
    log = send_email.get_logger()
 
    email_config = dict(config.items('DEFAULT')) 
 
    mail_from = email_config.get('app_email_from')
 
    user = email_config.get('smtp_username')
 
    passwd = email_config.get('smtp_password')
 
    mail_server = email_config.get('smtp_server')
 
    mail_port = email_config.get('smtp_port')
 
    tls = email_config.get('smtp_use_tls')
 
    ssl = False
 
    
 
    try:
 
        m = SmtpMailer(mail_from, user, passwd, mail_server,
 
                       mail_port, ssl, tls)
 
        m.send(recipients, subject, body)  
 
    except:
 
        log.error('Mail sending failed')
 
        log.error(traceback.format_exc())
 
        return False
 
    return True
 

	
 
@task
 
def create_repo_fork(form_data, cur_user):
 
    import os
 
    from pylons_app.lib.utils import invalidate_cache
 
    from pylons_app.model.repo_model import RepoModel
 
    sa = get_session()
 
    rm = RepoModel(sa)
 
    
 
    rm.create(form_data, cur_user, just_db=True, fork=True)
 
    
 
    repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
 
    repo_path = os.path.join(repos_path, form_data['repo_name'])
 
    repo_fork_path = os.path.join(repos_path, form_data['fork_name'])
 
    
 
    MercurialRepository(str(repo_fork_path), True, clone_url=str(repo_path))
 
    #invalidate_cache('cached_repo_list')
 

	
 
    
 
def __get_codes_stats(repo_name):
 
    LANGUAGES_EXTENSIONS = ['action', 'adp', 'ashx', 'asmx', 'aspx', 'asx', 'axd', 'c',
 
                    'cfg', 'cfm', 'cpp', 'cs', 'diff', 'do', 'el', 'erl',
 
                    'h', 'java', 'js', 'jsp', 'jspx', 'lisp',
 
                    'lua', 'm', 'mako', 'ml', 'pas', 'patch', 'php', 'php3',
 
                    'php4', 'phtml', 'pm', 'py', 'rb', 'rst', 's', 'sh',
 
                    'tpl', 'txt', 'vim', 'wss', 'xhtml', 'xml', 'xsl', 'xslt',
 
                    'yaws']
 
    repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
 
    repo = MercurialRepository(repos_path + repo_name)
 

	
 
    code_stats = {}
 
    for topnode, dirs, files in repo.walk('/', 'tip'):
 
        for f in files:
 
            k = f.mimetype
 
            if f.extension in LANGUAGES_EXTENSIONS:
 
                if code_stats.has_key(k):
 
                    code_stats[k] += 1
 
                else:
 
                    code_stats[k] = 1
 
                    
 
    return code_stats or {}
pylons_app/model/hg_model.py
Show inline comments
 
@@ -5,54 +5,55 @@
 
# 
 
# This program is free software; you can redistribute it and/or
 
# modify it under the terms of the GNU General Public License
 
# as published by the Free Software Foundation; version 2
 
# of the License or (at your opinion) any later version of the license.
 
# 
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
# 
 
# You should have received a copy of the GNU General Public License
 
# along with this program; if not, write to the Free Software
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 
# MA  02110-1301, USA.
 
"""
 
Created on April 9, 2010
 
Model for hg app
 
@author: marcink
 
"""
 
from beaker.cache import cache_region
 
from mercurial import ui
 
from mercurial.hgweb.hgwebdir_mod import findrepos
 
from pylons.i18n.translation import _
 
from pylons_app.lib import helpers as h
 
from pylons_app.lib.utils import invalidate_cache
 
from pylons_app.lib.auth import HasRepoPermissionAny
 
from pylons_app.model import meta
 
from pylons_app.model.db import Repository, User
 
from pylons_app.lib import helpers as h
 
from sqlalchemy.orm import joinedload
 
from vcs.exceptions import RepositoryError, VCSError
 
from sqlalchemy.orm import joinedload
 
import logging
 
import os
 
import sys
 
log = logging.getLogger(__name__)
 

	
 
try:
 
    from vcs.backends.hg import MercurialRepository
 
except ImportError:
 
    sys.stderr.write('You have to import vcs module')
 
    raise Exception('Unable to import vcs')
 

	
 
def _get_repos_cached_initial(app_globals, initial):
 
    """return cached dict with repos
 
    """
 
    g = app_globals
 
    return HgModel.repo_scan(g.paths[0][0], g.paths[0][1], g.baseui, initial)
 

	
 
@cache_region('long_term', 'cached_repo_list')
 
def _get_repos_cached():
 
    """return cached dict with repos
 
    """
 
    log.info('getting all repositories list')
 
    from pylons import app_globals as g
 
    return HgModel.repo_scan(g.paths[0][0], g.paths[0][1], g.baseui)
 
@@ -102,71 +103,84 @@ class HgModel(object):
 
        if not repos_path.endswith('*'):
 
            raise VCSError('You need to specify * or ** at the end of path '
 
                            'for recursive scanning')
 
            
 
        check_repo_dir(repos_path)
 
        log.info('scanning for repositories in %s', repos_path)
 
        repos = findrepos([(repos_prefix, repos_path)])
 
        if not isinstance(baseui, ui.ui):
 
            baseui = ui.ui()
 
    
 
        repos_list = {}
 
        for name, path in repos:
 
            try:
 
                #name = name.split('/')[-1]
 
                if repos_list.has_key(name):
 
                    raise RepositoryError('Duplicate repository name %s found in'
 
                                    ' %s' % (name, path))
 
                else:
 
                    
 
                    repos_list[name] = MercurialRepository(path, baseui=baseui)
 
                    repos_list[name].name = name
 
                    
 
                    dbrepo = None
 
                    if not initial:
 
                        #for initial scann on application first run we don't
 
                        #have db repos yet.
 
                        dbrepo = sa.query(Repository)\
 
                            .options(joinedload(Repository.fork))\
 
                            .filter(Repository.repo_name == name)\
 
                            .scalar()
 
                            
 
                    if dbrepo:
 
                        log.info('Adding db instance to cached list')
 
                        repos_list[name].dbrepo = dbrepo
 
                        repos_list[name].description = dbrepo.description
 
                        if dbrepo.user:
 
                            repos_list[name].contact = dbrepo.user.full_contact
 
                        else:
 
                            repos_list[name].contact = sa.query(User)\
 
                            .filter(User.admin == True).first().full_contact
 
            except OSError:
 
                continue
 
        meta.Session.remove()
 
        return repos_list
 
        
 
    def get_repos(self):
 
        for name, repo in _get_repos_cached().items():
 
            if repo._get_hidden():
 
                #skip hidden web repository
 
                continue
 
            
 
            last_change = repo.last_change
 
            tip = h.get_changeset_safe(repo, 'tip')
 
                
 
            tmp_d = {}
 
            tmp_d['name'] = repo.name
 
            tmp_d['name_sort'] = tmp_d['name'].lower()
 
            tmp_d['description'] = repo.description
 
            tmp_d['description_sort'] = tmp_d['description']
 
            tmp_d['last_change'] = last_change
 
            tmp_d['last_change_sort'] = last_change[1] - last_change[0]
 
            tmp_d['tip'] = tip.short_id
 
            tmp_d['tip_sort'] = tip.revision 
 
            tmp_d['rev'] = tip.revision
 
            tmp_d['contact'] = repo.contact
 
            tmp_d['contact_sort'] = tmp_d['contact']
 
            tmp_d['repo_archives'] = list(repo._get_archives())
 
            tmp_d['last_msg'] = tip.message
 
            tmp_d['repo'] = repo
 
            yield tmp_d
 

	
 
    def get_repo(self, repo_name):
 
        return _get_repos_cached()[repo_name]
 
        try:
 
            repo = _get_repos_cached()[repo_name]
 
            return repo
 
        except KeyError:
 
            #i we're here and we got key errors let's try to invalidate the
 
            #cahce and try again
 
            invalidate_cache('cached_repo_list')
 
            repo = _get_repos_cached()[repo_name]
 
            return repo
 
            
 
        
 
        
pylons_app/tests/__init__.py
Show inline comments
 
@@ -7,48 +7,50 @@ command.
 
This module initializes the application via ``websetup`` (`paster
 
setup-app`) and provides the base testing objects.
 
"""
 
from unittest import TestCase
 

	
 
from paste.deploy import loadapp
 
from paste.script.appinstall import SetupCommand
 
from pylons import config, url
 
from routes.util import URLGenerator
 
from webtest import TestApp
 
import os
 
from pylons_app.model import meta
 
import logging
 

	
 

	
 
log = logging.getLogger(__name__) 
 

	
 
import pylons.test
 

	
 
__all__ = ['environ', 'url', 'TestController']
 

	
 
# Invoke websetup with the current config file
 
#SetupCommand('setup-app').run([config_file])
 

	
 
##RUNNING DESIRED TESTS
 
#nosetests pylons_app.tests.functional.test_admin_settings:TestSettingsController.test_my_account
 

	
 
environ = {}
 

	
 
class TestController(TestCase):
 

	
 
    def __init__(self, *args, **kwargs):
 
        wsgiapp = pylons.test.pylonsapp
 
        config = wsgiapp.config
 
        self.app = TestApp(wsgiapp)
 
        url._push_object(URLGenerator(config['routes.map'], environ))
 
        self.sa = meta.Session
 

	
 
        TestCase.__init__(self, *args, **kwargs)
 
    
 
    def log_user(self, username='test_admin', password='test12'):
 
        response = self.app.post(url(controller='login', action='index'),
 
                                 {'username':username,
 
                                  'password':password})
 
        print response
 
        
 
        if 'invalid user name' in response.body:
 
            assert False, 'could not login using %s %s' % (username, password)
 
        
 
        assert response.status == '302 Found', 'Wrong response code from login got %s' % response.status
pylons_app/tests/functional/test_settings.py
Show inline comments
 
@@ -22,25 +22,34 @@ class TestSettingsController(TestControl
 
        repo_name = 'vcs_test'
 
        response = self.app.post(url(controller='settings', action='fork_create',
 
                                    repo_name=repo_name),
 
                                    {'fork_name':fork_name,
 
                                     'description':description,
 
                                     'private':'False'})
 
        
 
        
 
        print response
 
        
 
        #test if we have a message that fork is ok
 
        assert 'fork %s repository as %s task added' \
 
                      % (repo_name, fork_name) in response.session['flash'][0], 'No flash message about fork'
 
                      
 
        #test if the fork was created in the database
 
        fork_repo = self.sa.query(Repository).filter(Repository.repo_name == fork_name).one()
 
        
 
        assert fork_repo.repo_name == fork_name, 'wrong name of repo name in new db fork repo'
 
        assert fork_repo.fork.repo_name == repo_name, 'wron fork parrent'
 
        
 
        
 
        #test if fork is visible in the list ?
 
        response = response.follow()
 
        
 

	
 
        #check if fork is marked as fork
 
        response = self.app.get(url(controller='summary', action='index',
 
                                    repo_name=fork_name))
 
        
 
        
 
        print response
 
        
 
        assert 'Fork of %s' % repo_name in response.body, 'no message about that this repo is a fork'
 
        
0 comments (0 inline, 0 general)