Files
@ 6bdfd938ef64
Branch filter:
Location: majic-ansible-roles/roles/common/molecule/default/tests/test_maintenance_from_allowed_client.py - annotation
6bdfd938ef64
6.3 KiB
text/x-python
MAR-242: Document that expiration period is set for uploaded files.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | 7b004fce5c8b b171f6203e40 7b004fce5c8b 325b9d16a72b 325b9d16a72b 7b004fce5c8b 7b004fce5c8b 7b004fce5c8b 7b004fce5c8b 7b004fce5c8b 7b004fce5c8b f6bd1ff55982 f6bd1ff55982 7b004fce5c8b f6bd1ff55982 f6bd1ff55982 f6bd1ff55982 f6bd1ff55982 f6bd1ff55982 736e06e7ffd6 736e06e7ffd6 7b004fce5c8b 7b004fce5c8b 7b004fce5c8b 7b004fce5c8b 7b004fce5c8b 7b004fce5c8b 736e06e7ffd6 76debadf4dae 76debadf4dae 76debadf4dae 7b004fce5c8b 7b004fce5c8b f6bd1ff55982 736e06e7ffd6 736e06e7ffd6 7b004fce5c8b 7b004fce5c8b 7b004fce5c8b 7b004fce5c8b 7b004fce5c8b 7b004fce5c8b 736e06e7ffd6 76debadf4dae 76debadf4dae 76debadf4dae b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 b171f6203e40 | import os
import time
import pytest
import testinfra.utils.ansible_runner
testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('client-allowed')
parameters_mandatory_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('parameters-mandatory')
parameters_optional_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('parameters-optional')
@pytest.mark.parametrize("target_host", parameters_mandatory_hosts + parameters_optional_hosts)
@pytest.mark.parametrize("ip_protocol", [4, 6])
def test_ssh_connectivity(host, target_host, ip_protocol):
"""
Test if SSH server is reachable.
"""
with host.sudo():
scan = host.run('nmap -%s -p 22 -oG - %s', str(ip_protocol), target_host)
assert scan.rc == 0
assert "Ports: 22/open/tcp//ssh" in scan.stdout
@pytest.mark.parametrize("target_host", parameters_mandatory_hosts + parameters_optional_hosts)
@pytest.mark.parametrize("ip_protocol", [4, 6])
def test_http_connectivity(host, target_host, ip_protocol):
"""
Test if HTTP server is reachable.
"""
with host.sudo():
scan = host.run('nmap -%s -p 80 -oG - %s', str(ip_protocol), target_host)
assert scan.rc == 0
assert "Ports: 80/open/tcp//http" in scan.stdout
@pytest.mark.parametrize("target_host", parameters_mandatory_hosts)
@pytest.mark.parametrize("ip_protocol", [4, 6])
def test_tcp_rate_limit_parameters_mandatory(host, target_host, ip_protocol):
"""
Test if TCP rate limits are applied correctly.
"""
# Sequence explanation (for total of 14 packets being sent):
#
# - First second, 9 packets accepted because of burst policy.
# - First second, 1 packet dropped because both burst policy and rate limit have been exceeded.
# - Second second, waiting for dropped packet timeout. Rate limit catches up with burst.
# - Third second, 3 packets accpeted thanks to being within the rate limit.
# - Third second, 1 packet dropped because rate limit has been exceeded.
expected_output = ["success"] * 9 + ["failure"] * 1 + ["success"] * 3 + ["failure"] * 1
with host.sudo():
# Clear the hash bucket.
time.sleep(2)
# Send 14 packets, timeout one second (-w), cut connection as
# soon as it is established (-z).
scan = host.run("for i in $(seq 14); do nc.openbsd -%s -w 1 -z %s 22 2>/dev/null && echo success || echo failure; done", str(ip_protocol), target_host)
assert scan.rc == 0
assert scan.stdout.strip().split("\n") == expected_output
@pytest.mark.parametrize("target_host", parameters_optional_hosts)
@pytest.mark.parametrize("ip_protocol", [4, 6])
def test_tcp_rate_limit_parameters_optional(host, target_host, ip_protocol):
"""
Test if TCP rate limits are applied correctly.
"""
# Sequence explanation (for total of 17 packets being sent):
#
# - First second, 10 packets accepted because of burst policy.
# - First second, 1 packet dropped because both burst policy and rate limit have been exceeded.
# - Second second, waiting for dropped packet timeout. Rate limit catches up with burst.
# - Third second, 5 packets accpeted thanks to being within the rate limit.
# - Third second, 1 packet dropped because rate limit has been exceeded.
expected_output = ["success"] * 10 + ["failure"] * 1 + ["success"] * 5 + ["failure"] * 1
with host.sudo():
# Clear the hash bucket.
time.sleep(2)
# Send 17 packets, timeout one second (-w), cut connection as
# soon as it is established (-z).
scan = host.run("for i in $(seq 17); do nc.openbsd -%s -w 1 -z %s 22 2>/dev/null && echo success || echo failure; done", str(ip_protocol), target_host)
assert scan.rc == 0
assert scan.stdout.strip().split("\n") == expected_output
@pytest.mark.parametrize("target_host", parameters_mandatory_hosts)
@pytest.mark.parametrize("ip_protocol", [4, 6])
def test_icmp_rate_limit_parameters_mandatory(host, target_host, ip_protocol):
"""
Test if ICMP rate limits are applied correctly.
"""
# Sequence explanation (for total of 14 packets being sent):
#
# - First second, 9 packets accepted because of burst policy.
# - First second, 1 packet dropped because both burst policy and rate limit have been exceeded.
# - Second second, waiting for dropped packet timeout. Rate limit catches up with burst.
# - Third second, 3 packets accpeted thanks to being within the rate limit.
# - Third second, 1 packet dropped because rate limit has been exceeded.
expected_output = ["1 received"] * 9 + ["0 received"] * 1 + ["1 received"] * 3 + ["0 received"] * 1
with host.sudo():
# Clear the hash bucket.
time.sleep(2)
# Send 14 x 1 packets (-c 1), with timeout of one second (-W 1).
scan = host.run("for i in $(seq 14); do sudo ping -%s -c 1 -W 1 %s | grep -o '[[:digit:]] received'; done", str(ip_protocol), target_host)
assert scan.rc == 0
assert scan.stdout.strip().split("\n") == expected_output
@pytest.mark.parametrize("target_host", parameters_optional_hosts)
@pytest.mark.parametrize("ip_protocol", [4, 6])
def test_icmp_rate_limit_parameters_optional(host, target_host, ip_protocol):
"""
Test if ICMP rate limits are applied correctly.
"""
# Sequence explanation (for total of 17 packets being sent):
#
# - First second, 10 packets accepted because of burst policy.
# - First second, 1 packet dropped because both burst policy and rate limit have been exceeded.
# - Second second, waiting for dropped packet timeout. Rate limit catches up with burst.
# - Third second, 5 packets accpeted thanks to being within the rate limit.
# - Third second, 1 packet dropped because rate limit has been exceeded.
expected_output = ["1 received"] * 10 + ["0 received"] * 1 + ["1 received"] * 5 + ["0 received"] * 1
with host.sudo():
# Clear the hash bucket.
time.sleep(2)
# Send 17 x 1 packets (-c 1), with timeout of one second (-W 1).
scan = host.run("for i in $(seq 17); do sudo ping -%s -c 1 -W 1 %s | grep -o '[[:digit:]] received'; done", str(ip_protocol), target_host)
assert scan.rc == 0
assert scan.stdout.strip().split("\n") == expected_output
|