Changeset - 82662f9faaf4
[Not reviewed]
default
0 8 0
Mads Kiilerich - 9 years ago 2016-09-06 00:51:18
madski@unity3d.com
celeryd: annotate tasks so they can be run directly without run_task

This also makes the system less forgiving about celery configuration problems
and thus easier to debug. I like that.
8 files changed with 56 insertions and 60 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/settings.py
Show inline comments
 
@@ -34,13 +34,13 @@ from pylons import request, tmpl_context
 
from pylons.i18n.translation import _
 
from webob.exc import HTTPFound
 

	
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import LoginRequired, HasPermissionAnyDecorator
 
from kallithea.lib.base import BaseController, render
 
from kallithea.lib.celerylib import tasks, run_task
 
from kallithea.lib.celerylib import tasks
 
from kallithea.lib.exceptions import HgsubversionImportError
 
from kallithea.lib.utils import repo2db_mapper, set_app_settings
 
from kallithea.model.db import Ui, Repository, Setting
 
from kallithea.model.forms import ApplicationSettingsForm, \
 
    ApplicationUiSettingsForm, ApplicationVisualisationForm
 
from kallithea.model.scm import ScmModel
 
@@ -322,14 +322,14 @@ class SettingsController(BaseController)
 
            test_email_html_body = EmailNotificationModel() \
 
                .get_email_tmpl(EmailNotificationModel.TYPE_DEFAULT,
 
                                'html', body=test_body)
 

	
 
            recipients = [test_email] if test_email else None
 

	
 
            run_task(tasks.send_email, recipients, test_email_subj,
 
                     test_email_txt_body, test_email_html_body)
 
            tasks.send_email(recipients, test_email_subj,
 
                             test_email_txt_body, test_email_html_body)
 

	
 
            h.flash(_('Send email task created'), category='success')
 
            raise HTTPFound(location=url('admin_settings_email'))
 

	
 
        defaults = Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 
@@ -395,13 +395,13 @@ class SettingsController(BaseController)
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_search(self):
 
        c.active = 'search'
 
        if request.POST:
 
            repo_location = self._get_hg_ui_settings()['paths_root_path']
 
            full_index = request.POST.get('full_index', False)
 
            run_task(tasks.whoosh_index, repo_location, full_index)
 
            tasks.whoosh_index(repo_location, full_index)
 
            h.flash(_('Whoosh reindex task scheduled'), category='success')
 
            raise HTTPFound(location=url('admin_settings_search'))
 

	
 
        defaults = Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
kallithea/controllers/summary.py
Show inline comments
 
@@ -46,13 +46,12 @@ from kallithea.lib.utils import jsonify
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.lib.auth import LoginRequired, HasRepoPermissionAnyDecorator, \
 
    NotAnonymous
 
from kallithea.lib.base import BaseRepoController, render
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.markup_renderer import MarkupRenderer
 
from kallithea.lib.celerylib import run_task
 
from kallithea.lib.celerylib.tasks import get_commits_stats
 
from kallithea.lib.compat import json
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.controllers.changelog import _load_changelog_summary
 

	
 
log = logging.getLogger(__name__)
 
