Changeset - ac32a026c306
[Not reviewed]
default
0 2 0
Marcin Kuzminski - 15 years ago 2010-09-23 21:25:30
marcin@python-works.com
simplified task locking, and fixed some bugs for keyworded arguments
2 files changed with 10 insertions and 17 deletions:
0 comments (0 inline, 0 general)
pylons_app/lib/celerylib/__init__.py
Show inline comments
 
@@ -10,57 +10,50 @@ log = logging.getLogger(__name__)
 

	
 
class ResultWrapper(object):
 
    def __init__(self, task):
 
        self.task = task
 
        
 
    @LazyProperty
 
    def result(self):
 
        return self.task
 

	
 
def run_task(task, *args, **kwargs):
 
    try:
 
        t = task.delay(*args, **kwargs)
 
        log.info('running task %s', t.task_id)
 
        return t
 
    except Exception, e:
 
        print e
 
        if e.errno == 111:
 
            log.debug('Unnable to connect. Sync execution')
 
        else:
 
            log.error(traceback.format_exc())
 
        #pure sync version
 
        return ResultWrapper(task(*args, **kwargs))
 

	
 

	
 
class LockTask(object):
 
    """LockTask decorator"""
 
    
 
    def __init__(self, func):
 
        self.func = func
 
        
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 
    
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        params = []
 
        params.extend(fargs)
 
        params.extend(fkwargs.values())
 
def locked_task(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        params = list(fargs)
 
        params.extend(['%s-%s' % ar for ar in fkwargs.items()])
 
            
 
        lockkey = 'task_%s' % \
 
           md5(str(self.func) + '-' + '-'.join(map(str, params))).hexdigest()
 
            md5(str(func.__name__) + '-' + \
 
                '-'.join(map(str, params))).hexdigest()
 
        log.info('running task with lockkey %s', lockkey)
 
        try:
 
            l = DaemonLock(lockkey)
 
            return func(*fargs, **fkwargs)
 
            l.release()
 
        except LockHeld:
 
            log.info('LockHeld')
 
            return 'Task with key %s already running' % lockkey   
 

	
 
            
 
    return decorator(__wrapper, func)      
 
            
 

	
 
        
 
        
 
    
 
    
 
    
 
  
pylons_app/lib/celerylib/tasks.py
Show inline comments
 
from celery.decorators import task
 
from celery.task.sets import subtask
 
from celeryconfig import PYLONS_CONFIG as config
 
from pylons.i18n.translation import _
 
from pylons_app.lib.celerylib import run_task, LockTask
 
from pylons_app.lib.celerylib import run_task, locked_task
 
from pylons_app.lib.helpers import person
 
from pylons_app.lib.smtp_mailer import SmtpMailer
 
from pylons_app.lib.utils import OrderedDict
 
from operator import itemgetter
 
from vcs.backends.hg import MercurialRepository
 
from time import mktime
 
import traceback
 
import json
 

	
 
__all__ = ['whoosh_index', 'get_commits_stats',
 
           'reset_user_password', 'send_email']
 

	
 
def get_session():
 
    from sqlalchemy import engine_from_config
 
    from sqlalchemy.orm import sessionmaker, scoped_session
 
    engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.')
 
    sa = scoped_session(sessionmaker(bind=engine))
 
    return sa
 

	
 
def get_hg_settings():
 
    from pylons_app.model.db import HgAppSettings
 
    try:
 
        sa = get_session()
 
        ret = sa.query(HgAppSettings).all()
 
@@ -61,49 +61,49 @@ def get_hg_ui_settings():
 
        if each.ui_section == 'hooks':
 
            v = each.ui_active
 
        
 
        settings[each.ui_section + '_' + k] = v  
 
    
 
    return settings   
 

	
 
@task
 
def whoosh_index(repo_location, full_index):
 
    log = whoosh_index.get_logger()
 
    from pylons_app.lib.pidlock import DaemonLock
 
    from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon, LockHeld
 
    try:
 
        l = DaemonLock()
 
        WhooshIndexingDaemon(repo_location=repo_location)\
 
            .run(full_index=full_index)
 
        l.release()
 
        return 'Done'
 
    except LockHeld:
 
        log.info('LockHeld')
 
        return 'LockHeld'    
 

	
 

	
 
@task
 
@LockTask('get_commits_stats')
 
@locked_task
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
 
    author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
 
        
 
    from pylons_app.model.db import Statistics, Repository
 
    log = get_commits_stats.get_logger()
 
    commits_by_day_author_aggregate = {}
 
    commits_by_day_aggregate = {}
 
    repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
 
    repo = MercurialRepository(repos_path + repo_name)
 

	
 
    skip_date_limit = True
 
    parse_limit = 350 #limit for single task changeset parsing
 
    last_rev = 0
 
    last_cs = None
 
    timegetter = itemgetter('time')
 
    
 
    sa = get_session()
 
    
 
    dbrepo = sa.query(Repository)\
 
        .filter(Repository.repo_name == repo_name).scalar()
 
    cur_stats = sa.query(Statistics)\
 
        .filter(Statistics.repository == dbrepo).scalar()
 
    if cur_stats:
 
        last_rev = cur_stats.stat_on_revision
0 comments (0 inline, 0 general)