Changeset - 0c43c6671815
[Not reviewed]
beta
0 2 0
Marcin Kuzminski - 15 years ago 2011-04-17 14:45:16
marcin@python-works.com
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
2 files changed with 147 insertions and 122 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/celerylib/__init__.py
Show inline comments
 
@@ -39,59 +39,68 @@ from rhodecode.lib import str2bool
 
from rhodecode.lib.pidlock import DaemonLock, LockHeld
 

	
 
from celery.messaging import establish_connection
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
try:
 
    CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
 
except KeyError:
 
    CELERY_ON = False
 

	
 

	
 
class ResultWrapper(object):
 
    def __init__(self, task):
 
        self.task = task
 

	
 
    @LazyProperty
 
    def result(self):
 
        return self.task
 

	
 

	
 
def run_task(task, *args, **kwargs):
 
    if CELERY_ON:
 
        try:
 
            t = task.apply_async(args=args, kwargs=kwargs)
 
            log.info('running task %s:%s', t.task_id, task)
 
            return t
 

	
 
        except socket.error, e:
 
            if  e.errno == 111:
 
                log.debug('Unable to connect to celeryd. Sync execution')
 
            else:
 
                log.error(traceback.format_exc())
 
        except KeyError, e:
 
                log.debug('Unable to connect to celeryd. Sync execution')
 
        except Exception, e:
 
            log.error(traceback.format_exc())
 

	
 
    log.debug('executing task %s in sync mode', task)
 
    return ResultWrapper(task(*args, **kwargs))
 

	
 

	
 
def __get_lockkey(func, *fargs, **fkwargs):
 
    params = list(fargs)
 
    params.extend(['%s-%s' % ar for ar in fkwargs.items()])
 

	
 
    func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
 

	
 
    lockkey = 'task_%s' % \
 
        md5(func_name + '-' + '-'.join(map(str, params))).hexdigest()
 
    return lockkey
 

	
 

	
 
def locked_task(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        params = list(fargs)
 
        params.extend(['%s-%s' % ar for ar in fkwargs.items()])
 

	
 
        lockkey = 'task_%s' % \
 
            md5(str(func.__name__) + '-' + \
 
                '-'.join(map(str, params))).hexdigest()
 
        lockkey = __get_lockkey(func, *fargs, **fkwargs)
 
        log.info('running task with lockkey %s', lockkey)
 
        try:
 
            l = DaemonLock(lockkey)
 
            ret = func(*fargs, **fkwargs)
 
            l.release()
 
            return ret
 
        except LockHeld:
 
            log.info('LockHeld')
 
            return 'Task with key %s already running' % lockkey
 

	
 
    return decorator(__wrapper, func)
rhodecode/lib/celerylib/tasks.py
Show inline comments
 
@@ -28,31 +28,32 @@ from celery.decorators import task
 
import os
 
import traceback
 
import logging
 

	
 
from time import mktime
 
from operator import itemgetter
 
from pygments import lexers
 
from string import lower
 

	
 
from pylons import config
 
from pylons.i18n.translation import _
 

	
 
from rhodecode.lib.celerylib import run_task, locked_task, str2bool
 
from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
 
    __get_lockkey, LockHeld, DaemonLock
 
from rhodecode.lib.helpers import person
 
from rhodecode.lib.smtp_mailer import SmtpMailer
 
from rhodecode.lib.utils import OrderedDict, add_cache
 
from rhodecode.model import init_model
 
from rhodecode.model import meta
 
from rhodecode.model.db import RhodeCodeUi
 
from rhodecode.model.db import RhodeCodeUi, Statistics, Repository
 

	
 
from vcs.backends import get_repo
 

	
 
from sqlalchemy import engine_from_config
 

	
 
add_cache(config)
 

	
 
try:
 
    import json
 
except ImportError:
 
    #python 2.5 compatibility
 
    import simplejson as json
 
@@ -116,164 +117,180 @@ def get_repos_path():
 
@task(ignore_result=True)
 
@locked_task
 
def whoosh_index(repo_location, full_index):
 
    #log = whoosh_index.get_logger()
 
    from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
 
    index_location = config['index_dir']
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location, sa=get_session())\
 
                         .run(full_index=full_index)
 

	
 

	
 
