Files @ ca41d544dbdf
Branch filter:

Location: kallithea/pylons_app/lib/celerylib/tasks.py - annotation

Marcin Kuzminski
Merge with 6aa7db1c083a1384ebff5c2bb3c943a035bb310d - celery branch
3fc3ce53659b
a3d9d24acbec
a9e50dce3081
a3d9d24acbec
fb0c3af6031b
3fc3ce53659b
a3d9d24acbec
3fc3ce53659b
b12ea84fb906
b12ea84fb906
3fc3ce53659b
a3d9d24acbec
5c376ac2d4c9
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
3fc3ce53659b
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
3fc3ce53659b
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
fb0c3af6031b
a3d9d24acbec
3fc3ce53659b
3fc3ce53659b
3fc3ce53659b
3fc3ce53659b
3fc3ce53659b
3fc3ce53659b
3fc3ce53659b
3fc3ce53659b
3fc3ce53659b
3fc3ce53659b
fb0c3af6031b
a3d9d24acbec
fb0c3af6031b
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
a3d9d24acbec
2256c78afe53
2256c78afe53
a9e50dce3081
2256c78afe53
2256c78afe53
2256c78afe53
fb0c3af6031b
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
5c376ac2d4c9
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
3fc3ce53659b
2256c78afe53
2256c78afe53
2256c78afe53
b12ea84fb906
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
3fc3ce53659b
3fc3ce53659b
3fc3ce53659b
3fc3ce53659b
3fc3ce53659b
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
3fc3ce53659b
3fc3ce53659b
2256c78afe53
b12ea84fb906
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
3fc3ce53659b
3fc3ce53659b
2256c78afe53
b12ea84fb906
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
3fc3ce53659b
2256c78afe53
2256c78afe53
2256c78afe53
5c376ac2d4c9
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
5c376ac2d4c9
2256c78afe53
5c376ac2d4c9
b12ea84fb906
b12ea84fb906
2256c78afe53
2256c78afe53
5c376ac2d4c9
5c376ac2d4c9
5c376ac2d4c9
5c376ac2d4c9
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
fb0c3af6031b
fb0c3af6031b
fb0c3af6031b
2256c78afe53
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
2256c78afe53
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a9e50dce3081
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
a3d9d24acbec
from celery.decorators import task
from celery.task.sets import subtask
from celeryconfig import PYLONS_CONFIG as config
from pylons.i18n.translation import _
from pylons_app.lib.celerylib import run_task, LockTask
from pylons_app.lib.helpers import person
from pylons_app.lib.smtp_mailer import SmtpMailer
from pylons_app.lib.utils import OrderedDict
from operator import itemgetter
from vcs.backends.hg import MercurialRepository
from time import mktime
import traceback
import json

__all__ = ['whoosh_index', 'get_commits_stats',
           'reset_user_password', 'send_email']

def get_session():
    from sqlalchemy import engine_from_config
    from sqlalchemy.orm import sessionmaker, scoped_session
    engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.')
    sa = scoped_session(sessionmaker(bind=engine))
    return sa

def get_hg_settings():
    from pylons_app.model.db import HgAppSettings
    try:
        sa = get_session()
        ret = sa.query(HgAppSettings).all()
    finally:
        sa.remove()
        
    if not ret:
        raise Exception('Could not get application settings !')
    settings = {}
    for each in ret:
        settings['hg_app_' + each.app_settings_name] = each.app_settings_value    
    
    return settings

def get_hg_ui_settings():
    from pylons_app.model.db import HgAppUi
    try:
        sa = get_session()
        ret = sa.query(HgAppUi).all()
    finally:
        sa.remove()
        
    if not ret:
        raise Exception('Could not get application ui settings !')
    settings = {}
    for each in ret:
        k = each.ui_key
        v = each.ui_value
        if k == '/':
            k = 'root_path'
        
        if k.find('.') != -1:
            k = k.replace('.', '_')
        
        if each.ui_section == 'hooks':
            v = each.ui_active
        
        settings[each.ui_section + '_' + k] = v  
    
    return settings   

@task
def whoosh_index(repo_location, full_index):
    log = whoosh_index.get_logger()
    from pylons_app.lib.pidlock import DaemonLock
    from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon, LockHeld
    try:
        l = DaemonLock()
        WhooshIndexingDaemon(repo_location=repo_location)\
            .run(full_index=full_index)
        l.release()
        return 'Done'
    except LockHeld:
        log.info('LockHeld')
        return 'LockHeld'    


