Changeset - 7c978511c951
pylons_app/config/environment.py
Show inline comments
 
@@ -7,68 +7,68 @@ from pylons_app.lib.auth import set_avai
 
from pylons_app.lib.utils import repo2db_mapper, make_ui, set_hg_app_config
 
from pylons_app.model import init_model
 
from pylons_app.model.hg_model import _get_repos_cached_initial
 
from sqlalchemy import engine_from_config
 
import logging
 
import os
 
import pylons_app.lib.app_globals as app_globals
 
import pylons_app.lib.helpers
 

	
 
log = logging.getLogger(__name__)
 

	
 
def load_environment(global_conf, app_conf, initial=False):
 
    """Configure the Pylons environment via the ``pylons.config``
 
    object
 
    """
 
    config = PylonsConfig()
 
    
 
    # Pylons paths
 
    root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 
    paths = dict(root=root,
 
                 controllers=os.path.join(root, 'controllers'),
 
                 static_files=os.path.join(root, 'public'),
 
                 templates=[os.path.join(root, 'templates')])
 

	
 
    # Initialize config with the basic options
 
    config.init_app(global_conf, app_conf, package='pylons_app', paths=paths)
 

	
 
    config['routes.map'] = make_map(config)
 
    config['pylons.app_globals'] = app_globals.Globals(config)
 
    config['pylons.h'] = pylons_app.lib.helpers
 
    
 
    # Setup cache object as early as possible
 
    import pylons
 
    pylons.cache._push_object(config['pylons.app_globals'].cache)
 
    
 
    # Create the Mako TemplateLookup, with the default auto-escaping
 
    config['pylons.app_globals'].mako_lookup = TemplateLookup(
 
        directories=paths['templates'],
 
        error_handler=handle_mako_error,
 
        module_directory=os.path.join(app_conf['cache_dir'], 'templates'),
 
        input_encoding='utf-8', default_filters=['escape'],
 
        imports=['from webhelpers.html import escape'])
 

	
 
    #sets the c attribute access when don't existing attribute are accessed
 
    config['pylons.strict_tmpl_context'] = True
 
    
 
    #MULTIPLE DB configs
 
    # Setup the SQLAlchemy database engine
 
    if config['debug']:
 
    if config['debug'] and os.path.split(config['__file__'])[-1] != 'tests.ini':
 
        #use query time debugging.
 
        from pylons_app.lib.timerproxy import TimerProxy
 
        sa_engine_db1 = engine_from_config(config, 'sqlalchemy.db1.',
 
                                                            proxy=TimerProxy())
 
    else:
 
        sa_engine_db1 = engine_from_config(config, 'sqlalchemy.db1.')
 

	
 
    init_model(sa_engine_db1)
 
    #init baseui
 
    config['pylons.app_globals'].baseui = make_ui('db')
 
    
 
    repo2db_mapper(_get_repos_cached_initial(config['pylons.app_globals'], initial))
 
    set_available_permissions(config)
 
    set_base_path(config)
 
    set_hg_app_config(config)
 
    # CONFIGURATION OPTIONS HERE (note: all config options will override
 
    # any Pylons config options)
 
    
 
    return config
pylons_app/config/routing.py
Show inline comments
 