@task(ignore_result=True)
 
@locked_task
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
 
    try:
 
        log = get_commits_stats.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    from rhodecode.model.db import Statistics, Repository
 
    lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
 
                            ts_max_y)
 
    log.info('running task with lockkey %s', lockkey)
 
    try:
 
        lock = DaemonLock(lockkey)
 

	
 
    #for js data compatibilty
 
    akc = lambda k: person(k).replace('"', "")
 
        #for js data compatibilty cleans the key for person from '
 
        akc = lambda k: person(k).replace('"', "")
 

	
 
    co_day_auth_aggr = {}
 
    commits_by_day_aggregate = {}
 
    repos_path = get_repos_path()
 
    p = os.path.join(repos_path, repo_name)
 
    repo = get_repo(p)
 
        co_day_auth_aggr = {}
 
        commits_by_day_aggregate = {}
 
        repos_path = get_repos_path()
 
        p = os.path.join(repos_path, repo_name)
 
        repo = get_repo(p)
 
        repo_size = len(repo.revisions)
 
        #return if repo have no revisions
 
        if repo_size < 1:
 
            lock.release()
 
            return True
 

	
 
    skip_date_limit = True
 
    parse_limit = int(config['app_conf'].get('commit_parse_limit'))
 
    last_rev = 0
 
    last_cs = None
 
    timegetter = itemgetter('time')
 
        skip_date_limit = True
 
        parse_limit = int(config['app_conf'].get('commit_parse_limit'))
 
        last_rev = 0
 
        last_cs = None
 
        timegetter = itemgetter('time')
 

	
 
    sa = get_session()
 
        sa = get_session()
 

	
 
    dbrepo = sa.query(Repository)\
 
        .filter(Repository.repo_name == repo_name).scalar()
 
    cur_stats = sa.query(Statistics)\
 
        .filter(Statistics.repository == dbrepo).scalar()
 
        dbrepo = sa.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).scalar()
 
        cur_stats = sa.query(Statistics)\
 
            .filter(Statistics.repository == dbrepo).scalar()
 

	
 
    if cur_stats is not None:
 
        last_rev = cur_stats.stat_on_revision
 
        if cur_stats is not None:
 
            last_rev = cur_stats.stat_on_revision
 

	
 
    #return if repo is empty
 
    if not repo.revisions:
 
        return True
 
        if last_rev == repo.get_changeset().revision and repo_size > 1:
 
            #pass silently without any work if we're not on first revision or
 
            #current state of parsing revision(from db marker) is the
 
            #last revision
 
            lock.release()
 
            return True
 

	
 
    if last_rev == repo.get_changeset().revision and len(repo.revisions) > 1:
 
        #pass silently without any work if we're not on first revision or
 
        #current state of parsing revision(from db marker) is the last revision
 
        return True
 
        if cur_stats:
 
            commits_by_day_aggregate = OrderedDict(json.loads(
 
                                        cur_stats.commit_activity_combined))
 
            co_day_auth_aggr = json.loads(cur_stats.commit_activity)
 

	
 
    if cur_stats:
 
        commits_by_day_aggregate = OrderedDict(
 
                                       json.loads(
 
                                        cur_stats.commit_activity_combined))
 
        co_day_auth_aggr = json.loads(cur_stats.commit_activity)
 
        log.debug('starting parsing %s', parse_limit)
 
        lmktime = mktime
 

	
 
        last_rev = last_rev + 1 if last_rev > 0 else last_rev
 

	
 
    log.debug('starting parsing %s', parse_limit)
 
    lmktime = mktime
 

	
 
    last_rev = last_rev + 1 if last_rev > 0 else last_rev
 
        for cs in repo[last_rev:last_rev + parse_limit]:
 
            last_cs = cs  # remember last parsed changeset
 
            k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
 
                          cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
 

	
 
    for cs in repo[last_rev:last_rev + parse_limit]:
 
        last_cs = cs  # remember last parsed changeset
 
        k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
 
                      cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
 
            if akc(cs.author) in co_day_auth_aggr:
 
                try:
 
                    l = [timegetter(x) for x in
 
                         co_day_auth_aggr[akc(cs.author)]['data']]
 
                    time_pos = l.index(k)
 
                except ValueError:
 
                    time_pos = False
 

	
 
                if time_pos >= 0 and time_pos is not False:
 

	
 
                    datadict = \
 
                        co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
 

	
 
        if akc(cs.author) in co_day_auth_aggr:
 
            try:
 
                l = [timegetter(x) for x in
 
                     co_day_auth_aggr[akc(cs.author)]['data']]
 
                time_pos = l.index(k)
 
            except ValueError:
 
                time_pos = False
 
                    datadict["commits"] += 1
 
                    datadict["added"] += len(cs.added)
 
                    datadict["changed"] += len(cs.changed)
 
                    datadict["removed"] += len(cs.removed)
 

	
 
                else:
 
                    if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
 

	
 
            if time_pos >= 0 and time_pos is not False:
 

	
 
                datadict = co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
 

	
 
                datadict["commits"] += 1
 
                datadict["added"] += len(cs.added)
 
                datadict["changed"] += len(cs.changed)
 
                datadict["removed"] += len(cs.removed)
 
                        datadict = {"time": k,
 
                                    "commits": 1,
 
                                    "added": len(cs.added),
 
                                    "changed": len(cs.changed),
 
                                    "removed": len(cs.removed),
 
                                   }
 
                        co_day_auth_aggr[akc(cs.author)]['data']\
 
                            .append(datadict)
 

	
 
            else:
 
                if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
 

	
 
                    datadict = {"time": k,
 
                                "commits": 1,
 
                                "added": len(cs.added),
 
                                "changed": len(cs.changed),
 
                                "removed": len(cs.removed),
 
                               }
 
                    co_day_auth_aggr[akc(cs.author)]['data']\
 
                        .append(datadict)
 
                    co_day_auth_aggr[akc(cs.author)] = {
 
                                        "label": akc(cs.author),
 
                                        "data": [{"time":k,
 
                                                 "commits":1,
 
                                                 "added":len(cs.added),
 
                                                 "changed":len(cs.changed),
 
                                                 "removed":len(cs.removed),
 
                                                 }],
 
                                        "schema": ["commits"],
 
                                        }
 

	
 
        else:
 
            if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
 
                co_day_auth_aggr[akc(cs.author)] = {
 
                                    "label": akc(cs.author),
 
                                    "data": [{"time":k,
 
                                             "commits":1,
 
                                             "added":len(cs.added),
 
                                             "changed":len(cs.changed),
 
                                             "removed":len(cs.removed),
 
                                             }],
 
                                    "schema": ["commits"],
 
                                    }
 
            #gather all data by day
 
            if k in commits_by_day_aggregate:
 
                commits_by_day_aggregate[k] += 1
 
            else:
 
                commits_by_day_aggregate[k] = 1
 

	
 
        #gather all data by day
 
        if k in commits_by_day_aggregate:
 
            commits_by_day_aggregate[k] += 1
 
        else:
 
            commits_by_day_aggregate[k] = 1
 
        overview_data = sorted(commits_by_day_aggregate.items(),
 
                               key=itemgetter(0))
 

	
 
        if not co_day_auth_aggr:
 
            co_day_auth_aggr[akc(repo.contact)] = {
 
                "label": akc(repo.contact),
 
                "data": [0, 1],
 
                "schema": ["commits"],
 
            }
 

	
 
    overview_data = sorted(commits_by_day_aggregate.items(), key=itemgetter(0))
 
    if not co_day_auth_aggr:
 
        co_day_auth_aggr[akc(repo.contact)] = {
 
            "label": akc(repo.contact),
 
            "data": [0, 1],
 
            "schema": ["commits"],
 
        }
 
        stats = cur_stats if cur_stats else Statistics()
 
        stats.commit_activity = json.dumps(co_day_auth_aggr)
 
        stats.commit_activity_combined = json.dumps(overview_data)
 

	
 
    stats = cur_stats if cur_stats else Statistics()
 
    stats.commit_activity = json.dumps(co_day_auth_aggr)
 
    stats.commit_activity_combined = json.dumps(overview_data)
 
        log.debug('last revison %s', last_rev)
 
        leftovers = len(repo.revisions[last_rev:])
 
        log.debug('revisions to parse %s', leftovers)
 

	
 
    log.debug('last revison %s', last_rev)
 
    leftovers = len(repo.revisions[last_rev:])
 
    log.debug('revisions to parse %s', leftovers)
 
        if last_rev == 0 or leftovers < parse_limit:
 
            log.debug('getting code trending stats')
 
            stats.languages = json.dumps(__get_codes_stats(repo_name))
 

	
 
    if last_rev == 0 or leftovers < parse_limit:
 
        log.debug('getting code trending stats')
 
        stats.languages = json.dumps(__get_codes_stats(repo_name))
 
        try:
 
            stats.repository = dbrepo
 
            stats.stat_on_revision = last_cs.revision if last_cs else 0
 
            sa.add(stats)
 
            sa.commit()
 
        except:
 
            log.error(traceback.format_exc())
 
            sa.rollback()
 
            lock.release()
 
            return False
 

	
 
    try:
 
        stats.repository = dbrepo
 
        stats.stat_on_revision = last_cs.revision if last_cs else 0
 
        sa.add(stats)
 
        sa.commit()
 
    except:
 
        log.error(traceback.format_exc())
 
        sa.rollback()
 
        return False
 
    if len(repo.revisions) > 1:
 
        run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
 
        #final release
 
        lock.release()
 

	
 
    return True
 
        #execute another task if celery is enabled
 
        if len(repo.revisions) > 1 and CELERY_ON:
 
            run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
 
        return True
 
    except LockHeld:
 
        log.info('LockHeld')
 
        return 'Task with key %s already running' % lockkey
 

	
 

	
 
