Changeset - 1e8458068791
[Not reviewed]
default
0 1 0
Mads Kiilerich - 6 years ago 2020-02-14 13:56:58
mads@kiilerich.com
Grafted from: 1e9efd60d19d
celery: fix logging from inside tasks

Celery will hijack the global logging configuration and disable the usual
Kallithea logging (unless CELERYD_HIJACK_ROOT_LOGGER is set False).

Fixed partially by using the special celery loggers inside the tasks. Logging
from modules will still not work.
1 file changed with 2 insertions and 2 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/celerylib/tasks.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.celerylib.tasks
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Kallithea task modules, containing all task that suppose to be run
 
by celery daemon
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Oct 6, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import email.utils
 
import logging
 
import os
 
import traceback
 
from collections import OrderedDict
 
from operator import itemgetter
 
from time import mktime
 

	
 
import celery.utils.log
 
from tg import config
 

	
 
import kallithea
 
from kallithea.lib import celerylib, ext_json
 
from kallithea.lib.helpers import person
 
from kallithea.lib.hooks import log_create_repository
 
from kallithea.lib.rcmail.smtp_mailer import SmtpMailer
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.utils2 import ascii_bytes, str2bool
 
from kallithea.lib.vcs.utils import author_email
 
from kallithea.model.db import RepoGroup, Repository, Statistics, User
 

	
 

	
 
__all__ = ['whoosh_index', 'get_commits_stats', 'send_email']
 

	
 

	
 
log = logging.getLogger(__name__)
 
log = celery.utils.log.get_task_logger(__name__)
 

	
 

	
 
@celerylib.task
 
@celerylib.locked_task
 
@celerylib.dbsession
 
def whoosh_index(repo_location, full_index):
 
    from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
    celerylib.get_session() # initialize database connection
 

	
 
    index_location = config['index_dir']
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location) \
 
                         .run(full_index=full_index)
 

	
 

	
 
@celerylib.task
 
@celerylib.dbsession
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit=100):
 
    DBS = celerylib.get_session()
 
    lockkey = celerylib.__get_lockkey('get_commits_stats', repo_name, ts_min_y,
 
                            ts_max_y)
 
    lockkey_path = config.get('cache_dir') or config['app_conf']['cache_dir']  # Backward compatibility for TurboGears < 2.4
 

	
 
    log.info('running task with lockkey %s', lockkey)
 

	
 
    try:
 
        lock = celerylib.DaemonLock(os.path.join(lockkey_path, lockkey))
 

	
 
        # for js data compatibility cleans the key for person from '
 
        akc = lambda k: person(k).replace('"', "")
 

	
 
        co_day_auth_aggr = {}
 
        commits_by_day_aggregate = {}
 
        repo = Repository.get_by_repo_name(repo_name)
 
        if repo is None:
 
            return True
 

	
 
        repo = repo.scm_instance
 
        repo_size = repo.count()
 
        # return if repo have no revisions
 
        if repo_size < 1:
 
            lock.release()
 
            return True
 

	
 
        skip_date_limit = True
 
        parse_limit = int(config.get('commit_parse_limit'))
 
        last_rev = None
 
        last_cs = None
 
        timegetter = itemgetter('time')
 

	
 
        dbrepo = DBS.query(Repository) \
 
            .filter(Repository.repo_name == repo_name).scalar()
 
        cur_stats = DBS.query(Statistics) \
 
            .filter(Statistics.repository == dbrepo).scalar()
 

	
 
        if cur_stats is not None:
 
            last_rev = cur_stats.stat_on_revision
 

	
 
        if last_rev == repo.get_changeset().revision and repo_size > 1:
 
            # pass silently without any work if we're not on first revision or
 
            # current state of parsing revision(from db marker) is the
 
            # last revision
 
            lock.release()
 
            return True
 

	
 
        if cur_stats:
 
            commits_by_day_aggregate = OrderedDict(ext_json.loads(
 
                                        cur_stats.commit_activity_combined))
 
            co_day_auth_aggr = ext_json.loads(cur_stats.commit_activity)
 

	
 
        log.debug('starting parsing %s', parse_limit)
 

	
 
        last_rev = last_rev + 1 if last_rev and last_rev >= 0 else 0
 
        log.debug('Getting revisions from %s to %s',
 
             last_rev, last_rev + parse_limit
 
        )
 
        for cs in repo[last_rev:last_rev + parse_limit]:
 
            log.debug('parsing %s', cs)
 
            last_cs = cs  # remember last parsed changeset
 
            tt = cs.date.timetuple()
 
            k = mktime(tt[:3] + (0, 0, 0, 0, 0, 0))
 

	
 
            if akc(cs.author) in co_day_auth_aggr:
 
                try:
 
                    l = [timegetter(x) for x in
 
                         co_day_auth_aggr[akc(cs.author)]['data']]
 
                    time_pos = l.index(k)
 
                except ValueError:
 
                    time_pos = None
 

	
 
                if time_pos is not None and time_pos >= 0:
 
                    datadict = \
 
                        co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
 

	
 
                    datadict["commits"] += 1
 
                    datadict["added"] += len(cs.added)
0 comments (0 inline, 0 general)