Changeset - cd8a7e3698bc
[Not reviewed]
beta
0 2 0
Marcin Kuzminski - 14 years ago 2012-01-20 07:11:00
marcin@python-works.com
fixes #340 session cleanup for celery tasks
2 files changed with 54 insertions and 36 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/celerylib/__init__.py
Show inline comments
 
@@ -29,6 +29,7 @@ import socket
 
import traceback
 
import logging
 
from os.path import dirname as dn, join as jn
 
from pylons import config
 

	
 
from hashlib import md5
 
from decorator import decorator
 
@@ -37,15 +38,17 @@ from vcs.utils.lazy import LazyProperty
 
from rhodecode import CELERY_ON
 
from rhodecode.lib import str2bool, safe_str
 
from rhodecode.lib.pidlock import DaemonLock, LockHeld
 
from rhodecode.model import init_model
 
from rhodecode.model import meta
 
from rhodecode.model.db import Statistics, Repository, User
 

	
 
from sqlalchemy import engine_from_config
 

	
 
from celery.messaging import establish_connection
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 

	
 

	
 
class ResultWrapper(object):
 
    def __init__(self, task):
 
        self.task = task
 
@@ -103,3 +106,22 @@ def locked_task(func):
 
            return 'Task with key %s already running' % lockkey
 

	
 
    return decorator(__wrapper, func)
 

	
 

	
 
def get_session():
 
    if CELERY_ON:
 
        engine = engine_from_config(config, 'sqlalchemy.db1.')
 
        init_model(engine)
 
    sa = meta.Session
 
    return sa
 

	
 

	
 
def dbsession(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        try:
 
            ret = func(*fargs, **fkwargs)
 
            return ret
 
        finally:
 
            meta.Session.remove()
 

	
 
    return decorator(__wrapper, func)
rhodecode/lib/celerylib/tasks.py
Show inline comments
 
@@ -41,18 +41,15 @@ from vcs import get_backend
 

	
 
from rhodecode import CELERY_ON
 
from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
 
from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
 
    __get_lockkey, LockHeld, DaemonLock
 
from rhodecode.lib.celerylib import run_task, locked_task, dbsession, \
 
    str2bool, __get_lockkey, LockHeld, DaemonLock, get_session
 
from rhodecode.lib.helpers import person
 
from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
 
from rhodecode.lib.utils import add_cache, action_logger
 
from rhodecode.lib.compat import json, OrderedDict
 

	
 
from rhodecode.model import init_model
 
from rhodecode.model import meta
 
from rhodecode.model.db import Statistics, Repository, User
 

	
 
from sqlalchemy import engine_from_config
 

	
 
add_cache(config)
 

	
 
@@ -60,13 +57,6 @@ __all__ = ['whoosh_index', 'get_commits_
 
           'reset_user_password', 'send_email']
 

	
 

	
 
def get_session():
 
    if CELERY_ON:
 
        engine = engine_from_config(config, 'sqlalchemy.db1.')
 
        init_model(engine)
 
    sa = meta.Session
 
    return sa
 

	
 
def get_logger(cls):
 
    if CELERY_ON:
 
        try:
 
@@ -81,21 +71,23 @@ def get_logger(cls):
 

	
 
@task(ignore_result=True)
 
@locked_task
 
@dbsession
 
def whoosh_index(repo_location, full_index):
 
    from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
 

	
 
    # log = whoosh_index.get_logger(whoosh_index)
 
    log = whoosh_index.get_logger(whoosh_index)
 
    DBS = get_session()
 

	
 
    index_location = config['index_dir']
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location, sa=get_session())\
 
                         repo_location=repo_location, sa=DBS)\
 
                         .run(full_index=full_index)
 

	
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
 
    log = get_logger(get_commits_stats)
 

	
 
    DBS = get_session()
 
    lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
 
                            ts_max_y)
 
    lockkey_path = config['here']
 
