Changeset - 7afbc45aab28
[Not reviewed]
celery
0 2 1
Marcin Kuzminski - 15 years ago 2010-09-17 21:35:46
marcin@python-works.com
added caching queries to hg-app
3 files changed with 319 insertions and 14 deletions:
0 comments (0 inline, 0 general)
pylons_app/model/caching_query.py
Show inline comments
 
new file 100644
 
"""caching_query.py
 

	
 
Represent persistence structures which allow the usage of
 
Beaker caching with SQLAlchemy.
 

	
 
The three new concepts introduced here are:
 

	
 
 * CachingQuery - a Query subclass that caches and
 
   retrieves results in/from Beaker.
 
 * FromCache - a query option that establishes caching
 
   parameters on a Query
 
 * RelationshipCache - a variant of FromCache which is specific
 
   to a query invoked during a lazy load.
 
 * _params_from_query - extracts value parameters from 
 
   a Query.
 

	
 
The rest of what's here are standard SQLAlchemy and
 
Beaker constructs.
 
   
 
"""
 
from sqlalchemy.orm.interfaces import MapperOption
 
from sqlalchemy.orm.query import Query
 
from sqlalchemy.sql import visitors
 

	
 
class CachingQuery(Query):
 
    """A Query subclass which optionally loads full results from a Beaker 
 
    cache region.
 
    
 
    The CachingQuery stores additional state that allows it to consult
 
    a Beaker cache before accessing the database:
 
    
 
    * A "region", which is a cache region argument passed to a 
 
      Beaker CacheManager, specifies a particular cache configuration
 
      (including backend implementation, expiration times, etc.)
 
    * A "namespace", which is a qualifying name that identifies a
 
      group of keys within the cache.  A query that filters on a name 
 
      might use the name "by_name", a query that filters on a date range 
 
      to a joined table might use the name "related_date_range".
 
      
 
    When the above state is present, a Beaker cache is retrieved.
 
    
 
    The "namespace" name is first concatenated with 
 
    a string composed of the individual entities and columns the Query 
 
    requests, i.e. such as ``Query(User.id, User.name)``.
 
    
 
    The Beaker cache is then loaded from the cache manager based
 
    on the region and composed namespace.  The key within the cache
 
    itself is then constructed against the bind parameters specified
 
    by this query, which are usually literals defined in the 
 
    WHERE clause.
 

	
 
    The FromCache and RelationshipCache mapper options below represent
 
    the "public" method of configuring this state upon the CachingQuery.
 
    
 
    """
 
    
 
    def __init__(self, manager, *args, **kw):
 
        self.cache_manager = manager
 
        Query.__init__(self, *args, **kw)
 
        
 
    def __iter__(self):
 
        """override __iter__ to pull results from Beaker
 
           if particular attributes have been configured.
 
           
 
           Note that this approach does *not* detach the loaded objects from
 
           the current session. If the cache backend is an in-process cache
 
           (like "memory") and lives beyond the scope of the current session's
 
           transaction, those objects may be expired. The method here can be
 
           modified to first expunge() each loaded item from the current
 
           session before returning the list of items, so that the items
 
           in the cache are not the same ones in the current Session.
 
           
 
        """
 
        if hasattr(self, '_cache_parameters'):
 
            return self.get_value(createfunc=lambda: list(Query.__iter__(self)))
 
        else:
 
            return Query.__iter__(self)
 

	
 
    def invalidate(self):
 
        """Invalidate the value represented by this Query."""
 

	
 
        cache, cache_key = _get_cache_parameters(self)
 
        cache.remove(cache_key)
 

	
 
    def get_value(self, merge=True, createfunc=None):
 
        """Return the value from the cache for this query.
 

	
 
        Raise KeyError if no value present and no
 
        createfunc specified.
 

	
 
        """
 
        cache, cache_key = _get_cache_parameters(self)
 
        ret = cache.get_value(cache_key, createfunc=createfunc)
 
        if merge:
 
            ret = self.merge_result(ret, load=False)
 
        return ret
 

	
 
    def set_value(self, value):
 
        """Set the value in the cache for this query."""
 

	
 
        cache, cache_key = _get_cache_parameters(self)
 
        cache.put(cache_key, value)        
 

	
 
def query_callable(manager):
 
    def query(*arg, **kw):
 
        return CachingQuery(manager, *arg, **kw)
 
    return query
 
    
 
