diff --git a/rhodecode/lib/celerypylons/commands.py b/rhodecode/lib/celerypylons/commands.py --- a/rhodecode/lib/celerypylons/commands.py +++ b/rhodecode/lib/celerypylons/commands.py @@ -1,11 +1,35 @@ from rhodecode.lib.utils import BasePasterCommand, Command - +from celery.app import app_or_default +from celery.bin import camqadm, celerybeat, celeryd, celeryev __all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand', 'CAMQPAdminCommand', 'CeleryEventCommand'] -class CeleryDaemonCommand(BasePasterCommand): +class CeleryCommand(BasePasterCommand): + """Abstract class implements run methods needed for celery + + Starts the celery worker that uses a paste.deploy configuration + file. + """ + + def update_parser(self): + """ + Abstract method. Allows for the class's parser to be updated + before the superclass's `run` method is called. Necessary to + allow options/arguments to be passed through to the underlying + celery command. + """ + + cmd = self.celery_command(app_or_default()) + for x in cmd.get_options(): + self.parser.add_option(x) + + def command(self): + cmd = self.celery_command(app_or_default()) + return cmd.run(**vars(self.options)) + +class CeleryDaemonCommand(CeleryCommand): """Start the celery worker Starts the celery worker that uses a paste.deploy configuration @@ -16,18 +40,10 @@ class CeleryDaemonCommand(BasePasterComm description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import celeryd - for x in celeryd.WorkerCommand().get_options(): - self.parser.add_option(x) - - def command(self): - from celery.bin import celeryd - return celeryd.WorkerCommand().run(**vars(self.options)) + celery_command = celeryd.WorkerCommand -class CeleryBeatCommand(BasePasterCommand): +class CeleryBeatCommand(CeleryCommand): """Start the celery beat server Starts the celery beat server using a paste.deploy configuration @@ -38,17 +54,10 @@ class CeleryBeatCommand(BasePasterComman description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import celerybeat - for x in celerybeat.BeatCommand().get_options(): - self.parser.add_option(x) + celery_command = celerybeat.BeatCommand - def command(self): - from celery.bin import celerybeat - return celerybeat.BeatCommand(**vars(self.options)) -class CAMQPAdminCommand(BasePasterCommand): +class CAMQPAdminCommand(CeleryCommand): """CAMQP Admin CAMQP celery admin tool. @@ -58,19 +67,10 @@ class CAMQPAdminCommand(BasePasterComman description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import camqadm - for x in camqadm.OPTION_LIST: - self.parser.add_option(x) + celery_command = camqadm.AMQPAdminCommand - def command(self): - from celery.bin import camqadm - return camqadm.camqadm(*self.args, **vars(self.options)) - - -class CeleryEventCommand(BasePasterCommand): - """Celery event commandd. +class CeleryEventCommand(CeleryCommand): + """Celery event command. Capture celery events. """ @@ -79,12 +79,4 @@ class CeleryEventCommand(BasePasterComma description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import celeryev - for x in celeryev.OPTION_LIST: - self.parser.add_option(x) - - def command(self): - from celery.bin import celeryev - return celeryev.run_celeryev(**vars(self.options)) + celery_command = celeryev.EvCommand