@@ -26,120 +26,119 @@ def make_map(config):
 
    def check_repo(environ, match_dict):
 
        """
 
        check for valid repository for proper 404 handling
 
        @param environ:
 
        @param match_dict:
 
        """
 
        repo_name = match_dict.get('repo_name')
 
        return not cr(repo_name, config['base_path'])
 
 
 
    #REST REPO MAP
 
    with map.submapper(path_prefix='/_admin', controller='admin/repos') as m:
 
        m.connect("repos", "/repos",
 
             action="create", conditions=dict(method=["POST"]))
 
        m.connect("repos", "/repos",
 
             action="index", conditions=dict(method=["GET"]))
 
        m.connect("formatted_repos", "/repos.{format}",
 
             action="index",
 
            conditions=dict(method=["GET"]))
 
        m.connect("new_repo", "/repos/new",
 
             action="new", conditions=dict(method=["GET"]))
 
        m.connect("formatted_new_repo", "/repos/new.{format}",
 
             action="new", conditions=dict(method=["GET"]))
 
        m.connect("/repos/{repo_name:.*}",
 
             action="update", conditions=dict(method=["PUT"],
 
                                              function=check_repo))
 
        m.connect("/repos/{repo_name:.*}",
 
             action="delete", conditions=dict(method=["DELETE"],
 
                                              function=check_repo))
 
        m.connect("edit_repo", "/repos/{repo_name:.*}/edit",
 
             action="edit", conditions=dict(method=["GET"],
 
                                            function=check_repo))
 
        m.connect("formatted_edit_repo", "/repos/{repo_name:.*}.{format}/edit",
 
             action="edit", conditions=dict(method=["GET"],
 
                                            function=check_repo))
 
        m.connect("repo", "/repos/{repo_name:.*}",
 
             action="show", conditions=dict(method=["GET"],
 
                                            function=check_repo))
 
        m.connect("formatted_repo", "/repos/{repo_name:.*}.{format}",
 
             action="show", conditions=dict(method=["GET"],
 
                                            function=check_repo))
 
        #ajax delete repo perm user
 
        m.connect('delete_repo_user', "/repos_delete_user/{repo_name:.*}",
 
             action="delete_perm_user", conditions=dict(method=["DELETE"],
 
                                                        function=check_repo))
 
        
 
    map.resource('user', 'users', controller='admin/users', path_prefix='/_admin')
 
    map.resource('permission', 'permissions', controller='admin/permissions', path_prefix='/_admin')
 
    
 
    #map.resource('setting', 'settings', controller='admin/settings', path_prefix='/_admin', name_prefix='admin_')
 
    #REST SETTINGS MAP
 
    with map.submapper(path_prefix='/_admin', controller='admin/settings') as m:
 
        m.connect("admin_settings", "/settings",
 
             action="create", conditions=dict(method=["POST"]))
 
        m.connect("admin_settings", "/settings",
 
             action="index", conditions=dict(method=["GET"]))
 
        m.connect("admin_formatted_settings", "/settings.{format}",
 
        m.connect("formatted_admin_settings", "/settings.{format}",
 
             action="index", conditions=dict(method=["GET"]))
 
        m.connect("admin_new_setting", "/settings/new",
 
             action="new", conditions=dict(method=["GET"]))
 
        m.connect("admin_formatted_new_setting", "/settings/new.{format}",
 
        m.connect("formatted_admin_new_setting", "/settings/new.{format}",
 
             action="new", conditions=dict(method=["GET"]))
 
        m.connect("/settings/{setting_id}",
 
             action="update", conditions=dict(method=["PUT"]))
 
        m.connect("/settings/{setting_id}",
 
             action="delete", conditions=dict(method=["DELETE"]))
 
        m.connect("admin_edit_setting", "/settings/{setting_id}/edit",
 
             action="edit", conditions=dict(method=["GET"]))
 
        m.connect("admin_formatted_edit_setting", "/settings/{setting_id}.{format}/edit",
 
        m.connect("formatted_admin_edit_setting", "/settings/{setting_id}.{format}/edit",
 
             action="edit", conditions=dict(method=["GET"]))
 
        m.connect("admin_setting", "/settings/{setting_id}",
 
             action="show", conditions=dict(method=["GET"]))
 
        m.connect("admin_formatted_setting", "/settings/{setting_id}.{format}",
 
        m.connect("formatted_admin_setting", "/settings/{setting_id}.{format}",
 
             action="show", conditions=dict(method=["GET"]))
 
        m.connect("admin_settings_my_account", "/my_account",
 
             action="my_account", conditions=dict(method=["GET"]))
 
        m.connect("admin_settings_my_account_update", "/my_account_update",
 
             action="my_account_update", conditions=dict(method=["PUT"]))
 
        m.connect("admin_settings_create_repository", "/create_repository",
 
             action="create_repository", conditions=dict(method=["GET"]))
 
    
 
    #ADMIN
 
    with map.submapper(path_prefix='/_admin', controller='admin/admin') as m:
 
        m.connect('admin_home', '', action='index')#main page
 
        m.connect('admin_add_repo', '/add_repo/{new_repo:[a-z0-9\. _-]*}',
 
                  action='add_repo')
 
    #SEARCH
 
    map.connect('search', '/_admin/search', controller='search')
 
    
 
    #LOGIN/LOGOUT
 
    map.connect('login_home', '/_admin/login', controller='login')
 
    map.connect('logout_home', '/_admin/logout', controller='login', action='logout')
 
    map.connect('register', '/_admin/register', controller='login', action='register')
 
        
 
    #FEEDS
 
    map.connect('rss_feed_home', '/{repo_name:.*}/feed/rss',
 
                controller='feed', action='rss',
 
                conditions=dict(function=check_repo))
 
    map.connect('atom_feed_home', '/{repo_name:.*}/feed/atom',
 
                controller='feed', action='atom',
 
                conditions=dict(function=check_repo))
 
    
 
    
 
    #OTHERS
 
    map.connect('changeset_home', '/{repo_name:.*}/changeset/{revision}',
 
                controller='changeset', revision='tip',
 
                conditions=dict(function=check_repo))
 
    map.connect('summary_home', '/{repo_name:.*}/summary',
 
                controller='summary', conditions=dict(function=check_repo))
 
    map.connect('shortlog_home', '/{repo_name:.*}/shortlog',
 
                controller='shortlog', conditions=dict(function=check_repo))
 
    map.connect('branches_home', '/{repo_name:.*}/branches',
 
                controller='branches', conditions=dict(function=check_repo))
 
    map.connect('tags_home', '/{repo_name:.*}/tags',
 
                controller='tags', conditions=dict(function=check_repo))
 
    map.connect('changelog_home', '/{repo_name:.*}/changelog',
 
                controller='changelog', conditions=dict(function=check_repo))    
 
    map.connect('files_home', '/{repo_name:.*}/files/{revision}/{f_path:.*}',
 
                controller='files', revision='tip', f_path='',
 
                conditions=dict(function=check_repo))
 
    map.connect('files_diff_home', '/{repo_name:.*}/diff/{f_path:.*}',
pylons_app/controllers/login.py
Show inline comments
 
#!/usr/bin/env python
 
# encoding: utf-8
 
# login controller for pylons
 
# Copyright (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
 
# 
 
# This program is free software; you can redistribute it and/or
 
# modify it under the terms of the GNU General Public License
 
# as published by the Free Software Foundation; version 2
 
# of the License or (at your opinion) any later version of the license.
 
# 
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
# 
 
# You should have received a copy of the GNU General Public License
 
# along with this program; if not, write to the Free Software
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 
# MA  02110-1301, USA.
 

	
 
"""
 
Created on April 22, 2010
 
login controller for pylons
 
@author: marcink
 
"""
 
from formencode import htmlfill
 
from pylons import request, response, session, tmpl_context as c, url
 
from pylons.controllers.util import abort, redirect
 
from pylons_app.lib.auth import AuthUser, HasPermissionAnyDecorator
 
from pylons_app.lib.base import BaseController, render
 
from pylons_app.model.forms import LoginForm, RegisterForm
 
from pylons_app.model.user_model import UserModel
 
from sqlalchemy.exc import OperationalError
 
import formencode
 
import datetime
 
import logging
 

	
 
log = logging.getLogger(__name__)
 

	
 
class LoginController(BaseController):
 

	
 
    def __before__(self):
 
        super(LoginController, self).__before__()
 

	
 
    def index(self):
 
        #redirect if already logged in
 
        c.came_from = request.GET.get('came_from',None)
 
        
 
        if c.hg_app_user.is_authenticated:
 
            return redirect(url('hg_home'))
 
        
 
        if request.POST:
 
            #import Login Form validator class
 
            login_form = LoginForm()
 
            try:
 
                c.form_result = login_form.to_python(dict(request.POST))
 
                username = c.form_result['username']
 
                user = UserModel().get_user_by_name(username)
 
                auth_user = AuthUser()
 
                auth_user.username = user.username
 
                auth_user.is_authenticated = True
 
                auth_user.is_admin = user.admin
 
                auth_user.user_id = user.user_id
 
                auth_user.name = user.name
 
                auth_user.lastname = user.lastname
 
                session['hg_app_user'] = auth_user
 
                session.save()
 
                log.info('user %s is now authenticated', username)
 
                
 
                user.update_lastlogin()
 
                                        
 
                if c.came_from:
 
                    return redirect(c.came_from)
 
                else:
 
                    return redirect(url('hg_home'))
 
                               
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    render('/login.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8")
pylons_app/lib/db_manage.py
Show inline comments
 
#!/usr/bin/env python
 
# encoding: utf-8
 
# database managment for hg app
 
# Copyright (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
 
#
 
# This program is free software; you can redistribute it and/or
 
# modify it under the terms of the GNU General Public License
 
# as published by the Free Software Foundation; version 2
 
# of the License or (at your opinion) any later version of the license.
 
# 
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
# 
 
# You should have received a copy of the GNU General Public License
 
# along with this program; if not, write to the Free Software
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 
# MA  02110-1301, USA.
 

	
 
"""
 
Created on April 10, 2010
 
database managment and creation for hg app
 
@author: marcink
 
"""
 

	
 
from os.path import dirname as dn, join as jn
 
import os
 
import sys
 
import uuid
 
ROOT = dn(dn(dn(os.path.realpath(__file__))))
 
sys.path.append(ROOT)
 

	
 
from pylons_app.lib.auth import get_crypt_password
 
from pylons_app.lib.utils import ask_ok
 
from pylons_app.model import init_model
 
from pylons_app.model.db import User, Permission, HgAppUi, HgAppSettings, \
 
    UserToPerm
 
from pylons_app.model import meta
 
from sqlalchemy.engine import create_engine
 
import logging
 

	
 
log = logging.getLogger(__name__)
 

	
 
class DbManage(object):
 
    def __init__(self, log_sql):
 
        self.dbname = 'hg_app.db'
 
    def __init__(self, log_sql, dbname,tests=False):
 
        self.dbname = dbname
 
        self.tests = tests
 
        dburi = 'sqlite:////%s' % jn(ROOT, self.dbname)
 
        engine = create_engine(dburi, echo=log_sql) 
 
        init_model(engine)
 
        self.sa = meta.Session
 
        self.db_exists = False
 
    
 
    def check_for_db(self, override):
 
        log.info('checking for exisiting db')
 
        if os.path.isfile(jn(ROOT, self.dbname)):
 
            self.db_exists = True
 
            log.info('database exisist')
 
            if not override:
 
                raise Exception('database already exists')
 

	
 
    def create_tables(self, override=False):
 
        """
 
        Create a auth database
 
        """
 
        self.check_for_db(override)
 
        if override:
 
            log.info("database exisist and it's going to be destroyed")
 
            if self.tests:
 
                destroy=True
 
            else:
 
            destroy = ask_ok('Are you sure to destroy old database ? [y/n]')
 
            if not destroy:
 
                sys.exit()
 
            if self.db_exists and destroy:
 
                os.remove(jn(ROOT, self.dbname))
 
        checkfirst = not override
 
        meta.Base.metadata.create_all(checkfirst=checkfirst)
 
        log.info('Created tables for %s', self.dbname)
 
    
 
    def admin_prompt(self):
 
        if not self.tests:
 
        import getpass
 
        username = raw_input('Specify admin username:')
 
        password = getpass.getpass('Specify admin password:')
 
        self.create_user(username, password, True)
 
        else:
 
            log.info('creating admin and regular test users')
 
            self.create_user('test_admin', 'test', True)
 
            self.create_user('test_regular', 'test', False)
 
    
 
    def config_prompt(self):
 
        
 
    
 
    def config_prompt(self,test_repo_path=''):
 
        log.info('Setting up repositories config')
 
        
 
        if not self.tests and not test_repo_path:
 
        path = raw_input('Specify valid full path to your repositories'
 
                        ' you can change this later in application settings:')
 
        else:
 
            path = test_repo_path
 
        
 
        if not os.path.isdir(path):
 
            log.error('You entered wrong path')
 
            log.error('You entered wrong path: %s',path)
 
            sys.exit()
 
        
 
        hooks1 = HgAppUi()
 
        hooks1.ui_section = 'hooks'
 
        hooks1.ui_key = 'changegroup.update'
 
        hooks1.ui_value = 'hg update >&2'
 
        
 
        hooks2 = HgAppUi()
 
        hooks2.ui_section = 'hooks'
 
        hooks2.ui_key = 'changegroup.repo_size'
 
        hooks2.ui_value = 'python:pylons_app.lib.hooks.repo_size' 
 
                
 
        web1 = HgAppUi()
 
        web1.ui_section = 'web'
 
        web1.ui_key = 'push_ssl'
 
        web1.ui_value = 'false'
 
                
 
        web2 = HgAppUi()
 
        web2.ui_section = 'web'
 
        web2.ui_key = 'allow_archive'
 
        web2.ui_value = 'gz zip bz2'
 
                
 
        web3 = HgAppUi()
 
        web3.ui_section = 'web'
 
        web3.ui_key = 'allow_push'
 
        web3.ui_value = '*'
 
        
 
        web4 = HgAppUi()
 
        web4.ui_section = 'web'
 
        web4.ui_key = 'baseurl'
 
        web4.ui_value = '/'                        
 
        
 
        paths = HgAppUi()
 
        paths.ui_section = 'paths'
 
        paths.ui_key = '/'
 
        paths.ui_value = os.path.join(path, '*')
 
        
 
        
 
        hgsettings1 = HgAppSettings()
 
        
 
        hgsettings1.app_settings_name = 'realm'
 
        hgsettings1.app_settings_value = 'hg-app authentication'
 
        
 
        hgsettings2 = HgAppSettings()
 
        hgsettings2.app_settings_name = 'title'
 
        hgsettings2.app_settings_value = 'hg-app'      
 
        
 
        try:
 
            self.sa.add(hooks1)
 
            self.sa.add(hooks2)
 
            self.sa.add(web1)
 
            self.sa.add(web2)
 
            self.sa.add(web3)
 
            self.sa.add(web4)
 
            self.sa.add(paths)
 
            self.sa.add(hgsettings1)
 
            self.sa.add(hgsettings2)
 
            self.sa.commit()
 
        except:
 
            self.sa.rollback()
 
            raise        
 
        log.info('created ui config')
 
                    
 
    def create_user(self, username, password, admin=False):
 
        
 
        log.info('creating default user')
 
        #create default user for handling default permissions.
 
        def_user = User()
 
        def_user.username = 'default'
 
        def_user.password = get_crypt_password(str(uuid.uuid1())[:8])
 
        def_user.name = 'default'
 
        def_user.lastname = 'default'
 
        def_user.email = 'default@default.com'
 
        def_user.admin = False
 
        def_user.active = False
 
        
 
        log.info('creating administrator user %s', username)
 
        new_user = User()
 
        new_user.username = username
 
        new_user.password = get_crypt_password(password)
 
        new_user.name = 'Hg'
 
        new_user.lastname = 'Admin'
 
        new_user.email = 'admin@localhost'
 
        new_user.admin = admin
 
        new_user.active = True
 
        
 
        try:
 
            self.sa.add(new_user)
 
            self.sa.commit()
 
        except:
 
            self.sa.rollback()
 
            raise
 

	
 
    def create_default_user(self):
 
        log.info('creating default user')
 
        #create default user for handling default permissions.
 
        def_user = User()
 
        def_user.username = 'default'
 
        def_user.password = get_crypt_password(str(uuid.uuid1())[:8])
 
        def_user.name = 'default'
 
        def_user.lastname = 'default'
 
        def_user.email = 'default@default.com'
 
        def_user.admin = False
 
        def_user.active = False
 
        try:
 
            self.sa.add(def_user)
 
            self.sa.add(new_user)
 
            self.sa.commit()
 
        except:
 
            self.sa.rollback()
 
            raise
 
    
 
    def create_permissions(self):
 
        #module.(access|create|change|delete)_[name]
 
        #module.(read|write|owner)
 
        perms = [('repository.none', 'Repository no access'),
 
                 ('repository.read', 'Repository read access'),
 
                 ('repository.write', 'Repository write access'),
 
                 ('repository.admin', 'Repository admin access'),
 
                 ('hg.admin', 'Hg Administrator'),
 
                 ('hg.create.repository', 'Repository create'),
 
                 ('hg.create.none', 'Repository creation disabled'),
 
                 ('hg.register.none', 'Register disabled'),
 
                 ('hg.register.manual_activate', 'Register new user with hg-app without manual activation'),
 
                 ('hg.register.auto_activate', 'Register new user with hg-app without auto activation'),
 
                ]
 
        
 
        for p in perms:
 
            new_perm = Permission()
 
            new_perm.permission_name = p[0]
 
            new_perm.permission_longname = p[1]
 
            try:
 
                self.sa.add(new_perm)
 
                self.sa.commit()
 
            except:
 
                self.sa.rollback()
 
                raise
 

	
 
    def populate_default_permissions(self):
 
        log.info('creating default user permissions')
 
        
 
        default_user = self.sa.query(User)\
 
        .filter(User.username == 'default').scalar()
 
        
 
        reg_perm = UserToPerm()
 
        reg_perm.user = default_user
 
        reg_perm.permission = self.sa.query(Permission)\
 
        .filter(Permission.permission_name == 'hg.register.manual_activate')\
 
        .scalar() 
 
        
 
        create_repo_perm = UserToPerm()
 
        create_repo_perm.user = default_user
 
        create_repo_perm.permission = self.sa.query(Permission)\
 
        .filter(Permission.permission_name == 'hg.create.repository')\
 
        .scalar() 
pylons_app/model/db.py
Show inline comments
 
from pylons_app.model.meta import Base
 
from sqlalchemy import *
 
from sqlalchemy.orm import relation, backref
 
from sqlalchemy.orm.session import Session
 
from vcs.utils.lazy import LazyProperty
 
import logging
 

	
 
log = logging.getLogger(__name__)
 

	
 
class HgAppSettings(Base):
 
    __tablename__ = 'hg_app_settings'
 
    __table_args__ = (UniqueConstraint('app_settings_name'), {'useexisting':True})
 
    app_settings_id = Column("app_settings_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
 
    app_settings_name = Column("app_settings_name", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    app_settings_value = Column("app_settings_value", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 

	
 
class HgAppUi(Base):
 
    __tablename__ = 'hg_app_ui'
 
    __table_args__ = {'useexisting':True}
 
    ui_id = Column("ui_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
 
    ui_section = Column("ui_section", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    ui_key = Column("ui_key", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    ui_value = Column("ui_value", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    ui_active = Column("ui_active", BOOLEAN(), nullable=True, unique=None, default=True)
 
    
 
    
 
class User(Base): 
 
    __tablename__ = 'users'
 
    __table_args__ = {'useexisting':True}
 
    __table_args__ = (UniqueConstraint('username'), {'useexisting':True})
 
    user_id = Column("user_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
 
    username = Column("username", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    password = Column("password", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    active = Column("active", BOOLEAN(), nullable=True, unique=None, default=None)
 
    admin = Column("admin", BOOLEAN(), nullable=True, unique=None, default=False)
 
    name = Column("name", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    lastname = Column("lastname", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    email = Column("email", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    last_login = Column("last_login", DATETIME(timezone=False), nullable=True, unique=None, default=None)
 
    
 
    user_log = relation('UserLog')
 
    user_perms = relation('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id")
 
    
 
    @LazyProperty
 
    def full_contact(self):
 
        return '%s %s <%s>' % (self.name, self.lastname, self.email)
 
        
 
    def __repr__(self):
 
        return "<User('id:%s:%s')>" % (self.user_id, self.username)
 
    
 
    def update_lastlogin(self):
 
        """Update user lastlogin"""
 
        import datetime
 
        
 
        try:
 
            session = Session.object_session(self)
 
            self.last_login = datetime.datetime.now()
 
            session.add(self)
 
            session.commit()
 
            log.debug('updated user %s lastlogin',self)
 
        except Exception:
 
            session.rollback()        
 
    
 
      
 
class UserLog(Base): 
 
    __tablename__ = 'user_logs'
 
    __table_args__ = {'useexisting':True}
 
    user_log_id = Column("user_log_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
 
    user_id = Column("user_id", INTEGER(), ForeignKey(u'users.user_id'), nullable=False, unique=None, default=None)
 
    user_ip = Column("user_ip", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None) 
 
    repository = Column("repository", TEXT(length=None, convert_unicode=False, assert_unicode=None), ForeignKey(u'repositories.repo_name'), nullable=False, unique=None, default=None)
 
    action = Column("action", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
 
    action_date = Column("action_date", DATETIME(timezone=False), nullable=True, unique=None, default=None)
 
    
 
    user = relation('User')
 
    
 
class Repository(Base):
 
    __tablename__ = 'repositories'
pylons_app/tests/__init__.py
Show inline comments
 
"""Pylons application test package
 

	
 
This package assumes the Pylons environment is already loaded, such as
 
when this script is imported from the `nosetests --with-pylons=test.ini`
 
command.
 

	
 
This module initializes the application via ``websetup`` (`paster
 
setup-app`) and provides the base testing objects.
 
"""
 
from unittest import TestCase
 

	
 
from paste.deploy import loadapp
 
from paste.script.appinstall import SetupCommand
 
from pylons import config, url
 
from routes.util import URLGenerator
 
from webtest import TestApp
 
import os
 

	
 
import pylons.test
 

	
 
__all__ = ['environ', 'url', 'TestController']
 

	
 
# Invoke websetup with the current config file
 
SetupCommand('setup-app').run([config['__file__']])
 
SetupCommand('setup-app').run([pylons.test.pylonsapp.config['__file__']])
 

	
 
environ = {}
 

	
 
class TestController(TestCase):
 

	
 
    def __init__(self, *args, **kwargs):
 
        if pylons.test.pylonsapp:
 
            wsgiapp = pylons.test.pylonsapp
 
        else:
 
            wsgiapp = loadapp('config:%s' % config['__file__'])
 
        config = wsgiapp.config
 
        self.app = TestApp(wsgiapp)
 
        url._push_object(URLGenerator(config['routes.map'], environ))
 
        TestCase.__init__(self, *args, **kwargs)
 

	
pylons_app/tests/functional/test_admin.py
Show inline comments
 
from pylons_app.tests import *
 

	
 
class TestAdminController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='admin', action='index'))
 
        response = self.app.get(url(controller='admin/admin', action='index'))
 
        # Test response...
pylons_app/tests/functional/test_admin_settings.py
Show inline comments
 
from pylons_app.tests import *
 

	
 
class TestSettingsController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url('admin_settings'))
 
        # Test response...
 

	
 
    def test_index_as_xml(self):
 
        response = self.app.get(url('formatted_admin_settings', format='xml'))
 

	
 
    def test_create(self):
 
        response = self.app.post(url('admin_settings'))
 

	
 
    def test_new(self):
 
        response = self.app.get(url('admin_new_setting'))
 

	
 
    def test_new_as_xml(self):
 
        response = self.app.get(url('formatted_admin_new_setting', format='xml'))
 

	
 
    def test_update(self):
 
        response = self.app.put(url('admin_setting', id=1))
 
        response = self.app.put(url('admin_setting', setting_id=1))
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('admin_setting', id=1), params=dict(_method='put'))
 
        response = self.app.post(url('admin_setting', setting_id=1), params=dict(_method='put'))
 

	
 
    def test_delete(self):
 
        response = self.app.delete(url('admin_setting', id=1))
 
        response = self.app.delete(url('admin_setting', setting_id=1))
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('admin_setting', id=1), params=dict(_method='delete'))
 
        response = self.app.post(url('admin_setting', setting_id=1), params=dict(_method='delete'))
 

	
 
    def test_show(self):
 
        response = self.app.get(url('admin_setting', id=1))
 
        response = self.app.get(url('admin_setting', setting_id=1))
 

	
 
    def test_show_as_xml(self):
 
        response = self.app.get(url('formatted_admin_setting', id=1, format='xml'))
 
        response = self.app.get(url('formatted_admin_setting', setting_id=1, format='xml'))
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('admin_edit_setting', id=1))
 
        response = self.app.get(url('admin_edit_setting', setting_id=1))
 

	
 
    def test_edit_as_xml(self):
 
        response = self.app.get(url('formatted_admin_edit_setting', id=1, format='xml'))
 
        response = self.app.get(url('formatted_admin_edit_setting', setting_id=1, format='xml'))
pylons_app/tests/functional/test_admin_settings_hg.py
Show inline comments
 
deleted file
pylons_app/tests/functional/test_branches.py
Show inline comments
 
from pylons_app.tests import *
 

	
 
class TestBranchesController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='branches', action='index'))
 
        response = self.app.get(url(controller='branches', action='index',repo_name='vcs_test'))
 
        # Test response...
pylons_app/tests/functional/test_changelog.py
Show inline comments
 
from pylons_app.tests import *
 

	
 
class TestChangelogController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='changelog', action='index'))
 
        response = self.app.get(url(controller='changelog', action='index',repo_name='vcs_test'))
 
        # Test response...
pylons_app/tests/functional/test_changeset.py
Show inline comments
 
from pylons_app.tests import *
 

	
 
class TestChangesetController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='changeset', action='index'))
 
        response = self.app.get(url(controller='changeset', action='index',
 
                                    repo_name='vcs_test',revision='tip'))
 
        # Test response...
pylons_app/tests/functional/test_feed.py
Show inline comments
 
from pylons_app.tests import *
 

	
 
class TestFeedController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='feed', action='index'))
 
    def test_rss(self):
 
        response = self.app.get(url(controller='feed', action='rss',
 
                                    repo_name='vcs_test'))
 
        # Test response...
 

	
 
    def test_atom(self):
 
        response = self.app.get(url(controller='feed', action='atom',
 
                                    repo_name='vcs_test'))
 
        # Test response...
 
\ No newline at end of file
pylons_app/tests/functional/test_files.py
Show inline comments
 
from pylons_app.tests import *
 

	
 
class TestFilesController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='files', action='index'))
 
        response = self.app.get(url(controller='files', action='index',
 
                                    repo_name='vcs_test',
 
                                    revision='tip',
 
                                    f_path='/'))
 
        # Test response...
pylons_app/tests/functional/test_repos.py
Show inline comments
 
from pylons_app.tests import *
 

	
 
class TestReposController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url('repos'))
 
        # Test response...
 

	
 
    def test_index_as_xml(self):
 
        response = self.app.get(url('formatted_repos', format='xml'))
 

	
 
    def test_create(self):
 
        response = self.app.post(url('repos'))
 

	
 
    def test_new(self):
 
        response = self.app.get(url('new_repo'))
 

	
 
    def test_new_as_xml(self):
 
        response = self.app.get(url('formatted_new_repo', format='xml'))
 

	
 
    def test_update(self):
 
        response = self.app.put(url('repo', id=1))
 
        response = self.app.put(url('repo', repo_name='vcs_test'))
 

	
 
    def test_update_browser_fakeout(self):
 
        response = self.app.post(url('repo', id=1), params=dict(_method='put'))
 
        response = self.app.post(url('repo', repo_name='vcs_test'), params=dict(_method='put'))
 

	
 
    def test_delete(self):
 
        response = self.app.delete(url('repo', id=1))
 
        response = self.app.delete(url('repo', repo_name='vcs_test'))
 

	
 
    def test_delete_browser_fakeout(self):
 
        response = self.app.post(url('repo', id=1), params=dict(_method='delete'))
 
        response = self.app.post(url('repo', repo_name='vcs_test'), params=dict(_method='delete'))
 

	
 
    def test_show(self):
 
        response = self.app.get(url('repo', id=1))
 
        response = self.app.get(url('repo', repo_name='vcs_test'))
 

	
 
    def test_show_as_xml(self):
 
        response = self.app.get(url('formatted_repo', id=1, format='xml'))
 
        response = self.app.get(url('formatted_repo', repo_name='vcs_test', format='xml'))
 

	
 
    def test_edit(self):
 
        response = self.app.get(url('edit_repo', id=1))
 
        response = self.app.get(url('edit_repo', repo_name='vcs_test'))
 

	
 
    def test_edit_as_xml(self):
 
        response = self.app.get(url('formatted_edit_repo', id=1, format='xml'))
 
        response = self.app.get(url('formatted_edit_repo', repo_name='vcs_test', format='xml'))
pylons_app/tests/functional/test_settings.py
Show inline comments
 
from pylons_app.tests import *
 

	
 
class TestSettingsController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='settings', action='index'))
 
        response = self.app.get(url(controller='settings', action='index',
 
                                    repo_name='vcs_test'))
 
        # Test response...
pylons_app/tests/functional/test_shortlog.py
Show inline comments
 
from pylons_app.tests import *
 

	
 
class TestShortlogController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='shortlog', action='index'))
 
        response = self.app.get(url(controller='shortlog', action='index',repo_name='vcs_test'))
 
        # Test response...
pylons_app/tests/functional/test_summary.py
Show inline comments
 
from pylons_app.tests import *
 

	
 
class TestSummaryController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='summary', action='index'))
 
        response = self.app.get(url(controller='summary', action='index',repo_name='vcs_test'))
 
        # Test response...