@@ -103,7 +95,6 @@ def get_commits_stats(repo_name, ts_min_
 
    log.info('running task with lockkey %s', lockkey)
 

	
 
    try:
 
        sa = get_session()
 
        lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
 

	
 
        # for js data compatibilty cleans the key for person from '
 
@@ -128,9 +119,9 @@ def get_commits_stats(repo_name, ts_min_
 
        last_cs = None
 
        timegetter = itemgetter('time')
 

	
 
        dbrepo = sa.query(Repository)\
 
        dbrepo = DBS.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).scalar()
 
        cur_stats = sa.query(Statistics)\
 
        cur_stats = DBS.query(Statistics)\
 
            .filter(Statistics.repository == dbrepo).scalar()
 

	
 
        if cur_stats is not None:
 
@@ -234,11 +225,11 @@ def get_commits_stats(repo_name, ts_min_
 
        try:
 
            stats.repository = dbrepo
 
            stats.stat_on_revision = last_cs.revision if last_cs else 0
 
            sa.add(stats)
 
            sa.commit()
 
            DBS.add(stats)
 
            DBS.commit()
 
        except:
 
            log.error(traceback.format_exc())
 
            sa.rollback()
 
            DBS.rollback()
 
            lock.release()
 
            return False
 

	
 
@@ -254,13 +245,14 @@ def get_commits_stats(repo_name, ts_min_
 
        return 'Task with key %s already running' % lockkey
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def send_password_link(user_email):
 
    from rhodecode.model.notification import EmailNotificationModel
 

	
 
    log = get_logger(send_password_link)
 
    DBS = get_session()
 

	
 
    try:
 
        sa = get_session()
 
        user = User.get_by_email(user_email)
 
        if user:
 
            log.debug('password reset user found %s' % user)
 
@@ -283,28 +275,29 @@ def send_password_link(user_email):
 
    return True
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def reset_user_password(user_email):
 
    from rhodecode.lib import auth
 

	
 
    log = get_logger(reset_user_password)
 
    DBS = get_session()
 

	
 
    try:
 
        try:
 
            sa = get_session()
 
            user = User.get_by_email(user_email)
 
            new_passwd = auth.PasswordGenerator().gen_password(8,
 
                             auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
 
            if user:
 
                user.password = auth.get_crypt_password(new_passwd)
 
                user.api_key = auth.generate_api_key(user.username)
 
                sa.add(user)
 
                sa.commit()
 
                DBS.add(user)
 
                DBS.commit()
 
                log.info('change password for %s', user_email)
 
            if new_passwd is None:
 
                raise Exception('unable to generate new password')
 
        except:
 
            log.error(traceback.format_exc())
 
            sa.rollback()
 
            DBS.rollback()
 

	
 
        run_task(send_email, user_email,
 
                 'Your new password',
 
@@ -319,6 +312,7 @@ def reset_user_password(user_email):
 

	
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def send_email(recipients, subject, body, html_body=''):
 
    """
 
    Sends an email with defined parameters from the .ini files.
 
@@ -330,7 +324,8 @@ def send_email(recipients, subject, body
 
    :param html_body: html version of body
 
    """
 
    log = get_logger(send_email)
 
    sa = get_session()
 
    DBS = get_session()
 
    
 
    email_config = config
 
    subject = "%s %s" % (email_config.get('email_prefix'), subject)
 
    if not recipients:
 
@@ -361,6 +356,7 @@ def send_email(recipients, subject, body
 

	
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def create_repo_fork(form_data, cur_user):
 
    """
 
    Creates a fork of repository using interval VCS methods
 
@@ -371,11 +367,11 @@ def create_repo_fork(form_data, cur_user
 
    from rhodecode.model.repo import RepoModel
 

	
 
    log = get_logger(create_repo_fork)
 
    DBS = create_repo_fork.DBS
 

	
 
    Session = get_session()
 
    base_path = Repository.base_path()
 

	
 
    RepoModel(Session).create(form_data, cur_user, just_db=True, fork=True)
 
    RepoModel(DBS).create(form_data, cur_user, just_db=True, fork=True)
 

	
 
    alias = form_data['repo_type']
 
    org_repo_name = form_data['org_path']
 
@@ -391,12 +387,12 @@ def create_repo_fork(form_data, cur_user
 
            src_url=safe_str(source_repo_path),
 
            update_after_clone=update_after_clone)
 
    action_logger(cur_user, 'user_forked_repo:%s' % fork_name,
 
                   org_repo_name, '', Session)
 
                   org_repo_name, '', DBS)
 

	
 
    action_logger(cur_user, 'user_created_fork:%s' % fork_name,
 
                   fork_name, '', Session)
 
                   fork_name, '', DBS)
 
    # finally commit at latest possible stage
 
    Session.commit()
 
    DBS.commit()
 

	
 
def __get_codes_stats(repo_name):
 
    repo = Repository.get_by_repo_name(repo_name).scm_instance
0 comments (0 inline, 0 general)