diff --git a/rhodecode/lib/celerylib/__init__.py b/rhodecode/lib/celerylib/__init__.py --- a/rhodecode/lib/celerylib/__init__.py +++ b/rhodecode/lib/celerylib/__init__.py @@ -48,6 +48,7 @@ try: except KeyError: CELERY_ON = False + class ResultWrapper(object): def __init__(self, task): self.task = task @@ -56,12 +57,14 @@ class ResultWrapper(object): def result(self): return self.task + def run_task(task, *args, **kwargs): if CELERY_ON: try: t = task.apply_async(args=args, kwargs=kwargs) log.info('running task %s:%s', t.task_id, task) return t + except socket.error, e: if e.errno == 111: log.debug('Unable to connect to celeryd. Sync execution') @@ -76,14 +79,20 @@ def run_task(task, *args, **kwargs): return ResultWrapper(task(*args, **kwargs)) +def __get_lockkey(func, *fargs, **fkwargs): + params = list(fargs) + params.extend(['%s-%s' % ar for ar in fkwargs.items()]) + + func_name = str(func.__name__) if hasattr(func, '__name__') else str(func) + + lockkey = 'task_%s' % \ + md5(func_name + '-' + '-'.join(map(str, params))).hexdigest() + return lockkey + + def locked_task(func): def __wrapper(func, *fargs, **fkwargs): - params = list(fargs) - params.extend(['%s-%s' % ar for ar in fkwargs.items()]) - - lockkey = 'task_%s' % \ - md5(str(func.__name__) + '-' + \ - '-'.join(map(str, params))).hexdigest() + lockkey = __get_lockkey(func, *fargs, **fkwargs) log.info('running task with lockkey %s', lockkey) try: l = DaemonLock(lockkey)