Changeset - f6c613fba757
[Not reviewed]
beta
1 6 3
Marcin Kuzminski - 15 years ago 2010-11-27 01:27:24
marcin@python-works.com
Celery is configured by the .ini files and run from paster now
removed celeryconfig, added homebrew celery-pylons,
added paster celeryd command, fixed tasks to use pylons configs, sqlalchemy sessions
10 files changed with 427 insertions and 135 deletions:
0 comments (0 inline, 0 general)
celeryconfig.py
Show inline comments
 
deleted file
development.ini
Show inline comments
 
################################################################################
 
################################################################################
 
# rhodecode - Pylons environment configuration                                 #
 
# RhodeCode - Pylons environment configuration                                 #
 
#                                                                              # 
 
# The %(here)s variable will be replaced with the parent directory of this file#
 
################################################################################
 
@@ -10,7 +10,7 @@ debug = true
 
################################################################################
 
## Uncomment and replace with the address which should receive                ## 
 
## any error reports after application crash								  ##
 
## Additionally those settings will be used by rhodecode mailing system       ##
 
## Additionally those settings will be used by RhodeCode mailing system       ##
 
################################################################################
 
#email_to = admin@localhost
 
#error_email_from = paste_error@localhost
 
@@ -21,13 +21,14 @@ debug = true
 
#smtp_username = 
 
#smtp_password =
 
#smtp_port = 
 
#smtp_use_tls = 
 
#smtp_use_tls = false
 
#smtp_use_ssl = true
 

	
 
[server:main]
 
##nr of threads to spawn
 
threadpool_workers = 5
 

	
 
##max request before
 
##max request before thread respawn
 
threadpool_max_requests = 6
 

	
 
##option to use threads of process
 
@@ -46,6 +47,33 @@ cache_dir = %(here)s/data
 
index_dir = %(here)s/data/index
 

	
 
####################################
 
###        CELERY CONFIG        ####
 
####################################
 
use_celery = false
 
broker.host = localhost
 
broker.vhost = rabbitmqhost
 
broker.port = 5672
 
broker.user = rabbitmq
 
broker.password = qweqwe
 

	
 
celery.imports = rhodecode.lib.celerylib.tasks
 

	
 
celery.result.backend = amqp
 
celery.result.dburi = amqp://
 
celery.result.serialier = json
 

	
 
#celery.send.task.error.emails = true
 
#celery.amqp.task.result.expires = 18000
 

	
 
celeryd.concurrency = 2
 
#celeryd.log.file = celeryd.log
 
celeryd.log.level = debug
 
celeryd.max.tasks.per.child = 3
 

	
 
#tasks will never be sent to the queue, but executed locally instead.
 
celery.always.eager = false
 

	
 
####################################
 
###         BEAKER CACHE        ####
 
####################################
 
beaker.cache.data_dir=/%(here)s/data/cache/data
 
@@ -61,9 +89,8 @@ beaker.cache.short_term.expire=60
 
beaker.cache.long_term.type=memory
 
beaker.cache.long_term.expire=36000
 

	
 

	
 
beaker.cache.sql_cache_short.type=memory
 
beaker.cache.sql_cache_short.expire=5
 
beaker.cache.sql_cache_short.expire=10
 

	
 
beaker.cache.sql_cache_med.type=memory
 
beaker.cache.sql_cache_med.expire=360
 
@@ -75,7 +102,7 @@ beaker.cache.sql_cache_long.expire=3600
 
###       BEAKER SESSION        ####
 
####################################
 
## Type of storage used for the session, current types are 
 
## "dbm", "file", "memcached", "database", and "memory". 
 
## dbm, file, memcached, database, and memory. 
 
## The storage uses the Container API 
 
##that is also used by the cache system.
 
beaker.session.type = file
production.ini
Show inline comments
 
@@ -47,6 +47,33 @@ cache_dir = %(here)s/data
 
index_dir = %(here)s/data/index
 

	
 
####################################
 
###        CELERY CONFIG        ####
 
####################################
 
use_celery = false
 
broker.host = localhost
 
broker.vhost = rabbitmqhost
 
broker.port = 5672
 