@@ -222,9 +221,8 @@ class SummaryController(BaseRepoControll
 
            c.commit_data = json.dumps({})
 
            c.overview_data = json.dumps([[ts_min_y, 0], [ts_max_y, 10]])
 
            c.trending_languages = json.dumps({})
 
            c.no_data = True
 

	
 
        recurse_limit = 500  # don't recurse more than 500 times when parsing
 
        run_task(get_commits_stats, c.db_repo.repo_name, ts_min_y,
 
                 ts_max_y, recurse_limit)
 
        get_commits_stats(c.db_repo.repo_name, ts_min_y, ts_max_y, recurse_limit)
 
        return render('summary/statistics.html')
kallithea/lib/celerylib/__init__.py
Show inline comments
 
@@ -24,14 +24,12 @@ Original author and date, and relevant c
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import os
 
import socket
 
import traceback
 
import logging
 

	
 
from pylons import config
 

	
 
from hashlib import md5
 
from decorator import decorator
 
@@ -59,38 +57,41 @@ class FakeTask(object):
 

	
 
    traceback = None # if failed
 

	
 
    task_id = None
 

	
 

	
 
def run_task(task, *args, **kwargs):
 
    global CELERY_ON
 
    if CELERY_ON:
 
        try:
 
            t = task.apply_async(args=args, kwargs=kwargs)
 
            log.info('running task %s:%s', t.task_id, task)
 
            return t
 
def task(f_org):
 
    """Wrapper of celery.task.task, running async if CELERY_ON
 
    """
 

	
 
        except socket.error as e:
 
            if isinstance(e, IOError) and e.errno == 111:
 
                log.debug('Unable to connect to celeryd. Sync execution')
 
                CELERY_ON = False
 
            else:
 
                log.error(traceback.format_exc())
 
        except KeyError as e:
 
                log.debug('Unable to connect to celeryd. Sync execution')
 
        except Exception as e:
 
            log.error(traceback.format_exc())
 
    if CELERY_ON:
 
        def f_async(*args, **kwargs):
 
            log.info('executing %s task', f_org.__name__)
 
            try:
 
                f_org(*args, **kwargs)
 
            finally:
 
                log.info('executed %s task', f_org.__name__)
 
        f_async.__name__ = f_org.__name__
 
        import celery.task
 
        runner = celery.task.task(ignore_result=True)(f_async)
 
        def f_wrapped(*args, **kwargs):
 
            t = runner.apply_async(args=args, kwargs=kwargs)
 
            log.info('executing task %s in async mode - id %s', f_org, t.task_id)
 
            return t
 
    else:
 
        def f_wrapped(*args, **kwargs):
 
            log.info('executing task %s in sync', f_org.__name__)
 
            try:
 
                result = f_org(*args, **kwargs)
 
            except Exception as e:
 
                log.error('exception executing sync task %s in sync', f_org.__name__, e)
 
                raise # TODO: return this in FakeTask as with async tasks?
 
            return FakeTask(result)
 

	
 
    log.debug('executing task %s in sync mode', task)
 
    try:
 
        result = task(*args, **kwargs)
 
    except Exception as e:
 
        log.error('exception running sync task %s: %s', task, e)
 
        raise # TODO: return this in FakeTask as with async tasks?
 
    return FakeTask(result)
 
    return f_wrapped
 

	
 

	
 
def __get_lockkey(func, *fargs, **fkwargs):
 
    params = list(fargs)
 
    params.extend(['%s-%s' % ar for ar in fkwargs.items()])
 

	
kallithea/lib/celerylib/tasks.py
Show inline comments
 
@@ -23,27 +23,26 @@ Original author and date, and relevant c
 
:created_on: Oct 6, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
from celery.task import task
 

	
 
import os
 
import traceback
 
import logging
 
import rfc822
 

	
 
from time import mktime
 
from operator import itemgetter
 
from string import lower
 

	
 
from pylons import config
 

	
 
from kallithea import CELERY_ON
 
from kallithea.lib.celerylib import run_task, locked_task, dbsession, \
 
from kallithea.lib import celerylib
 
from kallithea.lib.celerylib import locked_task, dbsession, \
 
    str2bool, __get_lockkey, LockHeld, DaemonLock, get_session
 
from kallithea.lib.helpers import person
 
from kallithea.lib.rcmail.smtp_mailer import SmtpMailer
 
from kallithea.lib.utils import setup_cache_regions, action_logger
 
from kallithea.lib.vcs.utils import author_email
 
from kallithea.lib.compat import json, OrderedDict
 
@@ -57,26 +56,26 @@ setup_cache_regions(config)  # pragma: n
 
__all__ = ['whoosh_index', 'get_commits_stats', 'send_email']
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
@task(ignore_result=True)
 
@celerylib.task
 
@locked_task
 
@dbsession
 
def whoosh_index(repo_location, full_index):
 
    from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
    DBS = get_session()
 

	
 
    index_location = config['index_dir']
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location, sa=DBS) \
 
                         .run(full_index=full_index)
 

	
 

	
 
@task(ignore_result=True)
 
@celerylib.task
 
@dbsession
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit=100):
 
    DBS = get_session()
 
    lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
 
                            ts_max_y)
 
    lockkey_path = config['app_conf']['cache_dir']
 
@@ -225,24 +224,23 @@ def get_commits_stats(repo_name, ts_min_
 

	
 
        # final release
 
        lock.release()
 

	
 
        # execute another task if celery is enabled
 
        if len(repo.revisions) > 1 and CELERY_ON and recurse_limit > 0:
 
            recurse_limit -= 1
 
            run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y,
 
                     recurse_limit)
 
        if recurse_limit <= 0:
 
            log.debug('Breaking recursive mode due to reach of recurse limit')
 
        return True
 
            get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit - 1)
 
        elif recurse_limit <= 0:
 
            log.debug('Not recursing - limit has been reached')
 
        else:
 
            log.debug('Not recursing')
 
    except LockHeld:
 
        log.info('Task with key %s already running', lockkey)
 
        return 'Task with key %s already running' % lockkey
 

	
 

	
 
