Changeset - 7e7db11d4e4d
[Not reviewed]
default
0 6 0
Mads Kiilerich - 9 years ago 2016-09-06 00:51:18
madski@unity3d.com
celerypylons: wrap celery import so we always get the right environment variables set and check configuration

Get rid of magic ... or at least document and encapsulate it.

Before, celerypylons would set the environment variables that made it possible
to import Celery after Pylons had been configured.

Now, the module will import Celery ... and verify that Pylons has been
configured.

(A next stop could be to move things around so this got tied closely to
initializing Pylons. Or something.)
6 files changed with 35 insertions and 27 deletions:
0 comments (0 inline, 0 general)
kallithea/config/environment.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
    Pylons environment configuration
 
"""
 

	
 
import os
 
import logging
 
import kallithea
 
import platform
 

	
 
import pylons
 
import mako.lookup
 
import beaker
 
import formencode
 

	
 
# don't remove this import it does magic for celery
 
from kallithea.lib import celerypylons
 

	
 
import kallithea.lib.app_globals as app_globals
 

	
 
from kallithea.config.routing import make_map
 

	
 
from kallithea.lib import helpers
 
from kallithea.lib.auth import set_available_permissions
 
from kallithea.lib.utils import repo2db_mapper, make_ui, set_app_settings, \
 
    load_rcextensions, check_git_version, set_vcs_config, set_indexer_config
 
from kallithea.lib.utils2 import engine_from_config, str2bool
 
from kallithea.lib.db_manage import DbManage
 
from kallithea.model import init_model
 
from kallithea.model.scm import ScmModel
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def load_environment(global_conf, app_conf,
 
                     test_env=None, test_index=None):
 
    """
 
    Configure the Pylons environment via the ``pylons.config``
 
    object
 
    """
 
    config = pylons.configuration.PylonsConfig()
 

	
 
    # Pylons paths
 
    root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 
    paths = dict(
 
        root=root,
 
        controllers=os.path.join(root, 'controllers'),
 
        static_files=os.path.join(root, 'public'),
 
        templates=[os.path.join(root, 'templates')]
 
    )
 

	
 
    # Initialize config with the basic options
 
    config.init_app(global_conf, app_conf, package='kallithea', paths=paths)
 

	
 
    # store some globals into kallithea
 
    kallithea.CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
 
    kallithea.CELERY_EAGER = str2bool(config['app_conf'].get('celery.always.eager'))
 

	
 
    config['routes.map'] = make_map(config)
 
    config['pylons.app_globals'] = app_globals.Globals(config)
 
    config['pylons.h'] = helpers
 
    kallithea.CONFIG = config
 

	
 
    load_rcextensions(root_path=config['here'])
 

	
 
    # Setup cache object as early as possible
 
    pylons.cache._push_object(config['pylons.app_globals'].cache)
 

	
 
    # Create the Mako TemplateLookup, with the default auto-escaping
 
    config['pylons.app_globals'].mako_lookup = mako.lookup.TemplateLookup(
 
        directories=paths['templates'],
 
        strict_undefined=True,
 
        module_directory=os.path.join(app_conf['cache_dir'], 'templates'),
 
        input_encoding='utf-8', default_filters=['escape'],
 
        imports=['from webhelpers.html import escape'])
 

	
 
    # sets the c attribute access when don't existing attribute are accessed
 
    config['pylons.strict_tmpl_context'] = True
 
    test = os.path.split(config['__file__'])[-1] == 'test.ini'
 
    if test:
 
        if test_env is None:
 
            test_env = not int(os.environ.get('KALLITHEA_NO_TMP_PATH', 0))
 
        if test_index is None:
 
            test_index = not int(os.environ.get('KALLITHEA_WHOOSH_TEST_DISABLE', 0))
 
        if os.environ.get('TEST_DB'):
 
            # swap config if we pass enviroment variable
 
            config['sqlalchemy.db1.url'] = os.environ.get('TEST_DB')
 

	
 
        from kallithea.lib.utils import create_test_env, create_test_index
 
        from kallithea.tests import TESTS_TMP_PATH
 
        #set KALLITHEA_NO_TMP_PATH=1 to disable re-creating the database and
 
        #test repos
 
        if test_env:
 
            create_test_env(TESTS_TMP_PATH, config)
 
        #set KALLITHEA_WHOOSH_TEST_DISABLE=1 to disable whoosh index during tests
 
        if test_index:
 
            create_test_index(TESTS_TMP_PATH, config, True)
 

	
 
    DbManage.check_waitress()
 
    # MULTIPLE DB configs
 
    # Setup the SQLAlchemy database engine
 
    sa_engine_db1 = engine_from_config(config, 'sqlalchemy.db1.')
 
    init_model(sa_engine_db1)
 

	
 
    set_available_permissions(config)
 
    repos_path = make_ui('db').configitems('paths')[0][1]
 
    config['base_path'] = repos_path
 
    set_app_settings(config)
 

	
 
    instance_id = kallithea.CONFIG.get('instance_id', '*')
 
    if instance_id == '*':
 
        instance_id = '%s-%s' % (platform.uname()[1], os.getpid())
 
        kallithea.CONFIG['instance_id'] = instance_id
 

	
 
    # CONFIGURATION OPTIONS HERE (note: all config options will override
 
    # any Pylons config options)
 

	
 
    # store config reference into our module to skip import magic of
 
    # pylons
 
    kallithea.CONFIG.update(config)
 
    set_vcs_config(kallithea.CONFIG)
 
    set_indexer_config(kallithea.CONFIG)
 

	
 
    #check git version
 
    check_git_version()
 

	
 
    if str2bool(config.get('initial_repo_scan', True)):
 
        repo2db_mapper(ScmModel().repo_scan(repos_path),
 
                       remove_obsolete=False, install_git_hooks=False)
 
    formencode.api.set_stdtranslation(languages=[config.get('lang')])
 
    return config
kallithea/controllers/admin/repos.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.admin.repos
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Repositories controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 7, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 
import formencode
 
from formencode import htmlfill
 
from pylons import request, tmpl_context as c, url
 
from pylons.i18n.translation import _
 
from sqlalchemy.sql.expression import func
 
from webob.exc import HTTPFound, HTTPInternalServerError, HTTPForbidden, HTTPNotFound
 

	
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import LoginRequired, \
 
    HasRepoPermissionAnyDecorator, NotAnonymous, HasPermissionAny
 
from kallithea.lib.base import BaseRepoController, render
 
from kallithea.lib.utils import action_logger, jsonify
 
from kallithea.lib.vcs import RepositoryError
 
from kallithea.model.meta import Session
 
from kallithea.model.db import User, Repository, UserFollowing, RepoGroup, \
 
    Setting, RepositoryField
 
from kallithea.model.forms import RepoForm, RepoFieldForm, RepoPermsForm
 
from kallithea.model.scm import ScmModel, AvailableRepoGroupChoices, RepoList
 
from kallithea.model.repo import RepoModel
 
from kallithea.lib.compat import json
 
from kallithea.lib.exceptions import AttachedForksError
 
from kallithea.lib.utils2 import safe_int
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ReposController(BaseRepoController):
 
    """
 
    REST Controller styled on the Atom Publishing Protocol"""
 
    # To properly map this controller, ensure your config/routing.py
 
    # file has a resource setup:
 
    #     map.resource('repo', 'repos')
 

	
 
    @LoginRequired()
 
    def __before__(self):
 
        super(ReposController, self).__before__()
 

	
 
    def _load_repo(self, repo_name):
 
        repo_obj = Repository.get_by_repo_name(repo_name)
 

	
 
        if repo_obj is None:
 
            h.not_mapped_error(repo_name)
 
            raise HTTPFound(location=url('repos'))
 

	
 
        return repo_obj
 

	
 
    def __load_defaults(self, repo=None):
 
        top_perms = ['hg.create.repository']
 
        repo_group_perms = ['group.admin']
 
        if HasPermissionAny('hg.create.write_on_repogroup.true')():
 
            repo_group_perms.append('group.write')
 
        extras = [] if repo is None else [repo.group]
 

	
 
        c.repo_groups = AvailableRepoGroupChoices(top_perms, repo_group_perms, extras)
 

	
 
        c.landing_revs_choices, c.landing_revs = ScmModel().get_repo_landing_revs(repo)
 

	
 
    def __load_data(self, repo_name=None):
 
        """
 
        Load defaults settings for edit, and update
 

	
 
        :param repo_name:
 
        """
 
        c.repo_info = self._load_repo(repo_name)
 
        self.__load_defaults(c.repo_info)
 

	
 
        defaults = RepoModel()._get_defaults(repo_name)
 
        defaults['clone_uri'] = c.repo_info.clone_uri_hidden # don't show password
 

	
 
        return defaults
 

	
 
    def index(self, format='html'):
 
        _list = Repository.query() \
 
                        .order_by(func.lower(Repository.repo_name)) \
 
                        .all()
 

	
 
        c.repos_list = RepoList(_list, perm_set=['repository.admin'])
 
        repos_data = RepoModel().get_repos_as_dict(repos_list=c.repos_list,
 
                                                   admin=True,
 
                                                   super_user_actions=True)
 
        #json used to render the grid
 
        c.data = json.dumps(repos_data)
 

	
 
        return render('admin/repos/repos.html')
 

	
 
    @NotAnonymous()
 
    def create(self):
 
        self.__load_defaults()
 
        form_result = {}
 
        try:
 
            # CanWriteGroup validators checks permissions of this POST
 
            form_result = RepoForm(repo_groups=c.repo_groups,
 
                                   landing_revs=c.landing_revs_choices)() \
 
                            .to_python(dict(request.POST))
 

	
 
            # create is done sometimes async on celery, db transaction
 
            # management is handled there.
 
            task = RepoModel().create(form_result, self.authuser.user_id)
 
            task_id = task.task_id
 
        except formencode.Invalid as errors:
 
            log.info(errors)
 
            return htmlfill.render(
 
                render('admin/repos/repo_add.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                force_defaults=False,
 
                encoding="UTF-8")
 

	
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            msg = (_('Error creating repository %s')
 
                   % form_result.get('repo_name'))
 
            h.flash(msg, category='error')
 
            raise HTTPFound(location=url('home'))
 

	
 
        raise HTTPFound(location=h.url('repo_creating_home',
 
                              repo_name=form_result['repo_name_full'],
 
                              task_id=task_id))
 

	
 
    @NotAnonymous()
 
    def create_repository(self):
 
        self.__load_defaults()
 
        if not c.repo_groups:
 
            raise HTTPForbidden
 
        parent_group = request.GET.get('parent_group')
 

	
 
        ## apply the defaults from defaults page
 
        defaults = Setting.get_default_repo_settings(strip_prefix=True)
 
        if parent_group:
 
            prg = RepoGroup.get(parent_group)
 
            if prg is None or not any(rgc[0] == prg.group_id
 
                                      for rgc in c.repo_groups):
 
                raise HTTPForbidden
 
            defaults.update({'repo_group': parent_group})
 

	
 
        return htmlfill.render(
 
            render('admin/repos/repo_add.html'),
 
            defaults=defaults,
 
            errors={},
 
            prefix_error=False,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @LoginRequired()
 
    @NotAnonymous()
 
    def repo_creating(self, repo_name):
 
        c.repo = repo_name
 
        c.task_id = request.GET.get('task_id')
 
        if not c.repo:
 
            raise HTTPNotFound()
 
        return render('admin/repos/repo_creating.html')
 

	
 
    @LoginRequired()
 
    @NotAnonymous()
 
    @jsonify
 
    def repo_check(self, repo_name):
 
        c.repo = repo_name
 
        task_id = request.GET.get('task_id')
 

	
 
        if task_id and task_id not in ['None']:
 
            from kallithea import CELERY_ON
 
            from celery.result import AsyncResult
 
            from kallithea.lib import celerypylons
 
            if CELERY_ON:
 
                task = AsyncResult(task_id)
 
                task = celerypylons.result.AsyncResult(task_id)
 
                if task.failed():
 
                    raise HTTPInternalServerError(task.traceback)
 

	
 
        repo = Repository.get_by_repo_name(repo_name)
 
        if repo and repo.repo_state == Repository.STATE_CREATED:
 
            if repo.clone_uri:
 
                h.flash(_('Created repository %s from %s')
 
                        % (repo.repo_name, repo.clone_uri_hidden), category='success')
 
            else:
 
                repo_url = h.link_to(repo.repo_name,
 
                                     h.url('summary_home',
 
                                           repo_name=repo.repo_name))
 
                fork = repo.fork
 
                if fork is not None:
 
                    fork_name = fork.repo_name
 
                    h.flash(h.literal(_('Forked repository %s as %s')
 
                            % (fork_name, repo_url)), category='success')
 
                else:
 
                    h.flash(h.literal(_('Created repository %s') % repo_url),
 
                            category='success')
 
            return {'result': True}
 
        return {'result': False}
 

	
 
    @HasRepoPermissionAnyDecorator('repository.admin')
 
    def update(self, repo_name):
 
        c.repo_info = self._load_repo(repo_name)
 
        self.__load_defaults(c.repo_info)
 
        c.active = 'settings'
 
        c.repo_fields = RepositoryField.query() \
 
            .filter(RepositoryField.repository == c.repo_info).all()
 

	
 
        repo_model = RepoModel()
 
        changed_name = repo_name
 
        repo = Repository.get_by_repo_name(repo_name)
 
        old_data = {
 
            'repo_name': repo_name,
 
            'repo_group': repo.group.get_dict() if repo.group else {},
 
            'repo_type': repo.repo_type,
 
        }
 
        _form = RepoForm(edit=True, old_data=old_data,
 
                         repo_groups=c.repo_groups,
 
                         landing_revs=c.landing_revs_choices)()
 

	
 
        try:
 
            form_result = _form.to_python(dict(request.POST))
 
            repo = repo_model.update(repo_name, **form_result)
 
            ScmModel().mark_for_invalidation(repo_name)
 
            h.flash(_('Repository %s updated successfully') % repo_name,
 
                    category='success')
 
            changed_name = repo.repo_name
 
            action_logger(self.authuser, 'admin_updated_repo',
 
                              changed_name, self.ip_addr, self.sa)
 
            Session().commit()
 
        except formencode.Invalid as errors:
 
            log.info(errors)
 
            defaults = self.__load_data(repo_name)
 
            defaults.update(errors.value)
 
            c.users_array = repo_model.get_users_js()
 
            return htmlfill.render(
 
                render('admin/repos/repo_edit.html'),
 
                defaults=defaults,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8",
 
                force_defaults=False)
 

	
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('Error occurred during update of repository %s') \
 
                    % repo_name, category='error')
 
        raise HTTPFound(location=url('edit_repo', repo_name=changed_name))
 

	
 
    @HasRepoPermissionAnyDecorator('repository.admin')
 
    def delete(self, repo_name):
 
        repo_model = RepoModel()
 
        repo = repo_model.get_by_repo_name(repo_name)
 
        if not repo:
 
            h.not_mapped_error(repo_name)
 
            raise HTTPFound(location=url('repos'))
 
        try:
 
            _forks = repo.forks.count()
 
            handle_forks = None
 
            if _forks and request.POST.get('forks'):
 
                do = request.POST['forks']
 
                if do == 'detach_forks':
 
                    handle_forks = 'detach'
 
                    h.flash(_('Detached %s forks') % _forks, category='success')
 
                elif do == 'delete_forks':
 
                    handle_forks = 'delete'
 
                    h.flash(_('Deleted %s forks') % _forks, category='success')
 
            repo_model.delete(repo, forks=handle_forks)
 
            action_logger(self.authuser, 'admin_deleted_repo',
 
                  repo_name, self.ip_addr, self.sa)
 
            ScmModel().mark_for_invalidation(repo_name)
 
            h.flash(_('Deleted repository %s') % repo_name, category='success')
 
            Session().commit()
 
        except AttachedForksError:
 
            h.flash(_('Cannot delete repository %s which still has forks')
 
                        % repo_name, category='warning')
 

	
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during deletion of %s') % repo_name,
 
                    category='error')
 

	
 
        if repo.group:
 
            raise HTTPFound(location=url('repos_group_home', group_name=repo.group.group_name))
 
        raise HTTPFound(location=url('repos'))
 

	
 
    @HasRepoPermissionAnyDecorator('repository.admin')
 
    def edit(self, repo_name):
 
        defaults = self.__load_data(repo_name)
 
        c.repo_fields = RepositoryField.query() \
 
            .filter(RepositoryField.repository == c.repo_info).all()
 
        repo_model = RepoModel()
 
        c.users_array = repo_model.get_users_js()
 
        c.active = 'settings'
 
        return htmlfill.render(
 
            render('admin/repos/repo_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasRepoPermissionAnyDecorator('repository.admin')
 
    def edit_permissions(self, repo_name):
 
        c.repo_info = self._load_repo(repo_name)
 
        repo_model = RepoModel()
 
        c.users_array = repo_model.get_users_js()
 
        c.user_groups_array = repo_model.get_user_groups_js()
 
        c.active = 'permissions'
 
        defaults = RepoModel()._get_defaults(repo_name)
 

	
 
        return htmlfill.render(
 
            render('admin/repos/repo_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def edit_permissions_update(self, repo_name):
 
        form = RepoPermsForm()().to_python(request.POST)
 
        RepoModel()._update_permissions(repo_name, form['perms_new'],
 
                                        form['perms_updates'])
 
        #TODO: implement this
 
        #action_logger(self.authuser, 'admin_changed_repo_permissions',
 
        #              repo_name, self.ip_addr, self.sa)
 
        Session().commit()
 
        h.flash(_('Repository permissions updated'), category='success')
 
        raise HTTPFound(location=url('edit_repo_perms', repo_name=repo_name))
 

	
 
    def edit_permissions_revoke(self, repo_name):
 
        try:
 
            obj_type = request.POST.get('obj_type')
 
            obj_id = None
 
            if obj_type == 'user':
 
                obj_id = safe_int(request.POST.get('user_id'))
 
            elif obj_type == 'user_group':
 
                obj_id = safe_int(request.POST.get('user_group_id'))
 

	
 
            if obj_type == 'user':
 
                RepoModel().revoke_user_permission(repo=repo_name, user=obj_id)
 
            elif obj_type == 'user_group':
 
                RepoModel().revoke_user_group_permission(
 
                    repo=repo_name, group_name=obj_id
 
                )
 
            #TODO: implement this
 
            #action_logger(self.authuser, 'admin_revoked_repo_permissions',
 
            #              repo_name, self.ip_addr, self.sa)
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during revoking of permission'),
 
                    category='error')
 
            raise HTTPInternalServerError()
 

	
 
    @HasRepoPermissionAnyDecorator('repository.admin')
 
    def edit_fields(self, repo_name):
 
        c.repo_info = self._load_repo(repo_name)
 
        c.repo_fields = RepositoryField.query() \
 
            .filter(RepositoryField.repository == c.repo_info).all()
 
        c.active = 'fields'
 
        if request.POST:
 

	
 
            raise HTTPFound(location=url('repo_edit_fields'))
 
        return render('admin/repos/repo_edit.html')
 

	
 
    @HasRepoPermissionAnyDecorator('repository.admin')
 
    def create_repo_field(self, repo_name):
 
        try:
 
            form_result = RepoFieldForm()().to_python(dict(request.POST))
 
            new_field = RepositoryField()
 
            new_field.repository = Repository.get_by_repo_name(repo_name)
 
            new_field.field_key = form_result['new_field_key']
kallithea/lib/celerylib/__init__.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.celerylib
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
celery libs for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Nov 27, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import os
 
import logging
 

	
 
from pylons import config
 

	
 
from hashlib import md5
 
from decorator import decorator
 

	
 
from kallithea import CELERY_ON, CELERY_EAGER
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.lib.pidlock import DaemonLock, LockHeld
 
from kallithea.model import init_model
 
from kallithea.model import meta
 

	
 
from sqlalchemy import engine_from_config
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class FakeTask(object):
 
    """Fake a sync result to make it look like a finished task"""
 

	
 
    def __init__(self, result):
 
        self.result = result
 

	
 
    def failed(self):
 
        return False
 

	
 
    traceback = None # if failed
 

	
 
    task_id = None
 

	
 

	
 
def task(f_org):
 
    """Wrapper of celery.task.task, running async if CELERY_ON
 
    """
 

	
 
    if CELERY_ON:
 
        def f_async(*args, **kwargs):
 
            log.info('executing %s task', f_org.__name__)
 
            try:
 
                f_org(*args, **kwargs)
 
            finally:
 
                log.info('executed %s task', f_org.__name__)
 
        f_async.__name__ = f_org.__name__
 
        import celery.task
 
        runner = celery.task.task(ignore_result=True)(f_async)
 
        from kallithea.lib import celerypylons
 
        runner = celerypylons.task(ignore_result=True)(f_async)
 
        def f_wrapped(*args, **kwargs):
 
            t = runner.apply_async(args=args, kwargs=kwargs)
 
            log.info('executing task %s in async mode - id %s', f_org, t.task_id)
 
            return t
 
    else:
 
        def f_wrapped(*args, **kwargs):
 
            log.info('executing task %s in sync', f_org.__name__)
 
            try:
 
                result = f_org(*args, **kwargs)
 
            except Exception as e:
 
                log.error('exception executing sync task %s in sync', f_org.__name__, e)
 
                raise # TODO: return this in FakeTask as with async tasks?
 
            return FakeTask(result)
 

	
 
    return f_wrapped
 

	
 

	
 
def __get_lockkey(func, *fargs, **fkwargs):
 
    params = list(fargs)
 
    params.extend(['%s-%s' % ar for ar in fkwargs.items()])
 

	
 
    func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
 

	
 
    lockkey = 'task_%s.lock' % \
 
        md5(func_name + '-' + '-'.join(map(safe_str, params))).hexdigest()
 
    return lockkey
 

	
 

	
 
def locked_task(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        lockkey = __get_lockkey(func, *fargs, **fkwargs)
 
        lockkey_path = config['app_conf']['cache_dir']
 

	
 
        log.info('running task with lockkey %s', lockkey)
 
        try:
 
            l = DaemonLock(file_=os.path.join(lockkey_path, lockkey))
 
            ret = func(*fargs, **fkwargs)
 
            l.release()
 
            return ret
 
        except LockHeld:
 
            log.info('LockHeld')
 
            return 'Task with key %s already running' % lockkey
 

	
 
    return decorator(__wrapper, func)
 

	
 

	
 
def get_session():
 
    if CELERY_ON:
 
        engine = engine_from_config(config, 'sqlalchemy.db1.')
 
        init_model(engine)
 
    sa = meta.Session()
 
    return sa
 

	
 

	
 
def dbsession(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        try:
 
            ret = func(*fargs, **fkwargs)
 
            return ret
 
        finally:
 
            if CELERY_ON and not CELERY_EAGER:
 
                meta.Session.remove()
 

	
 
    return decorator(__wrapper, func)
kallithea/lib/celerypylons/__init__.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
"""
 