broker.user = rabbitmq
 
broker.password = qweqwe
 

	
 
celery.imports = rhodecode.lib.celerylib.tasks
 

	
 
celery.result.backend = amqp
 
celery.result.dburi = amqp://
 
celery.result.serialier = json
 

	
 
#celery.send.task.error.emails = true
 
#celery.amqp.task.result.expires = 18000
 

	
 
celeryd.concurrency = 2
 
#celeryd.log.file = celeryd.log
 
celeryd.log.level = debug
 
celeryd.max.tasks.per.child = 3
 

	
 
#tasks will never be sent to the queue, but executed locally instead.
 
celery.always.eager = false
 

	
 
####################################
 
###         BEAKER CACHE        ####
 
####################################
 
beaker.cache.data_dir=/%(here)s/data/cache/data
rhodecode/config/deployment.ini_tmpl
Show inline comments
 
################################################################################
 
################################################################################
 
# rhodecode - Pylons environment configuration                                 #
 
# RhodeCode - Pylons environment configuration                                 #
 
#                                                                              # 
 
# The %(here)s variable will be replaced with the parent directory of this file#
 
################################################################################
 
@@ -10,7 +10,7 @@ debug = true
 
################################################################################
 
## Uncomment and replace with the address which should receive                ## 
 
## any error reports after application crash                                  ##
 
## Additionally those settings will be used by rhodecode mailing system       ##
 
## Additionally those settings will be used by RhodeCode mailing system       ##
 
################################################################################
 
#email_to = admin@localhost
 
#error_email_from = paste_error@localhost
 
@@ -48,6 +48,33 @@ index_dir = %(here)s/data/index
 
app_instance_uuid = ${app_instance_uuid}
 

	
 
####################################
 
###        CELERY CONFIG        ####
 
####################################
 
use_celery = false
 
broker.host = localhost
 
broker.vhost = rabbitmqhost
 
broker.port = 5672
 
broker.user = rabbitmq
 
broker.password = qweqwe
 

	
 
celery.imports = rhodecode.lib.celerylib.tasks
 

	
 
celery.result.backend = amqp
 
celery.result.dburi = amqp://
 
celery.result.serialier = json
 

	
 
#celery.send.task.error.emails = true
 
#celery.amqp.task.result.expires = 18000
 

	
 
celeryd.concurrency = 2
 
#celeryd.log.file = celeryd.log
 
celeryd.log.level = debug
 
celeryd.max.tasks.per.child = 3
 

	
 
#tasks will never be sent to the queue, but executed locally instead.
 
celery.always.eager = false
 

	
 
####################################
 
###         BEAKER CACHE        ####
 
####################################
 
beaker.cache.data_dir=/%(here)s/data/cache/data
 
@@ -64,7 +91,7 @@ beaker.cache.long_term.type=memory
 
beaker.cache.long_term.expire=36000
 

	
 
beaker.cache.sql_cache_short.type=memory
 
beaker.cache.sql_cache_short.expire=5
 
beaker.cache.sql_cache_short.expire=10
 

	
 
beaker.cache.sql_cache_med.type=memory
 
beaker.cache.sql_cache_med.expire=360
rhodecode/lib/celerylib/__init__.py
Show inline comments
 
import os
 
import sys
 
import socket
 
import traceback
 
import logging
 

	
 
from rhodecode.lib.pidlock import DaemonLock, LockHeld
 
from vcs.utils.lazy import LazyProperty
 
from decorator import decorator
 
import logging
 
import os
 
import sys
 
import traceback
 
from hashlib import md5
 
import socket
 
from pylons import  config
 

	
 
log = logging.getLogger(__name__)
 

	
 
def str2bool(v):
 
    return v.lower() in ["yes", "true", "t", "1"] if v else None
 

	
 
CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
 

	
 
class ResultWrapper(object):
 
    def __init__(self, task):
 
        self.task = task
 
@@ -18,9 +26,10 @@ class ResultWrapper(object):
 
        return self.task
 

	
 
