Changeset - 2256c78afe53
[Not reviewed]
celery
0 5 0
Marcin Kuzminski - 15 years ago 2010-09-22 04:30:36
marcin@python-works.com
implemented basic autoupdating statistics fetched from database
5 files changed with 173 insertions and 70 deletions:
0 comments (0 inline, 0 general)
celeryconfig.py
Show inline comments
 
@@ -7,36 +7,68 @@ root = os.getcwd()
 
PYLONS_CONFIG_NAME = 'development.ini'
 

	
 
sys.path.append(root)
 
config = ConfigParser.ConfigParser({'here':root})
 
config.read('%s/%s' % (root, PYLONS_CONFIG_NAME))
 
PYLONS_CONFIG = config
 

	
 
CELERY_IMPORTS = ("pylons_app.lib.celerylib.tasks",)
 

	
 
## Result store settings.
 
CELERY_RESULT_BACKEND = "database"
 
CELERY_RESULT_DBURI = dict(config.items('app:main'))['sqlalchemy.db1.url']
 
CELERY_RESULT_SERIALIZER = 'json'
 

	
 

	
 
BROKER_CONNECTION_MAX_RETRIES = 30
 

	
 
## Broker settings.
 
BROKER_HOST = "localhost"
 
BROKER_PORT = 5672
 
BROKER_VHOST = "rabbitmqhost"
 
BROKER_USER = "rabbitmq"
 
BROKER_PASSWORD = "qweqwe"
 

	
 
## Worker settings
 
## If you're doing mostly I/O you can have more processes,
 
## but if mostly spending CPU, try to keep it close to the
 
## number of CPUs on your machine. If not set, the number of CPUs/cores
 
## available will be used.
 
CELERYD_CONCURRENCY = 2
 
# CELERYD_LOG_FILE = "celeryd.log"
 
CELERYD_LOG_LEVEL = "DEBUG"
 
CELERYD_MAX_TASKS_PER_CHILD = 1
 

	
 
#CELERY_ALWAYS_EAGER = True
 
#Tasks will never be sent to the queue, but executed locally instead.
 
CELERY_ALWAYS_EAGER = False
 

	
 
#===============================================================================
 
# EMAIL SETTINGS
 
#===============================================================================
 
pylons_email_config = dict(config.items('DEFAULT'))
 

	
 
CELERY_SEND_TASK_ERROR_EMAILS = True
 

	
 
#List of (name, email_address) tuples for the admins that should receive error e-mails.
 
ADMINS = [('Administrator', pylons_email_config.get('email_to'))]
 

	
 
#The e-mail address this worker sends e-mails from. Default is "celery@localhost".
 
SERVER_EMAIL = pylons_email_config.get('error_email_from')
 

	
 
#The mail server to use. Default is "localhost".
 
MAIL_HOST = pylons_email_config.get('smtp_server')
 

	
 
#Username (if required) to log on to the mail server with.
 
MAIL_HOST_USER = pylons_email_config.get('smtp_username')
 

	
 
#Password (if required) to log on to the mail server with.
 
MAIL_HOST_PASSWORD = pylons_email_config.get('smtp_password')
 

	
 
MAIL_PORT = pylons_email_config.get('smtp_port')
 

	
 

	
 
#===============================================================================
 
# INSTRUCTIONS FOR RABBITMQ
 
#===============================================================================
 
#rabbitmqctl add_user rabbitmq qweqwe
 
#rabbitmqctl add_vhost rabbitmqhost
 
#rabbitmqctl set_permissions -p rabbitmqhost rabbitmq ".*" ".*" ".*"
pylons_app/controllers/summary.py
Show inline comments
 
@@ -18,27 +18,31 @@
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 
# MA  02110-1301, USA.
 
"""
 
Created on April 18, 2010
 
summary controller for pylons
 
@author: marcink
 
"""
 
from pylons import tmpl_context as c, request, url
 
from pylons_app.lib.auth import LoginRequired, HasRepoPermissionAnyDecorator
 
from pylons_app.lib.base import BaseController, render
 
from pylons_app.lib.utils import OrderedDict
 
