Changeset - 3a7f5b1a19dd
[Not reviewed]
beta
0 4 1
Marcin Kuzminski - 15 years ago 2011-02-08 02:57:21
marcin@python-works.com
made rhodecode work with celery 2.2, made some tasks optimizations(forget results)

added celeryconfig.py with just the definitions of hosts, it seams just this is needed to get celery working nice, all other config options are taken from .ini files. This is a temp workaround until i get the proper soltuion to this problem.
5 files changed with 84 insertions and 56 deletions:
0 comments (0 inline, 0 general)
celeryconfig.py
Show inline comments
 
new file 100644
 
## Broker settings.
 
BROKER_VHOST = "rabbitmqhost"
 
BROKER_USER = "rabbitmq"
 
BROKER_PASSWORD = "qweqwe"
rhodecode/lib/celerylib/tasks.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.lib.celerylib.tasks
 
    ~~~~~~~~~~~~~~
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    RhodeCode task modules, containing all task that suppose to be run
 
    by celery daemon
 
@@ -29,6 +29,8 @@ from celery.decorators import task
 

	
 
import os
 
import traceback
 
import logging
 

	
 
from time import mktime
 
from operator import itemgetter
 

	
 
@@ -72,21 +74,25 @@ def get_repos_path():
 
    q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
 
    return q.ui_value
 

	
 
@task
 
@task(ignore_result=True)
 
@locked_task
 
def whoosh_index(repo_location, full_index):
 
    log = whoosh_index.get_logger()
 
    #log = whoosh_index.get_logger()
 
    from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
 
    index_location = config['index_dir']
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location, sa=get_session())\
 
                         .run(full_index=full_index)
 

	
 
@task
 
@task(ignore_result=True)
 
@locked_task
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
 
    try:
 
        log = get_commits_stats.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    from rhodecode.model.db import Statistics, Repository
 
    log = get_commits_stats.get_logger()
 

	
 
    #for js data compatibilty
 
    author_key_cleaner = lambda k: person(k).replace('"', "")
 