pylons_app/tests/functional/test_tags.py
Show inline comments
 
from pylons_app.tests import *
 

	
 
class TestTagsController(TestController):
 

	
 
    def test_index(self):
 
        response = self.app.get(url(controller='tags', action='index'))
 
        response = self.app.get(url(controller='tags', action='index',repo_name='vcs_test'))
 
        # Test response...
pylons_app/websetup.py
Show inline comments
 
"""Setup the pylons_app application"""
 

	
 
from os.path import dirname as dn, join as jn
 
from pylons_app.config.environment import load_environment
 
from pylons_app.lib.db_manage import DbManage
 
import datetime
 
from time import mktime
 
import logging
 
import os
 
import sys
 

	
 
import shutil
 
log = logging.getLogger(__name__)
 

	
 
ROOT = dn(dn(os.path.realpath(__file__)))
 
sys.path.append(ROOT)
 

	
 
def setup_app(command, conf, vars):
 
    """Place any commands to setup pylons_app here"""
 
    dbmanage = DbManage(log_sql=True)
 
    log_sql = True
 
    tests = False
 
    
 
    dbname = os.path.split(conf['sqlalchemy.db1.url'])[-1]
 
    filename = os.path.split(conf.filename)[-1]
 
    
 
    if filename == 'tests.ini':
 
        uniq_suffix = str(int(mktime(datetime.datetime.now().timetuple())))
 
        REPO_TEST_PATH = '/tmp/hg_app_test_%s' % uniq_suffix
 
        
 
        if not os.path.isdir(REPO_TEST_PATH):
 
            os.mkdir(REPO_TEST_PATH)
 
            from_ = '/home/marcink/workspace-python/vcs'
 
            shutil.copytree(from_, os.path.join(REPO_TEST_PATH,'vcs_test'))
 
            
 
        tests = True    
 
    
 
    dbmanage = DbManage(log_sql, dbname, tests)
 
    dbmanage.create_tables(override=True)
 
    dbmanage.config_prompt()
 
    dbmanage.config_prompt(REPO_TEST_PATH)
 
    dbmanage.create_default_user()
 
    dbmanage.admin_prompt()
 
    dbmanage.create_permissions()
 
    dbmanage.populate_default_permissions()
 
    load_environment(conf.global_conf, conf.local_conf, initial=True)
 

	
