Changeset - 74b9bed279ae
[Not reviewed]
celery
0 2 0
Marcin Kuzminski - 15 years ago 2010-09-20 22:55:55
marcin@python-works.com
fixed validation of user email in user creation, and editing on admin panel
2 files changed with 5 insertions and 3 deletions:
0 comments (0 inline, 0 general)
pylons_app/controllers/admin/users.py
Show inline comments
 
@@ -5,160 +5,162 @@
 
# 
 
# This program is free software; you can redistribute it and/or
 
# modify it under the terms of the GNU General Public License
 
# as published by the Free Software Foundation; version 2
 
# of the License or (at your opinion) any later version of the license.
 
# 
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
# 
 
# You should have received a copy of the GNU General Public License
 
# along with this program; if not, write to the Free Software
 
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 
# MA  02110-1301, USA.
 
"""
 
Created on April 4, 2010
 
users controller for pylons
 
@author: marcink
 
"""
 

	
 
from formencode import htmlfill
 
from pylons import request, session, tmpl_context as c, url
 
from pylons.controllers.util import abort, redirect
 
from pylons.i18n.translation import _
 
from pylons_app.lib import helpers as h
 
from pylons_app.lib.auth import LoginRequired, HasPermissionAllDecorator
 
from pylons_app.lib.base import BaseController, render
 
from pylons_app.model.db import User, UserLog
 
from pylons_app.model.forms import UserForm
 
from pylons_app.model.user_model import UserModel, DefaultUserException
 
import formencode
 
import logging
 
import traceback
 

	
 
log = logging.getLogger(__name__)
 

	
 
class UsersController(BaseController):
 
    """REST Controller styled on the Atom Publishing Protocol"""
 
    # To properly map this controller, ensure your config/routing.py
 
    # file has a resource setup:
 
    #     map.resource('user', 'users')
 
    
 
    @LoginRequired()
 
    @HasPermissionAllDecorator('hg.admin')
 
    def __before__(self):
 
        c.admin_user = session.get('admin_user')
 
        c.admin_username = session.get('admin_username')
 
        super(UsersController, self).__before__()
 
    
 

	
 
    def index(self, format='html'):
 
        """GET /users: All items in the collection"""
 
        # url('users')
 
        
 
        c.users_list = self.sa.query(User).all()     
 
        return render('admin/users/users.html')
 
    
 
    def create(self):
 
        """POST /users: Create a new item"""
 
        # url('users')
 
        
 
        user_model = UserModel()
 
        login_form = UserForm()()
 
        try:
 
            form_result = login_form.to_python(dict(request.POST))
 
            user_model.create(form_result)
 
            h.flash(_('created user %s') % form_result['username'],
 
                    category='success')
 
        except formencode.Invalid as errors:
 
            return htmlfill.render(
 
                render('admin/users/user_add.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8") 
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('error occured during creation of user %s') \
 
                    % request.POST.get('username'), category='error')            
 
        return redirect(url('users'))
 
    
 
    def new(self, format='html'):
 
        """GET /users/new: Form to create a new item"""
 
        # url('new_user')
 
        return render('admin/users/user_add.html')
 

	
 
    def update(self, id):
 
        """PUT /users/id: Update an existing item"""
 
        # Forms posted to this method should contain a hidden field:
 
        #    <input type="hidden" name="_method" value="PUT" />
 
        # Or using helpers:
 
        #    h.form(url('user', id=ID),
 
        #           method='put')
 
        # url('user', id=ID)
 
        user_model = UserModel()
 
        _form = UserForm(edit=True, old_data={'user_id':id})()
 
        c.user = user_model.get_user(id)
 
        
 
        _form = UserForm(edit=True, old_data={'user_id':id,
 
                                              'email':c.user.email})()
 
        form_result = {}
 
        try:
 
            form_result = _form.to_python(dict(request.POST))
 
            user_model.update(id, form_result)
 
            h.flash(_('User updated succesfully'), category='success')
 
                           
 
        except formencode.Invalid as errors:
 
            c.user = user_model.get_user(id)
 
            return htmlfill.render(
 
                render('admin/users/user_edit.html'),
 
                defaults=errors.value,
 
                errors=errors.error_dict or {},
 
                prefix_error=False,
 
                encoding="UTF-8") 
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('error occured during update of user %s') \
 
                    % form_result.get('username'), category='error')
 
            
 
        return redirect(url('users'))
 
    
 
    def delete(self, id):
 
        """DELETE /users/id: Delete an existing item"""
 
        # Forms posted to this method should contain a hidden field:
 
        #    <input type="hidden" name="_method" value="DELETE" />
 
        # Or using helpers:
 
        #    h.form(url('user', id=ID),
 
        #           method='delete')
 
        # url('user', id=ID)
 
        user_model = UserModel()
 
        try:
 
            user_model.delete(id)
 
            h.flash(_('sucessfully deleted user'), category='success')
 
        except DefaultUserException as e:
 
            h.flash(str(e), category='warning')
 
        except Exception:
 
            h.flash(_('An error occured during deletion of user'),
 
                    category='error')            
 
        return redirect(url('users'))
 
        
 
    def show(self, id, format='html'):
 
        """GET /users/id: Show a specific item"""
 
        # url('user', id=ID)
 
    
 
    
 
    def edit(self, id, format='html'):
 
        """GET /users/id/edit: Form to edit an existing item"""
 
        # url('edit_user', id=ID)
 
        c.user = self.sa.query(User).get(id)
 
        if not c.user:
 
            return redirect(url('users'))
 
        if c.user.username == 'default':
 
            h.flash(_("You can't edit this user since it's" 
 
              " crucial for entire application"), category='warning')
 
            return redirect(url('users'))
 
        
 
        defaults = c.user.__dict__
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False
 
        )    
