diff --git a/celeryconfig.py b/celeryconfig.py new file mode 100644 --- /dev/null +++ b/celeryconfig.py @@ -0,0 +1,4 @@ +## Broker settings. +BROKER_VHOST = "rabbitmqhost" +BROKER_USER = "rabbitmq" +BROKER_PASSWORD = "qweqwe" diff --git a/rhodecode/lib/celerylib/tasks.py b/rhodecode/lib/celerylib/tasks.py --- a/rhodecode/lib/celerylib/tasks.py +++ b/rhodecode/lib/celerylib/tasks.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """ rhodecode.lib.celerylib.tasks - ~~~~~~~~~~~~~~ + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ RhodeCode task modules, containing all task that suppose to be run by celery daemon @@ -29,6 +29,8 @@ from celery.decorators import task import os import traceback +import logging + from time import mktime from operator import itemgetter @@ -72,21 +74,25 @@ def get_repos_path(): q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one() return q.ui_value -@task +@task(ignore_result=True) @locked_task def whoosh_index(repo_location, full_index): - log = whoosh_index.get_logger() + #log = whoosh_index.get_logger() from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon index_location = config['index_dir'] WhooshIndexingDaemon(index_location=index_location, repo_location=repo_location, sa=get_session())\ .run(full_index=full_index) -@task +@task(ignore_result=True) @locked_task def get_commits_stats(repo_name, ts_min_y, ts_max_y): + try: + log = get_commits_stats.get_logger() + except: + log = logging.getLogger(__name__) + from rhodecode.model.db import Statistics, Repository - log = get_commits_stats.get_logger() #for js data compatibilty author_key_cleaner = lambda k: person(k).replace('"', "") @@ -218,9 +224,13 @@ def get_commits_stats(repo_name, ts_min_ return True -@task +@task(ignore_result=True) def reset_user_password(user_email): - log = reset_user_password.get_logger() + try: + log = reset_user_password.get_logger() + except: + log = logging.getLogger(__name__) + from rhodecode.lib import auth from rhodecode.model.db import User @@ -254,7 +264,7 @@ def reset_user_password(user_email): return True -@task +@task(ignore_result=True) def send_email(recipients, subject, body): """ Sends an email with defined parameters from the .ini files. @@ -265,7 +275,11 @@ def send_email(recipients, subject, body :param subject: subject of the mail :param body: body of the mail """ - log = send_email.get_logger() + try: + log = send_email.get_logger() + except: + log = logging.getLogger(__name__) + email_config = config if not recipients: @@ -289,11 +303,16 @@ def send_email(recipients, subject, body return False return True -@task +@task(ignore_result=True) def create_repo_fork(form_data, cur_user): + try: + log = create_repo_fork.get_logger() + except: + log = logging.getLogger(__name__) + from rhodecode.model.repo import RepoModel from vcs import get_backend - log = create_repo_fork.get_logger() + repo_model = RepoModel(get_session()) repo_model.create(form_data, cur_user, just_db=True, fork=True) repo_name = form_data['repo_name'] diff --git a/rhodecode/lib/celerypylons/commands.py b/rhodecode/lib/celerypylons/commands.py --- a/rhodecode/lib/celerypylons/commands.py +++ b/rhodecode/lib/celerypylons/commands.py @@ -1,11 +1,35 @@ from rhodecode.lib.utils import BasePasterCommand, Command - +from celery.app import app_or_default +from celery.bin import camqadm, celerybeat, celeryd, celeryev __all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand', 'CAMQPAdminCommand', 'CeleryEventCommand'] -class CeleryDaemonCommand(BasePasterCommand): +class CeleryCommand(BasePasterCommand): + """Abstract class implements run methods needed for celery + + Starts the celery worker that uses a paste.deploy configuration + file. + """ + + def update_parser(self): + """ + Abstract method. Allows for the class's parser to be updated + before the superclass's `run` method is called. Necessary to + allow options/arguments to be passed through to the underlying + celery command. + """ + + cmd = self.celery_command(app_or_default()) + for x in cmd.get_options(): + self.parser.add_option(x) + + def command(self): + cmd = self.celery_command(app_or_default()) + return cmd.run(**vars(self.options)) + +class CeleryDaemonCommand(CeleryCommand): """Start the celery worker Starts the celery worker that uses a paste.deploy configuration @@ -16,18 +40,10 @@ class CeleryDaemonCommand(BasePasterComm description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import celeryd - for x in celeryd.WorkerCommand().get_options(): - self.parser.add_option(x) - - def command(self): - from celery.bin import celeryd - return celeryd.WorkerCommand().run(**vars(self.options)) + celery_command = celeryd.WorkerCommand -class CeleryBeatCommand(BasePasterCommand): +class CeleryBeatCommand(CeleryCommand): """Start the celery beat server Starts the celery beat server using a paste.deploy configuration @@ -38,17 +54,10 @@ class CeleryBeatCommand(BasePasterComman description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import celerybeat - for x in celerybeat.BeatCommand().get_options(): - self.parser.add_option(x) + celery_command = celerybeat.BeatCommand - def command(self): - from celery.bin import celerybeat - return celerybeat.BeatCommand(**vars(self.options)) -class CAMQPAdminCommand(BasePasterCommand): +class CAMQPAdminCommand(CeleryCommand): """CAMQP Admin CAMQP celery admin tool. @@ -58,19 +67,10 @@ class CAMQPAdminCommand(BasePasterComman description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import camqadm - for x in camqadm.OPTION_LIST: - self.parser.add_option(x) + celery_command = camqadm.AMQPAdminCommand - def command(self): - from celery.bin import camqadm - return camqadm.camqadm(*self.args, **vars(self.options)) - - -class CeleryEventCommand(BasePasterCommand): - """Celery event commandd. +class CeleryEventCommand(CeleryCommand): + """Celery event command. Capture celery events. """ @@ -79,12 +79,4 @@ class CeleryEventCommand(BasePasterComma description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import celeryev - for x in celeryev.OPTION_LIST: - self.parser.add_option(x) - - def command(self): - from celery.bin import celeryev - return celeryev.run_celeryev(**vars(self.options)) + celery_command = celeryev.EvCommand diff --git a/rhodecode/lib/celerypylons/loader.py b/rhodecode/lib/celerypylons/loader.py --- a/rhodecode/lib/celerypylons/loader.py +++ b/rhodecode/lib/celerypylons/loader.py @@ -17,15 +17,29 @@ class PylonsSettingsProxy(object): pylons_key = to_pylons(key) try: value = config[pylons_key] - if key in LIST_PARAMS: return value.split() + if key in LIST_PARAMS:return value.split() return self.type_converter(value) except KeyError: raise AttributeError(pylons_key) + def get(self, key): + try: + return self.__getattr__(key) + except AttributeError: + return None + + def __getitem__(self, key): + try: + return self.__getattr__(key) + except AttributeError: + raise KeyError() + def __setattr__(self, key, value): pylons_key = to_pylons(key) config[pylons_key] = value + def __setitem__(self, key, value): + self.__setattr__(key, value) def type_converter(self, value): #cast to int @@ -35,7 +49,6 @@ class PylonsSettingsProxy(object): #cast to bool if value.lower() in ['true', 'false']: return value.lower() == 'true' - return value class PylonsLoader(BaseLoader): diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ requirements = [ "pygments>=1.4", "mercurial>=1.7.3", "whoosh>=1.3.4", - "celery>=2.1.4", + "celery>=2.2.2", "py-bcrypt", "babel", ]