setup.cfg
Show inline comments
 
[egg_info]
 
tag_build = dev
 
tag_svn_revision = true
 

	
 
[easy_install]
 
find_links = http://www.pylonshq.com/download/
 

	
 
[nosetests]
 
with-pylons = development.ini
 
verbose=True
 
verbosity=2
 
with-pylons=tests.ini
 
detailed-errors=1
 

	
 
# Babel configuration
 
[compile_catalog]
 
domain = pylons_app
 
directory = pylons_app/i18n
 
statistics = true
 

	
 
[extract_messages]
 
add_comments = TRANSLATORS:
 
output_file = pylons_app/i18n/pylons_app.pot
 
width = 80
 

	
 
[init_catalog]
 
domain = pylons_app
 
input_file = pylons_app/i18n/pylons_app.pot
 
output_dir = pylons_app/i18n
 

	
 
[update_catalog]
 
domain = pylons_app
 
input_file = pylons_app/i18n/pylons_app.pot
 
output_dir = pylons_app/i18n
 
previous = true
tests.ini
Show inline comments
 
new file 100644
 
################################################################################
 
################################################################################
 
# pylons_app - Pylons environment configuration                                #
 
#                                                                              # 
 
# The %(here)s variable will be replaced with the parent directory of this file#
 
################################################################################
 

	
 
