Changeset - 046fbed12f70
[Not reviewed]
default
0 3 0
Mads Kiilerich - 6 years ago 2020-02-13 16:41:51
mads@kiilerich.com
Grafted from: e68bb42cb9f1
celery: use celery directly instead of leaky abstraction in celerypylons

Things start making more sense when we remove unnecessary complexity ...
3 files changed with 4 insertions and 13 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/kallithea_cli_celery.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import celery.bin.worker
 
import click
 

	
 
import kallithea
 
import kallithea.bin.kallithea_cli_base as cli_base
 
from kallithea.lib import celerypylons
 

	
 

	
 
@cli_base.register_command(config_file_initialize_app=True)
 
@click.argument('celery_args', nargs=-1)
 
def celery_run(celery_args):
 
    """Start Celery worker(s) for asynchronous tasks.
 

	
 
    This commands starts the Celery daemon which will spawn workers to handle
 
    certain asynchronous tasks for Kallithea.
 

	
 
    Any extra arguments you pass to this command will be passed through to
 
    Celery. Use '--' before such extra arguments to avoid options to be parsed
 
    by this CLI command.
 
    """
 

	
 
    if not kallithea.CELERY_APP:
 
        raise Exception('Please set use_celery = true in .ini config '
 
                        'file before running this command')
 

	
 
    cmd = celerypylons.worker.worker(kallithea.CELERY_APP)
 
    cmd = celery.bin.worker.worker(kallithea.CELERY_APP)
 
    return cmd.run_from_argv(None, command='celery-run -c CONFIG_FILE --', argv=list(celery_args))
kallithea/controllers/admin/repos.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.repos
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Repositories controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 7, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import celery.result
 
import formencode
 
from formencode import htmlfill
 
from tg import request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPForbidden, HTTPFound, HTTPInternalServerError, HTTPNotFound
 

	
 
import kallithea
 
from kallithea.config.routing import url
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasPermissionAny, HasRepoPermissionLevelDecorator, LoginRequired, NotAnonymous
 
from kallithea.lib.base import BaseRepoController, jsonify, render
 
from kallithea.lib.exceptions import AttachedForksError
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.utils2 import safe_int
 
from kallithea.lib.vcs import RepositoryError
 
from kallithea.model.db import RepoGroup, Repository, RepositoryField, Setting, User, UserFollowing
 
from kallithea.model.forms import RepoFieldForm, RepoForm, RepoPermsForm
 
from kallithea.model.meta import Session
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import AvailableRepoGroupChoices, RepoList, ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ReposController(BaseRepoController):
 
    """
 
    REST Controller styled on the Atom Publishing Protocol"""
 
    # To properly map this controller, ensure your config/routing.py
 
    # file has a resource setup:
 
    #     map.resource('repo', 'repos')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    def _before(self, *args, **kwargs):
 
        super(ReposController, self)._before(*args, **kwargs)
 

	
 
    def _load_repo(self):
 
        repo_obj = c.db_repo
 

	
 
        if repo_obj is None:
 
            h.not_mapped_error(c.repo_name)
 
            raise HTTPFound(location=url('repos'))
 

	
 
        return repo_obj
 

	
 
    def __load_defaults(self, repo=None):
 
        top_perms = ['hg.create.repository']
 
