Changeset - 3fc3ce53659b
[Not reviewed]
celery
0 3 3
Marcin Kuzminski - 15 years ago 2010-09-11 01:55:46
marcin@python-works.com
starting celery branch
6 files changed with 189 insertions and 78 deletions:
0 comments (0 inline, 0 general)
celeryconfig.py
Show inline comments
 
new file 100644
 
# List of modules to import when celery starts.
 
import sys
 
import os
 
sys.path.append(os.getcwd())
 
CELERY_IMPORTS = ("pylons_app.lib.celerylib.tasks", )
 

	
 
## Result store settings.
 
CELERY_RESULT_BACKEND = "database"
 
CELERY_RESULT_DBURI = "sqlite:///hg_app.db"
 

	
 

	
 
## Broker settings.
 
BROKER_HOST = "localhost"
 
BROKER_PORT = 5672
 
BROKER_VHOST = "rabbitmqhost"
 
BROKER_USER = "rabbitmq"
 
BROKER_PASSWORD = "qweqwe"
 

	
 
## Worker settings
 
## If you're doing mostly I/O you can have more processes,
 
## but if mostly spending CPU, try to keep it close to the
 
## number of CPUs on your machine. If not set, the number of CPUs/cores
 
## available will be used.
 
CELERYD_CONCURRENCY = 2
 
# CELERYD_LOG_FILE = "celeryd.log"
 
CELERYD_LOG_LEVEL = "DEBUG"
 
CELERYD_MAX_TASKS_PER_CHILD = 1
 

	
 
#CELERY_ALWAYS_EAGER = True
 
#rabbitmqctl add_user rabbitmq qweqwe
 
#rabbitmqctl add_vhost rabbitmqhost
 
#rabbitmqctl set_permissions -p rabbitmqhost rabbitmq ".*" ".*" ".*"
 
\ No newline at end of file
pylons_app/controllers/admin/settings.py
Show inline comments
 
@@ -38,6 +38,7 @@ from pylons_app.model.forms import UserF
 
    ApplicationUiSettingsForm
 
from pylons_app.model.hg_model import HgModel
 
from pylons_app.model.user_model import UserModel
 
from pylons_app.lib.celerylib import tasks,run_task
 
import formencode
 
import logging
 
import traceback
 
@@ -102,6 +103,12 @@ class SettingsController(BaseController)
 
            invalidate_cache('cached_repo_list')
 
            h.flash(_('Repositories sucessfully rescanned'), category='success')            
 
        
 
        if setting_id == 'whoosh':
 
            repo_location = get_hg_ui_settings()['paths_root_path']
 
            full_index = request.POST.get('full_index',False)
 
            task = run_task(tasks.whoosh_index,True,repo_location,full_index)
 
            
 
            h.flash(_('Whoosh reindex task scheduled'), category='success')
 
        if setting_id == 'global':
 
            
 
            application_form = ApplicationSettingsForm()()
pylons_app/controllers/summary.py
Show inline comments
 
@@ -22,16 +22,14 @@ Created on April 18, 2010
 
summary controller for pylons
 
@author: marcink
 