[DEFAULT]
 
debug = true
 
############################################
 
## Uncomment and replace with the address ##
 
## which should receive any error reports ##
 
############################################
 
#email_to = admin@localhost
 
#smtp_server = mail.server.com
 
#error_email_from = paste_error@localhost
 
#smtp_username = 
 
#smtp_password = 
 
#error_message = 'mercurial crash !'
 

	
 
[server:main]
 
##nr of threads to spawn
 
threadpool_workers = 5
 

	
 
##max request before
 
threadpool_max_requests = 2
 

	
 
##option to use threads of process
 
use_threadpool = true
 

	
 
use = egg:Paste#http
 
host = 127.0.0.1
 
port = 5000
 

	
 
[app:main]
 
use = egg:pylons_app
 
full_stack = true
 
static_files = true
 
lang=en
 
cache_dir = %(here)s/data
 

	
 
####################################
 
###         BEAKER CACHE        ####
 
####################################
 
beaker.cache.data_dir=/%(here)s/data/cache/data
 
beaker.cache.lock_dir=/%(here)s/data/cache/lock
 
beaker.cache.regions=super_short_term,short_term,long_term
 
beaker.cache.long_term.type=memory
 
beaker.cache.long_term.expire=36000
 
beaker.cache.short_term.type=memory
 
