Changeset - 8b35ec087464
[Not reviewed]
default
0 4 0
Thomas De Schampheleire - 10 years ago 2015-06-03 21:25:38
thomas.de.schampheleire@gmail.com
admin: users: factorize check for default user

Note that one specific unittest has been commented because it relies on
pytest features (monkeypatch). When pytest is the default test runner, the
test should be uncommented.
4 files changed with 109 insertions and 38 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/users.py
Show inline comments
 
@@ -25,24 +25,25 @@ Original author and date, and relevant c
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 
import formencode
 

	
 
from formencode import htmlfill
 
from pylons import request, tmpl_context as c, url, config
 
from pylons.controllers.util import redirect
 
from pylons.i18n.translation import _
 
from sqlalchemy.sql.expression import func
 
from webob.exc import HTTPNotFound
 

	
 
import kallithea
 
from kallithea.lib.exceptions import DefaultUserException, \
 
    UserOwnsReposException, UserCreationError
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import LoginRequired, HasPermissionAllDecorator, \
 
    AuthUser
 
import kallithea.lib.auth_modules.auth_internal
 
from kallithea.lib import auth_modules
 
from kallithea.lib.base import BaseController, render
 
from kallithea.model.api_key import ApiKeyModel
 

	
 
@@ -224,135 +225,120 @@ class UsersController(BaseController):
 
            h.flash(e, category='warning')
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during deletion of user'),
 
                    category='error')
 
        return redirect(url('users'))
 

	
 
    def show(self, id, format='html'):
 
        """GET /users/id: Show a specific item"""
 
        # url('user', id=ID)
 
        User.get_or_404(-1)
 

	
 
    def _get_user_or_raise_if_default(self, id):
 
        try:
 
            return User.get_or_404(id, allow_default=False)
 
        except DefaultUserException:
 
            h.flash(_("The default user cannot be edited"), category='warning')
 
            raise HTTPNotFound
 

	
 
    def edit(self, id, format='html'):
 
        """GET /users/id/edit: Form to edit an existing item"""
 
        # url('edit_user', id=ID)
 
        c.user = User.get_or_404(id)
 
        if c.user.username == User.DEFAULT_USER:
 
            h.flash(_("You can't edit this user"), category='warning')
 
            return redirect(url('users'))
 

	
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'profile'
 
        c.extern_type = c.user.extern_type
 
        c.extern_name = c.user.extern_name
 
        c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
 

	
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def edit_advanced(self, id):
 
        c.user = User.get_or_404(id)
 
        if c.user.username == User.DEFAULT_USER:
 
            h.flash(_("You can't edit this user"), category='warning')
 
            return redirect(url('users'))
 

	
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'advanced'
 
        c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
 

	
 
        umodel = UserModel()
 
        defaults = c.user.get_dict()
 
        defaults.update({
 
            'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
 
            'create_user_group_perm': umodel.has_perm(c.user,
 
                                                      'hg.usergroup.create.true'),
 
            'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
 
        })
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def edit_api_keys(self, id):
 
        c.user = User.get_or_404(id)
 
        if c.user.username == User.DEFAULT_USER:
 
            h.flash(_("You can't edit this user"), category='warning')
 
            return redirect(url('users'))
 

	
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'api_keys'
 
        show_expired = True
 
        c.lifetime_values = [
 
            (str(-1), _('Forever')),
 
            (str(5), _('5 minutes')),
 
            (str(60), _('1 hour')),
 
            (str(60 * 24), _('1 day')),
 
            (str(60 * 24 * 30), _('1 month')),
 
        ]
 
        c.lifetime_options = [(c.lifetime_values, _("Lifetime"))]
 
        c.user_api_keys = ApiKeyModel().get_api_keys(c.user.user_id,
 
                                                     show_expired=show_expired)
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def add_api_key(self, id):
 
        c.user = User.get_or_404(id)
 
        if c.user.username == User.DEFAULT_USER:
 
            h.flash(_("You can't edit this user"), category='warning')
 
            return redirect(url('users'))
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        lifetime = safe_int(request.POST.get('lifetime'), -1)
 
        description = request.POST.get('description')
 
        ApiKeyModel().create(c.user.user_id, description, lifetime)
 
        Session().commit()
 
        h.flash(_("API key successfully created"), category='success')
 
        return redirect(url('edit_user_api_keys', id=c.user.user_id))
 

	
 
    def delete_api_key(self, id):
 
        c.user = User.get_or_404(id)
 
        if c.user.username == User.DEFAULT_USER:
 
            h.flash(_("You can't edit this user"), category='warning')
 
            return redirect(url('users'))
 
        c.user = self._get_user_or_raise_if_default(id)
 

	
 
        api_key = request.POST.get('del_api_key')
 
        if request.POST.get('del_api_key_builtin'):
 
            user = User.get(c.user.user_id)
 
            if user:
 
                user.api_key = generate_api_key(user.username)
 
                Session().add(user)
 
                Session().commit()
 
                h.flash(_("API key successfully reset"), category='success')
 
        elif api_key:
 
            ApiKeyModel().delete(api_key, c.user.user_id)
 
            Session().commit()
 
            h.flash(_("API key successfully deleted"), category='success')
 

	
 
        return redirect(url('edit_user_api_keys', id=c.user.user_id))
 

	
 
    def update_account(self, id):
 
        pass
 

	
 
    def edit_perms(self, id):
 
        c.user = User.get_or_404(id)
 
        if c.user.username == User.DEFAULT_USER:
 
            h.flash(_("You can't edit this user"), category='warning')
 
            return redirect(url('users'))
 

	
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'perms'
 
        c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr)
 

	
 
        umodel = UserModel()
 
        defaults = c.user.get_dict()
 
        defaults.update({
 
            'create_repo_perm': umodel.has_perm(c.user, 'hg.create.repository'),
 
            'create_user_group_perm': umodel.has_perm(c.user,
 
                                                      'hg.usergroup.create.true'),
 
            'fork_repo_perm': umodel.has_perm(c.user, 'hg.fork.repository'),
 
        })
 
        return htmlfill.render(
 
@@ -393,29 +379,25 @@ class UsersController(BaseController):
 
                user_model.grant_perm(id, 'hg.fork.repository')
 
            else:
 
                user_model.grant_perm(id, 'hg.fork.none')
 
            h.flash(_("Updated permissions"), category='success')
 
            Session().commit()
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            h.flash(_('An error occurred during permissions saving'),
 
                    category='error')
 
        return redirect(url('edit_user_perms', id=id))
 

	
 
    def edit_emails(self, id):
 
        c.user = User.get_or_404(id)
 
        if c.user.username == User.DEFAULT_USER:
 
            h.flash(_("You can't edit this user"), category='warning')
 
            return redirect(url('users'))
 

	
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'emails'
 
        c.user_email_map = UserEmailMap.query()\
 
            .filter(UserEmailMap.user == c.user).all()
 

	
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    def add_email(self, id):
 
@@ -440,29 +422,25 @@ class UsersController(BaseController):
 

	
 
    def delete_email(self, id):
 
        """DELETE /user_emails_delete/id: Delete an existing item"""
 
        # url('user_emails_delete', id=ID, method='delete')
 
        email_id = request.POST.get('del_email_id')
 
        user_model = UserModel()
 
        user_model.delete_extra_email(id, email_id)
 
        Session().commit()
 
        h.flash(_("Removed email from user"), category='success')
 
        return redirect(url('edit_user_emails', id=id))
 

	
 
    def edit_ips(self, id):
 
        c.user = User.get_or_404(id)
 
        if c.user.username == User.DEFAULT_USER:
 
            h.flash(_("You can't edit this user"), category='warning')
 
            return redirect(url('users'))
 

	
 
        c.user = self._get_user_or_raise_if_default(id)
 
        c.active = 'ips'
 
        c.user_ip_map = UserIpMap.query()\
 
            .filter(UserIpMap.user == c.user).all()
 

	
 
        c.inherit_default_ips = c.user.inherit_default_permissions
 
        c.default_user_ip_map = UserIpMap.query()\
 
            .filter(UserIpMap.user == User.get_default_user()).all()
 

	
 
        defaults = c.user.get_dict()
 
        return htmlfill.render(
 
            render('admin/users/user_edit.html'),
 
            defaults=defaults,
kallithea/lib/exceptions.py
Show inline comments
 
@@ -36,24 +36,25 @@ class LdapPasswordError(Exception):
 
    pass
 

	
 

	
 
class LdapConnectionError(Exception):
 
    pass
 

	
 

	
 
class LdapImportError(Exception):
 
    pass
 

	
 

	
 
class DefaultUserException(Exception):
 
    """An invalid action was attempted on the default user"""
 
    pass
 

	
 

	
 
class UserOwnsReposException(Exception):
 
    pass
 

	
 

	
 
class UserGroupsAssignedException(Exception):
 
    pass
 

	
 

	
 
class StatusChangeOnClosedPullRequestError(Exception):
kallithea/model/db.py
Show inline comments
 
@@ -34,24 +34,25 @@ import hashlib
 
import collections
 
import functools
 

	
 
from sqlalchemy import *
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import relationship, joinedload, class_mapper, validates
 
from beaker.cache import cache_region, region_invalidate
 
from webob.exc import HTTPNotFound
 

	
 
from pylons.i18n.translation import lazy_ugettext as _
 

	
 
from kallithea import DB_PREFIX
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.lib.vcs.exceptions import VCSError
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 

	
 
from kallithea.lib.utils2 import str2bool, safe_str, get_changeset_safe, \
 
    safe_unicode, remove_prefix, time_to_datetime, aslist, Optional, safe_int, \
 
    get_clone_url, urlreadable
 
from kallithea.lib.compat import json
 
from kallithea.lib.caching_query import FromCache
 

	
 
@@ -517,24 +518,36 @@ class User(Base, BaseModel):
 
    @user_data.setter
 
    def user_data(self, val):
 
        try:
 
            self._user_data = json.dumps(val)
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __unicode__(self):
 
        return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
 
                                      self.user_id, self.username)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_, allow_default=True):
 
        '''
 
        Overridden version of BaseModel.get_or_404, with an extra check on
 
        the default user.
 
        '''
 
        user = super(User, cls).get_or_404(id_)
 
        if allow_default == False:
 
            if user.username == User.DEFAULT_USER:
 
                raise DefaultUserException
 
        return user
 

	
 
    @classmethod
 
    def get_by_username(cls, username, case_insensitive=False, cache=False):
 
        if case_insensitive:
 
            q = cls.query().filter(cls.username.ilike(username))
 
        else:
 
            q = cls.query().filter(cls.username == username)
 

	
 
        if cache:
 
            q = q.options(FromCache(
 
                            "sql_cache_short",
 
                            "get_user_%s" % _hash_key(username)
 
                          )
 
            )