pylons_app/model/forms.py
Show inline comments
 
@@ -119,193 +119,193 @@ class ValidAuth(formencode.validators.Fa
 
                   
 
class ValidRepoUser(formencode.validators.FancyValidator):
 
            
 
    def to_python(self, value, state):
 
        try:
 
            self.user_db = meta.Session.query(User)\
 
                .filter(User.active == True)\
 
                .filter(User.username == value).one()
 
        except Exception:
 
            raise formencode.Invalid(_('This username is not valid'),
 
                                     value, state)
 
        finally:
 
            meta.Session.remove()
 
                        
 
        return self.user_db.user_id
 

	
 
def ValidRepoName(edit, old_data):    
 
    class _ValidRepoName(formencode.validators.FancyValidator):
 
            
 
        def to_python(self, value, state):
 
            slug = h.repo_name_slug(value)
 
            if slug in ['_admin']:
 
                raise formencode.Invalid(_('This repository name is disallowed'),
 
                                         value, state)
 
            if old_data.get('repo_name') != value or not edit:    
 
                sa = meta.Session
 
                if sa.query(Repository).filter(Repository.repo_name == slug).scalar():
 
                    raise formencode.Invalid(_('This repository already exists') ,
 
                                             value, state)
 
                meta.Session.remove()
 
            return slug 
 
        
 
        
 
    return _ValidRepoName
 

	
 
class ValidPerms(formencode.validators.FancyValidator):
 
    messages = {'perm_new_user_name':_('This username is not valid')}
 
    
 
    def to_python(self, value, state):
 
        perms_update = []
 
        perms_new = []
 
        #build a list of permission to update and new permission to create
 
        for k, v in value.items():
 
            if k.startswith('perm_'):
 
                if  k.startswith('perm_new_user'):
 
                    new_perm = value.get('perm_new_user', False)
 
                    new_user = value.get('perm_new_user_name', False)
 
                    if new_user and new_perm:
 
                        if (new_user, new_perm) not in perms_new:
 
                            perms_new.append((new_user, new_perm))
 
                else:
 
                    usr = k[5:]                    
 
                    if usr == 'default':
 
                        if value['private']:
 
                            #set none for default when updating to private repo
 
                            v = 'repository.none'
 
                    perms_update.append((usr, v))
 
        value['perms_updates'] = perms_update
 
        value['perms_new'] = perms_new
 
        sa = meta.Session
 
        for k, v in perms_new:
 
            try:
 
                self.user_db = sa.query(User)\
 
                    .filter(User.active == True)\
 
                    .filter(User.username == k).one()
 
            except Exception:
 
                msg = self.message('perm_new_user_name',
 
                                     state=State_obj)
 
                raise formencode.Invalid(msg, value, state, error_dict={'perm_new_user_name':msg})            
 
        return value
 
    
 
class ValidSettings(formencode.validators.FancyValidator):
 
    
 
    def to_python(self, value, state):
 
        #settings  form can't edit user
 
        if value.has_key('user'):
 
            del['value']['user']
 
        
 
        return value
 
    
 
class ValidPath(formencode.validators.FancyValidator):
 
    def to_python(self, value, state):
 
        isdir = os.path.isdir(value.replace('*', ''))
 
        if (value.endswith('/*') or value.endswith('/**')) and isdir:
 
            return value
 
        elif not isdir:
 
            msg = _('This is not a valid path') 
 
        else:
 
            msg = _('You need to specify * or ** at the end of path (ie. /tmp/*)')
 
        
 
        raise formencode.Invalid(msg, value, state,
 
                                     error_dict={'paths_root_path':msg})            
 

	
 
def UniqSystemEmail(old_data):
 
    class _UniqSystemEmail(formencode.validators.FancyValidator):
 
        def to_python(self, value, state):
 
            if old_data['email'] != value:
 
            if old_data.get('email') != value:
 
                sa = meta.Session
 
                try:
 
                    user = sa.query(User).filter(User.email == value).scalar()
 
                    if user:
 
                        raise formencode.Invalid(_("That e-mail address is already taken") ,
 
                                                 value, state)
 
                finally:
 
                    meta.Session.remove()
 
                
 
            return value
 
        
 
    return _UniqSystemEmail
 
    
 
class ValidSystemEmail(formencode.validators.FancyValidator):
 
    def to_python(self, value, state):
 
        sa = meta.Session
 
        try:
 
            user = sa.query(User).filter(User.email == value).scalar()
 
            if  user is None:
 
                raise formencode.Invalid(_("That e-mail address doesn't exist.") ,
 
                                         value, state)
 
        finally:
 
            meta.Session.remove()
 
            
 
        return value     
 

	
 
#===============================================================================
 
# FORMS        
 
#===============================================================================
 
class LoginForm(formencode.Schema):
 
    allow_extra_fields = True
 
    filter_extra_fields = True
 
    username = UnicodeString(
 
                             strip=True,
 
                             min=3,
 
                             not_empty=True,
 
                             messages={
 
                                       'empty':_('Please enter a login'),
 
                                       'tooShort':_('Enter a value %(min)i characters long or more')}
 
                            )
 

	
 
    password = UnicodeString(
 
                            strip=True,
 
                            min=3,
 
                            not_empty=True,
 
                            messages={
 
                                      'empty':_('Please enter a password'),
 
                                      'tooShort':_('Enter a value %(min)i characters long or more')}
 
                                )
 

	
 

	
 
    #chained validators have access to all data
 
    chained_validators = [ValidAuth]
 
    
 
def UserForm(edit=False, old_data={}):
 
    class _UserForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        username = All(UnicodeString(strip=True, min=3, not_empty=True), ValidUsername(edit, old_data))
 
        if edit:
 
            new_password = All(UnicodeString(strip=True, min=3, not_empty=False), ValidPassword)
 
            admin = StringBoolean(if_missing=False)
 
        else:
 
            password = All(UnicodeString(strip=True, min=8, not_empty=True), ValidPassword)
 
        active = StringBoolean(if_missing=False)
 
        name = UnicodeString(strip=True, min=3, not_empty=True)
 
        lastname = UnicodeString(strip=True, min=3, not_empty=True)
 
        email = All(Email(not_empty=True), UniqSystemEmail(old_data))
 
        
 
    return _UserForm
 

	
 
RegisterForm = UserForm
 

	
 
def PasswordResetForm():
 
    class _PasswordResetForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = True
 
        email = All(ValidSystemEmail(), Email(not_empty=True))             
 
    return _PasswordResetForm
 

	
 
def RepoForm(edit=False, old_data={}):
 
    class _RepoForm(formencode.Schema):
 
        allow_extra_fields = True
 
        filter_extra_fields = False
 
        repo_name = All(UnicodeString(strip=True, min=1, not_empty=True), ValidRepoName(edit, old_data))
 
        description = UnicodeString(strip=True, min=3, not_empty=True)
 
        private = StringBoolean(if_missing=False)
 
        
 
        if edit:
 
            user = All(Int(not_empty=True), ValidRepoUser)
 
        
 
        chained_validators = [ValidPerms]
 
    return _RepoForm
 

	
 
def RepoSettingsForm(edit=False, old_data={}):
 
    class _RepoForm(formencode.Schema):
0 comments (0 inline, 0 general)