# HG changeset patch # User Marcin Kuzminski # Date 2011-02-08 02:57:21 # Node ID 3a7f5b1a19ddff273490770a3d6d7b240ce90958 # Parent 94e0541a5283662a12930f5530bc47bc91075017 made rhodecode work with celery 2.2, made some tasks optimizations(forget results) added celeryconfig.py with just the definitions of hosts, it seams just this is needed to get celery working nice, all other config options are taken from .ini files. This is a temp workaround until i get the proper soltuion to this problem. diff --git a/celeryconfig.py b/celeryconfig.py new file mode 100644 --- /dev/null +++ b/celeryconfig.py @@ -0,0 +1,4 @@ +## Broker settings. +BROKER_VHOST = "rabbitmqhost" +BROKER_USER = "rabbitmq" +BROKER_PASSWORD = "qweqwe" diff --git a/rhodecode/lib/celerylib/tasks.py b/rhodecode/lib/celerylib/tasks.py --- a/rhodecode/lib/celerylib/tasks.py +++ b/rhodecode/lib/celerylib/tasks.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """ rhodecode.lib.celerylib.tasks - ~~~~~~~~~~~~~~ + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ RhodeCode task modules, containing all task that suppose to be run by celery daemon @@ -29,6 +29,8 @@ from celery.decorators import task import os import traceback +import logging + from time import mktime from operator import itemgetter @@ -72,21 +74,25 @@ def get_repos_path(): q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one() return q.ui_value -@task +@task(ignore_result=True) @locked_task def whoosh_index(repo_location, full_index): - log = whoosh_index.get_logger() + #log = whoosh_index.get_logger() from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon index_location = config['index_dir'] WhooshIndexingDaemon(index_location=index_location, repo_location=repo_location, sa=get_session())\ .run(full_index=full_index) -@task +@task(ignore_result=True) @locked_task def get_commits_stats(repo_name, ts_min_y, ts_max_y): + try: + log = get_commits_stats.get_logger() + except: + log = logging.getLogger(__name__) + from rhodecode.model.db import Statistics, Repository - log = get_commits_stats.get_logger() #for js data compatibilty author_key_cleaner = lambda k: person(k).replace('"', "") @@ -218,9 +224,13 @@ def get_commits_stats(repo_name, ts_min_ return True -@task +@task(ignore_result=True) def reset_user_password(user_email): - log = reset_user_password.get_logger() + try: + log = reset_user_password.get_logger() + except: + log = logging.getLogger(__name__) + from rhodecode.lib import auth from rhodecode.model.db import User @@ -254,7 +264,7 @@ def reset_user_password(user_email): return True -@task +@task(ignore_result=True) def send_email(recipients, subject, body): """ Sends an email with defined parameters from the .ini files. @@ -265,7 +275,11 @@ def send_email(recipients, subject, body :param subject: subject of the mail :param body: body of the mail """ - log = send_email.get_logger() + try: + log = send_email.get_logger() + except: + log = logging.getLogger(__name__) + email_config = config if not recipients: @@ -289,11 +303,16 @@ def send_email(recipients, subject, body return False return True -@task +@task(ignore_result=True) def create_repo_fork(form_data, cur_user): + try: + log = create_repo_fork.get_logger() + except: + log = logging.getLogger(__name__) + from rhodecode.model.repo import RepoModel from vcs import get_backend - log = create_repo_fork.get_logger() + repo_model = RepoModel(get_session()) repo_model.create(form_data, cur_user, just_db=True, fork=True) repo_name = form_data['repo_name'] diff --git a/rhodecode/lib/celerypylons/commands.py b/rhodecode/lib/celerypylons/commands.py --- a/rhodecode/lib/celerypylons/commands.py +++ b/rhodecode/lib/celerypylons/commands.py @@ -1,11 +1,35 @@ from rhodecode.lib.utils import BasePasterCommand, Command - +from celery.app import app_or_default +from celery.bin import camqadm, celerybeat, celeryd, celeryev __all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand', 'CAMQPAdminCommand', 'CeleryEventCommand'] -class CeleryDaemonCommand(BasePasterCommand): +class CeleryCommand(BasePasterCommand): + """Abstract class implements run methods needed for celery + + Starts the celery worker that uses a paste.deploy configuration + file. + """ + + def update_parser(self): + """ + Abstract method. Allows for the class's parser to be updated + before the superclass's `run` method is called. Necessary to + allow options/arguments to be passed through to the underlying + celery command. + """ + + cmd = self.celery_command(app_or_default()) + for x in cmd.get_options(): + self.parser.add_option(x) + + def command(self): + cmd = self.celery_command(app_or_default()) + return cmd.run(**vars(self.options)) + +class CeleryDaemonCommand(CeleryCommand): """Start the celery worker Starts the celery worker that uses a paste.deploy configuration @@ -16,18 +40,10 @@ class CeleryDaemonCommand(BasePasterComm description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import celeryd - for x in celeryd.WorkerCommand().get_options(): - self.parser.add_option(x) - - def command(self): - from celery.bin import celeryd - return celeryd.WorkerCommand().run(**vars(self.options)) + celery_command = celeryd.WorkerCommand -class CeleryBeatCommand(BasePasterCommand): +class CeleryBeatCommand(CeleryCommand): """Start the celery beat server Starts the celery beat server using a paste.deploy configuration @@ -38,17 +54,10 @@ class CeleryBeatCommand(BasePasterComman description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import celerybeat - for x in celerybeat.BeatCommand().get_options(): - self.parser.add_option(x) + celery_command = celerybeat.BeatCommand - def command(self): - from celery.bin import celerybeat - return celerybeat.BeatCommand(**vars(self.options)) -class CAMQPAdminCommand(BasePasterCommand): +class CAMQPAdminCommand(CeleryCommand): """CAMQP Admin CAMQP celery admin tool. @@ -58,19 +67,10 @@ class CAMQPAdminCommand(BasePasterComman description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import camqadm - for x in camqadm.OPTION_LIST: - self.parser.add_option(x) + celery_command = camqadm.AMQPAdminCommand - def command(self): - from celery.bin import camqadm - return camqadm.camqadm(*self.args, **vars(self.options)) - - -class CeleryEventCommand(BasePasterCommand): - """Celery event commandd. +class CeleryEventCommand(CeleryCommand): + """Celery event command. Capture celery events. """ @@ -79,12 +79,4 @@ class CeleryEventCommand(BasePasterComma description = "".join(__doc__.splitlines()[2:]) parser = Command.standard_parser(quiet=True) - - def update_parser(self): - from celery.bin import celeryev - for x in celeryev.OPTION_LIST: - self.parser.add_option(x) - - def command(self): - from celery.bin import celeryev - return celeryev.run_celeryev(**vars(self.options)) + celery_command = celeryev.EvCommand diff --git a/rhodecode/lib/celerypylons/loader.py b/rhodecode/lib/celerypylons/loader.py --- a/rhodecode/lib/celerypylons/loader.py +++ b/rhodecode/lib/celerypylons/loader.py @@ -17,15 +17,29 @@ class PylonsSettingsProxy(object): pylons_key = to_pylons(key) try: value = config[pylons_key] - if key in LIST_PARAMS: return value.split() + if key in LIST_PARAMS:return value.split() return self.type_converter(value) except KeyError: raise AttributeError(pylons_key) + def get(self, key): + try: + return self.__getattr__(key) + except AttributeError: + return None + + def __getitem__(self, key): + try: + return self.__getattr__(key) + except AttributeError: + raise KeyError() + def __setattr__(self, key, value): pylons_key = to_pylons(key) config[pylons_key] = value + def __setitem__(self, key, value): + self.__setattr__(key, value) def type_converter(self, value): #cast to int @@ -35,7 +49,6 @@ class PylonsSettingsProxy(object): #cast to bool if value.lower() in ['true', 'false']: return value.lower() == 'true' - return value class PylonsLoader(BaseLoader): diff --git a/setup.py b/setup.py --- a/setup.py +++ b/setup.py @@ -12,7 +12,7 @@ requirements = [ "pygments>=1.4", "mercurial>=1.7.3", "whoosh>=1.3.4", - "celery>=2.1.4", + "celery>=2.2.2", "py-bcrypt", "babel", ]