Changeset - ac32a026c306
[Not reviewed]
default
0 2 0
Marcin Kuzminski - 15 years ago 2010-09-23 21:25:30
marcin@python-works.com
simplified task locking, and fixed some bugs for keyworded arguments
2 files changed with 9 insertions and 16 deletions:
0 comments (0 inline, 0 general)
pylons_app/lib/celerylib/__init__.py
Show inline comments
 
from pylons_app.lib.pidlock import DaemonLock, LockHeld
 
from vcs.utils.lazy import LazyProperty
 
from decorator import decorator
 
import logging
 
import os
 
import sys
 
import traceback
 
from hashlib import md5
 
log = logging.getLogger(__name__)
 

	
 
class ResultWrapper(object):
 
    def __init__(self, task):
 
        self.task = task
 
        
 
    @LazyProperty
 
    def result(self):
 
        return self.task
 

	
 
def run_task(task, *args, **kwargs):
 
    try:
 
        t = task.delay(*args, **kwargs)
 
        log.info('running task %s', t.task_id)
 
        return t
 
    except Exception, e:
 
        print e
 
        if e.errno == 111:
 
            log.debug('Unnable to connect. Sync execution')
 
        else:
 
            log.error(traceback.format_exc())
 
        #pure sync version
 
        return ResultWrapper(task(*args, **kwargs))
 

	
 

	
 
class LockTask(object):
 
    """LockTask decorator"""
 
    
 
    def __init__(self, func):
 
        self.func = func
 
