import os import time import pytest import testinfra.utils.ansible_runner testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner( os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('client-allowed') parameters_mandatory_hosts = testinfra.utils.ansible_runner.AnsibleRunner( os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('parameters-mandatory') parameters_optional_hosts = testinfra.utils.ansible_runner.AnsibleRunner( os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('parameters-optional') @pytest.mark.parametrize("target_host", parameters_mandatory_hosts + parameters_optional_hosts) @pytest.mark.parametrize("ip_protocol", [4, 6]) def test_ssh_connectivity(host, target_host, ip_protocol): """ Test if SSH server is reachable. """ with host.sudo(): scan = host.run('nmap -%s -p 22 -oG - %s', str(ip_protocol), target_host) assert scan.rc == 0 assert "Ports: 22/open/tcp//ssh" in scan.stdout @pytest.mark.parametrize("target_host", parameters_mandatory_hosts + parameters_optional_hosts) @pytest.mark.parametrize("ip_protocol", [4, 6]) def test_http_connectivity(host, target_host, ip_protocol): """ Test if HTTP server is reachable. """ with host.sudo(): scan = host.run('nmap -%s -p 80 -oG - %s', str(ip_protocol), target_host) assert scan.rc == 0 assert "Ports: 80/open/tcp//http" in scan.stdout @pytest.mark.parametrize("target_host", parameters_mandatory_hosts) @pytest.mark.parametrize("ip_protocol", [4, 6]) def test_tcp_rate_limit_parameters_mandatory(host, target_host, ip_protocol): """ Test if TCP rate limits are applied correctly. """ # Sequence explanation (for total of 14 packets being sent): # # - First second, 9 packets accepted because of burst policy. # - First second, 1 packet dropped because both burst policy and rate limit have been exceeded. # - Second second, waiting for dropped packet timeout. Rate limit catches up with burst. # - Third second, 3 packets accpeted thanks to being within the rate limit. # - Third second, 1 packet dropped because rate limit has been exceeded. expected_output = ["success"] * 9 + ["failure"] * 1 + ["success"] * 3 + ["failure"] * 1 with host.sudo(): # Clear the hash bucket. time.sleep(2) # Send 14 packets, timeout one second (-w), cut connection as # soon as it is established (-z). scan = host.run("for i in $(seq 14); do nc.openbsd -%s -w 1 -z %s 22 2>/dev/null && echo success || echo failure; done", str(ip_protocol), target_host) assert scan.rc == 0 assert scan.stdout.strip().split("\n") == expected_output @pytest.mark.parametrize("target_host", parameters_optional_hosts) @pytest.mark.parametrize("ip_protocol", [4, 6]) def test_tcp_rate_limit_parameters_optional(host, target_host, ip_protocol): """ Test if TCP rate limits are applied correctly. """ # Sequence explanation (for total of 17 packets being sent): # # - First second, 10 packets accepted because of burst policy. # - First second, 1 packet dropped because both burst policy and rate limit have been exceeded. # - Second second, waiting for dropped packet timeout. Rate limit catches up with burst. # - Third second, 5 packets accpeted thanks to being within the rate limit. # - Third second, 1 packet dropped because rate limit has been exceeded. expected_output = ["success"] * 10 + ["failure"] * 1 + ["success"] * 5 + ["failure"] * 1 with host.sudo(): # Clear the hash bucket. time.sleep(2) # Send 17 packets, timeout one second (-w), cut connection as # soon as it is established (-z). scan = host.run("for i in $(seq 17); do nc.openbsd -%s -w 1 -z %s 22 2>/dev/null && echo success || echo failure; done", str(ip_protocol), target_host) assert scan.rc == 0 assert scan.stdout.strip().split("\n") == expected_output @pytest.mark.parametrize("target_host", parameters_mandatory_hosts) @pytest.mark.parametrize("ip_protocol", [4, 6]) def test_icmp_rate_limit_parameters_mandatory(host, target_host, ip_protocol): """ Test if ICMP rate limits are applied correctly. """ # Sequence explanation (for total of 14 packets being sent): # # - First second, 9 packets accepted because of burst policy. # - First second, 1 packet dropped because both burst policy and rate limit have been exceeded. # - Second second, waiting for dropped packet timeout. Rate limit catches up with burst. # - Third second, 3 packets accpeted thanks to being within the rate limit. # - Third second, 1 packet dropped because rate limit has been exceeded. expected_output = ["1 received"] * 9 + ["0 received"] * 1 + ["1 received"] * 3 + ["0 received"] * 1 with host.sudo(): # Clear the hash bucket. time.sleep(2) # Send 14 x 1 packets (-c 1), with timeout of one second (-W 1). scan = host.run("for i in $(seq 14); do sudo ping -%s -c 1 -W 1 %s | grep -o '[[:digit:]] received'; done", str(ip_protocol), target_host) assert scan.rc == 0 assert scan.stdout.strip().split("\n") == expected_output @pytest.mark.parametrize("target_host", parameters_optional_hosts) @pytest.mark.parametrize("ip_protocol", [4, 6]) def test_icmp_rate_limit_parameters_optional(host, target_host, ip_protocol): """ Test if ICMP rate limits are applied correctly. """ # Sequence explanation (for total of 17 packets being sent): # # - First second, 10 packets accepted because of burst policy. # - First second, 1 packet dropped because both burst policy and rate limit have been exceeded. # - Second second, waiting for dropped packet timeout. Rate limit catches up with burst. # - Third second, 5 packets accpeted thanks to being within the rate limit. # - Third second, 1 packet dropped because rate limit has been exceeded. expected_output = ["1 received"] * 10 + ["0 received"] * 1 + ["1 received"] * 5 + ["0 received"] * 1 with host.sudo(): # Clear the hash bucket. time.sleep(2) # Send 17 x 1 packets (-c 1), with timeout of one second (-W 1). scan = host.run("for i in $(seq 17); do sudo ping -%s -c 1 -W 1 %s | grep -o '[[:digit:]] received'; done", str(ip_protocol), target_host) assert scan.rc == 0 assert scan.stdout.strip().split("\n") == expected_output