@task(ignore_result=True)
 
@celerylib.task
 
@dbsession
 
def send_email(recipients, subject, body='', html_body='', headers=None, author=None):
 
    """
 
    Sends an email with defined parameters from the .ini files.
 

	
 
    :param recipients: list of recipients, if this is None, the defined email
 
@@ -323,13 +321,13 @@ def send_email(recipients, subject, body
 
    except:
 
        log.error('Mail sending failed')
 
        log.error(traceback.format_exc())
 
        return False
 
    return True
 

	
 
@task(ignore_result=False)
 
@celerylib.task
 
@dbsession
 
def create_repo(form_data, cur_user):
 
    from kallithea.model.repo import RepoModel
 
    from kallithea.model.user import UserModel
 
    from kallithea.model.db import Setting
 

	
 
@@ -407,13 +405,13 @@ def create_repo(form_data, cur_user):
 
            RepoModel(DBS)._delete_filesystem_repo(repo)
 
        raise
 

	
 
    return True
 

	
 

	
 
@task(ignore_result=False)
 
@celerylib.task
 
@dbsession
 
def create_repo_fork(form_data, cur_user):
 
    """
 
    Creates a fork of repository using interval VCS methods
 

	
 
    :param form_data:
kallithea/model/notification.py
Show inline comments
 
@@ -59,13 +59,13 @@ class NotificationModel(BaseModel):
 
        :param recipients: list of int, str or User objects, when None
 
            is given send to all admins
 
        :param type_: type of notification
 
        :param with_email: send email with this notification
 
        :param email_kwargs: additional dict to pass as args to email template
 
        """
 
        from kallithea.lib.celerylib import tasks, run_task
 
        from kallithea.lib.celerylib import tasks
 
        email_kwargs = email_kwargs or {}
 
        if recipients and not getattr(recipients, '__iter__', False):
 
            raise Exception('recipients must be a list or iterable')
 

	
 
        created_by_obj = self._get_user(created_by)
 

	
 
@@ -129,13 +129,13 @@ class NotificationModel(BaseModel):
 
                                .get_email_description(type_, **txt_kwargs)
 
            email_txt_body = EmailNotificationModel() \
 
                                .get_email_tmpl(type_, 'txt', **txt_kwargs)
 
            email_html_body = EmailNotificationModel() \
 
                                .get_email_tmpl(type_, 'html', **html_kwargs)
 

	
 
            run_task(tasks.send_email, [rec.email], email_subject, email_txt_body,
 
            tasks.send_email([rec.email], email_subject, email_txt_body,
 
                     email_html_body, headers, author=created_by_obj)
 

	
 
        return notif
 

	
 
    def delete(self, user, notification):
 
        # we don't want to remove actual notification just the assignment
kallithea/model/repo.py
Show inline comments
 
@@ -470,14 +470,14 @@ class RepoModel(BaseModel):
 
        """
 
        Create repository using celery tasks
 

	
 
        :param form_data:
 
        :param cur_user:
 
        """
 
        from kallithea.lib.celerylib import tasks, run_task
 
        return run_task(tasks.create_repo, form_data, cur_user)
 
        from kallithea.lib.celerylib import tasks
 
        return tasks.create_repo(form_data, cur_user)
 

	
 
    def _update_permissions(self, repo, perms_new=None, perms_updates=None,
 
                            check_perms=True):
 
        if not perms_new:
 
            perms_new = []
 
        if not perms_updates:
 
@@ -519,14 +519,14 @@ class RepoModel(BaseModel):
 
        """
 
        Simple wrapper into executing celery task for fork creation
 

	
 
        :param form_data:
 
        :param cur_user:
 
        """
 
        from kallithea.lib.celerylib import tasks, run_task
 
        return run_task(tasks.create_repo_fork, form_data, cur_user)
 
        from kallithea.lib.celerylib import tasks
 
        return tasks.create_repo_fork(form_data, cur_user)
 

	
 
    def delete(self, repo, forks=None, fs_remove=True, cur_user=None):
 
        """
 
        Delete given repository, forks parameter defines what do do with
 
        attached forks. Throws AttachedForksError if deleted repo has attached
 
        forks
