Changeset - cd8a7e3698bc
[Not reviewed]
beta
0 2 0
Marcin Kuzminski - 14 years ago 2012-01-20 07:11:00
marcin@python-works.com
fixes #340 session cleanup for celery tasks
2 files changed with 57 insertions and 39 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/celerylib/__init__.py
Show inline comments
 
@@ -26,29 +26,32 @@
 
import os
 
import sys
 
import socket
 
import traceback
 
import logging
 
from os.path import dirname as dn, join as jn
 
from pylons import config
 

	
 
from hashlib import md5
 
from decorator import decorator
 

	
 
from vcs.utils.lazy import LazyProperty
 
from rhodecode import CELERY_ON
 
from rhodecode.lib import str2bool, safe_str
 
from rhodecode.lib.pidlock import DaemonLock, LockHeld
 
from rhodecode.model import init_model
 
from rhodecode.model import meta
 
from rhodecode.model.db import Statistics, Repository, User
 

	
 
from sqlalchemy import engine_from_config
 

	
 
from celery.messaging import establish_connection
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 

	
 

	
 
class ResultWrapper(object):
 
    def __init__(self, task):
 
        self.task = task
 

	
 
    @LazyProperty
 
    def result(self):
 
@@ -100,6 +103,25 @@ def locked_task(func):
 
            return ret
 
        except LockHeld:
 
            log.info('LockHeld')
 
            return 'Task with key %s already running' % lockkey
 

	
 
    return decorator(__wrapper, func)
 

	
 

	
 
def get_session():
 
    if CELERY_ON:
 
        engine = engine_from_config(config, 'sqlalchemy.db1.')
 
        init_model(engine)
 
    sa = meta.Session
 
    return sa
 

	
 

	
 
def dbsession(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        try:
 
            ret = func(*fargs, **fkwargs)
 
            return ret
 
        finally:
 
            meta.Session.remove()
 

	
 
    return decorator(__wrapper, func)
rhodecode/lib/celerylib/tasks.py
Show inline comments
 
@@ -38,38 +38,28 @@ from pylons import config, url
 
from pylons.i18n.translation import _
 

	
 
from vcs import get_backend
 

	
 
from rhodecode import CELERY_ON
 
from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
 
from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
 
    __get_lockkey, LockHeld, DaemonLock
 
from rhodecode.lib.celerylib import run_task, locked_task, dbsession, \
 
    str2bool, __get_lockkey, LockHeld, DaemonLock, get_session
 
from rhodecode.lib.helpers import person
 
from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
 
from rhodecode.lib.utils import add_cache, action_logger
 
from rhodecode.lib.compat import json, OrderedDict
 

	
 
from rhodecode.model import init_model
 
from rhodecode.model import meta
 
from rhodecode.model.db import Statistics, Repository, User
 

	
 
from sqlalchemy import engine_from_config
 

	
 
add_cache(config)
 

	
 
__all__ = ['whoosh_index', 'get_commits_stats',
 
           'reset_user_password', 'send_email']
 

	
 

	
 
def get_session():
 
    if CELERY_ON:
 
        engine = engine_from_config(config, 'sqlalchemy.db1.')
 
        init_model(engine)
 
    sa = meta.Session
 
    return sa
 

	
 
def get_logger(cls):
 
    if CELERY_ON:
 
        try:
 
            log = cls.get_logger()
 
        except:
 
            log = logging.getLogger(__name__)
 
@@ -78,35 +68,36 @@ def get_logger(cls):
 

	
 
    return log
 

	
 

	
 
@task(ignore_result=True)
 
@locked_task
 
@dbsession
 
def whoosh_index(repo_location, full_index):
 
    from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
 

	
 
    # log = whoosh_index.get_logger(whoosh_index)
 
    log = whoosh_index.get_logger(whoosh_index)
 
    DBS = get_session()
 

	
 
    index_location = config['index_dir']
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location, sa=get_session())\
 
                         repo_location=repo_location, sa=DBS)\
 
                         .run(full_index=full_index)
 

	
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
 
    log = get_logger(get_commits_stats)
 

	
 
    DBS = get_session()
 
    lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
 
                            ts_max_y)
 
    lockkey_path = config['here']
 

	
 
    log.info('running task with lockkey %s', lockkey)
 

	
 
    try:
 
        sa = get_session()
 
        lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
 

	
 
        # for js data compatibilty cleans the key for person from '
 
        akc = lambda k: person(k).replace('"', "")
 

	
 
        co_day_auth_aggr = {}
 
@@ -125,15 +116,15 @@ def get_commits_stats(repo_name, ts_min_
 
        skip_date_limit = True
 
        parse_limit = int(config['app_conf'].get('commit_parse_limit'))
 
        last_rev = None
 
        last_cs = None
 
        timegetter = itemgetter('time')
 

	
 
        dbrepo = sa.query(Repository)\
 
        dbrepo = DBS.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).scalar()
 
        cur_stats = sa.query(Statistics)\
 
        cur_stats = DBS.query(Statistics)\
 
            .filter(Statistics.repository == dbrepo).scalar()
 

	
 
        if cur_stats is not None:
 
            last_rev = cur_stats.stat_on_revision
 

	
 
        if last_rev == repo.get_changeset().revision and repo_size > 1:
 