@task
@LockTask('get_commits_stats')
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
    author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
        
    from pylons_app.model.db import Statistics, Repository
    log = get_commits_stats.get_logger()
    commits_by_day_author_aggregate = {}
    commits_by_day_aggregate = {}
    repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
    repo = MercurialRepository(repos_path + repo_name)

    skip_date_limit = True
    parse_limit = 350 #limit for single task changeset parsing
    last_rev = 0
    last_cs = None
    timegetter = itemgetter('time')
    
    sa = get_session()
    
    dbrepo = sa.query(Repository)\
        .filter(Repository.repo_name == repo_name).scalar()
    cur_stats = sa.query(Statistics)\
        .filter(Statistics.repository == dbrepo).scalar()
    if cur_stats:
        last_rev = cur_stats.stat_on_revision
    
    if last_rev == repo.revisions[-1]:
        #pass silently without any work
        return True
    
    if cur_stats:
        commits_by_day_aggregate = OrderedDict(
                                       json.loads(
                                        cur_stats.commit_activity_combined))
        commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
    
    for cnt, rev in enumerate(repo.revisions[last_rev:]):
        last_cs = cs = repo.get_changeset(rev)
        k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
                          cs.date.timetuple()[2])
        timetupple = [int(x) for x in k.split('-')]
        timetupple.extend([0 for _ in xrange(6)])
        k = mktime(timetupple)
        if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
            try:
                l = [timegetter(x) for x in commits_by_day_author_aggregate\
                        [author_key_cleaner(cs.author)]['data']]
                time_pos = l.index(k)
            except ValueError:
                time_pos = False
                
            if time_pos >= 0 and time_pos is not False:
                
                datadict = commits_by_day_author_aggregate\
                    [author_key_cleaner(cs.author)]['data'][time_pos]
                
                datadict["commits"] += 1
                datadict["added"] += len(cs.added)
                datadict["changed"] += len(cs.changed)
                datadict["removed"] += len(cs.removed)
                #print datadict
                
            else:
                #print 'ELSE !!!!'
                if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
                    
                    datadict = {"time":k,
                                "commits":1,
                                "added":len(cs.added),
                                "changed":len(cs.changed),
                                "removed":len(cs.removed),
                               }
                    commits_by_day_author_aggregate\
                        [author_key_cleaner(cs.author)]['data'].append(datadict)
                                        
        else:
            #print k, 'nokey ADDING'
            if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
                commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
                                    "label":author_key_cleaner(cs.author),
                                    "data":[{"time":k,
                                             "commits":1,
                                             "added":len(cs.added),
                                             "changed":len(cs.changed),
                                             "removed":len(cs.removed),
                                             }],
                                    "schema":["commits"],
                                    }               
    
#        #gather all data by day
        if commits_by_day_aggregate.has_key(k):
            commits_by_day_aggregate[k] += 1
        else:
            commits_by_day_aggregate[k] = 1
        
        if cnt >= parse_limit:
            #don't fetch to much data since we can freeze application
            break

    overview_data = []
    for k, v in commits_by_day_aggregate.items():
        overview_data.append([k, v])
    overview_data = sorted(overview_data, key=itemgetter(0))
        
    if not commits_by_day_author_aggregate:
        commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
            "label":author_key_cleaner(repo.contact),
            "data":[0, 1],
            "schema":["commits"],
        }

    stats = cur_stats if cur_stats else Statistics()
    stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
    stats.commit_activity_combined = json.dumps(overview_data)
    stats.repository = dbrepo
    stats.stat_on_revision = last_cs.revision
    stats.languages = json.dumps({'_TOTAL_':0, '':0})
    
    try:
        sa.add(stats)
        sa.commit()    
    except:
        log.error(traceback.format_exc())
        sa.rollback()
        return False
    
    run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
                            
    return True

@task
def reset_user_password(user_email):
    log = reset_user_password.get_logger()
    from pylons_app.lib import auth
    from pylons_app.model.db import User
    
    try:
        try:
            sa = get_session()
            user = sa.query(User).filter(User.email == user_email).scalar()
            new_passwd = auth.PasswordGenerator().gen_password(8,
                             auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
            if user:
                user.password = auth.get_crypt_password(new_passwd)
                sa.add(user)
                sa.commit()
                log.info('change password for %s', user_email)
            if new_passwd is None:
                raise Exception('unable to generate new password')
            
        except:
            log.error(traceback.format_exc())
            sa.rollback()
        
        run_task(send_email, user_email,
                 "Your new hg-app password",
                 'Your new hg-app password:%s' % (new_passwd))
        log.info('send new password mail to %s', user_email)
        
        
    except:
        log.error('Failed to update user password')
        log.error(traceback.format_exc())
    return True

@task    
def send_email(recipients, subject, body):
    log = send_email.get_logger()
    email_config = dict(config.items('DEFAULT')) 
    mail_from = email_config.get('app_email_from')
    user = email_config.get('smtp_username')
    passwd = email_config.get('smtp_password')
    mail_server = email_config.get('smtp_server')
    mail_port = email_config.get('smtp_port')
    tls = email_config.get('smtp_use_tls')
    ssl = False
    
    try:
        m = SmtpMailer(mail_from, user, passwd, mail_server,
                       mail_port, ssl, tls)
        m.send(recipients, subject, body)  
    except:
        log.error('Mail sending failed')
        log.error(traceback.format_exc())
        return False
    return True