@@ -218,9 +224,13 @@ def get_commits_stats(repo_name, ts_min_
 

	
 
    return True
 

	
 
@task
 
@task(ignore_result=True)
 
def reset_user_password(user_email):
 
    log = reset_user_password.get_logger()
 
    try:
 
        log = reset_user_password.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    from rhodecode.lib import auth
 
    from rhodecode.model.db import User
 

	
 
@@ -254,7 +264,7 @@ def reset_user_password(user_email):
 

	
 
    return True
 

	
 
@task
 
@task(ignore_result=True)
 
def send_email(recipients, subject, body):
 
    """
 
    Sends an email with defined parameters from the .ini files.
 
@@ -265,7 +275,11 @@ def send_email(recipients, subject, body
 
    :param subject: subject of the mail
 
    :param body: body of the mail
 
    """
 
    log = send_email.get_logger()
 
    try:
 
        log = send_email.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    email_config = config
 

	
 
    if not recipients:
 
@@ -289,11 +303,16 @@ def send_email(recipients, subject, body
 
        return False
 
    return True
 

	
 
@task
 
@task(ignore_result=True)
 
def create_repo_fork(form_data, cur_user):
 
    try:
 
        log = create_repo_fork.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    from rhodecode.model.repo import RepoModel
 
    from vcs import get_backend
 
    log = create_repo_fork.get_logger()
 

	
 
    repo_model = RepoModel(get_session())
 
    repo_model.create(form_data, cur_user, just_db=True, fork=True)
 
    repo_name = form_data['repo_name']
rhodecode/lib/celerypylons/commands.py
Show inline comments
 
from rhodecode.lib.utils import BasePasterCommand, Command
 

	
 
from celery.app import app_or_default
 
from celery.bin import camqadm, celerybeat, celeryd, celeryev
 

	
 
__all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand',
 
           'CAMQPAdminCommand', 'CeleryEventCommand']
 

	
 

	
 
class CeleryDaemonCommand(BasePasterCommand):
 
class CeleryCommand(BasePasterCommand):
 
    """Abstract class implements run methods needed for celery
 

	
 
    Starts the celery worker that uses a paste.deploy configuration
 
    file.
 
    """
 

	
 
    def update_parser(self):
 
        """
 
        Abstract method.  Allows for the class's parser to be updated
 
        before the superclass's `run` method is called.  Necessary to
 
        allow options/arguments to be passed through to the underlying
 
        celery command.
 
        """
 

	
 
        cmd = self.celery_command(app_or_default())
 
        for x in cmd.get_options():
 
            self.parser.add_option(x)
 

	
 
    def command(self):
 
        cmd = self.celery_command(app_or_default())
 
        return cmd.run(**vars(self.options))
 

	
 
class CeleryDaemonCommand(CeleryCommand):
 
    """Start the celery worker
 

	
 
    Starts the celery worker that uses a paste.deploy configuration
 
@@ -16,18 +40,10 @@ class CeleryDaemonCommand(BasePasterComm
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 

	
 
    def update_parser(self):
 
        from celery.bin import celeryd
 
        for x in celeryd.WorkerCommand().get_options():
 
            self.parser.add_option(x)
 

	
 
    def command(self):
 
        from celery.bin import celeryd
 
        return celeryd.WorkerCommand().run(**vars(self.options))
 
    celery_command = celeryd.WorkerCommand
 

	
 

	
 
class CeleryBeatCommand(BasePasterCommand):
 
class CeleryBeatCommand(CeleryCommand):
 
    """Start the celery beat server
 

	
 
    Starts the celery beat server using a paste.deploy configuration
 
@@ -38,17 +54,10 @@ class CeleryBeatCommand(BasePasterComman
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 

	
 
    def update_parser(self):
 
        from celery.bin import celerybeat
 
        for x in celerybeat.BeatCommand().get_options():
 
            self.parser.add_option(x)
 
    celery_command = celerybeat.BeatCommand
 

	
 
    def command(self):
 
        from celery.bin import celerybeat
 
        return celerybeat.BeatCommand(**vars(self.options))
 

	
 
class CAMQPAdminCommand(BasePasterCommand):
 
class CAMQPAdminCommand(CeleryCommand):
 
    """CAMQP Admin
 

	
 
    CAMQP celery admin tool.
 
@@ -58,19 +67,10 @@ class CAMQPAdminCommand(BasePasterComman
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 

	
 
    def update_parser(self):
 
        from celery.bin import camqadm
 
        for x in camqadm.OPTION_LIST:
 
            self.parser.add_option(x)
 
    celery_command = camqadm.AMQPAdminCommand
 

	
 
    def command(self):
 
        from celery.bin import camqadm
 
        return camqadm.camqadm(*self.args, **vars(self.options))
 

	
 

	
 
class CeleryEventCommand(BasePasterCommand):
 
    """Celery event commandd.
 
class CeleryEventCommand(CeleryCommand):
 
    """Celery event command.
 

	
 
    Capture celery events.
 
    """
 
@@ -79,12 +79,4 @@ class CeleryEventCommand(BasePasterComma
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 

	
 
    def update_parser(self):
 
        from celery.bin import celeryev
 
        for x in celeryev.OPTION_LIST:
 
            self.parser.add_option(x)
 

	
 
    def command(self):
 
        from celery.bin import celeryev
 
        return celeryev.run_celeryev(**vars(self.options))
 
    celery_command = celeryev.EvCommand
rhodecode/lib/celerypylons/loader.py
Show inline comments
 
@@ -17,15 +17,29 @@ class PylonsSettingsProxy(object):
 
        pylons_key = to_pylons(key)
 
        try:
 
            value = config[pylons_key]
 
            if key in LIST_PARAMS: return value.split()
 
            if key in LIST_PARAMS:return value.split()
 
            return self.type_converter(value)
 
        except KeyError:
 
            raise AttributeError(pylons_key)
 

	
 
    def get(self, key):
 
        try:
 
            return self.__getattr__(key)
 
        except AttributeError:
 
            return None
 

	
 
    def __getitem__(self, key):
 
        try:
 
            return self.__getattr__(key)
 
        except AttributeError:
 
            raise KeyError()
 

	
 
    def __setattr__(self, key, value):
 
        pylons_key = to_pylons(key)
 
        config[pylons_key] = value
 

	
 
    def __setitem__(self, key, value):
 
        self.__setattr__(key, value)
 

	
 
    def type_converter(self, value):
 
        #cast to int
 
@@ -35,7 +49,6 @@ class PylonsSettingsProxy(object):
 
        #cast to bool
 
        if value.lower() in ['true', 'false']:
 
            return value.lower() == 'true'
 

	
 
        return value
 

	
 
class PylonsLoader(BaseLoader):
setup.py
Show inline comments
 
@@ -12,7 +12,7 @@ requirements = [
 
        "pygments>=1.4",
 
        "mercurial>=1.7.3",
 
        "whoosh>=1.3.4",
 
        "celery>=2.1.4",
 
        "celery>=2.2.2",
 
        "py-bcrypt",
 
        "babel",
 
    ]
0 comments (0 inline, 0 general)