"""
 
from datetime import datetime, timedelta
 
from pylons import tmpl_context as c, request
 
from pylons import tmpl_context as c, request,url
 
from pylons_app.lib.auth import LoginRequired, HasRepoPermissionAnyDecorator
 
from pylons_app.lib.base import BaseController, render
 
from pylons_app.lib.helpers import person
 
from pylons_app.lib.utils import OrderedDict
 
from pylons_app.model.hg_model import HgModel
 
from time import mktime
 
from webhelpers.paginate import Page
 
import calendar
 
from pylons_app.lib.celerylib import run_task
 
from pylons_app.lib.celerylib.tasks import get_commits_stats
 
import logging
 

	
 
log = logging.getLogger(__name__)
 
@@ -62,78 +60,11 @@ class SummaryController(BaseController):
 
        c.repo_branches = OrderedDict()
 
        for name, hash in c.repo_info.branches.items()[:10]:
 
            c.repo_branches[name] = c.repo_info.get_changeset(hash)
 

	
 
        c.commit_data = self.__get_commit_stats(c.repo_info)
 
        
 
        task = run_task(get_commits_stats,False,c.repo_info.name)
 
        c.ts_min = task.result[0]
 
        c.ts_max = task.result[1]
 
        c.commit_data = task.result[2]
 
        
 
        return render('summary/summary.html')
 

	
 

	
 

	
 
    def __get_commit_stats(self, repo):
 
        aggregate = OrderedDict()
 
        
 
        #graph range
 
        td = datetime.today() + timedelta(days=1) 
 
        y, m, d = td.year, td.month, td.day
 
        c.ts_min = mktime((y, (td - timedelta(days=calendar.mdays[m])).month,
 
                            d, 0, 0, 0, 0, 0, 0,))
 
        c.ts_max = mktime((y, m, d, 0, 0, 0, 0, 0, 0,))
 
        
 
        def author_key_cleaner(k):
 
            k = person(k)
 
            k = k.replace('"', "'") #for js data compatibilty
 
            return k
 
                
 
        for cs in repo[:200]:#added limit 200 until fix #29 is made
 
            k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
 
                              cs.date.timetuple()[2])
 
            timetupple = [int(x) for x in k.split('-')]
 
            timetupple.extend([0 for _ in xrange(6)])
 
            k = mktime(timetupple)
 
            if aggregate.has_key(author_key_cleaner(cs.author)):
 
                if aggregate[author_key_cleaner(cs.author)].has_key(k):
 
                    aggregate[author_key_cleaner(cs.author)][k]["commits"] += 1
 
                    aggregate[author_key_cleaner(cs.author)][k]["added"] += len(cs.added)
 
                    aggregate[author_key_cleaner(cs.author)][k]["changed"] += len(cs.changed)
 
                    aggregate[author_key_cleaner(cs.author)][k]["removed"] += len(cs.removed)
 
                    
 
                else:
 
                    #aggregate[author_key_cleaner(cs.author)].update(dates_range)
 
                    if k >= c.ts_min and k <= c.ts_max:
 
                        aggregate[author_key_cleaner(cs.author)][k] = {}
 
                        aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
 
                        aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
 
                        aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
 
                        aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed) 
 
                                            
 
            else:
 
                if k >= c.ts_min and k <= c.ts_max:
 
                    aggregate[author_key_cleaner(cs.author)] = OrderedDict()
 
                    #aggregate[author_key_cleaner(cs.author)].update(dates_range)
 
                    aggregate[author_key_cleaner(cs.author)][k] = {}
 
                    aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
 
                    aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
 
                    aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
 
                    aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed)                 
 
        
 
        d = ''
 
        tmpl0 = u""""%s":%s"""
 
        tmpl1 = u"""{label:"%s",data:%s,schema:["commits"]},"""
 
        for author in aggregate:
 
            
 
            d += tmpl0 % (author,
 
                          tmpl1 \
 
                          % (author,
 
                        [{"time":x,
 
                          "commits":aggregate[author][x]['commits'],
 
                          "added":aggregate[author][x]['added'],
 
                          "changed":aggregate[author][x]['changed'],
 
                          "removed":aggregate[author][x]['removed'],
 
                          } for x in aggregate[author]]))
 
        if d == '':
 
            d = '"%s":{label:"%s",data:[[0,1],]}' \
 
                % (author_key_cleaner(repo.contact),
 
                   author_key_cleaner(repo.contact))
 
        return d
 

	
 

	
pylons_app/lib/celerylib/__init__.py
Show inline comments
 
new file 100644
 
from vcs.utils.lazy import LazyProperty
 
import logging
 

	
 
log = logging.getLogger(__name__)
 

	
 
class ResultWrapper(object):
 
    def __init__(self, task):
 
        self.task = task
 
        
 
    @LazyProperty
 
    def result(self):
 
        return self.task
 

	
 
def run_task(task,async,*args,**kwargs):
 
    try:
 
        t = task.delay(*args,**kwargs)
 
        log.info('running task %s',t.task_id)
 
        if not async:
 
            t.wait()
 
        return t
 
    except:
 
        #pure sync version
 
        return ResultWrapper(task(*args,**kwargs))
 
    
 
\ No newline at end of file
pylons_app/lib/celerylib/tasks.py
Show inline comments
 
new file 100644
 
from celery.decorators import task
 
from datetime import datetime, timedelta
 
from pylons_app.lib.helpers import person
 
from pylons_app.lib.utils import OrderedDict
 
from time import mktime
 
import calendar
 
import logging
 
from vcs.backends.hg import MercurialRepository
 

	
 
log = logging.getLogger(__name__)
 

	
 
@task()
 
def whoosh_index(repo_location,full_index):
 
    from pylons_app.lib.indexers import DaemonLock
 
    from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon,LockHeld
 
    try:
 
        l = DaemonLock()
 
        WhooshIndexingDaemon(repo_location=repo_location)\
 
            .run(full_index=full_index)
 
        l.release()
 
        return 'Done'
 
    except LockHeld:
 
        log.info('LockHeld')
 
        return 'LockHeld'    
 

	
 
@task()
 
def get_commits_stats(repo):
 
    aggregate = OrderedDict()
 
    repo = MercurialRepository('/home/marcink/hg_repos/'+repo)
 
    #graph range
 
    td = datetime.today() + timedelta(days=1) 
 
    y, m, d = td.year, td.month, td.day
 
    ts_min = mktime((y, (td - timedelta(days=calendar.mdays[m])).month,
 
                        d, 0, 0, 0, 0, 0, 0,))
 
    ts_max = mktime((y, m, d, 0, 0, 0, 0, 0, 0,))
 
    
 
    def author_key_cleaner(k):
 
        k = person(k)
 
        k = k.replace('"', "'") #for js data compatibilty
 
        return k
 
            
 
    for cs in repo[:200]:#added limit 200 until fix #29 is made
 
        k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
 
                          cs.date.timetuple()[2])
 
        timetupple = [int(x) for x in k.split('-')]
 
        timetupple.extend([0 for _ in xrange(6)])
 
        k = mktime(timetupple)
 
        if aggregate.has_key(author_key_cleaner(cs.author)):
 
            if aggregate[author_key_cleaner(cs.author)].has_key(k):
 
                aggregate[author_key_cleaner(cs.author)][k]["commits"] += 1
 
                aggregate[author_key_cleaner(cs.author)][k]["added"] += len(cs.added)
 
                aggregate[author_key_cleaner(cs.author)][k]["changed"] += len(cs.changed)
 
                aggregate[author_key_cleaner(cs.author)][k]["removed"] += len(cs.removed)
 
                
 
            else:
 
                #aggregate[author_key_cleaner(cs.author)].update(dates_range)
 
                if k >= ts_min and k <= ts_max:
 
                    aggregate[author_key_cleaner(cs.author)][k] = {}
 
                    aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
 
                    aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
 
                    aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
 
                    aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed) 
 
                                        
 
        else:
 
            if k >= ts_min and k <= ts_max:
 
                aggregate[author_key_cleaner(cs.author)] = OrderedDict()
 
                #aggregate[author_key_cleaner(cs.author)].update(dates_range)
 
                aggregate[author_key_cleaner(cs.author)][k] = {}
 
                aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
 
                aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
 
                aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
 
                aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed)                 
 
    
 
    d = ''
 
    tmpl0 = u""""%s":%s"""
 
    tmpl1 = u"""{label:"%s",data:%s,schema:["commits"]},"""
 
    for author in aggregate:
 
        
 
        d += tmpl0 % (author,
 
                      tmpl1 \
 
                      % (author,
 
                    [{"time":x,
 
                      "commits":aggregate[author][x]['commits'],
 
                      "added":aggregate[author][x]['added'],
 
                      "changed":aggregate[author][x]['changed'],
 
                      "removed":aggregate[author][x]['removed'],
 
                      } for x in aggregate[author]]))
 
    if d == '':
 
        d = '"%s":{label:"%s",data:[[0,1],]}' \
 
            % (author_key_cleaner(repo.contact),
 
               author_key_cleaner(repo.contact))
 
    return (ts_min, ts_max, d)    
pylons_app/templates/admin/settings/settings.html
Show inline comments
 
@@ -47,7 +47,32 @@
 
        </div>
 
    </div>  
 
    ${h.end_form()}
 
     
 
    
 
    <h3>${_('Whoosh indexing')}</h3>
 
    ${h.form(url('admin_setting', setting_id='whoosh'),method='put')}
 
    <div class="form">
 
        <!-- fields -->
 
        
 
        <div class="fields">
 
            <div class="field">
 
                <div class="label label-checkbox">
 
                    <label for="destroy">${_('index build option')}:</label>
 
                </div>
 
                <div class="checkboxes">
 
                    <div class="checkbox">
 
                        ${h.checkbox('full_index',True)}
 
                        <label for="checkbox-1">${_('build from scratch')}</label>
 
                    </div>
 
                </div>
 
            </div>
 
                            
 
            <div class="buttons">
 
            ${h.submit('reindex','reindex',class_="ui-button ui-widget ui-state-default ui-corner-all")}
 
            </div>                                                          
 
        </div>
 
    </div>  
 
    ${h.end_form()}
 
         
 
    <h3>${_('Global application settings')}</h3> 
 
    ${h.form(url('admin_setting', setting_id='global'),method='put')}
 
    <div class="form">
0 comments (0 inline, 0 general)