Changeset - 7fd45bf17d07
[Not reviewed]
beta
0 1 0
Marcin Kuzminski - 15 years ago 2011-02-08 15:06:41
marcin@python-works.com
fixed celery issues, default loader was not set as PylonsLoader
1 file changed with 2 insertions and 11 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/celerylib/__init__.py
Show inline comments
 
@@ -19,78 +19,69 @@
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
# 
 
# You should have received a copy of the GNU General Public License
 
# along with this program; if not, write to the Free Software
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 
# MA  02110-1301, USA.
 

	
 
import os
 
import sys
 
import socket
 
import traceback
 
import logging
 

	
 
from hashlib import md5
 
from decorator import decorator
 
from vcs.utils.lazy import LazyProperty
 

	
 
from rhodecode.lib import str2bool
 
from rhodecode.lib.pidlock import DaemonLock, LockHeld
 

	
 
from celery.messaging import establish_connection
 
from pylons import  config
 
from rhodecode.lib import celerypylons
 

	
 
log = logging.getLogger(__name__)
 

	
 
try:
 
    CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
 
except KeyError:
 
    CELERY_ON = False
 

	
 
class ResultWrapper(object):
 
    def __init__(self, task):
 
        self.task = task
 

	
 
    @LazyProperty
 
    def result(self):
 
        return self.task
 

	
 
def run_task(task, *args, **kwargs):
 
    if CELERY_ON:
 
        try:
 
            kw = {
 
                'hostname':config['app_conf'].get('broker.host'),
 
                'userid':config['app_conf'].get('broker.user'),
 
                'password':config['app_conf'].get('broker.password'),
 
                'virtual_host':config['app_conf'].get('broker.vhost'),
 
                'port':config['app_conf'].get('broker.port'),
 
            }
 
            conn = establish_connection(**kw)
 
            publisher = task.get_publisher(connection=conn)
 
            t = task.apply_async(args=args, kwargs=kwargs, publisher=publisher)
 

	
 
            t = task.apply_async(args=args, kwargs=kwargs)
 
            log.info('running task %s:%s', t.task_id, task)
 
            return t
 
        except socket.error, e:
 
            if  e.errno == 111:
 
                log.debug('Unable to connect to celeryd. Sync execution')
 
            else:
 
                log.error(traceback.format_exc())
 
        except KeyError, e:
 
                log.debug('Unable to connect to celeryd. Sync execution')
 
        except Exception, e:
 
            log.error(traceback.format_exc())
 

	
 
    log.debug('executing task %s in sync mode', task)
 
    return ResultWrapper(task(*args, **kwargs))
 

	
 

	
 
def locked_task(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        params = list(fargs)
 
        params.extend(['%s-%s' % ar for ar in fkwargs.items()])
 

	
 
        lockkey = 'task_%s' % \
 
            md5(str(func.__name__) + '-' + \
 
                '-'.join(map(str, params))).hexdigest()
0 comments (0 inline, 0 general)