def _get_cache_parameters(query):
 
    """For a query with cache_region and cache_namespace configured,
 
    return the correspoinding Cache instance and cache key, based
 
    on this query's current criterion and parameter values.
 

	
 
    """
 
    if not hasattr(query, '_cache_parameters'):
 
        raise ValueError("This Query does not have caching parameters configured.")
 

	
 
    region, namespace, cache_key = query._cache_parameters
 
    
 
    namespace = _namespace_from_query(namespace, query)
 

	
 
    if cache_key is None:
 
        # cache key - the value arguments from this query's parameters.
 
        args = _params_from_query(query)
 
        cache_key = " ".join([str(x) for x in args])
 

	
 
    # get cache
 
    cache = query.cache_manager.get_cache_region(namespace, region)
 

	
 
    # optional - hash the cache_key too for consistent length
 
    # import uuid
 
    # cache_key= str(uuid.uuid5(uuid.NAMESPACE_DNS, cache_key))
 

	
 
    return cache, cache_key
 

	
 
def _namespace_from_query(namespace, query):
 
    # cache namespace - the token handed in by the 
 
    # option + class we're querying against
 
    namespace = " ".join([namespace] + [str(x) for x in query._entities])
 

	
 
    # memcached wants this
 
    namespace = namespace.replace(' ', '_')
 

	
 
    return namespace
 

	
 
def _set_cache_parameters(query, region, namespace, cache_key):
 
    
 
    if hasattr(query, '_cache_parameters'):
 
        region, namespace, cache_key = query._cache_parameters
 
        raise ValueError("This query is already configured "
 
                        "for region %r namespace %r" % 
 
                        (region, namespace)
 
                    )
 
    query._cache_parameters = region, namespace, cache_key
 
    
 
class FromCache(MapperOption):
 
    """Specifies that a Query should load results from a cache."""
 

	
 
    propagate_to_loaders = False
 

	
 
    def __init__(self, region, namespace, cache_key=None):
 
        """Construct a new FromCache.
 
        
 
        :param region: the cache region.  Should be a
 
        region configured in the Beaker CacheManager.
 
        
 
        :param namespace: the cache namespace.  Should
 
        be a name uniquely describing the target Query's
 
        lexical structure.
 
        
 
        :param cache_key: optional.  A string cache key 
 
        that will serve as the key to the query.   Use this
 
        if your query has a huge amount of parameters (such
 
        as when using in_()) which correspond more simply to 
 
        some other identifier.
 

	
 
        """
 
        self.region = region
 
        self.namespace = namespace
 
        self.cache_key = cache_key
 
    
 
    def process_query(self, query):
 
        """Process a Query during normal loading operation."""
 
        
 
        _set_cache_parameters(query, self.region, self.namespace, self.cache_key)
 

	
 
class RelationshipCache(MapperOption):
 
    """Specifies that a Query as called within a "lazy load" 
 
       should load results from a cache."""
 

	
 
    propagate_to_loaders = True
 

	
 
    def __init__(self, region, namespace, attribute):
 
        """Construct a new RelationshipCache.
 
        
 
        :param region: the cache region.  Should be a
 
        region configured in the Beaker CacheManager.
 
        
 
        :param namespace: the cache namespace.  Should
 
        be a name uniquely describing the target Query's
 
        lexical structure.
 
        
 
        :param attribute: A Class.attribute which
 
        indicates a particular class relationship() whose
 
        lazy loader should be pulled from the cache.
 
        
 
        """
 
        self.region = region
 
        self.namespace = namespace
 
        self._relationship_options = {
 
            (attribute.property.parent.class_, attribute.property.key) : self
 
        }
 

	
 
    def process_query_conditionally(self, query):
 
        """Process a Query that is used within a lazy loader.
 

	
 
        (the process_query_conditionally() method is a SQLAlchemy
 
        hook invoked only within lazyload.)
 

	
 
        """
 
        if query._current_path:
 
            mapper, key = query._current_path[-2:]
 

	
 
            for cls in mapper.class_.__mro__:
 
                if (cls, key) in self._relationship_options:
 
                    relationship_option = self._relationship_options[(cls, key)]
 
                    _set_cache_parameters(
 
                            query,
 
                            relationship_option.region,
 
                            relationship_option.namespace,
 
                            None)
 

	
 
    def and_(self, option):
 
        """Chain another RelationshipCache option to this one.
 
        
 
        While many RelationshipCache objects can be specified on a single
 
        Query separately, chaining them together allows for a more efficient
 
        lookup during load.
 
        
 
        """
 
        self._relationship_options.update(option._relationship_options)
 
        return self
 

	
 

	
 