from pylons_app.model.hg_model import HgModel
 
from pylons_app.model.db import Statistics
 
from webhelpers.paginate import Page
 
from pylons_app.lib.celerylib import run_task
 
from pylons_app.lib.celerylib.tasks import get_commits_stats
 
from datetime import datetime, timedelta
 
from time import mktime
 
import calendar
 
import logging
 

	
 
log = logging.getLogger(__name__)
 

	
 
class SummaryController(BaseController):
 
    
 
    @LoginRequired()
 
    @HasRepoPermissionAnyDecorator('repository.read', 'repository.write',
 
                                   'repository.admin')           
 
    def __before__(self):
 
        super(SummaryController, self).__before__()
 
                
 
@@ -52,20 +56,41 @@ class SummaryController(BaseController):
 
                                        'user':str(c.hg_app_user.username),
 
                                        'host':e.get('HTTP_HOST'),
 
                                        'repo_name':c.repo_name, }
 
        c.clone_repo_url = uri
 
        c.repo_tags = OrderedDict()
 
        for name, hash in c.repo_info.tags.items()[:10]:
 
            c.repo_tags[name] = c.repo_info.get_changeset(hash)
 
        
 
        c.repo_branches = OrderedDict()
 
        for name, hash in c.repo_info.branches.items()[:10]:
 
            c.repo_branches[name] = c.repo_info.get_changeset(hash)
 
        
 
        task = run_task(get_commits_stats, c.repo_info.name)
 
        c.ts_min = task.result[0]
 
        c.ts_max = task.result[1]
 
        c.commit_data = task.result[2]
 
        c.overview_data = task.result[3]
 
        td = datetime.today() + timedelta(days=1) 
 
        y, m, d = td.year, td.month, td.day
 
        
 
        ts_min_y = mktime((y - 1, (td - timedelta(days=calendar.mdays[m])).month,
 
                            d, 0, 0, 0, 0, 0, 0,))
 
        ts_min_m = mktime((y, (td - timedelta(days=calendar.mdays[m])).month,
 
                            d, 0, 0, 0, 0, 0, 0,))
 
        
 
        ts_max_y = mktime((y, m, d, 0, 0, 0, 0, 0, 0,))
 
            
 
        run_task(get_commits_stats, c.repo_info.name, ts_min_y, ts_max_y)
 
        c.ts_min = ts_min_m
 
        c.ts_max = ts_max_y
 
        
 
        
 
        stats = self.sa.query(Statistics)\
 
            .filter(Statistics.repository == c.repo_info.dbrepo)\
 
            .scalar()
 

	
 
        if stats:
 
            c.commit_data = stats.commit_activity
 
            c.overview_data = stats.commit_activity_combined
 
        else:
 
            import json
 
            c.commit_data = json.dumps({})
 
            c.overview_data = json.dumps([[ts_min_y, 0], [ts_max_y, 0] ])
 
        
 
        return render('summary/summary.html')
 

	
pylons_app/lib/celerylib/tasks.py
Show inline comments
 
from celery.decorators import task
 
from celery.task.sets import subtask
 
from celeryconfig import PYLONS_CONFIG as config
 
from datetime import datetime, timedelta
 
from pylons.i18n.translation import _
 
from pylons_app.lib.celerylib import run_task
 
from pylons_app.lib.helpers import person
 
from pylons_app.lib.smtp_mailer import SmtpMailer
 
from pylons_app.lib.utils import OrderedDict
 
from operator import itemgetter
 
from vcs.backends.hg import MercurialRepository
 
from time import mktime
 
import calendar
 
import traceback
 
import json
 

	
 
__all__ = ['whoosh_index', 'get_commits_stats',
 
           'reset_user_password', 'send_email']
 

	
 
def get_session():
 
    from sqlalchemy import engine_from_config
 
    from sqlalchemy.orm import sessionmaker, scoped_session
 
    engine = engine_from_config(dict(config.items('app:main')), 'sqlalchemy.db1.')
 
    sa = scoped_session(sessionmaker(bind=engine))
 
    return sa
 
