diff --git a/rhodecode/tests/functional/test_admin_users.py b/rhodecode/tests/functional/test_admin_users.py --- a/rhodecode/tests/functional/test_admin_users.py +++ b/rhodecode/tests/functional/test_admin_users.py @@ -1,24 +1,45 @@ +# -*- coding: utf-8 -*- +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + from sqlalchemy.orm.exc import NoResultFound from rhodecode.tests import * -from rhodecode.model.db import User, Permission +from rhodecode.tests.fixture import Fixture +from rhodecode.model.db import User, Permission, UserIpMap, UserApiKeys from rhodecode.lib.auth import check_password from rhodecode.model.user import UserModel from rhodecode.model import validators from rhodecode.lib import helpers as h from rhodecode.model.meta import Session +fixture = Fixture() + class TestAdminUsersController(TestController): + test_user_1 = 'testme' + + @classmethod + def teardown_class(cls): + if User.get_by_username(cls.test_user_1): + UserModel().delete(cls.test_user_1) + Session().commit() def test_index(self): self.log_user() response = self.app.get(url('users')) # Test response... - def test_index_as_xml(self): - response = self.app.get(url('formatted_users', format='xml')) - def test_create(self): self.log_user() username = 'newtestuser' @@ -29,13 +50,15 @@ class TestAdminUsersController(TestContr email = 'mail@mail.com' response = self.app.post(url('users'), - {'username': username, - 'password': password, - 'password_confirmation': password_confirmation, - 'firstname': name, - 'active': True, - 'lastname': lastname, - 'email': email}) + {'username': username, + 'password': password, + 'password_confirmation': password_confirmation, + 'firstname': name, + 'active': True, + 'lastname': lastname, + 'extern_name': 'rhodecode', + 'extern_type': 'rhodecode', + 'email': email}) self.checkSessionFlash(response, '''Created user %s''' % (username)) @@ -82,69 +105,60 @@ class TestAdminUsersController(TestContr self.log_user() response = self.app.get(url('new_user')) - def test_new_as_xml(self): - response = self.app.get(url('formatted_new_user', format='xml')) - - @parameterized.expand([('firstname', 'new_username'), - ('lastname', 'new_username'), - ('admin', True), - ('admin', False), - ('ldap_dn', 'test'), - ('ldap_dn', None), - ('active', False), - ('active', True), - ('email', 'some@email.com'), - ]) - def test_update(self, name, expected): + @parameterized.expand( + [('firstname', {'firstname': 'new_username'}), + ('lastname', {'lastname': 'new_username'}), + ('admin', {'admin': True}), + ('admin', {'admin': False}), + ('extern_type', {'extern_type': 'ldap'}), + ('extern_type', {'extern_type': None}), + ('extern_name', {'extern_name': 'test'}), + ('extern_name', {'extern_name': None}), + ('active', {'active': False}), + ('active', {'active': True}), + ('email', {'email': 'some@email.com'}), + # ('new_password', {'new_password': 'foobar123', + # 'password_confirmation': 'foobar123'}) + ]) + def test_update(self, name, attrs): self.log_user() - uname = 'testme' - usr = UserModel().create_or_update(username=uname, password='qweqwe', - email='testme@rhodecod.org') + usr = fixture.create_user(self.test_user_1, password='qweqwe', + email='testme@rhodecode.org', + extern_type='rhodecode', + extern_name=self.test_user_1, + skip_if_exists=True) Session().commit() params = usr.get_api_data() - params.update({name: expected}) params.update({'password_confirmation': ''}) params.update({'new_password': ''}) + params.update(attrs) if name == 'email': - params['emails'] = [expected] - if name == 'ldap_dn': - #cannot update this via form - params['ldap_dn'] = None - try: - response = self.app.put(url('user', id=usr.user_id), params) - - self.checkSessionFlash(response, '''User updated successfully''') + params['emails'] = [attrs['email']] + if name == 'extern_type': + #cannot update this via form, expected value is original one + params['extern_type'] = "rhodecode" + if name == 'extern_name': + #cannot update this via form, expected value is original one + params['extern_name'] = self.test_user_1 + # special case since this user is not + # logged in yet his data is not filled + # so we use creation data - updated_user = User.get_by_username(uname) - updated_params = updated_user.get_api_data() - updated_params.update({'password_confirmation': ''}) - updated_params.update({'new_password': ''}) + response = self.app.put(url('user', id=usr.user_id), params) + self.checkSessionFlash(response, 'User updated successfully') - self.assertEqual(params, updated_params) + updated_user = User.get_by_username(self.test_user_1) + updated_params = updated_user.get_api_data() + updated_params.update({'password_confirmation': ''}) + updated_params.update({'new_password': ''}) - finally: - UserModel().delete('testme') - - def test_update_browser_fakeout(self): - response = self.app.post(url('user', id=1), params=dict(_method='put')) + self.assertEqual(params, updated_params) def test_delete(self): self.log_user() username = 'newtestuserdeleteme' - password = 'test12' - name = 'name' - lastname = 'lastname' - email = 'todeletemail@mail.com' - response = self.app.post(url('users'), {'username': username, - 'password': password, - 'password_confirmation': password, - 'firstname': name, - 'active': True, - 'lastname': lastname, - 'email': email}) - - response = response.follow() + fixture.create_user(name=username) new_user = Session().query(User)\ .filter(User.username == username).one() @@ -152,16 +166,9 @@ class TestAdminUsersController(TestContr self.checkSessionFlash(response, 'Successfully deleted user') - def test_delete_browser_fakeout(self): - response = self.app.post(url('user', id=1), - params=dict(_method='delete')) - def test_show(self): response = self.app.get(url('user', id=1)) - def test_show_as_xml(self): - response = self.app.get(url('formatted_user', id=1, format='xml')) - def test_edit(self): self.log_user() user = User.get_by_username(TEST_USER_ADMIN_LOGIN) @@ -183,7 +190,7 @@ class TestAdminUsersController(TestContr self.assertEqual(UserModel().has_perm(user, perm_none), False) self.assertEqual(UserModel().has_perm(user, perm_create), False) - response = self.app.post(url('user_perm', id=uid), + response = self.app.post(url('edit_user_perms', id=uid), params=dict(_method='put', create_repo_perm=True)) @@ -213,7 +220,7 @@ class TestAdminUsersController(TestContr self.assertEqual(UserModel().has_perm(user, perm_none), False) self.assertEqual(UserModel().has_perm(user, perm_create), False) - response = self.app.post(url('user_perm', id=uid), + response = self.app.post(url('edit_user_perms', id=uid), params=dict(_method='put')) perm_none = Permission.get_by_key('hg.create.none') @@ -242,7 +249,7 @@ class TestAdminUsersController(TestContr self.assertEqual(UserModel().has_perm(user, perm_none), False) self.assertEqual(UserModel().has_perm(user, perm_fork), False) - response = self.app.post(url('user_perm', id=uid), + response = self.app.post(url('edit_user_perms', id=uid), params=dict(_method='put', create_repo_perm=True)) @@ -272,7 +279,7 @@ class TestAdminUsersController(TestContr self.assertEqual(UserModel().has_perm(user, perm_none), False) self.assertEqual(UserModel().has_perm(user, perm_fork), False) - response = self.app.post(url('user_perm', id=uid), + response = self.app.post(url('edit_user_perms', id=uid), params=dict(_method='put')) perm_none = Permission.get_by_key('hg.create.none') @@ -285,5 +292,128 @@ class TestAdminUsersController(TestContr UserModel().delete(uid) Session().commit() - def test_edit_as_xml(self): - response = self.app.get(url('formatted_edit_user', id=1, format='xml')) + def test_ips(self): + self.log_user() + user = User.get_by_username(TEST_USER_REGULAR_LOGIN) + response = self.app.get(url('edit_user_ips', id=user.user_id)) + response.mustcontain('All IP addresses are allowed') + + @parameterized.expand([ + ('127/24', '127.0.0.1/24', '127.0.0.0 - 127.0.0.255', False), + ('10/32', '10.0.0.10/32', '10.0.0.10 - 10.0.0.10', False), + ('0/16', '0.0.0.0/16', '0.0.0.0 - 0.0.255.255', False), + ('0/8', '0.0.0.0/8', '0.0.0.0 - 0.255.255.255', False), + ('127_bad_mask', '127.0.0.1/99', '127.0.0.1 - 127.0.0.1', True), + ('127_bad_ip', 'foobar', 'foobar', True), + ]) + def test_add_ip(self, test_name, ip, ip_range, failure): + self.log_user() + user = User.get_by_username(TEST_USER_REGULAR_LOGIN) + user_id = user.user_id + + response = self.app.put(url('edit_user_ips', id=user_id), + params=dict(new_ip=ip)) + + if failure: + self.checkSessionFlash(response, 'Please enter a valid IPv4 or IpV6 address') + response = self.app.get(url('edit_user_ips', id=user_id)) + response.mustcontain(no=[ip]) + response.mustcontain(no=[ip_range]) + + else: + response = self.app.get(url('edit_user_ips', id=user_id)) + response.mustcontain(ip) + response.mustcontain(ip_range) + + ## cleanup + for del_ip in UserIpMap.query().filter(UserIpMap.user_id == user_id).all(): + Session().delete(del_ip) + Session().commit() + + def test_delete_ip(self): + self.log_user() + user = User.get_by_username(TEST_USER_REGULAR_LOGIN) + user_id = user.user_id + ip = '127.0.0.1/32' + ip_range = '127.0.0.1 - 127.0.0.1' + new_ip = UserModel().add_extra_ip(user_id, ip) + Session().commit() + new_ip_id = new_ip.ip_id + + response = self.app.get(url('edit_user_ips', id=user_id)) + response.mustcontain(ip) + response.mustcontain(ip_range) + + self.app.post(url('edit_user_ips', id=user_id), + params=dict(_method='delete', del_ip_id=new_ip_id)) + + response = self.app.get(url('edit_user_ips', id=user_id)) + response.mustcontain('All IP addresses are allowed') + response.mustcontain(no=[ip]) + response.mustcontain(no=[ip_range]) + + def test_api_keys(self): + self.log_user() + + user = User.get_by_username(TEST_USER_REGULAR_LOGIN) + response = self.app.get(url('edit_user_api_keys', id=user.user_id)) + response.mustcontain(user.api_key) + response.mustcontain('expires: never') + + @parameterized.expand([ + ('forever', -1), + ('5mins', 60*5), + ('30days', 60*60*24*30), + ]) + def test_add_api_keys(self, desc, lifetime): + self.log_user() + user = User.get_by_username(TEST_USER_REGULAR_LOGIN) + user_id = user.user_id + + response = self.app.post(url('edit_user_api_keys', id=user_id), + {'_method': 'put', 'description': desc, 'lifetime': lifetime}) + self.checkSessionFlash(response, 'Api key successfully created') + try: + response = response.follow() + user = User.get(user_id) + for api_key in user.api_keys: + response.mustcontain(api_key) + finally: + for api_key in UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all(): + Session().delete(api_key) + Session().commit() + + def test_remove_api_key(self): + self.log_user() + user = User.get_by_username(TEST_USER_REGULAR_LOGIN) + user_id = user.user_id + + response = self.app.post(url('edit_user_api_keys', id=user_id), + {'_method': 'put', 'description': 'desc', 'lifetime': -1}) + self.checkSessionFlash(response, 'Api key successfully created') + response = response.follow() + + #now delete our key + keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all() + self.assertEqual(1, len(keys)) + + response = self.app.post(url('edit_user_api_keys', id=user_id), + {'_method': 'delete', 'del_api_key': keys[0].api_key}) + self.checkSessionFlash(response, 'Api key successfully deleted') + keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all() + self.assertEqual(0, len(keys)) + + def test_reset_main_api_key(self): + self.log_user() + user = User.get_by_username(TEST_USER_REGULAR_LOGIN) + user_id = user.user_id + api_key = user.api_key + response = self.app.get(url('edit_user_api_keys', id=user_id)) + response.mustcontain(api_key) + response.mustcontain('expires: never') + + response = self.app.post(url('edit_user_api_keys', id=user_id), + {'_method': 'delete', 'del_api_key_builtin': api_key}) + self.checkSessionFlash(response, 'Api key successfully reset') + response = response.follow() + response.mustcontain(no=[api_key])