@@ -231,17 +222,17 @@ def get_commits_stats(repo_name, ts_min_
 
            log.debug('getting code trending stats')
 
            stats.languages = json.dumps(__get_codes_stats(repo_name))
 

	
 
        try:
 
            stats.repository = dbrepo
 
            stats.stat_on_revision = last_cs.revision if last_cs else 0
 
            sa.add(stats)
 
            sa.commit()
 
            DBS.add(stats)
 
            DBS.commit()
 
        except:
 
            log.error(traceback.format_exc())
 
            sa.rollback()
 
            DBS.rollback()
 
            lock.release()
 
            return False
 

	
 
        #final release
 
        lock.release()
 

	
 
@@ -251,19 +242,20 @@ def get_commits_stats(repo_name, ts_min_
 
        return True
 
    except LockHeld:
 
        log.info('LockHeld')
 
        return 'Task with key %s already running' % lockkey
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def send_password_link(user_email):
 
    from rhodecode.model.notification import EmailNotificationModel
 

	
 
    log = get_logger(send_password_link)
 

	
 
    DBS = get_session()
 
    
 
    try:
 
        sa = get_session()
 
        user = User.get_by_email(user_email)
 
        if user:
 
            log.debug('password reset user found %s' % user)
 
            link = url('reset_password_confirmation', key=user.api_key,
 
                       qualified=True)
 
            reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
 
@@ -280,34 +272,35 @@ def send_password_link(user_email):
 
        log.error(traceback.format_exc())
 
        return False
 

	
 
    return True
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def reset_user_password(user_email):
 
    from rhodecode.lib import auth
 

	
 
    log = get_logger(reset_user_password)
 

	
 
    DBS = get_session()
 
    
 
    try:
 
        try:
 
            sa = get_session()
 
            user = User.get_by_email(user_email)
 
            new_passwd = auth.PasswordGenerator().gen_password(8,
 
                             auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
 
            if user:
 
                user.password = auth.get_crypt_password(new_passwd)
 
                user.api_key = auth.generate_api_key(user.username)
 
                sa.add(user)
 
                sa.commit()
 
                DBS.add(user)
 
                DBS.commit()
 
                log.info('change password for %s', user_email)
 
            if new_passwd is None:
 
                raise Exception('unable to generate new password')
 
        except:
 
            log.error(traceback.format_exc())
 
            sa.rollback()
 
            DBS.rollback()
 

	
 
        run_task(send_email, user_email,
 
                 'Your new password',
 
                 'Your new RhodeCode password:%s' % (new_passwd))
 
        log.info('send new password mail to %s', user_email)
 

	
 
@@ -316,24 +309,26 @@ def reset_user_password(user_email):
 
        log.error(traceback.format_exc())
 

	
 
    return True
 

	
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def send_email(recipients, subject, body, html_body=''):
 
    """
 
    Sends an email with defined parameters from the .ini files.
 

	
 
    :param recipients: list of recipients, it this is empty the defined email
 
        address from field 'email_to' is used instead
 
    :param subject: subject of the mail
 
    :param body: body of the mail
 
    :param html_body: html version of body
 
    """
 
    log = get_logger(send_email)
 
    sa = get_session()
 
    DBS = get_session()
 
    
 
    email_config = config
 
    subject = "%s %s" % (email_config.get('email_prefix'), subject)
 
    if not recipients:
 
        # if recipients are not defined we send to email_config + all admins
 
        admins = [u.email for u in User.query()
 
                  .filter(User.admin == True).all()]
 
@@ -358,27 +353,28 @@ def send_email(recipients, subject, body
 
        log.error(traceback.format_exc())
 
        return False
 
    return True
 

	
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def create_repo_fork(form_data, cur_user):
 
    """
 
    Creates a fork of repository using interval VCS methods
 

	
 
    :param form_data:
 
    :param cur_user:
 
    """
 
    from rhodecode.model.repo import RepoModel
 

	
 
    log = get_logger(create_repo_fork)
 

	
 
    Session = get_session()
 
    DBS = create_repo_fork.DBS
 
    
 
    base_path = Repository.base_path()
 

	
 
    RepoModel(Session).create(form_data, cur_user, just_db=True, fork=True)
 
    RepoModel(DBS).create(form_data, cur_user, just_db=True, fork=True)
 

	
 
    alias = form_data['repo_type']
 
    org_repo_name = form_data['org_path']
 
    fork_name = form_data['repo_name_full']
 
    update_after_clone = form_data['update_after_clone']
 
    source_repo_path = os.path.join(base_path, org_repo_name)
 
@@ -388,18 +384,18 @@ def create_repo_fork(form_data, cur_user
 
             destination_fork_path)
 
    backend = get_backend(alias)
 
    backend(safe_str(destination_fork_path), create=True,
 
            src_url=safe_str(source_repo_path),
 
            update_after_clone=update_after_clone)
 
    action_logger(cur_user, 'user_forked_repo:%s' % fork_name,
 
                   org_repo_name, '', Session)
 
                   org_repo_name, '', DBS)
 

	
 
    action_logger(cur_user, 'user_created_fork:%s' % fork_name,
 
                   fork_name, '', Session)
 
                   fork_name, '', DBS)
 
    # finally commit at latest possible stage
 
    Session.commit()
 
    DBS.commit()
 

	
 
def __get_codes_stats(repo_name):
 
    repo = Repository.get_by_repo_name(repo_name).scm_instance
 

	
 
    tip = repo.get_changeset()
 
    code_stats = {}
0 comments (0 inline, 0 general)