Changeset - 7e7db11d4e4d
[Not reviewed]
default
0 6 0
Mads Kiilerich - 9 years ago 2016-09-06 00:51:18
madski@unity3d.com
celerypylons: wrap celery import so we always get the right environment variables set and check configuration

Get rid of magic ... or at least document and encapsulate it.

Before, celerypylons would set the environment variables that made it possible
to import Celery after Pylons had been configured.

Now, the module will import Celery ... and verify that Pylons has been
configured.

(A next stop could be to move things around so this got tied closely to
initializing Pylons. Or something.)
6 files changed with 35 insertions and 27 deletions:
0 comments (0 inline, 0 general)
kallithea/config/environment.py
Show inline comments
 
@@ -16,27 +16,24 @@
 
"""
 

	
 
import os
 
import logging
 
import kallithea
 
import platform
 

	
 
import pylons
 
import mako.lookup
 
import beaker
 
import formencode
 

	
 
# don't remove this import it does magic for celery
 
from kallithea.lib import celerypylons
 

	
 
import kallithea.lib.app_globals as app_globals
 

	
 
from kallithea.config.routing import make_map
 

	
 
from kallithea.lib import helpers
 
from kallithea.lib.auth import set_available_permissions
 
from kallithea.lib.utils import repo2db_mapper, make_ui, set_app_settings, \
 
    load_rcextensions, check_git_version, set_vcs_config, set_indexer_config
 
from kallithea.lib.utils2 import engine_from_config, str2bool
 
from kallithea.lib.db_manage import DbManage
 
from kallithea.model import init_model
 
from kallithea.model.scm import ScmModel
kallithea/controllers/admin/repos.py
Show inline comments
 
@@ -180,27 +180,27 @@ class ReposController(BaseRepoController
 
            raise HTTPNotFound()
 
        return render('admin/repos/repo_creating.html')
 

	
 
    @LoginRequired()
 
    @NotAnonymous()
 
    @jsonify
 
    def repo_check(self, repo_name):
 
        c.repo = repo_name
 
        task_id = request.GET.get('task_id')
 

	
 
        if task_id and task_id not in ['None']:
 
            from kallithea import CELERY_ON
 
            from celery.result import AsyncResult
 
            from kallithea.lib import celerypylons
 
            if CELERY_ON:
 
                task = AsyncResult(task_id)
 
                task = celerypylons.result.AsyncResult(task_id)
 
                if task.failed():
 
                    raise HTTPInternalServerError(task.traceback)
 

	
 
        repo = Repository.get_by_repo_name(repo_name)
 
        if repo and repo.repo_state == Repository.STATE_CREATED:
 
            if repo.clone_uri:
 
                h.flash(_('Created repository %s from %s')
 
                        % (repo.repo_name, repo.clone_uri_hidden), category='success')
 
            else:
 
                repo_url = h.link_to(repo.repo_name,
 
                                     h.url('summary_home',
 
                                           repo_name=repo.repo_name))
kallithea/lib/celerylib/__init__.py
Show inline comments
 
@@ -63,26 +63,26 @@ class FakeTask(object):
 
def task(f_org):
 
    """Wrapper of celery.task.task, running async if CELERY_ON
 
    """
 

	
 
    if CELERY_ON:
 
        def f_async(*args, **kwargs):
 
            log.info('executing %s task', f_org.__name__)
 
            try:
 
                f_org(*args, **kwargs)
 
            finally:
 
                log.info('executed %s task', f_org.__name__)
 
        f_async.__name__ = f_org.__name__
 
        import celery.task
 
        runner = celery.task.task(ignore_result=True)(f_async)
 
        from kallithea.lib import celerypylons
 
        runner = celerypylons.task(ignore_result=True)(f_async)
 
        def f_wrapped(*args, **kwargs):
 
            t = runner.apply_async(args=args, kwargs=kwargs)
 
            log.info('executing task %s in async mode - id %s', f_org, t.task_id)
 
            return t
 
    else:
 
        def f_wrapped(*args, **kwargs):
 
            log.info('executing task %s in sync', f_org.__name__)
 
            try:
 
                result = f_org(*args, **kwargs)
 
            except Exception as e:
 
                log.error('exception executing sync task %s in sync', f_org.__name__, e)
 
                raise # TODO: return this in FakeTask as with async tasks?
kallithea/lib/celerypylons/__init__.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
"""
 
Automatically sets the environment variable `CELERY_LOADER` to
 
`celerypylons.loader:PylonsLoader`.  This ensures the loader is
 
specified when accessing the rest of this package, and allows celery
 
to be installed in a webapp just by importing celerypylons::
 
Kallithea wrapper of Celery
 

	
 
The Celery configuration is in the normal Pylons ini file. We thus have to set
 
the `CELERY_LOADER` environment variable to point at a custom "loader" that can
 
read it. That environment variable must be set *before* importing celery. To
 
ensure that, we wrap celery in this module.
 

	
 
    import celerypylons
 
Also, the loader depends on Pylons being configured to it can read the Celery
 
configuration out of it. To make sure that really is the case and give an early
 
warning, we check one of the mandatory settings.
 

	
 
This module must thus not be imported in global scope but must be imported on
 
demand in function scope.
 