@task(ignore_result=True)
 
def reset_user_password(user_email):
 
    try:
 
        log = reset_user_password.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    from rhodecode.lib import auth
 
    from rhodecode.model.db import User
 

	
 
@@ -304,25 +321,24 @@ def reset_user_password(user_email):
 
    except:
 
        log.error('Failed to update user password')
 
        log.error(traceback.format_exc())
 

	
 
    return True
 

	
 

	
 
@task(ignore_result=True)
 
def send_email(recipients, subject, body):
 
    """
 
    Sends an email with defined parameters from the .ini files.
 

	
 

	
 
    :param recipients: list of recipients, it this is empty the defined email
 
        address from field 'email_to' is used instead
 
    :param subject: subject of the mail
 
    :param body: body of the mail
 
    """
 
    try:
 
        log = send_email.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    email_config = config
 

	
 
@@ -342,32 +358,32 @@ def send_email(recipients, subject, body
 
        m = SmtpMailer(mail_from, user, passwd, mail_server,
 
                       mail_port, ssl, tls, debug=debug)
 
        m.send(recipients, subject, body)
 
    except:
 
        log.error('Mail sending failed')
 
        log.error(traceback.format_exc())
 
        return False
 
    return True
 

	
 

	
 
@task(ignore_result=True)
 
def create_repo_fork(form_data, cur_user):
 
    from rhodecode.model.repo import RepoModel
 
    from vcs import get_backend
 

	
 
    try:
 
        log = create_repo_fork.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    from rhodecode.model.repo import RepoModel
 
    from vcs import get_backend
 

	
 
    repo_model = RepoModel(get_session())
 
    repo_model.create(form_data, cur_user, just_db=True, fork=True)
 
    repo_name = form_data['repo_name']
 
    repos_path = get_repos_path()
 
    repo_path = os.path.join(repos_path, repo_name)
 
    repo_fork_path = os.path.join(repos_path, form_data['fork_name'])
 
    alias = form_data['repo_type']
 

	
 
    log.info('creating repo fork %s as %s', repo_name, repo_path)
 
    backend = get_backend(alias)
 
    backend(str(repo_fork_path), create=True, src_url=str(repo_path))
 

	
0 comments (0 inline, 0 general)