Files
@ 2045d30919e6
Branch filter:
Location: kallithea/kallithea/tests/functional/test_admin_permissions.py
2045d30919e6
5.2 KiB
text/x-python
db: clarify that DEFAULT_USER just is DEFAULT_USER_NAME
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | from kallithea.model.db import User, UserIpMap
from kallithea.tests import base
class TestAdminPermissionsController(base.TestController):
def test_index(self):
self.log_user()
response = self.app.get(base.url('admin_permissions'))
# Test response...
def test_index_ips(self):
self.log_user()
response = self.app.get(base.url('admin_permissions_ips'))
# Test response...
response.mustcontain('All IP addresses are allowed')
def test_add_delete_ips(self, auto_clear_ip_permissions):
self.log_user()
default_user_id = User.get_default_user().user_id
# Add IP and verify it is shown in UI and both gives access and rejects
response = self.app.post(base.url('edit_user_ips_update', id=default_user_id),
params=dict(new_ip='0.0.0.0/24',
_session_csrf_secret_token=self.session_csrf_secret_token()))
base.invalidate_all_caches()
response = self.app.get(base.url('admin_permissions_ips'),
extra_environ={'REMOTE_ADDR': '0.0.0.1'})
response.mustcontain('0.0.0.0/24')
response.mustcontain('0.0.0.0 - 0.0.0.255')
response = self.app.get(base.url('admin_permissions_ips'),
extra_environ={'REMOTE_ADDR': '0.0.1.1'}, status=403)
# Add another IP and verify previously rejected now works
response = self.app.post(base.url('edit_user_ips_update', id=default_user_id),
params=dict(new_ip='0.0.1.0/24',
_session_csrf_secret_token=self.session_csrf_secret_token()))
base.invalidate_all_caches()
response = self.app.get(base.url('admin_permissions_ips'),
extra_environ={'REMOTE_ADDR': '0.0.1.1'})
# Delete latest IP and verify same IP is rejected again
x = UserIpMap.query().filter_by(ip_addr='0.0.1.0/24').first()
response = self.app.post(base.url('edit_user_ips_delete', id=default_user_id),
params=dict(del_ip_id=x.ip_id,
_session_csrf_secret_token=self.session_csrf_secret_token()))
base.invalidate_all_caches()
response = self.app.get(base.url('admin_permissions_ips'),
extra_environ={'REMOTE_ADDR': '0.0.1.1'}, status=403)
# Delete first IP and verify unlimited access again
x = UserIpMap.query().filter_by(ip_addr='0.0.0.0/24').first()
response = self.app.post(base.url('edit_user_ips_delete', id=default_user_id),
params=dict(del_ip_id=x.ip_id,
_session_csrf_secret_token=self.session_csrf_secret_token()))
base.invalidate_all_caches()
response = self.app.get(base.url('admin_permissions_ips'),
extra_environ={'REMOTE_ADDR': '0.0.1.1'})
def test_index_overview(self):
self.log_user()
response = self.app.get(base.url('admin_permissions_perms'))
# Test response...
def test_edit_permissions_permissions(self):
user = User.get_by_username(base.TEST_USER_REGULAR_LOGIN)
# Test unauthenticated access - it will redirect to login page
response = self.app.post(
base.url('edit_repo_perms_update', repo_name=base.HG_REPO),
params=dict(
perm_new_member_1='repository.read',
perm_new_member_name_1=user.username,
perm_new_member_type_1='user',
_session_csrf_secret_token=self.session_csrf_secret_token()),
status=302)
assert not response.location.endswith(base.url('edit_repo_perms_update', repo_name=base.HG_REPO))
assert response.location.endswith(base.url('login_home', came_from=base.url('edit_repo_perms_update', repo_name=base.HG_REPO)))
response = self.app.post(
base.url('edit_repo_perms_revoke', repo_name=base.HG_REPO),
params=dict(
obj_type='user',
user_id=user.user_id,
_session_csrf_secret_token=self.session_csrf_secret_token()),
status=302)
assert response.location.endswith(base.url('login_home', came_from=base.url('edit_repo_perms_revoke', repo_name=base.HG_REPO)))
# Test authenticated access
self.log_user()
response = self.app.post(
base.url('edit_repo_perms_update', repo_name=base.HG_REPO),
params=dict(
perm_new_member_1='repository.read',
perm_new_member_name_1=user.username,
perm_new_member_type_1='user',
_session_csrf_secret_token=self.session_csrf_secret_token()),
status=302)
assert response.location.endswith(base.url('edit_repo_perms_update', repo_name=base.HG_REPO))
response = self.app.post(
base.url('edit_repo_perms_revoke', repo_name=base.HG_REPO),
params=dict(
obj_type='user',
user_id=user.user_id,
_session_csrf_secret_token=self.session_csrf_secret_token()),
status=200)
assert not response.body
|