beaker.cache.short_term.expire=60
 
beaker.cache.super_short_term.type=memory
 
beaker.cache.super_short_term.expire=10
 

	
 
####################################
 
###       BEAKER SESSION        ####
 
####################################
 
## Type of storage used for the session, current types are 
 
## “dbm”, “file”, “memcached”, “database”, and “memory”. 
 
## The storage uses the Container API 
 
##that is also used by the cache system.
 
beaker.session.type = file
 

	
 
beaker.session.key = hg-app
 
beaker.session.secret = g654dcno0-9873jhgfreyu
 
beaker.session.timeout = 36000
 

	
 
##auto save the session to not to use .save()
 
beaker.session.auto = False
 

	
 
##true exire at browser close
 
#beaker.session.cookie_expires = 3600
 

	
 
    
 
################################################################################
 
## WARNING: *THE LINE BELOW MUST BE UNCOMMENTED ON A PRODUCTION ENVIRONMENT*  ##
 
## Debug mode will enable the interactive debugging tool, allowing ANYONE to  ##
 
## execute malicious code after an exception is raised.                       ##
 
################################################################################
 
#set debug = false
 

	
 
##################################
 
###       LOGVIEW CONFIG       ###
 
##################################
 
logview.sqlalchemy = #faa
 
logview.pylons.templating = #bfb
 