def _params_from_query(query):
 
    """Pull the bind parameter values from a query.
 
    
 
    This takes into account any scalar attribute bindparam set up.
 
    
 
    E.g. params_from_query(query.filter(Cls.foo==5).filter(Cls.bar==7)))
 
    would return [5, 7].
 
    
 
    """
 
    v = []
 
    def visit_bindparam(bind):
 
        value = query._params.get(bind.key, bind.value)
 
        
 
        # lazyloader may dig a callable in here, intended
 
        # to late-evaluate params after autoflush is called.
 
        # convert to a scalar value.
 
        if callable(value):
 
            value = value()
 
            
 
        v.append(value)
 
    if query._criterion is not None:
 
        visitors.traverse(query._criterion, {}, {'bindparam':visit_bindparam})
 
    return v
pylons_app/model/hg_model.py
Show inline comments
 
@@ -43,16 +43,14 @@ except ImportError:
 
    raise Exception('Unable to import vcs')
 

	
 
def _get_repos_cached_initial(app_globals, initial):
 
    """
 
    return cached dict with repos
 
    """return cached dict with repos
 
    """
 
    g = app_globals
 
    return HgModel.repo_scan(g.paths[0][0], g.paths[0][1], g.baseui, initial)
 

	
 
@cache_region('long_term', 'cached_repo_list')
 
def _get_repos_cached():
 
    """
 
    return cached dict with repos
 
    """return cached dict with repos
 
    """
 
    log.info('getting all repositories list')
 
    from pylons import app_globals as g
 
@@ -62,7 +60,8 @@ def _get_repos_cached():
 
def _get_repos_switcher_cached(cached_repo_list):
 
    repos_lst = []
 
    for repo in sorted(x for x in cached_repo_list.values()):
 
        if HasRepoPermissionAny('repository.write', 'repository.read', 'repository.admin')(repo.name.lower(), 'main page check'):
 
        if HasRepoPermissionAny('repository.write', 'repository.read',
 
                    'repository.admin')(repo.name.lower(), 'main page check'):
 
            repos_lst.append((repo.name.lower(), repo.dbrepo.private,))
 
    
 
    return repos_lst
 
@@ -73,14 +72,11 @@ def _full_changelog_cached(repo_name):
 
    return list(reversed(list(HgModel().get_repo(repo_name))))
 

	
 
class HgModel(object):
 
    """
 
    Mercurial Model
 
    """Mercurial Model
 
    """
 

	
 
    def __init__(self):
 
        """
 
        Constructor
 
        """
 
        pass
 
    
 
    @staticmethod
 
    def repo_scan(repos_prefix, repos_path, baseui, initial=False):
 
@@ -92,8 +88,7 @@ class HgModel(object):
 
        """
 
        sa = meta.Session()
 
        def check_repo_dir(path):
 
            """
 
            Checks the repository
 
            """Checks the repository
 
            :param path:
 
            """
 
            repos_path = path.split('/')
pylons_app/model/meta.py
Show inline comments
 
"""SQLAlchemy Metadata and Session object"""
 
from sqlalchemy.ext.declarative import declarative_base
 
from sqlalchemy.orm import scoped_session, sessionmaker
 
from pylons_app.model import caching_query
 
from beaker import cache
 
import os
 
from os.path import join as jn, dirname as dn, abspath
 
import time
 

	
 
# Beaker CacheManager.  A home base for cache configurations.
 
cache_manager = cache.CacheManager()
 

	
 
__all__ = ['Base', 'Session']
 
#
 
# SQLAlchemy session manager. Updated by model.init_model()
 
#
 
Session = scoped_session(sessionmaker())
 
#
 
Session = scoped_session(
 
                sessionmaker(
 
                    query_cls=caching_query.query_callable(cache_manager)
 
                )
 
          )
 

	
 
# The declarative Base
 
Base = declarative_base()
 
#For another db...
 
#Base2 = declarative_base()
 

	
 
#===============================================================================
 
# CACHE OPTIONS
 
#===============================================================================
 
cache_dir = jn(dn(dn(dn(abspath(__file__)))), 'data', 'cache')
 
if not os.path.isdir(cache_dir):
 
    os.mkdir(cache_dir)
 
# set start_time to current time
 
# to re-cache everything
 
# upon application startup
 
start_time = time.time()
 
# configure the "sqlalchemy" cache region.
 
cache_manager.regions['sql_cache_short'] = {
 
        'type':'memory',
 
        'data_dir':cache_dir,
 
        'expire':10,
 
        'start_time':start_time
 
    }
 
cache_manager.regions['sql_cache_med'] = {
 
        'type':'memory',
 
        'data_dir':cache_dir,
 
        'expire':360,
 
        'start_time':start_time
 
    }
 
cache_manager.regions['sql_cache_long'] = {
 
        'type':'file',
 
        'data_dir':cache_dir,
 
        'expire':3600,
 
        'start_time':start_time
 
    }
 
#to use cache use this in query
 
#.options(FromCache("sqlalchemy_cache_type", "cachekey"))
0 comments (0 inline, 0 general)