Changeset - df5a67678b96
[Not reviewed]
default
0 3 0
Mads Kiilerich - 8 years ago 2017-08-12 17:55:58
mads@kiilerich.com
celeryd: let the gearbox command use db session as most other gearbox commands do

304aae43194c changed the common gearbox wrapper so make_app_without_logging
only was run for commands tagged as requires_db_session. That broke celeryd -
even a plain 'gearbox celeryd -c my.ini' would fail on the safety check in
celerypylons asserting on tg.config having 'celery.imports' configuration.

The gearbox celeryd command did not really require a db session - it just
required app configuration so it could create db sessions on the fly.

To to get the missing make_app_without_logging invocation back, set
requires_db_session (the default for our gearbox commands).

requires_db_session not only calls make_app_without_logging (which undo the
effect from 304aae43194c), it also calls setup_cache_regions,
engine_from_config, and init_model. These were also invoked explicitly in
celeryd code - these double invocations are dropped too.

Also, make_app_without_logging will call into tg and thus invoke the
setup_configuration hook which will set kallithea.CELERY_ON and call
load_rcextensions. The celeryd code for doing that is thus dropped.
3 files changed with 3 insertions and 18 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/celerylib/__init__.py
Show inline comments
 
@@ -100,41 +100,38 @@ def __get_lockkey(func, *fargs, **fkwarg
 
    lockkey = 'task_%s.lock' % \
 
        md5(func_name + '-' + '-'.join(map(safe_str, params))).hexdigest()
 
    return lockkey
 

	
 

	
 
def locked_task(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        lockkey = __get_lockkey(func, *fargs, **fkwargs)
 
        lockkey_path = config['app_conf']['cache_dir']
 

	
 
        log.info('running task with lockkey %s', lockkey)
 
        try:
 
            l = DaemonLock(os.path.join(lockkey_path, lockkey))
 
            ret = func(*fargs, **fkwargs)
 
            l.release()
 
            return ret
 
        except LockHeld:
 
            log.info('LockHeld')
 
            return 'Task with key %s already running' % lockkey
 

	
 
    return decorator(__wrapper, func)
 

	
 

	
 
def get_session():
 
    if CELERY_ON:
 
        engine = engine_from_config(config, 'sqlalchemy.')
 
        init_model(engine)
 
    sa = meta.Session()
 
    return sa
 

	
 

	
 
def dbsession(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        try:
 
            ret = func(*fargs, **fkwargs)
 
            return ret
 
        finally:
 
            if CELERY_ON and not CELERY_EAGER:
 
                meta.Session.remove()
 

	
 
    return decorator(__wrapper, func)
kallithea/lib/celerylib/tasks.py
Show inline comments
 
@@ -20,59 +20,57 @@ by celery daemon
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Oct 6, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import traceback
 
import logging
 
import rfc822
 

	
 
from time import mktime
 
from operator import itemgetter
 
from string import lower
 

	
 
from tg import config
 

	
 
from kallithea import CELERY_ON
 
from kallithea.lib import celerylib
 
from kallithea.lib.helpers import person
 
from kallithea.lib.rcmail.smtp_mailer import SmtpMailer
 
from kallithea.lib.utils import setup_cache_regions, action_logger
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.utils2 import str2bool
 
from kallithea.lib.vcs.utils import author_email
 
from kallithea.lib.compat import json, OrderedDict
 
from kallithea.lib.hooks import log_create_repository
 

	
 
from kallithea.model.db import Statistics, RepoGroup, Repository, User
 

	
 

	
 
setup_cache_regions(config)  # pragma: no cover
 

	
 
__all__ = ['whoosh_index', 'get_commits_stats', 'send_email']
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
@celerylib.task
 
@celerylib.locked_task
 
@celerylib.dbsession
 
def whoosh_index(repo_location, full_index):
 
    from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
    celerylib.get_session() # initialize database connection
 

	
 
    index_location = config['index_dir']
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location) \
 
                         .run(full_index=full_index)
 

	
 

	
 
@celerylib.task
 
@celerylib.dbsession
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit=100):
 
    DBS = celerylib.get_session()
 
    lockkey = celerylib.__get_lockkey('get_commits_stats', repo_name, ts_min_y,
kallithea/lib/paster_commands/celeryd.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
import argparse
 

	
 
import kallithea
 
from kallithea.lib.paster_commands.common import BasePasterCommand
 
from kallithea.lib.utils import load_rcextensions
 
from kallithea.lib.utils2 import str2bool
 

	
 
__all__ = ['Command']
 

	
 

	
 
class Command(BasePasterCommand):
 
    """Kallithea: Celery worker for asynchronous tasks"""
 

	
 
    # Starts the celery worker using configuration from a paste.deploy
 
    # configuration file.
 

	
 
    requires_db_session = False # will start session on demand
 

	
 
    def take_action(self, args):
 
        from kallithea.lib import celerypylons
 
        try:
 
            CELERY_ON = str2bool(self.config['app_conf'].get('use_celery'))
 
        except KeyError:
 
            CELERY_ON = False
 

	
 
        if not CELERY_ON:
 
        if not kallithea.CELERY_ON:
 
            raise Exception('Please set use_celery = true in .ini config '
 
                            'file before running celeryd')
 
        kallithea.CELERY_ON = CELERY_ON
 

	
 
        load_rcextensions(self.config['here'])
 
        from kallithea.lib import celerypylons
 
        cmd = celerypylons.worker.worker(celerypylons.app.app_or_default())
 

	
 
        celery_args = args.celery_args
 
        if '--' in celery_args:
 
            celery_args.remove('--')
 

	
 
        return cmd.run_from_argv('kallithea celery worker', celery_args)
 

	
 
    def get_parser(self, prog_name):
 
        parser = super(Command, self).get_parser(prog_name)
 

	
 
        parser.add_argument('celery_args', nargs=argparse.REMAINDER,
 
            help="Pass extra options to Celery after a '--' separator",
 
            )
 

	
 
        return parser
0 comments (0 inline, 0 general)