Changeset - 0c43c6671815
[Not reviewed]
beta
0 2 0
Marcin Kuzminski - 15 years ago 2011-04-17 14:45:16
marcin@python-works.com
moved locking of commit stats into the task itself to remove race conditions when lock was not removed before starting another task.
2 files changed with 147 insertions and 122 deletions:
0 comments (0 inline, 0 general)
rhodecode/lib/celerylib/__init__.py
Show inline comments
 
@@ -48,6 +48,7 @@ try:
 
except KeyError:
 
    CELERY_ON = False
 

	
 

	
 
class ResultWrapper(object):
 
    def __init__(self, task):
 
        self.task = task
 
@@ -56,12 +57,14 @@ class ResultWrapper(object):
 
    def result(self):
 
        return self.task
 

	
 

	
 
def run_task(task, *args, **kwargs):
 
    if CELERY_ON:
 
        try:
 
            t = task.apply_async(args=args, kwargs=kwargs)
 
            log.info('running task %s:%s', t.task_id, task)
 
            return t
 

	
 
        except socket.error, e:
 
            if  e.errno == 111:
 
                log.debug('Unable to connect to celeryd. Sync execution')
 
@@ -76,14 +79,20 @@ def run_task(task, *args, **kwargs):
 
    return ResultWrapper(task(*args, **kwargs))
 

	
 

	
 
def __get_lockkey(func, *fargs, **fkwargs):
 
    params = list(fargs)
 
    params.extend(['%s-%s' % ar for ar in fkwargs.items()])
 

	
 
    func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
 

	
 
    lockkey = 'task_%s' % \
 
        md5(func_name + '-' + '-'.join(map(str, params))).hexdigest()
 
    return lockkey
 

	
 

	
 
