Changeset - aec1b9c9ffe6
[Not reviewed]
default
1 6 0
Mads Kiilerich - 6 years ago 2019-10-20 04:57:04
mads@kiilerich.com
Grafted from: f7042eaa7afd
db: drop Repository CacheInvalidation

The benefit of this functionality is questionable. Especially in bigger setups
with multiple front-end instances all serving the same multitude of
repositories, making the hit rate very low. And the overhead of storing cache
invalidation data *in* the database is non-trivial.

We preserve a small cache in Repository SA records, but should probably just in
general know what we are doing and not ask for the same information multiple
times in each request.
7 files changed with 13 insertions and 261 deletions:
0 comments (0 inline, 0 general)
kallithea/config/routing.py
Show inline comments
 
@@ -554,31 +554,24 @@ def make_map(config):
 
    rmap.connect("edit_repo_advanced", "/{repo_name:.*?}/settings/advanced",
 
                 controller='admin/repos', action="edit_advanced",
 
                 conditions=dict(method=["GET"], function=check_repo))
 

	
 
    rmap.connect("edit_repo_advanced_journal", "/{repo_name:.*?}/settings/advanced/journal",
 
                 controller='admin/repos', action="edit_advanced_journal",
 
                 conditions=dict(method=["POST"], function=check_repo))
 

	
 
    rmap.connect("edit_repo_advanced_fork", "/{repo_name:.*?}/settings/advanced/fork",
 
                 controller='admin/repos', action="edit_advanced_fork",
 
                 conditions=dict(method=["POST"], function=check_repo))
 

	
 
    rmap.connect("edit_repo_caches", "/{repo_name:.*?}/settings/caches",
 
                 controller='admin/repos', action="edit_caches",
 
                 conditions=dict(method=["GET"], function=check_repo))
 
    rmap.connect("update_repo_caches", "/{repo_name:.*?}/settings/caches",
 
                 controller='admin/repos', action="edit_caches",
 
                 conditions=dict(method=["POST"], function=check_repo))
 

	
 
    rmap.connect("edit_repo_remote", "/{repo_name:.*?}/settings/remote",
 
                 controller='admin/repos', action="edit_remote",
 
                 conditions=dict(method=["GET"], function=check_repo))
 
    rmap.connect("edit_repo_remote_update", "/{repo_name:.*?}/settings/remote",
 
                 controller='admin/repos', action="edit_remote",
 
                 conditions=dict(method=["POST"], function=check_repo))
 

	
 
    rmap.connect("edit_repo_statistics", "/{repo_name:.*?}/settings/statistics",
 
                 controller='admin/repos', action="edit_statistics",
 
                 conditions=dict(method=["GET"], function=check_repo))
 
    rmap.connect("edit_repo_statistics_update", "/{repo_name:.*?}/settings/statistics",
 
                 controller='admin/repos', action="edit_statistics",
kallithea/controllers/admin/repos.py
Show inline comments
 
@@ -470,42 +470,24 @@ class ReposController(BaseRepoController
 
                    category='success')
 
        except RepositoryError as e:
 
            log.error(traceback.format_exc())
 
            h.flash(e, category='error')
 
        except Exception as e:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during this operation'),
 
                    category='error')
 

	
 
        raise HTTPFound(location=url('edit_repo_advanced', repo_name=repo_name))
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit_caches(self, repo_name):
 
        c.repo_info = self._load_repo()
 
        c.active = 'caches'
 
        if request.POST:
 
            try:
 
                ScmModel().mark_for_invalidation(repo_name)
 
                Session().commit()
 
                h.flash(_('Cache invalidation successful'),
 
                        category='success')
 
            except Exception as e:
 
                log.error(traceback.format_exc())
 
                h.flash(_('An error occurred during cache invalidation'),
 
                        category='error')
 

	
 
            raise HTTPFound(location=url('edit_repo_caches', repo_name=c.repo_name))
 
        return render('admin/repos/repo_edit.html')
 

	
 
    @HasRepoPermissionLevelDecorator('admin')
 
    def edit_remote(self, repo_name):
 
        c.repo_info = self._load_repo()
 
        c.active = 'remote'
 
        if request.POST:
 
            try:
 
                ScmModel().pull_changes(repo_name, request.authuser.username, request.ip_addr)
 
                h.flash(_('Pulled from remote location'), category='success')
 
            except Exception as e:
 
                log.error(traceback.format_exc())
 
                h.flash(_('An error occurred during pull from remote location'),
 
                        category='error')
 
            raise HTTPFound(location=url('edit_repo_remote', repo_name=c.repo_name))