kallithea/model/user.py
Show inline comments
 
@@ -326,13 +326,13 @@ class UserModel(BaseModel):
 
        Sends email with a password reset token and link to the password
 
        reset confirmation page with all information (including the token)
 
        pre-filled. Also returns URL of that page, only without the token,
 
        allowing users to copy-paste or manually enter the token from the
 
        email.
 
        """
 
        from kallithea.lib.celerylib import tasks, run_task
 
        from kallithea.lib.celerylib import tasks
 
        from kallithea.model.notification import EmailNotificationModel
 
        import kallithea.lib.helpers as h
 

	
 
        user_email = data['email']
 
        user = User.get_by_email(user_email)
 
        timestamp = int(time.time())
 
@@ -361,24 +361,23 @@ class UserModel(BaseModel):
 
            html_body = EmailNotificationModel().get_email_tmpl(
 
                reg_type, 'html',
 
                user=user.short_contact,
 
                reset_token=token,
 
                reset_url=link)
 
            log.debug('sending email')
 
            run_task(tasks.send_email, [user_email],
 
                     _("Password reset link"), body, html_body)
 
            tasks.send_email([user_email], _("Password reset link"), body, html_body)
 
            log.info('send new password mail to %s', user_email)
 
        else:
 
            log.debug("password reset email %s not found", user_email)
 

	
 
        return h.url('reset_password_confirmation',
 
                     email=user_email,
 
                     timestamp=timestamp)
 

	
 
    def verify_reset_password_token(self, email, timestamp, token):
 
        from kallithea.lib.celerylib import tasks, run_task
 
        from kallithea.lib.celerylib import tasks
 
        from kallithea.lib import auth
 
        import kallithea.lib.helpers as h
 
        user = User.get_by_email(email)
 
        if user is None:
 
            log.debug("user with email %s not found", email)
 
            return False
 
@@ -398,26 +397,26 @@ class UserModel(BaseModel):
 
                                                       h.authentication_token())
 
        log.debug('computed password reset token: %s', expected_token)
 
        log.debug('received password reset token: %s', token)
 
        return expected_token == token
 

	
 
    def reset_password(self, user_email, new_passwd):
 
        from kallithea.lib.celerylib import tasks, run_task
 
        from kallithea.lib.celerylib import tasks
 
        from kallithea.lib import auth
 
        user = User.get_by_email(user_email)
 
        if user is not None:
 
            if not self.can_change_password(user):
 
                raise Exception('trying to change password for external user')
 
            user.password = auth.get_crypt_password(new_passwd)
 
            Session().add(user)
 
            Session().commit()
 
            log.info('change password for %s', user_email)
 
        if new_passwd is None:
 
            raise Exception('unable to set new password')
 

	
 
        run_task(tasks.send_email, [user_email],
 
        tasks.send_email([user_email],
 
                 _('Password reset notification'),
 
                 _('The password to your account %s has been changed using password reset form.') % (user.username,))
 
        log.info('send password reset mail to %s', user_email)
 

	
 
        return True
 

	
kallithea/tests/models/test_notifications.py
Show inline comments
 
@@ -262,13 +262,13 @@ class TestNotifications(TestController):
 
                                                       subject=u'unused', body=body, email_kwargs=kwargs,
 
                                                       recipients=[self.u2], type_=type_)
 

	
 
            # Email type TYPE_PASSWORD_RESET has no corresponding notification type - test it directly:
 
            desc = 'TYPE_PASSWORD_RESET'
 
            kwargs = dict(user='John Doe', reset_token='decbf64715098db5b0bd23eab44bd792670ab746', reset_url='http://reset.com/decbf64715098db5b0bd23eab44bd792670ab746')
 
            kallithea.lib.celerylib.run_task(kallithea.lib.celerylib.tasks.send_email, ['john@doe.com'],
 
            kallithea.lib.celerylib.tasks.send_email(['john@doe.com'],
 
                "Password reset link",
 
                EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'txt', **kwargs),
 
                EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'html', **kwargs),
 
                author=User.get(self.u1))
 

	
 
        out = '<!doctype html>\n<html lang="en">\n<head><title>Notifications</title><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"></head>\n<body>\n%s\n</body>\n</html>\n' % \
0 comments (0 inline, 0 general)