@@ -74,125 +72,164 @@ def whoosh_index(repo_location, full_ind
 
    from pylons_app.lib.indexers.daemon import WhooshIndexingDaemon, LockHeld
 
    try:
 
        l = DaemonLock()
 
        WhooshIndexingDaemon(repo_location=repo_location)\
 
            .run(full_index=full_index)
 
        l.release()
 
        return 'Done'
 
    except LockHeld:
 
        log.info('LockHeld')
 
        return 'LockHeld'    
 

	
 
@task
 
def get_commits_stats(repo):
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
 
    author_key_cleaner = lambda k: person(k).replace('"', "") #for js data compatibilty
 
        
 
    from pylons_app.model.db import Statistics, Repository
 
    log = get_commits_stats.get_logger()
 
    aggregate = OrderedDict()
 
    overview_aggregate = OrderedDict()
 
    commits_by_day_author_aggregate = {}
 
    commits_by_day_aggregate = {}
 
    repos_path = get_hg_ui_settings()['paths_root_path'].replace('*', '')
 
    repo = MercurialRepository(repos_path + repo)
 
    #graph range
 
    td = datetime.today() + timedelta(days=1) 
 
    y, m, d = td.year, td.month, td.day
 
    repo = MercurialRepository(repos_path + repo_name)
 

	
 
    skip_date_limit = True
 
    parse_limit = 500 #limit for single task changeset parsing
 
    last_rev = 0
 
    last_cs = None
 
    timegetter = itemgetter('time')
 
    
 
    sa = get_session()
 
    
 
    ts_min_y = mktime((y - 1, (td - timedelta(days=calendar.mdays[m])).month,
 
                        d, 0, 0, 0, 0, 0, 0,))
 
    ts_min_m = mktime((y, (td - timedelta(days=calendar.mdays[m])).month,
 
                        d, 0, 0, 0, 0, 0, 0,))
 
    dbrepo = sa.query(Repository)\
 
        .filter(Repository.repo_name == repo_name).scalar()
 
    cur_stats = sa.query(Statistics)\
 
        .filter(Statistics.repository == dbrepo).scalar()
 
    if cur_stats:
 
        last_rev = cur_stats.stat_on_revision
 
    
 
    ts_max_y = mktime((y, m, d, 0, 0, 0, 0, 0, 0,))
 
    skip_date_limit = True
 
    if last_rev == repo.revisions[-1]:
 
        #pass silently without any work
 
        return True
 
    
 
    def author_key_cleaner(k):
 
        k = person(k)
 
        k = k.replace('"', "") #for js data compatibilty
 
        return k
 
    if cur_stats:
 
        commits_by_day_aggregate = OrderedDict(
 
                                       json.loads(
 
                                        cur_stats.commit_activity_combined))
 
        commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity)
 
            
 
    for cs in repo[:200]:#added limit 200 until fix #29 is made
 
    for cnt, rev in enumerate(repo.revisions[last_rev:]):
 
        last_cs = cs = repo.get_changeset(rev)
 
        k = '%s-%s-%s' % (cs.date.timetuple()[0], cs.date.timetuple()[1],
 
                          cs.date.timetuple()[2])
 
        timetupple = [int(x) for x in k.split('-')]
 
        timetupple.extend([0 for _ in xrange(6)])
 
        k = mktime(timetupple)
 
        if aggregate.has_key(author_key_cleaner(cs.author)):
 
            if aggregate[author_key_cleaner(cs.author)].has_key(k):
 
                aggregate[author_key_cleaner(cs.author)][k]["commits"] += 1
 
                aggregate[author_key_cleaner(cs.author)][k]["added"] += len(cs.added)
 
                aggregate[author_key_cleaner(cs.author)][k]["changed"] += len(cs.changed)
 
                aggregate[author_key_cleaner(cs.author)][k]["removed"] += len(cs.removed)
 
        if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)):
 
            try:
 
                l = [timegetter(x) for x in commits_by_day_author_aggregate\
 
                        [author_key_cleaner(cs.author)]['data']]
 
                time_pos = l.index(k)
 
            except ValueError:
 
                time_pos = False
 
                
 
            else:
 
                #aggregate[author_key_cleaner(cs.author)].update(dates_range)
 
                if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
 
                    aggregate[author_key_cleaner(cs.author)][k] = {}
 
                    aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
 
                    aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
 
                    aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
 
                    aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed) 
 
            if time_pos >= 0 and time_pos is not False:
 
                
 
                datadict = commits_by_day_author_aggregate\
 
                    [author_key_cleaner(cs.author)]['data'][time_pos]
 
                
 
                datadict["commits"] += 1
 
                datadict["added"] += len(cs.added)
 
                datadict["changed"] += len(cs.changed)
 
                datadict["removed"] += len(cs.removed)
 
                #print datadict
 
                                        
 
        else:
 
                #print 'ELSE !!!!'
 
            if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
 
                aggregate[author_key_cleaner(cs.author)] = OrderedDict()
 
                #aggregate[author_key_cleaner(cs.author)].update(dates_range)
 
                aggregate[author_key_cleaner(cs.author)][k] = {}
 
                aggregate[author_key_cleaner(cs.author)][k]["commits"] = 1
 
                aggregate[author_key_cleaner(cs.author)][k]["added"] = len(cs.added)
 
                aggregate[author_key_cleaner(cs.author)][k]["changed"] = len(cs.changed)
 
                aggregate[author_key_cleaner(cs.author)][k]["removed"] = len(cs.removed)                 
 
                    
 
                    datadict = {"time":k,
 
                                "commits":1,
 
                                "added":len(cs.added),
 
                                "changed":len(cs.changed),
 
                                "removed":len(cs.removed),
 
                               }
 
                    commits_by_day_author_aggregate\
 
                        [author_key_cleaner(cs.author)]['data'].append(datadict)
 
    
 
        else:
 
            #print k, 'nokey ADDING'
 
            if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
 
                commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = {
 
                                    "label":author_key_cleaner(cs.author),
 
                                    "data":[{"time":k,
 
                                             "commits":1,
 
                                             "added":len(cs.added),
 
                                             "changed":len(cs.changed),
 
                                             "removed":len(cs.removed),
 
                                             }],
 
                                    "schema":["commits"],
 
                                    }               
 
        
 
        if overview_aggregate.has_key(k):
 
            overview_aggregate[k] += 1
 
