Changeset - 301268606429
[Not reviewed]
beta
0 2 0
Marcin Kuzminski - 13 years ago 2012-12-01 15:17:36
marcin@python-works.com
fixes #666 move lockkey path location to cache_dir to ensure this path is always writable for rhodecode server
2 files changed with 2 insertions and 2 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/celerylib/__init__.py
Show inline comments
 
@@ -48,81 +48,81 @@ from celery.messaging import establish_c
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ResultWrapper(object):
 
    def __init__(self, task):
 
        self.task = task
 

	
 
    @LazyProperty
 
    def result(self):
 
        return self.task
 

	
 

	
 
def run_task(task, *args, **kwargs):
 
    if CELERY_ON:
 
        try:
 
            t = task.apply_async(args=args, kwargs=kwargs)
 
            log.info('running task %s:%s' % (t.task_id, task))
 
            return t
 

	
 
        except socket.error, e:
 
            if isinstance(e, IOError) and e.errno == 111:
 
                log.debug('Unable to connect to celeryd. Sync execution')
 
            else:
 
                log.error(traceback.format_exc())
 
        except KeyError, e:
 
                log.debug('Unable to connect to celeryd. Sync execution')
 
        except Exception, e:
 
            log.error(traceback.format_exc())
 

	
 
    log.debug('executing task %s in sync mode' % task)
 
    return ResultWrapper(task(*args, **kwargs))
 

	
 

	
 
def __get_lockkey(func, *fargs, **fkwargs):
 
    params = list(fargs)
 
    params.extend(['%s-%s' % ar for ar in fkwargs.items()])
 

	
 
    func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
 

	
 
    lockkey = 'task_%s.lock' % \
 
        md5(func_name + '-' + '-'.join(map(safe_str, params))).hexdigest()
 
    return lockkey
 

	
 

	
 
def locked_task(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        lockkey = __get_lockkey(func, *fargs, **fkwargs)
 
        lockkey_path = config['here']
 
        lockkey_path = config['app_conf']['cache_dir']
 

	
 
        log.info('running task with lockkey %s' % lockkey)
 
        try:
 
            l = DaemonLock(file_=jn(lockkey_path, lockkey))
 
            ret = func(*fargs, **fkwargs)
 
            l.release()
 
            return ret
 
        except LockHeld:
 
            log.info('LockHeld')
 
            return 'Task with key %s already running' % lockkey
 

	
 
    return decorator(__wrapper, func)
 

	
 

	
 
def get_session():
 
    if CELERY_ON:
 
        engine = engine_from_config(config, 'sqlalchemy.db1.')
 
        init_model(engine)
 
    sa = meta.Session()
 
    return sa
 

	
 

	
 
def dbsession(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        try:
 
            ret = func(*fargs, **fkwargs)
 
            return ret
 
        finally:
 
            if CELERY_ON and CELERY_EAGER is False:
 
                meta.Session.remove()
 

	
 
    return decorator(__wrapper, func)
rhodecode/lib/celerylib/tasks.py
Show inline comments
 
@@ -47,97 +47,97 @@ from rhodecode.lib.helpers import person
 
from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
 
from rhodecode.lib.utils import add_cache, action_logger
 
from rhodecode.lib.compat import json, OrderedDict
 
from rhodecode.lib.hooks import log_create_repository
 

	
 
from rhodecode.model.db import Statistics, Repository, User
 
from rhodecode.model.scm import ScmModel
 

	
 

	
 
add_cache(config)
 

	
 
__all__ = ['whoosh_index', 'get_commits_stats',
 
           'reset_user_password', 'send_email']
 

	
 

	
 
def get_logger(cls):
 
    if CELERY_ON:
 
        try:
 
            log = cls.get_logger()
 
        except:
 
            log = logging.getLogger(__name__)
 
    else:
 
        log = logging.getLogger(__name__)
 

	
 
    return log
 

	
 

	
 
@task(ignore_result=True)
 
@locked_task
 
@dbsession
 
def whoosh_index(repo_location, full_index):
 
    from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
 
    log = get_logger(whoosh_index)
 
    DBS = get_session()
 

	
 
    index_location = config['index_dir']
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location, sa=DBS)\
 
                         .run(full_index=full_index)
 

	
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
 
    log = get_logger(get_commits_stats)
 
    DBS = get_session()
 
    lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
 
                            ts_max_y)
 
    lockkey_path = config['here']
 
    lockkey_path = config['app_conf']['cache_dir']
 

	
 
    log.info('running task with lockkey %s' % lockkey)
 

	
 
    try:
 
        lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
 

	
 
        # for js data compatibility cleans the key for person from '
 
        akc = lambda k: person(k).replace('"', "")
 

	
 
        co_day_auth_aggr = {}
 
        commits_by_day_aggregate = {}
 
        repo = Repository.get_by_repo_name(repo_name)
 
        if repo is None:
 
            return True
 

	
 
        repo = repo.scm_instance
 
        repo_size = repo.count()
 
        # return if repo have no revisions
 
        if repo_size < 1:
 
            lock.release()
 
            return True
 

	
 
        skip_date_limit = True
 
        parse_limit = int(config['app_conf'].get('commit_parse_limit'))
 
        last_rev = None
 
        last_cs = None
 
        timegetter = itemgetter('time')
 

	
 
        dbrepo = DBS.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).scalar()
 
        cur_stats = DBS.query(Statistics)\
 
            .filter(Statistics.repository == dbrepo).scalar()
 

	
 
        if cur_stats is not None:
 
            last_rev = cur_stats.stat_on_revision
 

	
 
        if last_rev == repo.get_changeset().revision and repo_size > 1:
 
            # pass silently without any work if we're not on first revision or
 
            # current state of parsing revision(from db marker) is the
 
            # last revision
 
            lock.release()
 
            return True
 

	
 
        if cur_stats:
 
            commits_by_day_aggregate = OrderedDict(json.loads(
 
                                        cur_stats.commit_activity_combined))
 
            co_day_auth_aggr = json.loads(cur_stats.commit_activity)
 

	
0 comments (0 inline, 0 general)