"""
 

	
 
import os
 
import warnings
 

	
 
# Verify Pylons configuration has been loaded
 
from pylons import config
 
assert config['celery.imports'] == 'kallithea.lib.celerylib.tasks', 'Kallithea Celery configuration has not been loaded'
 

	
 
# Prepare environment to point at Kallithea Pylons loader
 
CELERYPYLONS_LOADER = 'kallithea.lib.celerypylons.loader.PylonsLoader'
 
if os.environ.get('CELERY_LOADER', CELERYPYLONS_LOADER) != CELERYPYLONS_LOADER:
 
    warnings.warn("'CELERY_LOADER' environment variable will be overridden by celery-pylons.")
 
os.environ['CELERY_LOADER'] = CELERYPYLONS_LOADER
 

	
 
# Import (and expose) celery, thus immediately triggering use of the custom Pylons loader
 
import celery.app as app
 
import celery.result as result
 
from celery.task import task
 
from celery.bin import camqadm, celerybeat, celeryd, celeryev
kallithea/lib/celerypylons/commands.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
import kallithea
 
from kallithea.lib.paster_commands.common import BasePasterCommand
 
from kallithea.lib.utils import Command, load_rcextensions
 
from celery.app import app_or_default
 
from celery.bin import camqadm, celerybeat, celeryd, celeryev
 

	
 

	
 
from kallithea.lib.utils2 import str2bool
 

	
 
__all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand',
 
           'CAMQPAdminCommand', 'CeleryEventCommand']
 

	
 

	
 
class CeleryCommand(BasePasterCommand):
 
    """Abstract class implements run methods needed for celery
 

	
 
    Starts the celery worker that uses a paste.deploy configuration
 
    file.
 
    """
 

	
 
    def update_parser(self):
 
        """
 
        Abstract method.  Allows for the class's parser to be updated
 
        before the superclass's `run` method is called.  Necessary to
 
        allow options/arguments to be passed through to the underlying
 
        celery command.
 
        """
 

	
 
        cmd = self.celery_command(app_or_default())
 
        from kallithea.lib import celerypylons
 
        cmd = self.celery_command(celerypylons.app.app_or_default())
 
        for x in cmd.get_options():
 
            self.parser.add_option(x)
 

	
 
    def command(self):
 
        from kallithea.lib import celerypylons
 
        from pylons import config
 
        try:
 
            CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
 
        except KeyError:
 
            CELERY_ON = False
 

	
 
        if not CELERY_ON:
 
            raise Exception('Please set use_celery = true in .ini config '
 
                            'file before running celeryd')
 
        kallithea.CELERY_ON = CELERY_ON
 
        load_rcextensions(config['here'])
 
        cmd = self.celery_command(app_or_default())
 
        cmd = self.celery_command(celerypylons.app.app_or_default())
 
        return cmd.run(**vars(self.options))
 

	
 

	
 
class CeleryDaemonCommand(CeleryCommand):
 
    """Start the celery worker
 

	
 
    Starts the celery worker that uses a paste.deploy configuration
 
    file.
 
    """
 
    usage = 'CONFIG_FILE [celeryd options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 
    celery_command = celeryd.WorkerCommand
 
    celery_command = celerypylons.celeryd.WorkerCommand
 

	
 

	
 
class CeleryBeatCommand(CeleryCommand):
 
    """Start the celery beat server
 

	
 
    Starts the celery beat server using a paste.deploy configuration
 
    file.
 
    """
 
    usage = 'CONFIG_FILE [celerybeat options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 
    celery_command = celerybeat.BeatCommand
 
    celery_command = celerypylons.celerybeat.BeatCommand
 

	
 

	
 
class CAMQPAdminCommand(CeleryCommand):
 
    """CAMQP Admin
 

	
 
    CAMQP celery admin tool.
 
    """
 
    usage = 'CONFIG_FILE [camqadm options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 
    celery_command = camqadm.AMQPAdminCommand
 
    celery_command = celerypylons.camqadm.AMQPAdminCommand
 

	
 

	
 
class CeleryEventCommand(CeleryCommand):
 
    """Celery event command.
 

	
 
    Capture celery events.
 
    """
 
    usage = 'CONFIG_FILE [celeryev options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 
    celery_command = celeryev.EvCommand
 
    celery_command = celerypylons.celeryev.EvCommand
kallithea/lib/paster_commands/common.py
Show inline comments
 
@@ -41,30 +41,24 @@ def ask_ok(prompt, retries=4, complaint=
 
            return True
 
        if ok in ('n', 'no', 'nop', 'nope'):
 
            return False
 
        retries = retries - 1
 
        if retries < 0:
 
            raise IOError
 
        print complaint
 

	
 

	
 
class BasePasterCommand(Command):
 
    """
 
    Abstract Base Class for paster commands.
 

	
 
    The celery commands are somewhat aggressive about loading
 
    celery.conf, and since our module sets the `CELERY_LOADER`
 
    environment variable to our loader, we have to bootstrap a bit and
 
    make sure we've had a chance to load the pylons config off of the
 
    command line, otherwise everything fails.
 
    """
 
    min_args = 1
 
    min_args_error = "Please provide a paster config file as an argument."
 
    takes_config_file = 1
 
    requires_config_file = True
 

	
 
    def run(self, args):
 
        """
 
        Overrides Command.run
 

	
 
        Checks for a config file argument and loads it.
 
        """
0 comments (0 inline, 0 general)