Changeset - 3a7f5b1a19dd
[Not reviewed]
beta
0 4 1
Marcin Kuzminski - 15 years ago 2011-02-08 02:57:21
marcin@python-works.com
made rhodecode work with celery 2.2, made some tasks optimizations(forget results)

added celeryconfig.py with just the definitions of hosts, it seams just this is needed to get celery working nice, all other config options are taken from .ini files. This is a temp workaround until i get the proper soltuion to this problem.
5 files changed with 84 insertions and 56 deletions:
0 comments (0 inline, 0 general)
celeryconfig.py
Show inline comments
 
new file 100644
 
## Broker settings.
 
BROKER_VHOST = "rabbitmqhost"
 
BROKER_USER = "rabbitmq"
 
BROKER_PASSWORD = "qweqwe"
rhodecode/lib/celerylib/tasks.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    rhodecode.lib.celerylib.tasks
 
    ~~~~~~~~~~~~~~
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    RhodeCode task modules, containing all task that suppose to be run
 
    by celery daemon
 
    
 
    :created_on: Oct 6, 2010
 
    :author: marcink
 
    :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>    
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software; you can redistribute it and/or
 
# modify it under the terms of the GNU General Public License
 
# as published by the Free Software Foundation; version 2
 
@@ -20,24 +20,26 @@
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
# 
 
# You should have received a copy of the GNU General Public License
 
# along with this program; if not, write to the Free Software
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 
# MA  02110-1301, USA.
 
from celery.decorators import task
 

	
 
import os
 
import traceback
 
import logging
 

	
 
from time import mktime
 
from operator import itemgetter
 

	
 
from pylons import config
 
from pylons.i18n.translation import _
 

	
 
from rhodecode.lib.celerylib import run_task, locked_task, str2bool
 
from rhodecode.lib.helpers import person
 
from rhodecode.lib.smtp_mailer import SmtpMailer
 
from rhodecode.lib.utils import OrderedDict, add_cache
 
from rhodecode.model import init_model
 
from rhodecode.model import meta
 
@@ -63,39 +65,43 @@ CELERY_ON = str2bool(config['app_conf'].
 
def get_session():
 
    if CELERY_ON:
 
        engine = engine_from_config(config, 'sqlalchemy.db1.')
 
        init_model(engine)
 
    sa = meta.Session()
 
    return sa
 

	
 
def get_repos_path():
 
    sa = get_session()
 
    q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
 
    return q.ui_value
 

	
 
@task
 
@task(ignore_result=True)
 
@locked_task
 
def whoosh_index(repo_location, full_index):
 
    log = whoosh_index.get_logger()
 
    #log = whoosh_index.get_logger()
 
    from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
 
    index_location = config['index_dir']
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location, sa=get_session())\
 
                         .run(full_index=full_index)
 

	
 
@task
 
@task(ignore_result=True)
 
@locked_task
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
 
    try:
 
        log = get_commits_stats.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    from rhodecode.model.db import Statistics, Repository
 
    log = get_commits_stats.get_logger()
 

	
 
    #for js data compatibilty
 
    author_key_cleaner = lambda k: person(k).replace('"', "")
 

	
 
    commits_by_day_author_aggregate = {}
 
    commits_by_day_aggregate = {}
 
    repos_path = get_repos_path()
 
    p = os.path.join(repos_path, repo_name)
 
    repo = get_repo(p)
 

	
 
    skip_date_limit = True
 
    parse_limit = 250 #limit for single task changeset parsing optimal for
 