Automatically sets the environment variable `CELERY_LOADER` to
 
`celerypylons.loader:PylonsLoader`.  This ensures the loader is
 
specified when accessing the rest of this package, and allows celery
 
to be installed in a webapp just by importing celerypylons::
 
Kallithea wrapper of Celery
 

	
 
The Celery configuration is in the normal Pylons ini file. We thus have to set
 
the `CELERY_LOADER` environment variable to point at a custom "loader" that can
 
read it. That environment variable must be set *before* importing celery. To
 
ensure that, we wrap celery in this module.
 

	
 
    import celerypylons
 
Also, the loader depends on Pylons being configured to it can read the Celery
 
configuration out of it. To make sure that really is the case and give an early
 
warning, we check one of the mandatory settings.
 

	
 
This module must thus not be imported in global scope but must be imported on
 
demand in function scope.
 
"""
 

	
 
import os
 
import warnings
 

	
 
# Verify Pylons configuration has been loaded
 
from pylons import config
 
assert config['celery.imports'] == 'kallithea.lib.celerylib.tasks', 'Kallithea Celery configuration has not been loaded'
 

	
 
# Prepare environment to point at Kallithea Pylons loader
 
CELERYPYLONS_LOADER = 'kallithea.lib.celerypylons.loader.PylonsLoader'
 
if os.environ.get('CELERY_LOADER', CELERYPYLONS_LOADER) != CELERYPYLONS_LOADER:
 
    warnings.warn("'CELERY_LOADER' environment variable will be overridden by celery-pylons.")
 
os.environ['CELERY_LOADER'] = CELERYPYLONS_LOADER
 

	
 
# Import (and expose) celery, thus immediately triggering use of the custom Pylons loader
 
import celery.app as app
 
import celery.result as result
 
from celery.task import task
 
from celery.bin import camqadm, celerybeat, celeryd, celeryev
kallithea/lib/celerypylons/commands.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
import kallithea
 
from kallithea.lib.paster_commands.common import BasePasterCommand
 
from kallithea.lib.utils import Command, load_rcextensions
 
from celery.app import app_or_default
 
from celery.bin import camqadm, celerybeat, celeryd, celeryev
 

	
 

	
 
from kallithea.lib.utils2 import str2bool
 

	
 
__all__ = ['CeleryDaemonCommand', 'CeleryBeatCommand',
 
           'CAMQPAdminCommand', 'CeleryEventCommand']
 

	
 

	
 
class CeleryCommand(BasePasterCommand):
 
    """Abstract class implements run methods needed for celery
 

	
 
    Starts the celery worker that uses a paste.deploy configuration
 
    file.
 
    """
 

	
 
    def update_parser(self):
 
        """
 
        Abstract method.  Allows for the class's parser to be updated
 
        before the superclass's `run` method is called.  Necessary to
 
        allow options/arguments to be passed through to the underlying
 
        celery command.
 
        """
 

	
 
        cmd = self.celery_command(app_or_default())
 
        from kallithea.lib import celerypylons
 
        cmd = self.celery_command(celerypylons.app.app_or_default())
 
        for x in cmd.get_options():
 
            self.parser.add_option(x)
 

	
 
    def command(self):
 
        from kallithea.lib import celerypylons
 
        from pylons import config
 
        try:
 
            CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
 
        except KeyError:
 
            CELERY_ON = False
 

	
 
        if not CELERY_ON:
 
            raise Exception('Please set use_celery = true in .ini config '
 
                            'file before running celeryd')
 
        kallithea.CELERY_ON = CELERY_ON
 
        load_rcextensions(config['here'])
 
        cmd = self.celery_command(app_or_default())
 
        cmd = self.celery_command(celerypylons.app.app_or_default())
 
        return cmd.run(**vars(self.options))
 

	
 

	
 
class CeleryDaemonCommand(CeleryCommand):
 
    """Start the celery worker
 

	
 
    Starts the celery worker that uses a paste.deploy configuration
 
    file.
 
    """
 
    usage = 'CONFIG_FILE [celeryd options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 
    celery_command = celeryd.WorkerCommand
 
    celery_command = celerypylons.celeryd.WorkerCommand
 

	
 

	
 
class CeleryBeatCommand(CeleryCommand):
 
    """Start the celery beat server
 

	
 
    Starts the celery beat server using a paste.deploy configuration
 
    file.
 
    """
 
    usage = 'CONFIG_FILE [celerybeat options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 
    celery_command = celerybeat.BeatCommand
 
    celery_command = celerypylons.celerybeat.BeatCommand
 

	
 

	
 
class CAMQPAdminCommand(CeleryCommand):
 
    """CAMQP Admin
 

	
 
    CAMQP celery admin tool.
 
    """
 
    usage = 'CONFIG_FILE [camqadm options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 
    celery_command = camqadm.AMQPAdminCommand
 
    celery_command = celerypylons.camqadm.AMQPAdminCommand
 

	
 

	
 
class CeleryEventCommand(CeleryCommand):
 
    """Celery event command.
 

	
 
    Capture celery events.
 
    """
 
    usage = 'CONFIG_FILE [celeryev options...]'
 
    summary = __doc__.splitlines()[0]
 
    description = "".join(__doc__.splitlines()[2:])
 

	
 
    parser = Command.standard_parser(quiet=True)
 
    celery_command = celeryev.EvCommand
 
    celery_command = celerypylons.celeryev.EvCommand
kallithea/lib/paster_commands/common.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.paster_commands.common
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Common code for Paster commands.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 18, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import logging
 

	
 
import paste
 
from paste.script.command import Command, BadCommand
 

	
 
from kallithea.lib.utils import setup_cache_regions
 

	
 

	
 
def ask_ok(prompt, retries=4, complaint='Yes or no please!'):
 
    while True:
 
        ok = raw_input(prompt)
 
        if ok in ('y', 'ye', 'yes'):
 
            return True
 
        if ok in ('n', 'no', 'nop', 'nope'):
 
            return False
 
        retries = retries - 1
 
        if retries < 0:
 
            raise IOError
 
        print complaint
 

	
 

	
 
class BasePasterCommand(Command):
 
    """
 
    Abstract Base Class for paster commands.
 

	
 
    The celery commands are somewhat aggressive about loading
 
    celery.conf, and since our module sets the `CELERY_LOADER`
 
    environment variable to our loader, we have to bootstrap a bit and
 
    make sure we've had a chance to load the pylons config off of the
 
    command line, otherwise everything fails.
 
    """
 
    min_args = 1
 
    min_args_error = "Please provide a paster config file as an argument."
 
    takes_config_file = 1
 
    requires_config_file = True
 

	
 
    def run(self, args):
 
        """
 
        Overrides Command.run
 

	
 
        Checks for a config file argument and loads it.
 
        """
 
        if len(args) < self.min_args:
 
            raise BadCommand(
 
                self.min_args_error % {'min_args': self.min_args,
 
                                       'actual_args': len(args)})
 

	
 
        # Decrement because we're going to lob off the first argument.
 
        # @@ This is hacky
 
        self.min_args -= 1
 
        self.bootstrap_config(args[0])
 
        self.update_parser()
 
        return super(BasePasterCommand, self).run(args[1:])
 

	
 
    def update_parser(self):
 
        """
 
        Abstract method.  Allows for the class's parser to be updated
 
        before the superclass's `run` method is called.  Necessary to
 
        allow options/arguments to be passed through to the underlying
 
        celery command.
 
        """
 
        raise NotImplementedError("Abstract Method.")
 

	
 
    def bootstrap_config(self, conf):
 
        """
 
        Loads the pylons configuration.
 
        """
 
        from pylons import config as pylonsconfig
 

	
 
        self.path_to_ini_file = os.path.realpath(conf)
 
        conf = paste.deploy.appconfig('config:' + self.path_to_ini_file)
 
        pylonsconfig.init_app(conf.global_conf, conf.local_conf)
 

	
 
    def _init_session(self):
 
        """
 
        Inits SqlAlchemy Session
 
        """
 
        logging.config.fileConfig(self.path_to_ini_file)
 

	
 
        from pylons import config
 
        from kallithea.model import init_model
 
        from kallithea.lib.utils2 import engine_from_config
 
        setup_cache_regions(config)
 
        engine = engine_from_config(config, 'sqlalchemy.db1.')
 
        init_model(engine)
0 comments (0 inline, 0 general)