def locked_task(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        params = list(fargs)
 
        params.extend(['%s-%s' % ar for ar in fkwargs.items()])
 
        
 
    def __call__(self, func):
 
        return decorator(self.__wrapper, func)
 
    
 
    def __wrapper(self, func, *fargs, **fkwargs):
 
        params = []
 
        params.extend(fargs)
 
        params.extend(fkwargs.values())
 
        lockkey = 'task_%s' % \
 
           md5(str(self.func) + '-' + '-'.join(map(str, params))).hexdigest()
 
            md5(str(func.__name__) + '-' + \
 
                '-'.join(map(str, params))).hexdigest()
 
        log.info('running task with lockkey %s', lockkey)
 
        try:
 
            l = DaemonLock(lockkey)
 
            return func(*fargs, **fkwargs)
 
            l.release()
 
        except LockHeld:
 
            log.info('LockHeld')
 
            return 'Task with key %s already running' % lockkey   
 

	
 
    return decorator(__wrapper, func)      
 
            
 
            
 

	
 
        
 
        
 
    
 
    
 
    
 
  
pylons_app/lib/celerylib/tasks.py
Show inline comments
 
from celery.decorators import task
 
from celery.task.sets import subtask
 
from celeryconfig import PYLONS_CONFIG as config
 
from pylons.i18n.translation import _
 
from pylons_app.lib.celerylib import run_task, LockTask
 
from pylons_app.lib.celerylib import run_task, locked_task
 
from pylons_app.lib.helpers import person
 
from pylons_app.lib.smtp_mailer import SmtpMailer
 
from pylons_app.lib.utils import OrderedDict
 
from operator import itemgetter
 
from vcs.backends.hg import MercurialRepository
 
from time import mktime
 
import traceback
 
import json
 

	
 
__all__ = ['whoosh_index', 'get_commits_stats',
 
           'reset_user_password', 'send_email']
 

	
 
def get_session():
 
    from sqlalchemy import engine_from_config
 
    from sqlalchemy.orm import sessionmaker, scoped_session
 
    engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.')
 
    sa = scoped_session(sessionmaker(bind=engine))
 
    return sa
 

	
 
def get_hg_settings():
 
    from pylons_app.model.db import HgAppSettings
 
    try:
 
        sa = get_session()
 
        ret = sa.query(HgAppSettings).all()
 
    finally:
 
        sa.remove()
 
        
 
    if not ret:
 
        raise Exception('Could not get application settings !')
 
    settings = {}
 
    for each in ret:
 
        settings['hg_app_' + each.app_settings_name] = each.app_settings_value    
 
    
 
    return settings
 

	
 
def get_hg_ui_settings():
 
    from pylons_app.model.db import HgAppUi
 
    try:
 
        sa = get_session()
 
        ret = sa.query(HgAppUi).all()
 
    finally:
 
        sa.remove()
 
        
 
    if not ret:
 
        raise Exception('Could not get application ui settings !')
 
    settings = {}
 
    for each in ret:
 
        k = each.ui_key
 
        v = each.ui_value
 
        if k == '/':
 
            k = 'root_path'
 
        
 
        if k.find('.') != -1:
 
            k = k.replace('.', '_')
 
        
 
        if each.ui_section == 'hooks':
 
            v = each.ui_active
 
        
 
        settings[each.ui_section + '_' + k] = v  
 
    
 
    return settings   
 

	
 
@task
 
def whoosh_index(repo_location, full_index):
 
    log = whoosh_index.get_logger()
 
    from pylons_app.lib.pidlock import DaemonLock
 
    from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon, LockHeld
 
    try:
 
        l = DaemonLock()
 
        WhooshIndexingDaemon(repo_location=repo_location)\
 
            .run(full_index=full_index)
 
        l.release()
 
        return 'Done'
 
    except LockHeld:
 
        log.info('LockHeld')
 
        return 'LockHeld'    
 

	
 

	
 
@task
 
@LockTask('get_commits_stats')
 
@locked_task
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
 
    author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
 
        
 
    from pylons_app.model.db import Statistics, Repository
 
    log = get_commits_stats.get_logger()
 
    commits_by_day_author_aggregate = {}
 
    commits_by_day_aggregate = {}
 
    repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
 
    repo = MercurialRepository(repos_path + repo_name)
 

	
 
    skip_date_limit = True
 
    parse_limit = 350 #limit for single task changeset parsing
 
    last_rev = 0
 
    last_cs = None
 
    timegetter = itemgetter('time')
 
    
 
    sa = get_session()
 
    
 
    dbrepo = sa.query(Repository)\
 
        .filter(Repository.repo_name == repo_name).scalar()
 
    cur_stats = sa.query(Statistics)\
 
        .filter(Statistics.repository == dbrepo).scalar()
 
    if cur_stats:
 
        last_rev = cur_stats.stat_on_revision
 
    
 
    if last_rev == repo.revisions[-1]:
 
        #pass silently without any work
 
        return True
 
    
 
    if cur_stats:
 
        commits_by_day_aggregate = OrderedDict(
 
                                       json.loads(
 
                                        cur_stats.commit_activity_combined))
 
        commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
 
    
 
    for cnt, rev in enumerate(repo.revisions[last_rev:]):
 
        last_cs = cs = repo.get_changeset(rev)
 
        k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
 
                          cs.date.timetuple()[2])
 
        timetupple = [int(x) for x in k.split('-')]
 
        timetupple.extend([0 for _ in xrange(6)])
 
        k = mktime(timetupple)
 
        if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
 
            try:
 
                l = [timegetter(x) for x in commits_by_day_author_aggregate\
 
                        [author_key_cleaner(cs.author)]['data']]
 
                time_pos = l.index(k)
 
            except ValueError:
 
                time_pos = False
 
                
 
            if time_pos >= 0 and time_pos is not False:
 
                
 
                datadict = commits_by_day_author_aggregate\
 
                    [author_key_cleaner(cs.author)]['data'][time_pos]
 
                
 
                datadict["commits"] += 1
 
                datadict["added"] += len(cs.added)
 
                datadict["changed"] += len(cs.changed)
 
                datadict["removed"] += len(cs.removed)
 
                #print datadict
 
                
 
            else:
 
                #print 'ELSE !!!!'
 
                if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
 
                    
 
                    datadict = {"time":k,
 
                                "commits":1,
 
                                "added":len(cs.added),
 
                                "changed":len(cs.changed),
 
                                "removed":len(cs.removed),
 
                               }
 
                    commits_by_day_author_aggregate\
 
                        [author_key_cleaner(cs.author)]['data'].append(datadict)
 
                                        
 
        else:
 
            #print k, 'nokey ADDING'
 
            if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
 
                commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
 
                                    "label":author_key_cleaner(cs.author),
 
                                    "data":[{"time":k,
 
                                             "commits":1,
 
                                             "added":len(cs.added),
 
                                             "changed":len(cs.changed),
 
                                             "removed":len(cs.removed),
 
                                             }],
 
                                    "schema":["commits"],
 
                                    }               
 
    
 