def run_task(task, *args, **kwargs):
 
    if CELERY_ON:
 
    try:
 
        t = task.delay(*args, **kwargs)
 
        log.info('running task %s', t.task_id)
 
            log.info('running task %s:%s', t.task_id, task)
 
        return t
 
    except socket.error, e:
 
        if  e.errno == 111:
 
@@ -32,6 +41,7 @@ def run_task(task, *args, **kwargs):
 
    except Exception, e:
 
        log.error(traceback.format_exc())
 
    
 
    log.debug('executing task %s in sync mode', task)
 
    return ResultWrapper(task(*args, **kwargs))
 

	
 

	
rhodecode/lib/celerylib/tasks.py
Show inline comments
 
@@ -2,16 +2,24 @@ from celery.decorators import task
 

	
 
import os
 
import traceback
 
import beaker
 
from time import mktime
 

	
 
from operator import itemgetter
 

	
 
from pylons import config
 
from pylons.i18n.translation import _
 
from rhodecode.lib.celerylib import run_task, locked_task
 

	
 
from rhodecode.lib.celerylib import run_task, locked_task, str2bool
 
from rhodecode.lib.helpers import person
 
from rhodecode.lib.smtp_mailer import SmtpMailer
 
from rhodecode.lib.utils import OrderedDict
 
from rhodecode.model import init_model
 
from rhodecode.model import meta
 
from rhodecode.model.db import RhodeCodeUi
 

	
 
from vcs.backends import get_repo
 
from rhodecode.model.db import RhodeCodeUi
 

	
 
from sqlalchemy import engine_from_config
 

	
 
try:
 
    import json
 
@@ -19,31 +27,16 @@ except ImportError:
 
    #python 2.5 compatibility
 
    import simplejson as json
 

	
 
try:
 
    from celeryconfig import PYLONS_CONFIG as config
 
    celery_on = True
 
except ImportError:
 
    #if celeryconfig is not present let's just load our pylons
 
    #config instead
 
    from pylons import config
 
    celery_on = False
 

	
 

	
 
__all__ = ['whoosh_index', 'get_commits_stats',
 
           'reset_user_password', 'send_email']
 

	
 
CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
 

	
 
def get_session():
 
    if celery_on:
 
        from sqlalchemy import engine_from_config
 
        from sqlalchemy.orm import sessionmaker, scoped_session
 
        engine = engine_from_config(dict(config.items('app:main')),
 
                                    'sqlalchemy.db1.')
 
        sa = scoped_session(sessionmaker(bind=engine))
 
    else:
 
        #If we don't use celery reuse our current application Session
 
        from rhodecode.model.meta import Session
 
        sa = Session()
 

	
 
    if CELERY_ON:
 
        engine = engine_from_config(config, 'sqlalchemy.db1.')
 
        init_model(engine)
 
    sa = meta.Session()
 
    return sa
 

	
 
def get_repos_path():
 
@@ -56,7 +49,7 @@ def get_repos_path():
 
def whoosh_index(repo_location, full_index):
 
    log = whoosh_index.get_logger()
 
    from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
 
    index_location = dict(config.items('app:main'))['index_dir']
 
    index_location = config['index_dir']
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location).run(full_index=full_index)
 

	
 
@@ -235,6 +228,7 @@ def reset_user_password(user_email):
 
    except:
 
        log.error('Failed to update user password')
 
        log.error(traceback.format_exc())
 

	
 
    return True
 

	
 
@task
 