logview.pylons.util = #eee
 

	
 
#########################################################
 
### DB CONFIGS - EACH DB WILL HAVE IT'S OWN CONFIG    ###
 
#########################################################
 
sqlalchemy.db1.url = sqlite:///%(here)s/test.db
 
#sqlalchemy.db1.echo = False
 
#sqlalchemy.db1.pool_recycle = 3600
 
sqlalchemy.convert_unicode = true
 

	
 
################################
 
### LOGGING CONFIGURATION   ####
 
################################
 
[loggers]
 
keys = root, routes, pylons_app, sqlalchemy
 

	
 
[handlers]
 
keys = console
 

	
 
[formatters]
 
keys = generic,color_formatter
 

	
 
#############
 
## LOGGERS ##
 
#############
 
[logger_root]
 
level = ERROR
 
handlers = console
 

	
 
[logger_routes]
 
level = ERROR
 
handlers = console
 
qualname = routes.middleware
 
# "level = DEBUG" logs the route matched and routing variables.
 

	
 
[logger_pylons_app]
 
level = ERROR
 
handlers = console
 
qualname = pylons_app
 
propagate = 0
 

	
 
[logger_sqlalchemy]
 
level = ERROR
 
handlers = console
 
qualname = sqlalchemy.engine
 
propagate = 0
 

	
 
##############
 
## HANDLERS ##
 
##############
 

	
 
[handler_console]
 
class = StreamHandler
 
args = (sys.stderr,)
 
level = NOTSET
 
formatter = color_formatter
 

	
 
################
 
## FORMATTERS ##
 
################
 

	
 
[formatter_generic]
 
format = %(asctime)s.%(msecs)03d %(levelname)-5.5s [%(name)s] %(message)s
 
datefmt = %Y-%m-%d %H:%M:%S
 

	
 
[formatter_color_formatter]
 
class=pylons_app.lib.colored_formatter.ColorFormatter
 
format= %(asctime)s.%(msecs)03d %(levelname)-5.5s [%(name)s] %(message)s
 
datefmt = %Y-%m-%d %H:%M:%S
 
\ No newline at end of file
0 comments (0 inline, 0 general)