diff --git a/kallithea/controllers/admin/users.py b/kallithea/controllers/admin/users.py --- a/kallithea/controllers/admin/users.py +++ b/kallithea/controllers/admin/users.py @@ -34,6 +34,7 @@ from pylons import request, tmpl_context from pylons.controllers.util import redirect from pylons.i18n.translation import _ from sqlalchemy.sql.expression import func +from webob.exc import HTTPNotFound import kallithea from kallithea.lib.exceptions import DefaultUserException, \ @@ -233,14 +234,17 @@ class UsersController(BaseController): # url('user', id=ID) User.get_or_404(-1) + def _get_user_or_raise_if_default(self, id): + try: + return User.get_or_404(id, allow_default=False) + except DefaultUserException: + h.flash(_("The default user cannot be edited"), category='warning') + raise HTTPNotFound + def edit(self, id, format='html'): """GET /users/id/edit: Form to edit an existing item""" # url('edit_user', id=ID) - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) - + c.user = self._get_user_or_raise_if_default(id) c.active = 'profile' c.extern_type = c.user.extern_type c.extern_name = c.user.extern_name @@ -254,11 +258,7 @@ class UsersController(BaseController): force_defaults=False) def edit_advanced(self, id): - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) - + c.user = self._get_user_or_raise_if_default(id) c.active = 'advanced' c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr) @@ -277,11 +277,7 @@ class UsersController(BaseController): force_defaults=False) def edit_api_keys(self, id): - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) - + c.user = self._get_user_or_raise_if_default(id) c.active = 'api_keys' show_expired = True c.lifetime_values = [ @@ -302,10 +298,7 @@ class UsersController(BaseController): force_defaults=False) def add_api_key(self, id): - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) + c.user = self._get_user_or_raise_if_default(id) lifetime = safe_int(request.POST.get('lifetime'), -1) description = request.POST.get('description') @@ -315,10 +308,7 @@ class UsersController(BaseController): return redirect(url('edit_user_api_keys', id=c.user.user_id)) def delete_api_key(self, id): - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) + c.user = self._get_user_or_raise_if_default(id) api_key = request.POST.get('del_api_key') if request.POST.get('del_api_key_builtin'): @@ -339,11 +329,7 @@ class UsersController(BaseController): pass def edit_perms(self, id): - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) - + c.user = self._get_user_or_raise_if_default(id) c.active = 'perms' c.perm_user = AuthUser(user_id=id, ip_addr=self.ip_addr) @@ -402,11 +388,7 @@ class UsersController(BaseController): return redirect(url('edit_user_perms', id=id)) def edit_emails(self, id): - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) - + c.user = self._get_user_or_raise_if_default(id) c.active = 'emails' c.user_email_map = UserEmailMap.query()\ .filter(UserEmailMap.user == c.user).all() @@ -449,11 +431,7 @@ class UsersController(BaseController): return redirect(url('edit_user_emails', id=id)) def edit_ips(self, id): - c.user = User.get_or_404(id) - if c.user.username == User.DEFAULT_USER: - h.flash(_("You can't edit this user"), category='warning') - return redirect(url('users')) - + c.user = self._get_user_or_raise_if_default(id) c.active = 'ips' c.user_ip_map = UserIpMap.query()\ .filter(UserIpMap.user == c.user).all() diff --git a/kallithea/lib/exceptions.py b/kallithea/lib/exceptions.py --- a/kallithea/lib/exceptions.py +++ b/kallithea/lib/exceptions.py @@ -45,6 +45,7 @@ class LdapImportError(Exception): class DefaultUserException(Exception): + """An invalid action was attempted on the default user""" pass diff --git a/kallithea/model/db.py b/kallithea/model/db.py --- a/kallithea/model/db.py +++ b/kallithea/model/db.py @@ -43,6 +43,7 @@ from webob.exc import HTTPNotFound from pylons.i18n.translation import lazy_ugettext as _ from kallithea import DB_PREFIX +from kallithea.lib.exceptions import DefaultUserException from kallithea.lib.vcs import get_backend from kallithea.lib.vcs.utils.helpers import get_scm from kallithea.lib.vcs.exceptions import VCSError @@ -526,6 +527,18 @@ class User(Base, BaseModel): self.user_id, self.username) @classmethod + def get_or_404(cls, id_, allow_default=True): + ''' + Overridden version of BaseModel.get_or_404, with an extra check on + the default user. + ''' + user = super(User, cls).get_or_404(id_) + if allow_default == False: + if user.username == User.DEFAULT_USER: + raise DefaultUserException + return user + + @classmethod def get_by_username(cls, username, case_insensitive=False, cache=False): if case_insensitive: q = cls.query().filter(cls.username.ilike(username)) diff --git a/kallithea/tests/functional/test_admin_users.py b/kallithea/tests/functional/test_admin_users.py --- a/kallithea/tests/functional/test_admin_users.py +++ b/kallithea/tests/functional/test_admin_users.py @@ -13,6 +13,7 @@ # along with this program. If not, see . from sqlalchemy.orm.exc import NoResultFound +from webob.exc import HTTPNotFound from kallithea.tests import * from kallithea.tests.fixture import Fixture @@ -497,3 +498,81 @@ class TestAdminUsersController(TestContr self.checkSessionFlash(response, 'API key successfully reset') response = response.follow() response.mustcontain(no=[api_key]) + +# TODO To be uncommented when pytest is the test runner +#import pytest +#from kallithea.controllers.admin.users import UsersController +#class TestAdminUsersController_unittest(object): +# """ +# Unit tests for the users controller +# These are in a separate class, not deriving from TestController (and thus +# unittest.TestCase), to be able to benefit from pytest features like +# monkeypatch. +# """ +# def test_get_user_or_raise_if_default(self, monkeypatch): +# # flash complains about an unexisting session +# def flash_mock(*args, **kwargs): +# pass +# monkeypatch.setattr(h, 'flash', flash_mock) +# +# u = UsersController() +# # a regular user should work correctly +# user = User.get_by_username(TEST_USER_REGULAR_LOGIN) +# assert u._get_user_or_raise_if_default(user.user_id) == user +# # the default user should raise +# with pytest.raises(HTTPNotFound): +# u._get_user_or_raise_if_default(User.get_default_user().user_id) + + +class TestAdminUsersControllerForDefaultUser(TestController): + """ + Edit actions on the default user are not allowed. + Validate that they throw a 404 exception. + """ + def test_edit_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.get(url('edit_user', id=user.user_id), status=404) + + def test_edit_advanced_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.get(url('edit_user_advanced', id=user.user_id), status=404) + + # API keys + def test_edit_api_keys_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.get(url('edit_user_api_keys', id=user.user_id), status=404) + + def test_add_api_keys_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.post(url('edit_user_api_keys', id=user.user_id), + {'_method': 'put', '_authentication_token': self.authentication_token()}, status=404) + + def test_delete_api_keys_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.post(url('edit_user_api_keys', id=user.user_id), + {'_method': 'delete', '_authentication_token': self.authentication_token()}, status=404) + + # Permissions + def test_edit_perms_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.get(url('edit_user_perms', id=user.user_id), status=404) + + # E-mails + def test_edit_emails_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.get(url('edit_user_emails', id=user.user_id), status=404) + + # IP addresses + # Add/delete of IP addresses for the default user is used to maintain + # the global IP whitelist and thus allowed. Only 'edit' is forbidden. + def test_edit_ip_default_user(self): + self.log_user() + user = User.get_default_user() + response = self.app.get(url('edit_user_ips', id=user.user_id), status=404)