kallithea/tests/functional/test_admin_users.py
Show inline comments
 
@@ -4,24 +4,25 @@
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
from sqlalchemy.orm.exc import NoResultFound
 
from webob.exc import HTTPNotFound
 

	
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.model.db import User, Permission, UserIpMap, UserApiKeys
 
from kallithea.lib.auth import check_password
 
from kallithea.model.user import UserModel
 
from kallithea.model import validators
 
from kallithea.lib import helpers as h
 
from kallithea.model.meta import Session
 

	
 
fixture = Fixture()
 

	
 
@@ -488,12 +489,90 @@ class TestAdminUsersController(TestContr
 
        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        user_id = user.user_id
 
        api_key = user.api_key
 
        response = self.app.get(url('edit_user_api_keys', id=user_id))
 
        response.mustcontain(api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
        response = self.app.post(url('edit_user_api_keys', id=user_id),
 
                 {'_method': 'delete', 'del_api_key_builtin': api_key, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully reset')
 
        response = response.follow()
 
        response.mustcontain(no=[api_key])
 

	
 
# TODO To be uncommented when pytest is the test runner
 
#import pytest
 
#from kallithea.controllers.admin.users import UsersController
 
#class TestAdminUsersController_unittest(object):
 
#    """
 
#    Unit tests for the users controller
 
#    These are in a separate class, not deriving from TestController (and thus
 
#    unittest.TestCase), to be able to benefit from pytest features like
 
#    monkeypatch.
 
#    """
 
#    def test_get_user_or_raise_if_default(self, monkeypatch):
 
#        # flash complains about an unexisting session
 
#        def flash_mock(*args, **kwargs):
 
#            pass
 
#        monkeypatch.setattr(h, 'flash', flash_mock)
 
#
 
#        u = UsersController()
 
#        # a regular user should work correctly
 
#        user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
#        assert u._get_user_or_raise_if_default(user.user_id) == user
 
#        # the default user should raise
 
#        with pytest.raises(HTTPNotFound):
 
#            u._get_user_or_raise_if_default(User.get_default_user().user_id)
 

	
 

	
 
class TestAdminUsersControllerForDefaultUser(TestController):
 
    """
 
    Edit actions on the default user are not allowed.
 
    Validate that they throw a 404 exception.
 
    """
 
    def test_edit_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user', id=user.user_id), status=404)
 

	
 
    def test_edit_advanced_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_advanced', id=user.user_id), status=404)
 

	
 
    # API keys
 
    def test_edit_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_api_keys', id=user.user_id), status=404)
 

	
 
    def test_add_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_api_keys', id=user.user_id),
 
                 {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404)
 

	
 
    def test_delete_api_keys_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.post(url('edit_user_api_keys', id=user.user_id),
 
                 {'_method': 'delete', '_authentication_token': self.authentication_token()}, status=404)
 

	
 
    # Permissions
 
    def test_edit_perms_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_perms', id=user.user_id), status=404)
 

	
 
    # E-mails
 
    def test_edit_emails_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_emails', id=user.user_id), status=404)
 

	
 
    # IP addresses
 
    # Add/delete of IP addresses for the default user is used to maintain
 
    # the global IP whitelist and thus allowed. Only 'edit' is forbidden.
 
    def test_edit_ip_default_user(self):
 
        self.log_user()
 
        user = User.get_default_user()
 
        response = self.app.get(url('edit_user_ips', id=user.user_id), status=404)
0 comments (0 inline, 0 general)