@@ -137,99 +138,98 @@ class ReposController(BaseRepoController
 
            msg = (_('Error creating repository %s')
 
                   % form_result.get('repo_name'))
 
            h.flash(msg, category='error')
 
            raise HTTPFound(location=url('home'))
 

	
 
        raise HTTPFound(location=h.url('repo_creating_home',
 
                              repo_name=form_result['repo_name_full'],
 
                              task_id=task_id))
 

	
 
    @NotAnonymous()
 
    def create_repository(self):
 
        self.__load_defaults()
 
        if not c.repo_groups:
 
            raise HTTPForbidden
 
        parent_group = request.GET.get('parent_group')
 

	
 
        ## apply the defaults from defaults page
 
        defaults = Setting.get_default_repo_settings(strip_prefix=True)
 
        if parent_group:
 
            prg = RepoGroup.get(parent_group)
 
            if prg is None or not any(rgc[0] == prg.group_id
 
                                      for rgc in c.repo_groups):
 
                raise HTTPForbidden
 
            defaults.update({'repo_group': parent_group})
 

	
 
        return htmlfill.render(
 
            render('admin/repos/repo_add.html'),
 
            defaults=defaults,
 
            errors={},
 
            prefix_error=False,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @LoginRequired()
 
    def repo_creating(self, repo_name):
 
        c.repo = repo_name
 
        c.task_id = request.GET.get('task_id')
 
        if not c.repo:
 
            raise HTTPNotFound()
 
        return render('admin/repos/repo_creating.html')
 

	
 
    @LoginRequired()
 
    @jsonify
 
    def repo_check(self, repo_name):
 
        c.repo = repo_name
 
        task_id = request.GET.get('task_id')
 

	
 
        if task_id and task_id not in ['None']:
 
            from kallithea.lib import celerypylons
 
            if kallithea.CELERY_APP:
 
                task_result = celerypylons.result.AsyncResult(task_id, app=kallithea.CELERY_APP)
 
                task_result = celery.result.AsyncResult(task_id, app=kallithea.CELERY_APP)
 
                if task_result.failed():
 
                    raise HTTPInternalServerError(task_result.traceback)
 

	
 
        repo = Repository.get_by_repo_name(repo_name)
 
        if repo and repo.repo_state == Repository.STATE_CREATED:
 
            if repo.clone_uri:
 
                h.flash(_('Created repository %s from %s')
 
                        % (repo.repo_name, repo.clone_uri_hidden), category='success')
 
            else:
 
                repo_url = h.link_to(repo.repo_name,
 
                                     h.url('summary_home',
 
                                           repo_name=repo.repo_name))
 
                fork = repo.fork
 
                if fork is not None:
 
                    fork_name = fork.repo_name
 
                    h.flash(h.HTML(_('Forked repository %s as %s'))
 
                            % (fork_name, repo_url), category='success')
 
                else:
 
                    h.flash(h.HTML(_('Created repository %s')) % repo_url,
 
                            category='success')
 
            return {'result': True}
 
        return {'result': False}
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def update(self, repo_name):
 
        c.repo_info = self._load_repo()
 
        self.__load_defaults(c.repo_info)
 
        c.active = 'settings'
 
        c.repo_fields = RepositoryField.query() \
 
            .filter(RepositoryField.repository == c.repo_info).all()
 

	
 
        repo_model = RepoModel()
 
        changed_name = repo_name
 
        repo = Repository.get_by_repo_name(repo_name)
 
        old_data = {
 
            'repo_name': repo_name,
 
            'repo_group': repo.group.get_dict() if repo.group else {},
 
            'repo_type': repo.repo_type,
 
        }
 
        _form = RepoForm(edit=True, old_data=old_data,
 
                         repo_groups=c.repo_groups,
 
                         landing_revs=c.landing_revs_choices)()
 

	
 
        try:
 
            form_result = _form.to_python(dict(request.POST))
 
            repo = repo_model.update(repo_name, **form_result)
 
            ScmModel().mark_for_invalidation(repo_name)
 
            h.flash(_('Repository %s updated successfully') % repo_name,
kallithea/lib/celerypylons/__init__.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
"""
 
Kallithea wrapper of Celery
 

	
 
The Celery configuration is in the Kallithea ini file but must be converted to an
 
entirely different format before Celery can use it.
 

	
 
We read the configuration from tg.config at module import time. This module can
 
thus not be imported in global scope but must be imported on demand in function
 
scope after tg.config has been initialized.
 

	
 
To make sure that the config really has been initialized, we check one of the
 
mandatory settings.
 
"""
 

	
 
import celery
 
import celery.result as result
 
import tg
 
from celery.bin import worker
 
from celery.task import task
 

	
 

	
 
# mute pyflakes "imported but unused"
 
assert result
 
assert worker
 
assert task
 

	
 

	
 
def celery_config(config):
 
    """Return Celery config object populated from relevant settings in a config dict, such as tg.config"""
 

	
 
    # Verify .ini file configuration has been loaded
 
    assert config['celery.imports'] == 'kallithea.lib.celerylib.tasks', 'Kallithea Celery configuration has not been loaded'
 

	
 
    class CeleryConfig(object):
 
        pass
 

	
 
    celery_config = CeleryConfig()
 

	
 
    PREFIXES = """ADMINS BROKER CASSANDRA CELERYBEAT CELERYD CELERYMON CELERY EMAIL SERVER""".split()
 
    LIST_PARAMS = """CELERY_IMPORTS ADMINS ROUTES CELERY_ACCEPT_CONTENT""".split()
 

	
 
    for config_key, config_value in sorted(config.items()):
 
        celery_key = config_key.replace('.', '_').upper()
 
        if celery_key.split('_', 1)[0] not in PREFIXES:
 
            continue
 
        if not isinstance(config_value, str):
 
            continue
 
        if celery_key in LIST_PARAMS:
 
            celery_value = config_value.split()
 
        elif config_value.isdigit():
 
            celery_value = int(config_value)
 
        elif config_value.lower() in ['true', 'false']:
 
            celery_value = config_value.lower() == 'true'
 
        else:
 
            celery_value = config_value
 
        setattr(celery_config, celery_key, celery_value)
 
    return celery_config
 

	
 

	
 
def make_app():
 
    """Create celery app from the TurboGears configuration file"""
 
    app = celery.Celery()
 
    app.config_from_object(celery_config(tg.config))
 
    return app
0 comments (0 inline, 0 general)