Changeset - afe8cfa32a0f
[Not reviewed]
default
0 3 0
Marcin Kuzminski - 14 years ago 2012-01-21 06:09:18
marcin@python-works.com
backported #340 session cleanup for celery tasks
3 files changed with 37 insertions and 22 deletions:
0 comments (0 inline, 0 general)
rhodecode/__init__.py
Show inline comments
 
@@ -25,7 +25,7 @@
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
import platform
 

	
 
VERSION = (1, 2, 4)
 
VERSION = (1, 2, 5)
 
__version__ = '.'.join((str(each) for each in VERSION[:4]))
 
__dbversion__ = 3  # defines current db version for migrations
 
__platform__ = platform.system()
rhodecode/lib/celerylib/__init__.py
Show inline comments
 
@@ -7,7 +7,7 @@
 

	
 
    :created_on: Nov 27, 2010
 
    :author: marcink
 
    :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
 
    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
@@ -29,19 +29,23 @@ import socket
 
import traceback
 
import logging
 
from os.path import dirname as dn, join as jn
 
from pylons import config
 

	
 
from hashlib import md5
 
from decorator import decorator
 
from pylons import  config
 

	
 
from vcs.utils.lazy import LazyProperty
 

	
 
from rhodecode.lib import str2bool, safe_str
 
from rhodecode.lib.pidlock import DaemonLock, LockHeld
 
from rhodecode.model import init_model
 
from rhodecode.model import meta
 
from rhodecode.model.db import Statistics, Repository, User
 

	
 
from sqlalchemy import engine_from_config
 

	
 
from celery.messaging import establish_connection
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
try:
 
@@ -107,3 +111,23 @@ def locked_task(func):
 
            return 'Task with key %s already running' % lockkey
 

	
 
    return decorator(__wrapper, func)
 

	
 

	
 
def get_session():
 
    if CELERY_ON:
 
        engine = engine_from_config(config, 'sqlalchemy.db1.')
 
        init_model(engine)
 
    sa = meta.Session
 
    return sa
 

	
 

	
 
def dbsession(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        try:
 
            ret = func(*fargs, **fkwargs)
 
            return ret
 
        finally:
 
            if CELERY_ON:
 
                meta.Session.remove()
 

	
 
    return decorator(__wrapper, func)
rhodecode/lib/celerylib/tasks.py
Show inline comments
 
@@ -8,7 +8,7 @@
 

	
 
    :created_on: Oct 6, 2010
 
    :author: marcink
 
    :copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
 
    :copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
@@ -39,19 +39,16 @@ from pylons.i18n.translation import _
 

	
 
from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
 
from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
 
    __get_lockkey, LockHeld, DaemonLock
 
    __get_lockkey, LockHeld, DaemonLock, get_session, dbsession
 
from rhodecode.lib.helpers import person
 
from rhodecode.lib.smtp_mailer import SmtpMailer
 
from rhodecode.lib.utils import add_cache
 
from rhodecode.lib.compat import json, OrderedDict
 

	
 
from rhodecode.model import init_model
 
from rhodecode.model import meta
 
from rhodecode.model.db import RhodeCodeUi, Statistics, Repository, User
 

	
 
from vcs.backends import get_repo
 

	
 
from sqlalchemy import engine_from_config
 
from vcs import get_backend
 

	
 
add_cache(config)
 

	
 
@@ -61,15 +58,6 @@ __all__ = ['whoosh_index', 'get_commits_
 
CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
 

	
 

	
 
def get_session():
 
    if CELERY_ON:
 
        engine = engine_from_config(config, 'sqlalchemy.db1.')
 
        init_model(engine)
 

	
 
    sa = meta.Session()
 
    return sa
 

	
 

	
 
def get_repos_path():
 
    sa = get_session()
 
    q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
 
@@ -78,6 +66,7 @@ def get_repos_path():
 

	
 
@task(ignore_result=True)
 
@locked_task
 
@dbsession
 
def whoosh_index(repo_location, full_index):
 
    #log = whoosh_index.get_logger()
 
    from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
 
@@ -88,6 +77,7 @@ def whoosh_index(repo_location, full_ind
 

	
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
 
    try:
 
        log = get_commits_stats.get_logger()
 
@@ -248,6 +238,7 @@ def get_commits_stats(repo_name, ts_min_
 
        return 'Task with key %s already running' % lockkey
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def send_password_link(user_email):
 
    try:
 
        log = reset_user_password.get_logger()
 
@@ -255,7 +246,6 @@ def send_password_link(user_email):
 
        log = logging.getLogger(__name__)
 

	
 
    from rhodecode.lib import auth
 
    from rhodecode.model.db import User
 

	
 
    try:
 
        sa = get_session()
 
@@ -288,6 +278,7 @@ If you didn't request new password pleas
 
    return True
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def reset_user_password(user_email):
 
    try:
 
        log = reset_user_password.get_logger()
 
@@ -295,7 +286,6 @@ def reset_user_password(user_email):
 
        log = logging.getLogger(__name__)
 

	
 
    from rhodecode.lib import auth
 
    from rhodecode.model.db import User
 

	
 
    try:
 
        try:
 
@@ -329,6 +319,7 @@ def reset_user_password(user_email):
 

	
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def send_email(recipients, subject, body):
 
    """
 
    Sends an email with defined parameters from the .ini files.
 
@@ -375,9 +366,9 @@ def send_email(recipients, subject, body
 

	
 

	
 
@task(ignore_result=True)
 
@dbsession
 
def create_repo_fork(form_data, cur_user):
 
    from rhodecode.model.repo import RepoModel
 
    from vcs import get_backend
 

	
 
    try:
 
        log = create_repo_fork.get_logger()
0 comments (0 inline, 0 general)