#        #gather all data by day
 
        if commits_by_day_aggregate.has_key(k):
 
            commits_by_day_aggregate[k] += 1
 
        else:
 
            commits_by_day_aggregate[k] = 1
 
        
 
        if cnt >= parse_limit:
 
            #don't fetch to much data since we can freeze application
 
            break
 

	
 
    overview_data = []
 
    for k, v in commits_by_day_aggregate.items():
 
        overview_data.append([k, v])
 
    overview_data = sorted(overview_data, key=itemgetter(0))
 
        
 
    if not commits_by_day_author_aggregate:
 
        commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
 
            "label":author_key_cleaner(repo.contact),
 
            "data":[0, 1],
 
            "schema":["commits"],
 
        }
 

	
 
    stats = cur_stats if cur_stats else Statistics()
 
    stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
 
    stats.commit_activity_combined = json.dumps(overview_data)
 
    stats.repository = dbrepo
 
    stats.stat_on_revision = last_cs.revision
 
    stats.languages = json.dumps({'_TOTAL_':0, '':0})
 
    
 
    try:
 
        sa.add(stats)
 
        sa.commit()    
 
    except:
 
        log.error(traceback.format_exc())
 
        sa.rollback()
 
        return False
 
    
 
    run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
 
                            
 
    return True
 

	
 
@task
 
def reset_user_password(user_email):
 
    log = reset_user_password.get_logger()
 
    from pylons_app.lib import auth
 
    from pylons_app.model.db import User
 
    
 
    try:
 
        try:
 
            sa = get_session()
 
            user = sa.query(User).filter(User.email == user_email).scalar()
 
            new_passwd = auth.PasswordGenerator().gen_password(8,
 
                             auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
 
            if user:
 
                user.password = auth.get_crypt_password(new_passwd)
 
                sa.add(user)
 
                sa.commit()
 
                log.info('change password for %s', user_email)
 
            if new_passwd is None:
 
                raise Exception('unable to generate new password')
 
            
 
        except:
 
            log.error(traceback.format_exc())
 
            sa.rollback()
 
        
 
        run_task(send_email, user_email,
 
                 "Your new hg-app password",
 
                 'Your new hg-app password:%s' % (new_passwd))
 
        log.info('send new password mail to %s', user_email)
 
        
 
        
 
    except:
 
        log.error('Failed to update user password')
 
        log.error(traceback.format_exc())
 
    return True
 

	
 
@task    
 
def send_email(recipients, subject, body):
 
    log = send_email.get_logger()
 
    email_config = dict(config.items('DEFAULT')) 
 
    mail_from = email_config.get('app_email_from')
 
    user = email_config.get('smtp_username')
 
    passwd = email_config.get('smtp_password')
 
    mail_server = email_config.get('smtp_server')
 
    mail_port = email_config.get('smtp_port')
 
    tls = email_config.get('smtp_use_tls')
 
    ssl = False
 
    
 
    try:
 
        m = SmtpMailer(mail_from, user, passwd, mail_server,
 
                       mail_port, ssl, tls)
 
        m.send(recipients, subject, body)  
 
    except:
 
        log.error('Mail sending failed')
 
        log.error(traceback.format_exc())
 
        return False
 
    return True
0 comments (0 inline, 0 general)