#        #gather all data by day
 
        if commits_by_day_aggregate.has_key(k):
 
            commits_by_day_aggregate[k] += 1
 
        else:
 
            overview_aggregate[k] = 1
 
            commits_by_day_aggregate[k] = 1
 
        
 
        if cnt >= parse_limit:
 
            #don't fetch to much data since we can freeze application
 
            break
 
    
 
    overview_data = []
 
    for k, v in overview_aggregate.items():
 
    for k, v in commits_by_day_aggregate.items():
 
        overview_data.append([k, v])
 
    overview_data = sorted(overview_data, key=itemgetter(0))
 
    data = {}
 
    for author in aggregate:
 
        commit_data = sorted([{"time":x,
 
                               "commits":aggregate[author][x]['commits'],
 
                               "added":aggregate[author][x]['added'],
 
                               "changed":aggregate[author][x]['changed'],
 
                               "removed":aggregate[author][x]['removed'],
 
                              } for x in aggregate[author]],
 
                              key=itemgetter('time'))
 
        
 
        data[author] = {"label":author,
 
                      "data":commit_data,
 
                      "schema":["commits"]
 
                      }
 
        
 
    if not data:
 
        data[author_key_cleaner(repo.contact)] = {
 
    if not commits_by_day_author_aggregate:
 
        commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = {
 
            "label":author_key_cleaner(repo.contact),
 
            "data":[0, 1],
 
            "schema":["commits"],
 
        }
 
                
 
    return (ts_min_m, ts_max_y, json.dumps(data), json.dumps(overview_data))    
 
    stats = cur_stats if cur_stats else Statistics()
 
    stats.commit_activity = json.dumps(commits_by_day_author_aggregate)
 
    stats.commit_activity_combined = json.dumps(overview_data)
 
    stats.repository = dbrepo
 
    stats.stat_on_revision = last_cs.revision
 
    stats.languages = json.dumps({'_TOTAL_':0, '':0})
 
    
 
    try:
 
        sa.add(stats)
 
        sa.commit()    
 
    except:
 
        log.error(traceback.format_exc())
 
        sa.rollback()
 
        return False
 
                        
 
    return True
 

	
 
@task
 
def reset_user_password(user_email):
 
    log = reset_user_password.get_logger()
 
    from pylons_app.lib import auth
 
    from pylons_app.model.db import User
 
    
 
    try:
 
        try:
 
            sa = get_session()
 
            user = sa.query(User).filter(User.email == user_email).scalar()
 
            new_passwd = auth.PasswordGenerator().gen_password(8,
 
                             auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
 
            if user:
 
            user.password = auth.get_crypt_password(new_passwd)
 
            sa.add(user)
 
            sa.commit()
 
            log.info('change password for %s', user_email)
 
            if new_passwd is None:
 
                raise Exception('unable to generate new password')
 
            
 
        except:
 
            log.error(traceback.format_exc())
 
            sa.rollback()
 
        
 
        run_task(send_email, user_email,
pylons_app/model/db.py
Show inline comments
 
@@ -111,15 +111,24 @@ class RepoToPerm(Base):
 
    repository = relation('Repository')
 

	
 
class UserToPerm(Base):
 
    __tablename__ = 'user_to_perm'
 
    __table_args__ = (UniqueConstraint('user_id', 'permission_id'), {'useexisting':True})
 
    user_to_perm_id = Column("user_to_perm_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
 
    user_id = Column("user_id", INTEGER(), ForeignKey(u'users.user_id'), nullable=False, unique=None, default=None)
 
    permission_id = Column("permission_id", INTEGER(), ForeignKey(u'permissions.permission_id'), nullable=False, unique=None, default=None)
 
    
 
    user = relation('User')
 
    permission = relation('Permission')
 

	
 

	
 
class Statistics(Base):
 
    __tablename__ = 'statistics'
 
    __table_args__ = (UniqueConstraint('repository_id'), {'useexisting':True})
 
    stat_id = Column("stat_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
 
    repository_id = Column("repository_id", INTEGER(), ForeignKey(u'repositories.repo_id'), nullable=False, unique=True, default=None)
 
    stat_on_revision = Column("stat_on_revision", INTEGER(), nullable=False)
 
    commit_activity = Column("commit_activity", BLOB(), nullable=False)#JSON data
 
    commit_activity_combined = Column("commit_activity_combined", BLOB(), nullable=False)#JSON data
 
    languages = Column("languages", BLOB(), nullable=False)#JSON data
 

	
 
    repository = relation('Repository')
 

	
pylons_app/templates/summary/summary.html
Show inline comments
 
@@ -114,25 +114,25 @@ E.onDOMReady(function(e){
 
			  <div class="input-short">
 
	            ${h.link_to(_('RSS'),h.url('rss_feed_home',repo_name=c.repo_info.name),class_='rss_icon')}
 
	            ${h.link_to(_('Atom'),h.url('atom_feed_home',repo_name=c.repo_info.name),class_='atom_icon')}
 
			  </div>
 
			 </div>				 			 			 
 
	  </div>		 
 
	</div>				
 
</div>
 
        
 
<div class="box box-right"  style="min-height:455px">
 
    <!-- box / title -->
 
    <div class="title">
 
        <h5>${_('Commit activity')}</h5>
 
        <h5>${_('Commit activity by day / author')}</h5>
 
    </div>
 
    
 
    <div class="table">
 
        <div id="commit_history" style="width:560px;height:300px;float:left"></div>
 
        <div style="clear: both;height: 10px"></div>
 
        <div id="overview" style="width:560px;height:100px;float:left"></div>
 
        
 
    	<div id="legend_data" style="clear:both;margin-top:10px;">
 
	    	<div id="legend_container"></div>
 
	    	<div id="legend_choices">
 
				<table id="legend_choices_tables" style="font-size:smaller;color:#545454"></table>
 
	    	</div>
0 comments (0 inline, 0 general)