Changeset - b18f89d6d17f
[Not reviewed]
default
0 7 0
Marcin Kuzminski - 15 years ago 2010-05-30 19:49:40
marcin@python-works.com
Adde draft for permissions systems, made all needed decorators, and checks. For future usage in the system.
7 files changed with 126 insertions and 30 deletions:
0 comments (0 inline, 0 general)
pylons_app/config/environment.py
Show inline comments
 
"""Pylons environment configuration"""
 
import logging
 
import os
 

	
 
from mako.lookup import TemplateLookup
 
from pylons.configuration import PylonsConfig
 
from pylons.error import handle_mako_error
 
from pylons_app.config.routing import make_map
 
from pylons_app.lib.auth import set_available_permissions
 
from pylons_app.model import init_model
 
from sqlalchemy import engine_from_config
 

	
 
import logging
 
import os
 
import pylons_app.lib.app_globals as app_globals
 
import pylons_app.lib.helpers
 
from pylons_app.config.routing import make_map
 
from pylons_app.model import init_model
 

	
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
def load_environment(global_conf, app_conf):
 
    """Configure the Pylons environment via the ``pylons.config``
 
    object
 
    """
 
    config = PylonsConfig()
 
    
 
    # Pylons paths
 
    root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 
    paths = dict(root=root,
 
                 controllers=os.path.join(root, 'controllers'),
 
                 static_files=os.path.join(root, 'public'),
 
                 templates=[os.path.join(root, 'templates')])
 

	
 
    # Initialize config with the basic options
 
    config.init_app(global_conf, app_conf, package='pylons_app', paths=paths)
 

	
 
    config['routes.map'] = make_map(config)
 
    config['pylons.app_globals'] = app_globals.Globals(config)
 
    config['pylons.h'] = pylons_app.lib.helpers
 
    
 
    # Setup cache object as early as possible
 