@@ -249,14 +243,11 @@ def send_email(recipients, subject, body
 
    :param body: body of the mail
 
    """
 
    log = send_email.get_logger()
 
    email_config = dict(config.items('DEFAULT'))
 
    email_config = config
 

	
 
    if not recipients:
 
        recipients = [email_config.get('email_to')]
 

	
 
    def str2bool(v):
 
        return v.lower() in ["yes", "true", "t", "1"] if v else None
 

	
 
    mail_from = email_config.get('app_email_from')
 
    user = email_config.get('smtp_username')
 
    passwd = email_config.get('smtp_password')
 
@@ -293,12 +284,58 @@ def create_repo_fork(form_data, cur_user
 
    backend(str(repo_fork_path), create=True, src_url=str(repo_path))
 

	
 
def __get_codes_stats(repo_name):
 
    LANGUAGES_EXTENSIONS = ['action', 'adp', 'ashx', 'asmx',
 
    'aspx', 'asx', 'axd', 'c', 'cfg', 'cfm', 'cpp', 'cs', 'diff', 'do', 'el',
 
    'erl', 'h', 'java', 'js', 'jsp', 'jspx', 'lisp', 'lua', 'm', 'mako', 'ml',
 
    'pas', 'patch', 'php', 'php3', 'php4', 'phtml', 'pm', 'py', 'rb', 'rst',
 
    's', 'sh', 'tpl', 'txt', 'vim', 'wss', 'xhtml', 'xml', 'xsl', 'xslt', 'yaws']
 

	
 
    LANGUAGES_EXTENSIONS_MAP = {'scm': 'Scheme', 'asmx': 'VbNetAspx', 'Rout':
 
    'RConsole', 'rest': 'Rst', 'abap': 'ABAP', 'go': 'Go', 'phtml': 'HtmlPhp',
 
    'ns2': 'Newspeak', 'xml': 'EvoqueXml', 'sh-session': 'BashSession', 'ads':
 
    'Ada', 'clj': 'Clojure', 'll': 'Llvm', 'ebuild': 'Bash', 'adb': 'Ada',
 
    'ada': 'Ada', 'c++-objdump': 'CppObjdump', 'aspx':
 
    'VbNetAspx', 'ksh': 'Bash', 'coffee': 'CoffeeScript', 'vert': 'GLShader',
 
    'Makefile.*': 'Makefile', 'di': 'D', 'dpatch': 'DarcsPatch', 'rake':
 
    'Ruby', 'moo': 'MOOCode', 'erl-sh': 'ErlangShell', 'geo': 'GLShader',
 
    'pov': 'Povray', 'bas': 'VbNet', 'bat': 'Batch', 'd': 'D', 'lisp':
 
    'CommonLisp', 'h': 'C', 'rbx': 'Ruby', 'tcl': 'Tcl', 'c++': 'Cpp', 'md':
 
    'MiniD', '.vimrc': 'Vim', 'xsd': 'Xml', 'ml': 'Ocaml', 'el': 'CommonLisp',
 
    'befunge': 'Befunge', 'xsl': 'Xslt', 'pyx': 'Cython', 'cfm':
 
    'ColdfusionHtml', 'evoque': 'Evoque', 'cfg': 'Ini', 'htm': 'Html',
 
    'Makefile': 'Makefile', 'cfc': 'ColdfusionHtml', 'tex': 'Tex', 'cs':
 
    'CSharp', 'mxml': 'Mxml', 'patch': 'Diff', 'apache.conf': 'ApacheConf',
 
    'scala': 'Scala', 'applescript': 'AppleScript', 'GNUmakefile': 'Makefile',
 
    'c-objdump': 'CObjdump', 'lua': 'Lua', 'apache2.conf': 'ApacheConf', 'rb':
 
    'Ruby', 'gemspec': 'Ruby', 'rl': 'RagelObjectiveC', 'vala': 'Vala', 'tmpl':
 
    'Cheetah', 'bf': 'Brainfuck', 'plt': 'Gnuplot', 'G': 'AntlrRuby', 'xslt':
 
    'Xslt', 'flxh': 'Felix', 'asax': 'VbNetAspx', 'Rakefile': 'Ruby', 'S': 'S',
 
    'wsdl': 'Xml', 'js': 'Javascript', 'autodelegate': 'Myghty', 'properties':
 
    'Ini', 'bash': 'Bash', 'c': 'C', 'g': 'AntlrRuby', 'r3': 'Rebol', 's':
 
    'Gas', 'ashx': 'VbNetAspx', 'cxx': 'Cpp', 'boo': 'Boo', 'prolog': 'Prolog',
 
    'sqlite3-console': 'SqliteConsole', 'cl': 'CommonLisp', 'cc': 'Cpp', 'pot':
 
    'Gettext', 'vim': 'Vim', 'pxi': 'Cython', 'yaml': 'Yaml', 'SConstruct':
 
    'Python', 'diff': 'Diff', 'txt': 'Text', 'cw': 'Redcode', 'pxd': 'Cython',
 
    'plot': 'Gnuplot', 'java': 'Java', 'hrl': 'Erlang', 'py': 'Python',
 
    'makefile': 'Makefile', 'squid.conf': 'SquidConf', 'asm': 'Nasm', 'toc':
 
    'Tex', 'kid': 'Genshi', 'rhtml': 'Rhtml', 'po': 'Gettext', 'pl': 'Prolog',
 
    'pm': 'Perl', 'hx': 'Haxe', 'ascx': 'VbNetAspx', 'ooc': 'Ooc', 'asy':
 
    'Asymptote', 'hs': 'Haskell', 'SConscript': 'Python', 'pytb':
 
    'PythonTraceback', 'myt': 'Myghty', 'hh': 'Cpp', 'R': 'S', 'aux': 'Tex',
 
    'rst': 'Rst', 'cpp-objdump': 'CppObjdump', 'lgt': 'Logtalk', 'rss': 'Xml',
 
    'flx': 'Felix', 'b': 'Brainfuck', 'f': 'Fortran', 'rbw': 'Ruby',
 
    '.htaccess': 'ApacheConf', 'cxx-objdump': 'CppObjdump', 'j': 'ObjectiveJ',
 
    'mll': 'Ocaml', 'yml': 'Yaml', 'mu': 'MuPAD', 'r': 'Rebol', 'ASM': 'Nasm',
 
    'erl': 'Erlang', 'mly': 'Ocaml', 'mo': 'Modelica', 'def': 'Modula2', 'ini':
 
    'Ini', 'control': 'DebianControl', 'vb': 'VbNet', 'vapi': 'Vala', 'pro':
 
    'Prolog', 'spt': 'Cheetah', 'mli': 'Ocaml', 'as': 'ActionScript3', 'cmd':
 
    'Batch', 'cpp': 'Cpp', 'io': 'Io', 'tac': 'Python', 'haml': 'Haml', 'rkt':
 
    'Racket', 'st':'Smalltalk', 'inc': 'Povray', 'pas': 'Delphi', 'cmake':
 
    'CMake', 'csh':'Tcsh', 'hpp': 'Cpp', 'feature': 'Gherkin', 'html': 'Html',
 
    'php':'Php', 'php3':'Php', 'php4':'Php', 'php5':'Php', 'xhtml': 'Html',
 
    'hxx': 'Cpp', 'eclass': 'Bash', 'css': 'Css',
 
    'frag': 'GLShader', 'd-objdump': 'DObjdump', 'weechatlog': 'IrcLogs',
 
    'tcsh': 'Tcsh', 'objdump': 'Objdump', 'pyw': 'Python', 'h++': 'Cpp',
 
    'py3tb': 'Python3Traceback', 'jsp': 'Jsp', 'sql': 'Sql', 'mak': 'Makefile',
 
    'php': 'Php', 'mao': 'Mako', 'man': 'Groff', 'dylan': 'Dylan', 'sass':
 
    'Sass', 'cfml': 'ColdfusionHtml', 'darcspatch': 'DarcsPatch', 'tpl':
 
    'Smarty', 'm': 'ObjectiveC', 'f90': 'Fortran', 'mod': 'Modula2', 'sh':
 
    'Bash', 'lhs': 'LiterateHaskell', 'sources.list': 'SourcesList', 'axd':
 
    'VbNetAspx', 'sc': 'Python'}
 

	
 
    repos_path = get_repos_path()
 
    p = os.path.join(repos_path, repo_name)
 
@@ -308,12 +345,14 @@ def __get_codes_stats(repo_name):
 

	
 
    def aggregate(cs):
 
        for f in cs[2]:
 
            k = f.mimetype
 
            if f.extension in LANGUAGES_EXTENSIONS:
 
                if code_stats.has_key(k):
 
                    code_stats[k] += 1
 
            ext = f.extension
 
            key = LANGUAGES_EXTENSIONS_MAP.get(ext, ext)
 
            key = key or ext
 
            if ext in LANGUAGES_EXTENSIONS_MAP.keys():
 
                if code_stats.has_key(key):
 
                    code_stats[key] += 1
 
                else:
 
                    code_stats[k] = 1
 
                    code_stats[key] = 1
 

	
 
    map(aggregate, tip.walk('/'))
 

	
rhodecode/lib/celerypylons/__init__.py
Show inline comments
 
new file 100644
 
"""
 
Automatically sets the environment variable `CELERY_LOADER` to
 
`celerypylons.loader:PylonsLoader`.  This ensures the loader is
 
specified when accessing the rest of this package, and allows celery
 
to be installed in a webapp just by importing celerypylons::
 

	
 
    import celerypylons
 

	
 
"""
 
import os
 
import warnings
 

	
 
CELERYPYLONS_LOADER = 'rhodecode.lib.celerypylons.loader.PylonsLoader'
 
if os.environ.get('CELERY_LOADER', CELERYPYLONS_LOADER) != CELERYPYLONS_LOADER:
 
    warnings.warn("'CELERY_LOADER' environment variable will be overridden by celery-pylons.")
 
os.environ['CELERY_LOADER'] = CELERYPYLONS_LOADER
rhodecode/lib/celerypylons/commands.py
Show inline comments
 
new file 100644
 
import os
 
from paste.script.command import Command, BadCommand
 
import paste.deploy
 
from pylons import config
 

	
 

	
 
__all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand',
 
           'CAMQPAdminCommand', 'CeleryEventCommand']
 

	
 

	
 
class CeleryCommand(Command):
 
    """
 
    Abstract Base Class for celery commands.
 

	
 
    The celery commands are somewhat aggressive about loading
 
    celery.conf, and since our module sets the `CELERY_LOADER`
 
    environment variable to our loader, we have to bootstrap a bit and
 
    make sure we've had a chance to load the pylons config off of the
 
    command line, otherwise everything fails.
 
    """
 
    min_args = 1
 
    min_args_error = "Please provide a paster config file as an argument."
 
    takes_config_file = 1
 
    requires_config_file = True
 

	
 
    def run(self, args):
 
        """
 
        Overrides Command.run
 
        
 
        Checks for a config file argument and loads it.
 
        """
 
        if len(args) < self.min_args:
 
            raise BadCommand(
 
                self.min_args_error % {'min_args': self.min_args,
 
                                       'actual_args': len(args)})
 
        # Decrement because we're going to lob off the first argument.
 
        # @@ This is hacky
 
        self.min_args -= 1
 
        self.bootstrap_config(args[0])
 
        self.update_parser()
 
        return super(CeleryCommand, self).run(args[1:])
 

	
 
    def update_parser(self):
 
        """
 
        Abstract method.  Allows for the class's parser to be updated
 
        before the superclass's `run` method is called.  Necessary to
 
        allow options/arguments to be passed through to the underlying
 
        celery command.
 
        """
 
        raise NotImplementedError("Abstract Method.")
 

	
 
    def bootstrap_config(self, conf):
 
        """
 
        Loads the pylons configuration.
 
        """
 
        path_to_ini_file = os.path.realpath(conf)
 
        conf = paste.deploy.appconfig('config:' + path_to_ini_file)
 
        config.init_app(conf.global_conf, conf.local_conf)
 

	
 

	
 
class CeleryDaemonCommand(CeleryCommand):
 
    """Start the celery worker
 

	
 
    Starts the celery worker that uses a paste.deploy configuration
 
    file.
 
    """
 
    usage = 'CONFIG_FILE [celeryd options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 

	
 
    def update_parser(self):
 
        from celery.bin import celeryd
 
        for x in celeryd.WorkerCommand().get_options():
 
            self.parser.add_option(x)
 

	
 
    def command(self):
 
        from celery.bin import celeryd
 
        return celeryd.WorkerCommand().run(**vars(self.options))
 

	
 

	
 
class CeleryBeatCommand(CeleryCommand):
 
    """Start the celery beat server
 

	
 
    Starts the celery beat server using a paste.deploy configuration
 
    file.
 
    """
 
    usage = 'CONFIG_FILE [celerybeat options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 

	
 
    def update_parser(self):
 
        from celery.bin import celerybeat
 
        for x in celerybeat.BeatCommand().get_options():
 
            self.parser.add_option(x)
 

	
 
    def command(self):
 
        from celery.bin import celerybeat
 
        return celerybeat.BeatCommand(**vars(self.options))
 

	
 
class CAMQPAdminCommand(CeleryCommand):
 
    """CAMQP Admin
 

	
 
    CAMQP celery admin tool.
 
    """
 
    usage = 'CONFIG_FILE [camqadm options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 

	
 
    def update_parser(self):
 
        from celery.bin import camqadm
 
        for x in camqadm.OPTION_LIST:
 
            self.parser.add_option(x)
 

	
 
    def command(self):
 
        from celery.bin import camqadm
 
        return camqadm.camqadm(*self.args, **vars(self.options))
 

	
 

	
 
class CeleryEventCommand(CeleryCommand):
 
    """Celery event commandd.
 

	
 
    Capture celery events.
 
    """
 
    usage = 'CONFIG_FILE [celeryev options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 

	
 
    def update_parser(self):
 
        from celery.bin import celeryev
 
        for x in celeryev.OPTION_LIST:
 
            self.parser.add_option(x)
 

	
 
    def command(self):
 
        from celery.bin import celeryev
 
        return celeryev.run_celeryev(**vars(self.options))
rhodecode/lib/celerypylons/loader.py
Show inline comments
 
new file 100644
 
from celery.loaders.base import BaseLoader
 
from pylons import config
 

	
 
to_pylons = lambda x: x.replace('_', '.').lower()
 
to_celery = lambda x: x.replace('.', '_').upper()
 

	
 
LIST_PARAMS = """CELERY_IMPORTS ADMINS ROUTES""".split()
 

	
 

	
 
class PylonsSettingsProxy(object):
 
    """Pylons Settings Proxy
 

	
 
    Proxies settings from pylons.config
 

	
 
    """
 
    def __getattr__(self, key):
 
        pylons_key = to_pylons(key)
 
        try:
 
            value = config[pylons_key]
 
            if key in LIST_PARAMS: return value.split()
 
            return self.type_converter(value)
 
        except KeyError:
 
            raise AttributeError(pylons_key)
 

	
 
    def __setattr__(self, key, value):
 
        pylons_key = to_pylons(key)
 
        config[pylons_key] = value
 

	
 

	
 
    def type_converter(self, value):
 
        #cast to int
 
        if value.isdigit():
 
            return int(value)
 

	
 
        #cast to bool
 
        if value.lower() in ['true', 'false']:
 
            return value.lower() == 'true'
 

	
 
        return value
 

	
 
class PylonsLoader(BaseLoader):
 
    """Pylons celery loader
 

	
 
    Maps the celery config onto pylons.config
 

	
 
    """
 
    def read_configuration(self):
 
        self.configured = True
 
        return PylonsSettingsProxy()
 

	
 
    def on_worker_init(self):
 
        """
 
        Import task modules.
 
        """
 
        self.import_default_modules()
test.ini
Show inline comments
 
@@ -46,6 +46,33 @@ cache_dir = %(here)s/data
 
index_dir = /tmp/index
 

	
 
####################################
 
###        CELERY CONFIG        ####
 
####################################
 
use_celery = false
 
broker.host = localhost
 
broker.vhost = rabbitmqhost
 
broker.port = 5672
 
broker.user = rabbitmq
 
broker.password = qweqwe
 

	
 
celery.imports = rhodecode.lib.celerylib.tasks
 

	
 
celery.result.backend = amqp
 
celery.result.dburi = amqp://
 
celery.result.serialier = json
 

	
 
#celery.send.task.error.emails = true
 
#celery.amqp.task.result.expires = 18000
 

	
 
celeryd.concurrency = 2
 
#celeryd.log.file = celeryd.log
 
celeryd.log.level = debug
 
celeryd.max.tasks.per.child = 3
 

	
 
#tasks will never be sent to the queue, but executed locally instead.
 
celery.always.eager = false
 

	
 
####################################
 
###         BEAKER CACHE        ####
 
####################################
 
beaker.cache.data_dir=/%(here)s/data/cache/data
0 comments (0 inline, 0 general)