Changeset - 9037456bb17f
[Not reviewed]
beta
1 1 0
Marcin Kuzminski - 15 years ago 2011-02-08 03:45:22
marcin@python-works.com
Another better solution for establishing connection with messaging broker in celery.
This one doesn't require celeryconfig.py
2 files changed with 12 insertions and 5 deletions:
0 comments (0 inline, 0 general)
celeryconfig.py
Show inline comments
 
deleted file
rhodecode/lib/celerylib/__init__.py
Show inline comments
 
@@ -17,69 +17,80 @@
 
# 
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
# 
 
# You should have received a copy of the GNU General Public License
 
# along with this program; if not, write to the Free Software
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 
# MA  02110-1301, USA.
 

	
 
import os
 
import sys
 
import socket
 
import traceback
 
import logging
 

	
 
from hashlib import md5
 
from decorator import decorator
 
from vcs.utils.lazy import LazyProperty
 

	
 
from rhodecode.lib import str2bool
 
from rhodecode.lib.pidlock import DaemonLock, LockHeld
 

	
 
from celery.messaging import establish_connection
 
from pylons import  config
 

	
 
log = logging.getLogger(__name__)
 

	
 
try:
 
    CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
 
except KeyError:
 
    CELERY_ON = False
 

	
 
class ResultWrapper(object):
 
    def __init__(self, task):
 
        self.task = task
 

	
 
    @LazyProperty
 
    def result(self):
 
        return self.task
 

	
 
def run_task(task, *args, **kwargs):
 
    if CELERY_ON:
 
        try:
 
            t = task.delay(*args, **kwargs)
 
            kw = {
 
                'hostname':config['app_conf'].get('broker.host'),
 
                'userid':config['app_conf'].get('broker.user'),
 
                'password':config['app_conf'].get('broker.password'),
 
                'virtual_host':config['app_conf'].get('broker.vhost'),
 
                'port':config['app_conf'].get('broker.port'),
 
            }
 
            conn = establish_connection(**kw)
 
            publisher = task.get_publisher(connection=conn)
 
            t = task.apply_async(args=args, kwargs=kwargs, publisher=publisher)
 

	
 
            log.info('running task %s:%s', t.task_id, task)
 
            return t
 
        except socket.error, e:
 
            if  e.errno == 111:
 
                log.debug('Unable to connect to celeryd. Sync execution')
 
            else:
 
                log.error(traceback.format_exc())
 
        except KeyError, e:
 
                log.debug('Unable to connect to celeryd. Sync execution')
 
        except Exception, e:
 
            log.error(traceback.format_exc())
 

	
 
    log.debug('executing task %s in sync mode', task)
 
    return ResultWrapper(task(*args, **kwargs))
 

	
 

	
 
def locked_task(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        params = list(fargs)
 
        params.extend(['%s-%s' % ar for ar in fkwargs.items()])
 

	
 
        lockkey = 'task_%s' % \
 
            md5(str(func.__name__) + '-' + \
 
                '-'.join(map(str, params))).hexdigest()
0 comments (0 inline, 0 general)