@@ -41,28 +42,29 @@ def load_environment(global_conf, app_co
 

	
 
    # Create the Mako TemplateLookup, with the default auto-escaping
 
    config['pylons.app_globals'].mako_lookup = TemplateLookup(
 
        directories=paths['templates'],
 
        error_handler=handle_mako_error,
 
        module_directory=os.path.join(app_conf['cache_dir'], 'templates'),
 
        input_encoding='utf-8', default_filters=['escape'],
 
        imports=['from webhelpers.html import escape'])
 

	
 
    #sets the c attribute access when don't existing attribute ar accessed
 
    config['pylons.strict_tmpl_context'] = True
 
    
 
    #MULTIPLE DB configs
 
    # Setup the SQLAlchemy database engine
 
    if config['debug']:
 
        #use query time debugging.
 
        from pylons_app.lib.timerproxy import TimerProxy
 
        sa_engine_db1 = engine_from_config(config, 'sqlalchemy.db1.',
 
                                                            proxy=TimerProxy())
 
    else:
 
        sa_engine_db1 = engine_from_config(config, 'sqlalchemy.db1.')
 

	
 
    init_model(sa_engine_db1)
 

	
 
    set_available_permissions(config)
 
    # CONFIGURATION OPTIONS HERE (note: all config options will override
 
    # any Pylons config options)
 
    
 
    return config
pylons_app/controllers/users.py
Show inline comments
 
from formencode import htmlfill
 
from pylons import request, response, session, tmpl_context as c, url, \
 
    app_globals as g
 
from pylons.i18n.translation import _
 
from pylons_app.lib import helpers as h    
 
from pylons.controllers.util import abort, redirect
 
from pylons_app.lib.auth import LoginRequired
 
from pylons_app.lib.auth import LoginRequired, CheckPermissionAll
 
from pylons_app.lib.base import BaseController, render
 
from pylons_app.model.db import User, UserLog
 
from pylons_app.model.forms import UserForm
 
from pylons_app.model.user_model import UserModel
 
import formencode
 
import logging
 

	
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
class UsersController(BaseController):
 
    """REST Controller styled on the Atom Publishing Protocol"""
 
    # To properly map this controller, ensure your config/routing.py
 
    # file has a resource setup:
 
    #     map.resource('user', 'users')
 
    @LoginRequired()
 
    def __before__(self):
 
        c.admin_user = session.get('admin_user')
 
        c.admin_username = session.get('admin_username')
 
        super(UsersController, self).__before__()
 
        
 
    
 

	
 
    def index(self, format='html'):
 
        """GET /users: All items in the collection"""
 
        # url('users')
 
        
 
        c.users_list = self.sa.query(User).all()     
 
        return render('admin/users/users.html')
 
    
 
    def create(self):
 
        """POST /users: Create a new item"""
 
        # url('users')
 
        
 
        user_model = UserModel()
 
        login_form = UserForm()()
 
        try:
 
            form_result = login_form.to_python(dict(request.POST))
 
            user_model.create(form_result)
 
            h.flash(_('created user %s') % form_result['username'], category='success')
 
            return redirect(url('users'))
 
                           
 
        except formencode.Invalid as errors:
 
            c.form_errors = errors.error_dict
 
            return htmlfill.render(
 
                 render('admin/users/user_add.html'),
 
                defaults=errors.value,
pylons_app/lib/app_globals.py
Show inline comments
 
"""The application's Globals object"""
 

	
 
from beaker.cache import CacheManager
 
from beaker.util import parse_cache_config_options
 
from pylons_app.lib.utils import make_ui
 

	
 
class Globals(object):
 

	
 
    """Globals acts as a container for objects available throughout the
 
    life of the application
 

	
 
    """
 

	
 
    def __init__(self, config):
 
        """One instance of Globals is created during application
 
        initialization and is available during requests via the
 
        'app_globals' variable
 

	
 
        """
 
        self.cache = CacheManager(**parse_cache_config_options(config))
 
        self.baseui = make_ui('hgwebdir.config')
 
        self.paths = self.baseui.configitems('paths')
 
        self.base_path = self.paths[0][1].replace('*', '')
 
        self.changeset_annotation_colors = {}
 
        self.available_permissions = None # propagated after init_model
pylons_app/lib/auth.py
Show inline comments
 
from functools import wraps
 
from pylons import session, url
 
from pylons import session, url, app_globals as g
 
from pylons.controllers.util import abort, redirect
 
from pylons_app.model import meta
 
from pylons_app.model.db import User
 
from sqlalchemy.exc import OperationalError
 
from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound
 
import crypt
 
import logging
 
log = logging.getLogger(__name__)
 

	
 
def get_crypt_password(password):
 
    """
 
    Cryptographic function used for password hashing
 
    @param password: password to hash
 
    """
 
    return crypt.crypt(password, '6a')
 

	
 
def authfunc(environ, username, password):
 
    sa = meta.Session
 
    password_crypt = get_crypt_password(password)
 
    try:
 
        user = sa.query(User).filter(User.username == username).one()
 
    except (NoResultFound, MultipleResultsFound, OperationalError) as e:
 
        log.error(e)
 
        user = None
 
        
 
    if user:
 
        if user.active:
 
            if user.username == username and user.password == password_crypt:
 
                log.info('user %s authenticated correctly', username)
 
                return True
 
        else:
 
            log.error('user %s is disabled', username)
 
            
 
    return False
 

	
 
class  AuthUser(object):
 
    """
 
    A simple object that handles a mercurial username for authentication
 
    """
 
    username = 'None'
 
    is_authenticated = False
 
    is_admin = False
 
    permissions = set()
 
    group = set()
 
    
 
    def __init__(self):
 
        pass
 
    
 

	
 

	
 

	
 
def set_available_permissions(config):
 
    """
 
    This function will propagate pylons globals with all available defined
 
    permission given in db. We don't wannt to check each time from db for new 
 
    permissions since adding a new permission also requires application restart
 
    ie. to decorate new views with the newly created permission
 
    @param config:
 
    """
 
    from pylons_app.model.meta import Session
 
    from pylons_app.model.db import Permission
 
    logging.info('getting information about all available permissions')
 
    sa = Session()
 
    all_perms = sa.query(Permission).all()
 
    config['pylons.app_globals'].available_permissions = [x.permission_name for x in all_perms]
 

	
 

	
 
        
 
#===============================================================================
 
# DECORATORS
 
#===============================================================================
 
class LoginRequired(object):
 
    """
 
    Must be logged in to execute this function else redirect to login page
 
    """
 
    def __init__(self):
 
        pass
 
    
 
    def __call__(self, func):
 
        
 
        @wraps(func)
 
        def _wrapper(*fargs, **fkwargs):
 
            user = session.get('hg_app_user', AuthUser())
 
            log.info('Checking login required for user:%s', user.username)            
 
            if user.is_authenticated:
 
                    log.info('user %s is authenticated', user.username)
 
                    func(*fargs)
 
            else:
 
                logging.info('user %s not authenticated', user.username)
 
                logging.info('redirecting to login page')
 
                return redirect(url('login_home'))
 

	
 
        return _wrapper
 

	
 
class PermsDecorator(object):
 
    
 
    def __init__(self, *perms):
 
        available_perms = g.available_permissions
 
        for perm in perms:
 
            if perm not in available_perms:
 
                raise Exception("'%s' permission in not defined" % perm)
 
        self.required_perms = set(perms)
 
        self.user_perms = set([])#propagate this list from somewhere.
 
        
 
    def __call__(self, func):        
 
        @wraps(func)
 
        def _wrapper(*args, **kwargs):
 
            logging.info('checking %s permissions %s for %s',
 
               self.__class__.__name__[-3:], self.required_perms, func.__name__)            
 
            
 
            if self.check_permissions():
 
                logging.info('Permission granted for %s', func.__name__)
 
                return func(*args, **kwargs)
 
            
 
            else:
 
                logging.warning('Permission denied for %s', func.__name__)
 
                #redirect with forbidden ret code
 
                return redirect(url('access_denied'), 403) 
 
        return _wrapper
 
        
 
        
 
    def check_permissions(self):
 
        """
 
        Dummy function for overiding
 
        """
 
        raise Exception('You have to write this function in child class')
 

	
 
class CheckPermissionAll(PermsDecorator):
 
    """
 
    Checks for access permission for all given predicates. All of them have to
 
    be meet in order to fulfill the request
 
    """
 
        
 
    def check_permissions(self):
 
        if self.required_perms.issubset(self.user_perms):
 
            return True
 
        return False
 
            
 

	
 
class CheckPermissionAny(PermsDecorator):
 
    """
 
    Checks for access permission for any of given predicates. In order to 
 
    fulfill the request any of predicates must be meet
 
    """
 
    
 
    def check_permissions(self):
 
        if self.required_perms.intersection(self.user_perms):
 
            return True
 
        return False
 

	
 

	
 

	
pylons_app/lib/db_manage.py
Show inline comments
 
from os.path import dirname as dn, join as jn
 
from pylons_app.lib.auth import get_crypt_password
 
from pylons_app.model import init_model
 
from pylons_app.model.db import User, Permission
 
from pylons_app.model.meta import Session, Base
 
from sqlalchemy.engine import create_engine
 
import logging
 
from os.path import dirname as dn
 
from os.path import join as jn
 
from sqlalchemy.engine import create_engine
 
import os
 
import sys
 
ROOT = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(ROOT)
 

	
 
from pylons_app.model.db import User
 
from pylons_app.model.meta import Session, Base
 

	
 
from pylons_app.lib.auth import get_crypt_password
 
from pylons_app.model import init_model
 

	
 
log = logging.getLogger('db manage')
 
log.setLevel(logging.DEBUG)
 
console_handler = logging.StreamHandler()
 
console_handler.setFormatter(logging.Formatter("%(asctime)s.%(msecs)03d" 
 
                                    " %(levelname)-5.5s [%(name)s] %(message)s"))
 
log.addHandler(console_handler)
 

	
 
class DbManage(object):
 
    def __init__(self, log_sql):
 
        self.dbname = 'hg_app.db'
 
        dburi = 'sqlite:////%s' % jn(ROOT, self.dbname)
 
        engine = create_engine(dburi, echo=log_sql) 
 
        init_model(engine)
 
        self.sa = Session()
 
    
 
    def check_for_db(self, override):
 
        log.info('checking for exisiting db')
 
        if os.path.isfile(jn(ROOT, self.dbname)):
 
            log.info('database exisist')
 
            if not override:
 
                raise Exception('database already exists')
 

	
 
    def create_tables(self, override=False):
 
@@ -47,30 +46,49 @@ class DbManage(object):
 
        log.info('Created tables for %s', self.dbname)
 
    
 
    def admin_prompt(self):
 
        import getpass
 
        username = raw_input('Specify admin username:')
 
        password = getpass.getpass('Specify admin password:')
 
        self.create_user(username, password, True)
 
        
 
    def create_user(self, username, password, admin=False):
 
        log.info('creating administrator user %s', username)
 
        
 
        new_user = User()
 
        new_user.username = username
 
        new_user.password = get_crypt_password(password)
 
        new_user.admin = admin
 
        new_user.active = True
 
        
 
        try:
 
            self.sa.add(new_user)
 
            self.sa.commit()
 
        except:
 
            self.sa.rollback()
 
            raise
 
    
 
    def create_permissions(self):
 
        perms = [('can_view_admin_users', 'Access to admin user view'),
 
                 
 
                 ]
 
        
 
        for p in perms:
 
            new_perm = Permission()
 
            new_perm.permission_name = p[0]
 
            new_perm.permission_longname = p[1]
 
            try:
 
                self.sa.add(new_perm)
 
                self.sa.commit()
 
            except:
 
                self.sa.rollback()
 
                raise
 
        
 
        
 
        
 
if __name__ == '__main__':
 
    dbmanage = DbManage(log_sql=True)
 
    dbmanage.create_tables(override=True)
 
    dbmanage.admin_prompt()  
 
    dbmanage.admin_prompt()
 
    dbmanage.create_permissions()  
 

	
 

	
pylons_app/model/db.py
Show inline comments
 
@@ -19,27 +19,28 @@ class User(Base):
 
    
 
    def __repr__(self):
 
        return "<User('%s:%s')>" % (self.user_id, self.username)
 
      
 
class UserLog(Base): 
 
    __tablename__ = 'user_logs'
 
    __table_args__ = {'useexisting':True}
 
    user_log_id = Column("user_log_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=1)
 
    user_id = Column("user_id", INTEGER(), ForeignKey(u'users.user_id'), nullable=True, unique=None, default=None)
 
    repository = Column("repository", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    action = Column("action", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    action_date = Column("action_date", DATETIME(timezone=False), nullable=True, unique=None, default=None)
 
    
 
    user = relation('User')
 

	
 
class Repository(Base):
 
    __tablename__ = 'repositories'
 
    repo_id = Column("repo_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=1)
 

	
 
class Permission(Base):
 
    __tablename__ = 'permissions'
 
    __table_args__ = {'useexisting':True}
 
    permission_id = Column("id", INTEGER(), nullable=False, unique=True, default=None, primary_key=1)
 
    permission_name = Column("permission_name", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 

	
 
    permission_longname = Column("permission_longname", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    
 
    def __repr__(self):
 
        return "<Permission('%s:%s')>" % (self.permission_id, self.permission_name)
pylons_app/templates/admin/admin.html
Show inline comments
 
## -*- coding: utf-8 -*-
 
<%inherit file="/base/base.html"/>
 

	
 
<%def name="title()">
 
    ${_('Administration')}
 
</%def>
 
<%def name="breadcrumbs()">
 
	${h.link_to(u'Admin',h.url('admin_home'))}
 
</%def>
 
<%def name="page_nav()">
 
	${self.menu('admin')}
 
	${self.submenu('')}
 
</%def>
 
<%def name="main()">
 
    %if c.admin_user:
 
	    <div>
 
	        <h2>Welcome ${c.admin_username}</h2>
 
			    <div id="user_log">
 
					${c.log_data}
 
				</div>
 
	    </div>
 
    %else:
 
		<h2>${_('Sorry only admin users can acces this area')}</h2>
 
    %endif
 
    
 
    <div>
 
        <h2>Welcome ${c.admin_username}</h2>
 
		    <div id="user_log">
 
				${c.log_data}
 
			</div>
 
    </div>
 
</%def>
 
\ No newline at end of file
0 comments (0 inline, 0 general)