kallithea/model/db.py
Show inline comments
 
@@ -28,25 +28,24 @@ Original author and date, and relevant c
 
import base64
 
import collections
 
import datetime
 
import functools
 
import hashlib
 
import logging
 
import os
 
import time
 
import traceback
 

	
 
import ipaddr
 
import sqlalchemy
 
from beaker.cache import cache_region, region_invalidate
 
from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, LargeBinary, String, Unicode, UnicodeText, UniqueConstraint
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import class_mapper, joinedload, relationship, validates
 
from tg.i18n import lazy_ugettext as _
 
from webob.exc import HTTPNotFound
 

	
 
import kallithea
 
from kallithea.lib import ext_json
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.utils2 import (Optional, ascii_bytes, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int, safe_str, str2bool,
 
                                  urlreadable)
 
from kallithea.lib.vcs import get_backend
 
@@ -1095,34 +1094,24 @@ class Repository(Base, BaseDbModel):
 
    def repo_full_path(self):
 
        """
 
        Returns base full path for the repository - where it actually
 
        exists on a filesystem.
 
        """
 
        p = [kallithea.CONFIG['base_path']]
 
        # we need to split the name by / since this is how we store the
 
        # names in the database, but that eventually needs to be converted
 
        # into a valid system path
 
        p += self.repo_name.split(URL_SEP)
 
        return os.path.join(*p)
 

	
 
    @property
 
    def cache_keys(self):
 
        """
 
        Returns associated cache keys for that repo
 
        """
 
        return CacheInvalidation.query() \
 
            .filter(CacheInvalidation.cache_args == self.repo_name) \
 
            .order_by(CacheInvalidation.cache_key) \
 
            .all()
 

	
 
    def get_new_name(self, repo_name):
 
        """
 
        returns new full repository name based on assigned group and new new
 

	
 
        :param group_name:
 
        """
 
        path_prefix = self.group.full_path_splitted if self.group else []
 
        return URL_SEP.join(path_prefix + [repo_name])
 

	
 
    @property
 
    def _ui(self):
 
        """
 
@@ -1327,65 +1316,52 @@ class Repository(Base, BaseDbModel):
 

	
 
    def _repo_size(self):
 
        from kallithea.lib import helpers as h
 
        log.debug('calculating repository size...')
 
        return h.format_byte_size(self.scm_instance.size)
 

	
 
    #==========================================================================
 
    # SCM CACHE INSTANCE
 
    #==========================================================================
 

	
 
    def set_invalidate(self):
 
        """
 
        Mark caches of this repo as invalid.
 
        Flush SA session caches of instances of on disk repo.
 
        """
 
        CacheInvalidation.set_invalidate(self.repo_name)
 
        try:
 
            del self._scm_instance
 
        except AttributeError:
 
            pass
 

	
 
    _scm_instance = None
 
    _scm_instance = None  # caching inside lifetime of SA session
 

	
 
    @property
 
    def scm_instance(self):
 
        if self._scm_instance is None:
 
            self._scm_instance = self.scm_instance_cached()
 
            return self.scm_instance_no_cache()  # will populate self._scm_instance
 
        return self._scm_instance
 

	
 
    def scm_instance_cached(self, valid_cache_keys=None):
 
        @cache_region('long_term', 'scm_instance_cached')
 
        def _c(repo_name): # repo_name is just for the cache key
 
            log.debug('Creating new %s scm_instance and populating cache', repo_name)
 
            return self.scm_instance_no_cache()
 
        rn = self.repo_name
 

	
 
        valid = CacheInvalidation.test_and_set_valid(rn, None, valid_cache_keys=valid_cache_keys)
 
        if not valid:
 
            log.debug('Cache for %s invalidated, getting new object', rn)
 
            region_invalidate(_c, None, 'scm_instance_cached', rn)
 
        else:
 
            log.debug('Trying to get scm_instance of %s from cache', rn)
 
        return _c(rn)
 

	
 
    def scm_instance_no_cache(self):
 
        repo_full_path = self.repo_full_path
 
        alias = get_scm(repo_full_path)[0]
 
        log.debug('Creating instance of %s repository from %s',
 
                  alias, self.repo_full_path)
 
        backend = get_backend(alias)
 

	
 
        if alias == 'hg':
 
            repo = backend(repo_full_path, create=False,
 
                           baseui=self._ui)
 
            self._scm_instance = backend(repo_full_path, create=False, baseui=self._ui)
 
        else:
 
            repo = backend(repo_full_path, create=False)
 
            self._scm_instance = backend(repo_full_path, create=False)
 

	
 
        return repo
 
        return self._scm_instance
 

	
 
    def __json__(self):
 
        return dict(
 
            repo_id=self.repo_id,
 
            repo_name=self.repo_name,
 
            landing_rev=self.landing_rev,
 
        )
 

	
 

	
 
class RepoGroup(Base, BaseDbModel):
 
    __tablename__ = 'groups'
 
    __table_args__ = (
 
@@ -1938,148 +1914,24 @@ class UserFollowing(Base, BaseDbModel):
 
    follows_from = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    user = relationship('User', primaryjoin='User.user_id==UserFollowing.user_id')
 

	
 
    follows_user = relationship('User', primaryjoin='User.user_id==UserFollowing.follows_user_id')
 
    follows_repository = relationship('Repository', order_by=lambda: sqlalchemy.func.lower(Repository.repo_name))
 

	
 
    @classmethod
 
    def get_repo_followers(cls, repo_id):
 
        return cls.query().filter(cls.follows_repository_id == repo_id)
 

	
 

	
 
class CacheInvalidation(Base, BaseDbModel):
 
    __tablename__ = 'cache_invalidation'
 
    __table_args__ = (
 
        Index('key_idx', 'cache_key'),
 
        _table_args_default_dict,
 
    )
 

	
 
    # cache_id, not used
 
    cache_id = Column(Integer(), primary_key=True)
 
    # cache_key as created by _get_cache_key
 
    cache_key = Column(Unicode(255), nullable=False, unique=True)
 
    # cache_args is a repo_name
 
    cache_args = Column(Unicode(255), nullable=False)
 
    # instance sets cache_active True when it is caching, other instances set
 
    # cache_active to False to indicate that this cache is invalid
 
    cache_active = Column(Boolean(), nullable=False, default=False)
 

	
 
    def __init__(self, cache_key, repo_name=''):
 
        self.cache_key = cache_key
 
        self.cache_args = repo_name
 
        self.cache_active = False
 

	
 
    def __repr__(self):
 
        return "<%s %s: %s=%s" % (
 
            self.__class__.__name__,
 
            self.cache_id, self.cache_key, self.cache_active)
 

	
 
    def _cache_key_partition(self):
 
        prefix, repo_name, suffix = self.cache_key.partition(self.cache_args)
 
        return prefix, repo_name, suffix
 

	
 
    def get_prefix(self):
 
        """
 
        get prefix that might have been used in _get_cache_key to
 
        generate self.cache_key. Only used for informational purposes
 
        in repo_edit.html.
 
        """
 
        # prefix, repo_name, suffix
 
        return self._cache_key_partition()[0]
 

	
 
    def get_suffix(self):
 
        """
 
        get suffix that might have been used in _get_cache_key to
 
        generate self.cache_key. Only used for informational purposes
 
        in repo_edit.html.
 
        """
 
        # prefix, repo_name, suffix
 
        return self._cache_key_partition()[2]
 

	
 
    @classmethod
 
    def clear_cache(cls):
 
        """
 
        Delete all cache keys from database.
 
        Should only be run when all instances are down and all entries thus stale.
 
        """
 
        cls.query().delete()
 
        Session().commit()
 

	
 
    @classmethod
 
    def _get_cache_key(cls, key):
 
        """
 
        Wrapper for generating a unique cache key for this instance and "key".
 
        key must / will start with a repo_name which will be stored in .cache_args .
 
        """
 
        prefix = kallithea.CONFIG.get('instance_id', '')
 
        return "%s%s" % (prefix, key)
 

	
 
    @classmethod
 
    def set_invalidate(cls, repo_name):
 
        """
 
        Mark all caches of a repo as invalid in the database.
 
        """
 
        inv_objs = Session().query(cls).filter(cls.cache_args == repo_name).all()
 
        log.debug('for repo %s got %s invalidation objects',
 
                  repo_name, inv_objs)
 

	
 
        for inv_obj in inv_objs:
 
            log.debug('marking %s key for invalidation based on repo_name=%s',
 
                      inv_obj, repo_name)
 
            Session().delete(inv_obj)
 
        Session().commit()
 

	
 
    @classmethod
 
    def test_and_set_valid(cls, repo_name, kind, valid_cache_keys=None):
 
        """
 
        Mark this cache key as active and currently cached.
 
        Return True if the existing cache registration still was valid.
 
        Return False to indicate that it had been invalidated and caches should be refreshed.
 
        """
 

	
 
        key = (repo_name + '_' + kind) if kind else repo_name
 
        cache_key = cls._get_cache_key(key)
 

	
 
        if valid_cache_keys and cache_key in valid_cache_keys:
 
            return True
 

	
 
        inv_obj = cls.query().filter(cls.cache_key == cache_key).scalar()
 
        if inv_obj is None:
 
            inv_obj = cls(cache_key, repo_name)
 
            Session().add(inv_obj)
 
        elif inv_obj.cache_active:
 
            return True
 
        inv_obj.cache_active = True
 
        try:
 
            Session().commit()
 
        except sqlalchemy.exc.IntegrityError:
 
            log.error('commit of CacheInvalidation failed - retrying')
 
            Session().rollback()
 
            inv_obj = cls.query().filter(cls.cache_key == cache_key).scalar()
 
            if inv_obj is None:
 
                log.error('failed to create CacheInvalidation entry')
 
                # TODO: fail badly?
 
            # else: TOCTOU - another thread added the key at the same time; no further action required
 
        return False
 

	
 
    @classmethod
 
    def get_valid_cache_keys(cls):
 
        """
 
        Return opaque object with information of which caches still are valid
 
        and can be used without checking for invalidation.
 
        """
 
        return set(inv_obj.cache_key for inv_obj in cls.query().filter(cls.cache_active).all())
 

	
 

	
 
class ChangesetComment(Base, BaseDbModel):
 
    __tablename__ = 'changeset_comments'
 
    __table_args__ = (
 
        Index('cc_revision_idx', 'revision'),
 
        Index('cc_pull_request_id_idx', 'pull_request_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    comment_id = Column(Integer(), primary_key=True)
 
    repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    revision = Column(String(40), nullable=True)
 
    pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=True)
kallithea/templates/admin/repos/repo_edit.html
Show inline comments
 
@@ -24,27 +24,24 @@ ${self.repo_context_bar('options')}
 
          <li class="${'active' if c.active=='settings' else ''}">
 
              <a href="${h.url('edit_repo', repo_name=c.repo_name)}">${_('Settings')}</a>
 
          </li>
 
          <li class="${'active' if c.active=='permissions' else ''}">
 
              <a href="${h.url('edit_repo_perms', repo_name=c.repo_name)}">${_('Permissions')}</a>
 
          </li>
 
          <li class="${'active' if c.active=='advanced' else ''}">
 
              <a href="${h.url('edit_repo_advanced', repo_name=c.repo_name)}">${_('Advanced')}</a>
 
          </li>
 
          <li class="${'active' if c.active=='fields' else ''}">
 
              <a href="${h.url('edit_repo_fields', repo_name=c.repo_name)}">${_('Extra Fields')}</a>
 
          </li>
 
          <li class="${'active' if c.active=='caches' else ''}">
 
              <a href="${h.url('edit_repo_caches', repo_name=c.repo_name)}">${_('Caches')}</a>
 
          </li>
 
          <li class="${'active' if c.active=='remote' else ''}">
 
              <a href="${h.url('edit_repo_remote', repo_name=c.repo_name)}">${_('Remote')}</a>
 
          </li>
 
          <li class="${'active' if c.active=='statistics' else ''}">
 
              <a href="${h.url('edit_repo_statistics', repo_name=c.repo_name)}">${_('Statistics')}</a>
 
          </li>
 
        </ul>
 

	
 
        <div>
 
            <%include file="/admin/repos/repo_edit_${c.active}.html"/>
 
        </div>
 
    </div>
kallithea/templates/admin/repos/repo_edit_caches.html
Show inline comments
 
deleted file
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -348,58 +348,24 @@ class _BaseTestApi(object):
 

	
 
        expected = {'added': [], 'removed': []}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(ScmModel, 'repo_scan', crash)
 
    def test_api_rescann_error(self):
 
        id_, params = _build_data(self.apikey, 'rescan_repos', )
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred during rescan repositories action'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_invalidate_cache(self):
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        repo.scm_instance_cached()  # seed cache
 

	
 
        id_, params = _build_data(self.apikey, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = {
 
            'msg': "Cache for repository `%s` was invalidated" % (self.REPO,),
 
            'repository': self.REPO
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(ScmModel, 'mark_for_invalidation', crash)
 
    def test_api_invalidate_cache_error(self):
 
        id_, params = _build_data(self.apikey, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = 'Error occurred during cache invalidation action'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_invalidate_cache_regular_user_no_permission(self):
 
        repo = RepoModel().get_by_repo_name(self.REPO)
 
        repo.scm_instance_cached() # seed cache
 

	
 
        id_, params = _build_data(self.apikey_regular, 'invalidate_cache',
 
                                  repoid=self.REPO)
 
        response = api_call(self, params)
 

	
 
        expected = "repository `%s` does not exist" % (self.REPO,)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_existing_user(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
 
                                  username=base.TEST_USER_ADMIN_LOGIN,
 
                                  email='test@example.com',
 
                                  password='trololo')
 
        response = api_call(self, params)
 

	
 
        expected = "user `%s` already exist" % base.TEST_USER_ADMIN_LOGIN
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_create_user_with_existing_email(self):
 
        id_, params = _build_data(self.apikey, 'create_user',
kallithea/tests/other/test_vcs_operations.py
Show inline comments
 
@@ -29,25 +29,25 @@ import json
 
import os
 
import re
 
import tempfile
 
import time
 
import urllib.request
 
from subprocess import PIPE, Popen
 
from tempfile import _RandomNameSequence
 

	
 
import pytest
 

	
 
from kallithea import CONFIG
 
from kallithea.lib.utils2 import ascii_bytes, safe_str
 
from kallithea.model.db import CacheInvalidation, Repository, Ui, User, UserIpMap, UserLog
 
from kallithea.model.db import Repository, Ui, User, UserIpMap, UserLog
 
from kallithea.model.meta import Session
 
from kallithea.model.ssh_key import SshKeyModel
 
from kallithea.model.user import UserModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
DEBUG = True
 
HOST = '127.0.0.1:4999'  # test host
 

	
 
fixture = Fixture()
 

	
 
@@ -433,49 +433,38 @@ class TestVCSOperations(base.TestControl
 
            assert 'Already up to date.' in stdout
 
        else:
 
            assert vt.repo_type == 'hg'
 
            assert "no changes found" in stdout
 
        assert "denied" not in stderr
 
        assert "denied" not in stdout
 
        assert "404" not in stdout
 

	
 
    @parametrize_vcs_test
 
    def test_push_invalidates_cache(self, webserver, testfork, vt):
 
        pre_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in Repository.query().filter(Repository.repo_name == testfork[vt.repo_type])]
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == testfork[vt.repo_type]).scalar()
 
        if not key:
 
            key = CacheInvalidation(testfork[vt.repo_type], testfork[vt.repo_type])
 
            Session().add(key)
 

	
 
        key.cache_active = True
 
        Session().commit()
 

	
 
        dest_dir = _get_tmp_dir()
 
        clone_url = vt.repo_url_param(webserver, testfork[vt.repo_type])
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
 

	
 
        stdout, stderr = _add_files_and_push(webserver, vt, dest_dir, files_no=1, clone_url=clone_url)
 

	
 
        Session().commit()  # expire test session to make sure SA fetch new Repository instances after last_changeset has been updated server side hook in other process
 

	
 
        if vt.repo_type == 'git':
 
            _check_proper_git_push(stdout, stderr)
 

	
 
        post_cached_tip = [repo.get_api_data()['last_changeset']['short_id'] for repo in Repository.query().filter(Repository.repo_name == testfork[vt.repo_type])]
 
        assert pre_cached_tip != post_cached_tip
 

	
 
        key = CacheInvalidation.query().filter(CacheInvalidation.cache_key
 
                                               == testfork[vt.repo_type]).all()
 
        assert key == []
 

	
 
    @parametrize_vcs_test_http
 
    def test_push_wrong_credentials(self, webserver, vt):
 
        dest_dir = _get_tmp_dir()
 
        clone_url = vt.repo_url_param(webserver, vt.repo_name)
 
        stdout, stderr = Command(base.TESTS_TMP_PATH).execute(vt.repo_type, 'clone', clone_url, dest_dir)
 

	
 
        clone_url = webserver.repo_url(vt.repo_name, username='bad', password='name')
 
        stdout, stderr = _add_files_and_push(webserver, vt, dest_dir,
 
                                             clone_url=clone_url, ignoreReturnCode=True)
 

	
 
        if vt.repo_type == 'git':
 
            assert 'fatal: Authentication failed' in stderr
0 comments (0 inline, 0 general)