def locked_task(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        params = list(fargs)
 
        params.extend(['%s-%s' % ar for ar in fkwargs.items()])
 

	
 
        lockkey = 'task_%s' % \
 
            md5(str(func.__name__) + '-' + \
 
                '-'.join(map(str, params))).hexdigest()
 
        lockkey = __get_lockkey(func, *fargs, **fkwargs)
 
        log.info('running task with lockkey %s', lockkey)
 
        try:
 
            l = DaemonLock(lockkey)
rhodecode/lib/celerylib/tasks.py
Show inline comments
 
@@ -37,13 +37,14 @@ from string import lower
 
from pylons import config
 
from pylons.i18n.translation import _
 

	
 
from rhodecode.lib.celerylib import run_task, locked_task, str2bool
 
from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
 
    __get_lockkey, LockHeld, DaemonLock
 
from rhodecode.lib.helpers import person
 
from rhodecode.lib.smtp_mailer import SmtpMailer
 
from rhodecode.lib.utils import OrderedDict, add_cache
 
from rhodecode.model import init_model
 
from rhodecode.model import meta
 
from rhodecode.model.db import RhodeCodeUi
 
from rhodecode.model.db import RhodeCodeUi, Statistics, Repository
 

	
 
from vcs.backends import get_repo
 

	
 
@@ -125,146 +126,162 @@ def whoosh_index(repo_location, full_ind
 

	
 

	
 
@task(ignore_result=True)
 
@locked_task
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
 
    try:
 
        log = get_commits_stats.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    from rhodecode.model.db import Statistics, Repository
 
    lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
 
                            ts_max_y)
 
    log.info('running task with lockkey %s', lockkey)
 
    try:
 
        lock = DaemonLock(lockkey)
 

	
 
    #for js data compatibilty
 
    akc = lambda k: person(k).replace('"', "")
 
        #for js data compatibilty cleans the key for person from '
 
        akc = lambda k: person(k).replace('"', "")
 

	
 
    co_day_auth_aggr = {}
 
    commits_by_day_aggregate = {}
 
    repos_path = get_repos_path()
 
    p = os.path.join(repos_path, repo_name)
 
    repo = get_repo(p)
 
        co_day_auth_aggr = {}
 
        commits_by_day_aggregate = {}
 
        repos_path = get_repos_path()
 
        p = os.path.join(repos_path, repo_name)
 
        repo = get_repo(p)
 
        repo_size = len(repo.revisions)
 
        #return if repo have no revisions
 
        if repo_size < 1:
 
            lock.release()
 
            return True
 

	
 
    skip_date_limit = True
 
    parse_limit = int(config['app_conf'].get('commit_parse_limit'))
 
    last_rev = 0
 
    last_cs = None
 
    timegetter = itemgetter('time')
 
        skip_date_limit = True
 
        parse_limit = int(config['app_conf'].get('commit_parse_limit'))
 
        last_rev = 0
 
        last_cs = None
 
        timegetter = itemgetter('time')
 

	
 
    sa = get_session()
 
        sa = get_session()
 

	
 
    dbrepo = sa.query(Repository)\
 
        .filter(Repository.repo_name == repo_name).scalar()
 
    cur_stats = sa.query(Statistics)\
 
        .filter(Statistics.repository == dbrepo).scalar()
 
        dbrepo = sa.query(Repository)\
 
            .filter(Repository.repo_name == repo_name).scalar()
 
        cur_stats = sa.query(Statistics)\
 
            .filter(Statistics.repository == dbrepo).scalar()
 

	
 
    if cur_stats is not None:
 
        last_rev = cur_stats.stat_on_revision
 
        if cur_stats is not None:
 
            last_rev = cur_stats.stat_on_revision
 

	
 
    #return if repo is empty
 
    if not repo.revisions:
 
        return True
 
        if last_rev == repo.get_changeset().revision and repo_size > 1:
 
            #pass silently without any work if we're not on first revision or
 
            #current state of parsing revision(from db marker) is the
 
            #last revision
 
            lock.release()
 
            return True
 

	
 
    if last_rev == repo.get_changeset().revision and len(repo.revisions) > 1:
 
        #pass silently without any work if we're not on first revision or
 
        #current state of parsing revision(from db marker) is the last revision
 
        return True
 
        if cur_stats:
 
            commits_by_day_aggregate = OrderedDict(json.loads(
 
                                        cur_stats.commit_activity_combined))
 
            co_day_auth_aggr = json.loads(cur_stats.commit_activity)
 

	
 
    if cur_stats:
 
        commits_by_day_aggregate = OrderedDict(
 
                                       json.loads(
 
                                        cur_stats.commit_activity_combined))
 
        co_day_auth_aggr = json.loads(cur_stats.commit_activity)
 
        log.debug('starting parsing %s', parse_limit)
 
        lmktime = mktime
 

	
 
        last_rev = last_rev + 1 if last_rev > 0 else last_rev
 

	
 
    log.debug('starting parsing %s', parse_limit)
 
    lmktime = mktime
 

	
 
    last_rev = last_rev + 1 if last_rev > 0 else last_rev
 
        for cs in repo[last_rev:last_rev + parse_limit]:
 
            last_cs = cs  # remember last parsed changeset
 
            k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
 
                          cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
 

	
 
    for cs in repo[last_rev:last_rev + parse_limit]:
 
        last_cs = cs  # remember last parsed changeset
 
        k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1],
 
                      cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0])
 
            if akc(cs.author) in co_day_auth_aggr:
 
                try:
 
                    l = [timegetter(x) for x in
 
                         co_day_auth_aggr[akc(cs.author)]['data']]
 
                    time_pos = l.index(k)
 
                except ValueError:
 
                    time_pos = False
 

	
 
                if time_pos >= 0 and time_pos is not False:
 

	
 
                    datadict = \
 
                        co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
 

	
 
        if akc(cs.author) in co_day_auth_aggr:
 
            try:
 
                l = [timegetter(x) for x in
 
                     co_day_auth_aggr[akc(cs.author)]['data']]
 
                time_pos = l.index(k)
 
            except ValueError:
 
                time_pos = False
 
                    datadict["commits"] += 1
 
                    datadict["added"] += len(cs.added)
 
                    datadict["changed"] += len(cs.changed)
 
                    datadict["removed"] += len(cs.removed)
 

	
 
                else:
 
                    if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
 

	
 
            if time_pos >= 0 and time_pos is not False:
 

	
 
                datadict = co_day_auth_aggr[akc(cs.author)]['data'][time_pos]
 

	
 
                datadict["commits"] += 1
 
                datadict["added"] += len(cs.added)
 
                datadict["changed"] += len(cs.changed)
 
                datadict["removed"] += len(cs.removed)
 
                        datadict = {"time": k,
 
                                    "commits": 1,
 
                                    "added": len(cs.added),
 
                                    "changed": len(cs.changed),
 
                                    "removed": len(cs.removed),
 
                                   }
 
                        co_day_auth_aggr[akc(cs.author)]['data']\
 
                            .append(datadict)
 

	
 
            else:
 
                if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
 

	
 
                    datadict = {"time": k,
 
                                "commits": 1,
 
                                "added": len(cs.added),
 
                                "changed": len(cs.changed),
 
                                "removed": len(cs.removed),
 
                               }
 
                    co_day_auth_aggr[akc(cs.author)]['data']\
 
                        .append(datadict)
 
                    co_day_auth_aggr[akc(cs.author)] = {
 
                                        "label": akc(cs.author),
 
                                        "data": [{"time":k,
 
                                                 "commits":1,
 
                                                 "added":len(cs.added),
 
                                                 "changed":len(cs.changed),
 
                                                 "removed":len(cs.removed),
 
                                                 }],
 
                                        "schema": ["commits"],
 
                                        }
 

	
 
        else:
 
            if k >= ts_min_y and k <= ts_max_y or skip_date_limit:
 
                co_day_auth_aggr[akc(cs.author)] = {
 
                                    "label": akc(cs.author),
 
                                    "data": [{"time":k,
 
                                             "commits":1,
 
                                             "added":len(cs.added),
 
                                             "changed":len(cs.changed),
 
                                             "removed":len(cs.removed),
 
                                             }],
 
                                    "schema": ["commits"],
 
                                    }
 
            #gather all data by day
 
            if k in commits_by_day_aggregate:
 
                commits_by_day_aggregate[k] += 1
 
            else:
 
                commits_by_day_aggregate[k] = 1
 

	
 
        #gather all data by day
 
        if k in commits_by_day_aggregate:
 
            commits_by_day_aggregate[k] += 1
 
        else:
 
            commits_by_day_aggregate[k] = 1
 
        overview_data = sorted(commits_by_day_aggregate.items(),
 
                               key=itemgetter(0))
 

	
 
        if not co_day_auth_aggr:
 
            co_day_auth_aggr[akc(repo.contact)] = {
 
                "label": akc(repo.contact),
 
                "data": [0, 1],
 
                "schema": ["commits"],
 
            }
 

	
 
    overview_data = sorted(commits_by_day_aggregate.items(), key=itemgetter(0))
 
    if not co_day_auth_aggr:
 
        co_day_auth_aggr[akc(repo.contact)] = {
 
            "label": akc(repo.contact),
 
            "data": [0, 1],
 
            "schema": ["commits"],
 
        }
 
        stats = cur_stats if cur_stats else Statistics()
 
        stats.commit_activity = json.dumps(co_day_auth_aggr)
 
        stats.commit_activity_combined = json.dumps(overview_data)
 

	
 
    stats = cur_stats if cur_stats else Statistics()
 
    stats.commit_activity = json.dumps(co_day_auth_aggr)
 
    stats.commit_activity_combined = json.dumps(overview_data)
 
        log.debug('last revison %s', last_rev)
 
        leftovers = len(repo.revisions[last_rev:])
 
        log.debug('revisions to parse %s', leftovers)
 

	
 
    log.debug('last revison %s', last_rev)
 
    leftovers = len(repo.revisions[last_rev:])
 
    log.debug('revisions to parse %s', leftovers)
 
        if last_rev == 0 or leftovers < parse_limit:
 
            log.debug('getting code trending stats')
 
            stats.languages = json.dumps(__get_codes_stats(repo_name))
 

	
 
    if last_rev == 0 or leftovers < parse_limit:
 
        log.debug('getting code trending stats')
 
        stats.languages = json.dumps(__get_codes_stats(repo_name))
 
        try:
 
            stats.repository = dbrepo
 
            stats.stat_on_revision = last_cs.revision if last_cs else 0
 
            sa.add(stats)
 
            sa.commit()
 
        except:
 
            log.error(traceback.format_exc())
 
            sa.rollback()
 
            lock.release()
 
            return False
 

	
 
    try:
 
        stats.repository = dbrepo
 
        stats.stat_on_revision = last_cs.revision if last_cs else 0
 
        sa.add(stats)
 
        sa.commit()
 
    except:
 
        log.error(traceback.format_exc())
 
        sa.rollback()
 
        return False
 
    if len(repo.revisions) > 1:
 
        run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
 
        #final release
 
        lock.release()
 

	
 
    return True
 
        #execute another task if celery is enabled
 
        if len(repo.revisions) > 1 and CELERY_ON:
 
            run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y)
 
        return True
 
    except LockHeld:
 
        log.info('LockHeld')
 
        return 'Task with key %s already running' % lockkey
 

	
 

	
 
@task(ignore_result=True)
 
@@ -313,7 +330,6 @@ def send_email(recipients, subject, body
 
    """
 
    Sends an email with defined parameters from the .ini files.
 

	
 

	
 
    :param recipients: list of recipients, it this is empty the defined email
 
        address from field 'email_to' is used instead
 
    :param subject: subject of the mail
 
@@ -351,14 +367,14 @@ def send_email(recipients, subject, body
 

	
 
@task(ignore_result=True)
 
def create_repo_fork(form_data, cur_user):
 
    from rhodecode.model.repo import RepoModel
 
    from vcs import get_backend
 

	
 
    try:
 
        log = create_repo_fork.get_logger()
 
    except:
 
        log = logging.getLogger(__name__)
 

	
 
    from rhodecode.model.repo import RepoModel
 
    from vcs import get_backend
 

	
 
    repo_model = RepoModel(get_session())
 
    repo_model.create(form_data, cur_user, just_db=True, fork=True)
 
    repo_name = form_data['repo_name']
0 comments (0 inline, 0 general)