Changeset - afe8cfa32a0f
[Not reviewed]
default
0 3 0
Marcin Kuzminski - 14 years ago 2012-01-21 06:09:18
marcin@python-works.com
backported #340 session cleanup for celery tasks
3 files changed with 37 insertions and 22 deletions:
0 comments (0 inline, 0 general)
rhodecode/__init__.py
Show inline comments
 
@@ -22,13 +22,13 @@
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
import platform
 

	
 
VERSION = (1, 2, 4)
 
VERSION = (1, 2, 5)
 
__version__ = '.'.join((str(each) for each in VERSION[:4]))
 
__dbversion__ = 3  # defines current db version for migrations
 
__platform__ = platform.system()
 
__license__ = 'GPLv3'
 

	
 
PLATFORM_WIN = ('Windows')
rhodecode/lib/celerylib/__init__.py
Show inline comments
 
@@ -4,13 +4,13 @@
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    celery libs for RhodeCode
 

	
 
    :created_on: Nov 27, 2010
 
    :author: marcink
 
    :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
 
    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
@@ -26,25 +26,29 @@
 
import os
 
import sys
 
import socket
 
import traceback
 
import logging
 
from os.path import dirname as dn, join as jn
 
from pylons import config
 

	
 
from hashlib import md5
 
from decorator import decorator
 
from pylons import  config
 

	
 
from vcs.utils.lazy import LazyProperty
 

	
 
from rhodecode.lib import str2bool, safe_str
 
from rhodecode.lib.pidlock import DaemonLock, LockHeld
 
from rhodecode.model import init_model
 
from rhodecode.model import meta
 
from rhodecode.model.db import Statistics, Repository, User
 

	
 
from sqlalchemy import engine_from_config
 

	
 
from celery.messaging import establish_connection
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
try:
 
    CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
 
except KeyError:
 
    CELERY_ON = False
 
@@ -104,6 +108,26 @@ def locked_task(func):
 
            return ret
 
        except LockHeld:
 
            log.info('LockHeld')
 
            return 'Task with key %s already running' % lockkey
 

	
 
    return decorator(__wrapper, func)
 

	
 

	
 
def get_session():
 
    if CELERY_ON:
 
        engine = engine_from_config(config, 'sqlalchemy.db1.')
 
        init_model(engine)
 
    sa = meta.Session
 
    return sa
 

	
 

	
 
def dbsession(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        try:
 
            ret = func(*fargs, **fkwargs)
 
            return ret
 
        finally:
 
            if CELERY_ON:
 
                meta.Session.remove()
 

	
 
    return decorator(__wrapper, func)
rhodecode/lib/celerylib/tasks.py
Show inline comments
 
@@ -5,13 +5,13 @@
 

	
 
    RhodeCode task modules, containing all task that suppose to be run
 
    by celery daemon
 

	
 
    :created_on: Oct 6, 2010
 
    :author: marcink
 
    :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
 
    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
@@ -36,61 +36,51 @@ from string import lower
 

	
 
from pylons import config, url
 
from pylons.i18n.translation import _
 

	
 
from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
 
from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
 
    __get_lockkey, LockHeld, DaemonLock
 
    __get_lockkey, LockHeld, DaemonLock, get_session, dbsession
 
from rhodecode.lib.helpers import person
 
from rhodecode.lib.smtp_mailer import SmtpMailer
 
from rhodecode.lib.utils import add_cache
 
from rhodecode.lib.compat import json, OrderedDict
 

	
 
from rhodecode.model import init_model
 
from rhodecode.model import meta
 
from rhodecode.model.db import RhodeCodeUi, Statistics, Repository, User
 

	
 
from vcs.backends import get_repo
 

	
 
from sqlalchemy import engine_from_config
 
from vcs import get_backend
 

	
 
add_cache(config)
 

	
 
__all__ = ['whoosh_index', 'get_commits_stats',
 
           'reset_user_password', 'send_email']
 

	
 
CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
 

	
 

	
 
def get_session():
 
    if CELERY_ON:
 
        engine = engine_from_config(config, 'sqlalchemy.db1.')
 
        init_model(engine)
 

	
 
    sa = meta.Session()
 
    return sa
 

	
 

	
 
def get_repos_path():
 
    sa = get_session()
 
    q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
 
    return q.ui_value
 

	
 

	
 
@task(ignore_result=True)
 
@locked_task
 
@dbsession
 
def whoosh_index(repo_location, full_index):
 
    #log = whoosh_index.get_logger()
 
    from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
 
    index_location = config['index_dir']
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location, sa=get_session())\
 
                         .run(full_index=full_index)
 

	
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
 
    try:
 
        log = get_commits_stats.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
@@ -245,20 +235,20 @@ def get_commits_stats(repo_name, ts_min_
 
        return True
 
    except LockHeld:
 
        log.info('LockHeld')
 
        return 'Task with key %s already running' % lockkey
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def send_password_link(user_email):
 
    try:
 
        log = reset_user_password.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    from rhodecode.lib import auth
 
    from rhodecode.model.db import User
 

	
 
    try:
 
        sa = get_session()
 
        user = sa.query(User).filter(User.email == user_email).scalar()
 

	
 
        if user:
 
@@ -285,20 +275,20 @@ If you didn't request new password pleas
 
        log.error(traceback.format_exc())
 
        return False
 

	
 
    return True
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def reset_user_password(user_email):
 
    try:
 
        log = reset_user_password.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    from rhodecode.lib import auth
 
    from rhodecode.model.db import User
 

	
 
    try:
 
        try:
 
            sa = get_session()
 
            user = sa.query(User).filter(User.email == user_email).scalar()
 
            new_passwd = auth.PasswordGenerator().gen_password(8,
 
@@ -326,12 +316,13 @@ def reset_user_password(user_email):
 
        log.error(traceback.format_exc())
 

	
 
    return True
 

	
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def send_email(recipients, subject, body):
 
    """
 
    Sends an email with defined parameters from the .ini files.
 

	
 
    :param recipients: list of recipients, it this is empty the defined email
 
        address from field 'email_to' is used instead
 
@@ -372,15 +363,15 @@ def send_email(recipients, subject, body
 
        log.error(traceback.format_exc())
 
        return False
 
    return True
 

	
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def create_repo_fork(form_data, cur_user):
 
    from rhodecode.model.repo import RepoModel
 
    from vcs import get_backend
 

	
 
    try:
 
        log = create_repo_fork.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
0 comments (0 inline, 0 general)