@@ -209,27 +215,31 @@ def get_commits_stats(repo_name, ts_min_
 
    try:
 
        sa.add(stats)
 
        sa.commit()
 
    except:
 
        log.error(traceback.format_exc())
 
        sa.rollback()
 
        return False
 
    if len(repo.revisions) > 1:
 
        run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
 

	
 
    return True
 

	
 
@task
 
@task(ignore_result=True)
 
def reset_user_password(user_email):
 
    log = reset_user_password.get_logger()
 
    try:
 
        log = reset_user_password.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    from rhodecode.lib import auth
 
    from rhodecode.model.db import User
 

	
 
    try:
 
        try:
 
            sa = get_session()
 
            user = sa.query(User).filter(User.email == user_email).scalar()
 
            new_passwd = auth.PasswordGenerator().gen_password(8,
 
                             auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
 
            if user:
 
                user.password = auth.get_crypt_password(new_passwd)
 
                sa.add(user)
 
@@ -245,64 +255,73 @@ def reset_user_password(user_email):
 
        run_task(send_email, user_email,
 
                 "Your new rhodecode password",
 
                 'Your new rhodecode password:%s' % (new_passwd))
 
        log.info('send new password mail to %s', user_email)
 

	
 

	
 
    except:
 
        log.error('Failed to update user password')
 
        log.error(traceback.format_exc())
 

	
 
    return True
 

	
 
@task
 
@task(ignore_result=True)
 
def send_email(recipients, subject, body):
 
    """
 
    Sends an email with defined parameters from the .ini files.
 
    
 
    
 
    :param recipients: list of recipients, it this is empty the defined email
 
        address from field 'email_to' is used instead
 
    :param subject: subject of the mail
 
    :param body: body of the mail
 
    """
 
    log = send_email.get_logger()
 
    try:
 
        log = send_email.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    email_config = config
 

	
 
    if not recipients:
 
        recipients = [email_config.get('email_to')]
 

	
 
    mail_from = email_config.get('app_email_from')
 
    user = email_config.get('smtp_username')
 
    passwd = email_config.get('smtp_password')
 
    mail_server = email_config.get('smtp_server')
 
    mail_port = email_config.get('smtp_port')
 
    tls = str2bool(email_config.get('smtp_use_tls'))
 
    ssl = str2bool(email_config.get('smtp_use_ssl'))
 

	
 
    try:
 
        m = SmtpMailer(mail_from, user, passwd, mail_server,
 
                       mail_port, ssl, tls)
 
        m.send(recipients, subject, body)
 
    except:
 
        log.error('Mail sending failed')
 
        log.error(traceback.format_exc())
 
        return False
 
    return True
 

	
 
@task
 
@task(ignore_result=True)
 
def create_repo_fork(form_data, cur_user):
 
    try:
 
        log = create_repo_fork.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    from rhodecode.model.repo import RepoModel
 
    from vcs import get_backend
 
    log = create_repo_fork.get_logger()
 

	
 
    repo_model = RepoModel(get_session())
 
    repo_model.create(form_data, cur_user, just_db=True, fork=True)
 
    repo_name = form_data['repo_name']
 
    repos_path = get_repos_path()
 
    repo_path = os.path.join(repos_path, repo_name)
 
    repo_fork_path = os.path.join(repos_path, form_data['fork_name'])
 
    alias = form_data['repo_type']
 

	
 
    log.info('creating repo fork %s as %s', repo_name, repo_path)
 
    backend = get_backend(alias)
 
    backend(str(repo_fork_path), create=True, src_url=str(repo_path))
 

	
rhodecode/lib/celerypylons/commands.py
Show inline comments
 
from rhodecode.lib.utils import BasePasterCommand, Command
 

	
 
from celery.app import app_or_default
 
from celery.bin import camqadm, celerybeat, celeryd, celeryev
 

	
 
__all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand',
 
           'CAMQPAdminCommand', 'CeleryEventCommand']
 

	
 

	
 
class CeleryDaemonCommand(BasePasterCommand):
 
class CeleryCommand(BasePasterCommand):
 
    """Abstract class implements run methods needed for celery
 

	
 
    Starts the celery worker that uses a paste.deploy configuration
 
    file.
 
    """
 

	
 
    def update_parser(self):
 
        """
 
        Abstract method.  Allows for the class's parser to be updated
 
        before the superclass's `run` method is called.  Necessary to
 
        allow options/arguments to be passed through to the underlying
 
        celery command.
 
        """
 

	
 
        cmd = self.celery_command(app_or_default())
 
        for x in cmd.get_options():
 
            self.parser.add_option(x)
 

	
 
    def command(self):
 
        cmd = self.celery_command(app_or_default())
 
        return cmd.run(**vars(self.options))
 

	
 
class CeleryDaemonCommand(CeleryCommand):
 
    """Start the celery worker
 

	
 
    Starts the celery worker that uses a paste.deploy configuration
 
    file.
 
    """
 
    usage = 'CONFIG_FILE [celeryd options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 

	
 
    def update_parser(self):
 
        from celery.bin import celeryd
 
        for x in celeryd.WorkerCommand().get_options():
 
            self.parser.add_option(x)
 

	
 
    def command(self):
 
        from celery.bin import celeryd
 
        return celeryd.WorkerCommand().run(**vars(self.options))
 
    celery_command = celeryd.WorkerCommand
 

	
 

	
 
class CeleryBeatCommand(BasePasterCommand):
 
class CeleryBeatCommand(CeleryCommand):
 
    """Start the celery beat server
 

	
 
    Starts the celery beat server using a paste.deploy configuration
 
    file.
 
    """
 
    usage = 'CONFIG_FILE [celerybeat options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 

	
 
    def update_parser(self):
 
        from celery.bin import celerybeat
 
        for x in celerybeat.BeatCommand().get_options():
 
            self.parser.add_option(x)
 
    celery_command = celerybeat.BeatCommand
 

	
 
    def command(self):
 
        from celery.bin import celerybeat
 
        return celerybeat.BeatCommand(**vars(self.options))
 

	
 
class CAMQPAdminCommand(BasePasterCommand):
 
class CAMQPAdminCommand(CeleryCommand):
 
    """CAMQP Admin
 

	
 
    CAMQP celery admin tool.
 
    """
 
    usage = 'CONFIG_FILE [camqadm options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 

	
 
    def update_parser(self):
 
        from celery.bin import camqadm
 
        for x in camqadm.OPTION_LIST:
 
            self.parser.add_option(x)
 
    celery_command = camqadm.AMQPAdminCommand
 

	
 
    def command(self):
 
        from celery.bin import camqadm
 
        return camqadm.camqadm(*self.args, **vars(self.options))
 

	
 

	
 
class CeleryEventCommand(BasePasterCommand):
 
    """Celery event commandd.
 
class CeleryEventCommand(CeleryCommand):
 
    """Celery event command.
 

	
 
    Capture celery events.
 
    """
 
    usage = 'CONFIG_FILE [celeryev options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 

	
 
    def update_parser(self):
 
        from celery.bin import celeryev
 
        for x in celeryev.OPTION_LIST:
 
            self.parser.add_option(x)
 

	
 
    def command(self):
 
        from celery.bin import celeryev
 
        return celeryev.run_celeryev(**vars(self.options))
 
    celery_command = celeryev.EvCommand
rhodecode/lib/celerypylons/loader.py
Show inline comments
 
@@ -8,43 +8,56 @@ LIST_PARAMS = """CELERY_IMPORTS ADMINS R
 

	
 

	
 
class PylonsSettingsProxy(object):
 
    """Pylons Settings Proxy
 

	
 
    Proxies settings from pylons.config
 

	
 
    """
 
    def __getattr__(self, key):
 
        pylons_key = to_pylons(key)
 
        try:
 
            value = config[pylons_key]
 
            if key in LIST_PARAMS: return value.split()
 
            if key in LIST_PARAMS:return value.split()
 
            return self.type_converter(value)
 
        except KeyError:
 
            raise AttributeError(pylons_key)
 

	
 
    def get(self, key):
 
        try:
 
            return self.__getattr__(key)
 
        except AttributeError:
 
            return None
 

	
 
    def __getitem__(self, key):
 
        try:
 
            return self.__getattr__(key)
 
        except AttributeError:
 
            raise KeyError()
 

	
 
    def __setattr__(self, key, value):
 
        pylons_key = to_pylons(key)
 
        config[pylons_key] = value
 

	
 
    def __setitem__(self, key, value):
 
        self.__setattr__(key, value)
 

	
 
    def type_converter(self, value):
 
        #cast to int
 
        if value.isdigit():
 
            return int(value)
 

	
 
        #cast to bool
 
        if value.lower() in ['true', 'false']:
 
            return value.lower() == 'true'
 

	
 
        return value
 

	
 
class PylonsLoader(BaseLoader):
 
    """Pylons celery loader
 

	
 
    Maps the celery config onto pylons.config
 

	
 
    """
 
    def read_configuration(self):
 
        self.configured = True
 
        return PylonsSettingsProxy()
 

	
setup.py
Show inline comments
 
@@ -3,25 +3,25 @@ py_version = sys.version_info
 

	
 
from rhodecode import get_version
 

	
 
requirements = [
 
        "Pylons==1.0.0",
 
        "WebHelpers>=1.2",
 
        "SQLAlchemy>=0.6.6",
 
        "Mako==0.3.6",
 
        "vcs>=0.1.10",
 
        "pygments>=1.4",
 
        "mercurial>=1.7.3",
 
        "whoosh>=1.3.4",
 
        "celery>=2.1.4",
 
        "celery>=2.2.2",
 
        "py-bcrypt",
 
        "babel",
 
    ]
 

	
 
classifiers = ['Development Status :: 4 - Beta',
 
               'Environment :: Web Environment',
 
               'Framework :: Pylons',
 
               'Intended Audience :: Developers',
 
               'License :: OSI Approved :: BSD License',
 
               'Operating System :: OS Independent',
 
               'Programming Language :: Python', ]
 

	
0 comments (0 inline, 0 general)