Files @ 6bdfd938ef64
Branch filter:

Location: majic-ansible-roles/roles/common/molecule/default/tests/test_maintenance_from_allowed_client.py - annotation

branko
MAR-242: Document that expiration period is set for uploaded files.
7b004fce5c8b
b171f6203e40
7b004fce5c8b
325b9d16a72b
325b9d16a72b
7b004fce5c8b
7b004fce5c8b
7b004fce5c8b
7b004fce5c8b
7b004fce5c8b
7b004fce5c8b
f6bd1ff55982
f6bd1ff55982
7b004fce5c8b
f6bd1ff55982
f6bd1ff55982
f6bd1ff55982
f6bd1ff55982
f6bd1ff55982
736e06e7ffd6
736e06e7ffd6
7b004fce5c8b
7b004fce5c8b
7b004fce5c8b
7b004fce5c8b
7b004fce5c8b
7b004fce5c8b
736e06e7ffd6
76debadf4dae
76debadf4dae
76debadf4dae
7b004fce5c8b
7b004fce5c8b
f6bd1ff55982
736e06e7ffd6
736e06e7ffd6
7b004fce5c8b
7b004fce5c8b
7b004fce5c8b
7b004fce5c8b
7b004fce5c8b
7b004fce5c8b
736e06e7ffd6
76debadf4dae
76debadf4dae
76debadf4dae
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
b171f6203e40
import os
import time

import pytest

import testinfra.utils.ansible_runner


testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
    os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('client-allowed')

parameters_mandatory_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
    os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('parameters-mandatory')

parameters_optional_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
    os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('parameters-optional')


@pytest.mark.parametrize("target_host", parameters_mandatory_hosts + parameters_optional_hosts)
@pytest.mark.parametrize("ip_protocol", [4, 6])
def test_ssh_connectivity(host, target_host, ip_protocol):
    """
    Test if SSH server is reachable.
    """

    with host.sudo():

        scan = host.run('nmap -%s -p 22 -oG - %s', str(ip_protocol), target_host)

        assert scan.rc == 0
        assert "Ports: 22/open/tcp//ssh" in scan.stdout


@pytest.mark.parametrize("target_host", parameters_mandatory_hosts + parameters_optional_hosts)
@pytest.mark.parametrize("ip_protocol", [4, 6])
def test_http_connectivity(host, target_host, ip_protocol):
    """
    Test if HTTP server is reachable.
    """

    with host.sudo():

        scan = host.run('nmap -%s -p 80 -oG - %s', str(ip_protocol), target_host)

        assert scan.rc == 0
        assert "Ports: 80/open/tcp//http" in scan.stdout


@pytest.mark.parametrize("target_host", parameters_mandatory_hosts)
@pytest.mark.parametrize("ip_protocol", [4, 6])
def test_tcp_rate_limit_parameters_mandatory(host, target_host, ip_protocol):
    """
    Test if TCP rate limits are applied correctly.
    """

    # Sequence explanation (for total of 14 packets being sent):
    #
    # - First second, 9 packets accepted because of burst policy.
    # - First second, 1 packet dropped because both burst policy and rate limit have been exceeded.
    # - Second second, waiting for dropped packet timeout. Rate limit catches up with burst.
    # - Third second, 3 packets accpeted thanks to being within the rate limit.
    # - Third second, 1 packet dropped because rate limit has been exceeded.
    expected_output = ["success"] * 9 + ["failure"] * 1 + ["success"] * 3 + ["failure"] * 1

    with host.sudo():

        # Clear the hash bucket.
        time.sleep(2)

        # Send 14 packets, timeout one second (-w), cut connection as
        # soon as it is established (-z).
        scan = host.run("for i in $(seq 14); do nc.openbsd -%s -w 1 -z %s 22 2>/dev/null && echo success || echo failure; done", str(ip_protocol), target_host)

        assert scan.rc == 0
        assert scan.stdout.strip().split("\n") == expected_output


@pytest.mark.parametrize("target_host", parameters_optional_hosts)
@pytest.mark.parametrize("ip_protocol", [4, 6])
def test_tcp_rate_limit_parameters_optional(host, target_host, ip_protocol):
    """
    Test if TCP rate limits are applied correctly.
    """

    # Sequence explanation (for total of 17 packets being sent):
    #
    # - First second, 10 packets accepted because of burst policy.
    # - First second, 1 packet dropped because both burst policy and rate limit have been exceeded.
    # - Second second, waiting for dropped packet timeout. Rate limit catches up with burst.
    # - Third second, 5 packets accpeted thanks to being within the rate limit.
    # - Third second, 1 packet dropped because rate limit has been exceeded.
    expected_output = ["success"] * 10 + ["failure"] * 1 + ["success"] * 5 + ["failure"] * 1

    with host.sudo():

        # Clear the hash bucket.
        time.sleep(2)

        # Send 17 packets, timeout one second (-w), cut connection as
        # soon as it is established (-z).
        scan = host.run("for i in $(seq 17); do nc.openbsd -%s -w 1 -z %s 22 2>/dev/null && echo success || echo failure; done", str(ip_protocol), target_host)

        assert scan.rc == 0
        assert scan.stdout.strip().split("\n") == expected_output


@pytest.mark.parametrize("target_host", parameters_mandatory_hosts)
@pytest.mark.parametrize("ip_protocol", [4, 6])
def test_icmp_rate_limit_parameters_mandatory(host, target_host, ip_protocol):
    """
    Test if ICMP rate limits are applied correctly.
    """

    # Sequence explanation (for total of 14 packets being sent):
    #
    # - First second, 9 packets accepted because of burst policy.
    # - First second, 1 packet dropped because both burst policy and rate limit have been exceeded.
    # - Second second, waiting for dropped packet timeout. Rate limit catches up with burst.
    # - Third second, 3 packets accpeted thanks to being within the rate limit.
    # - Third second, 1 packet dropped because rate limit has been exceeded.
    expected_output = ["1 received"] * 9 + ["0 received"] * 1 + ["1 received"] * 3 + ["0 received"] * 1

    with host.sudo():

        # Clear the hash bucket.
        time.sleep(2)

        # Send 14 x 1 packets (-c 1), with timeout of one second (-W 1).
        scan = host.run("for i in $(seq 14); do sudo ping -%s -c 1 -W 1 %s | grep -o '[[:digit:]] received'; done", str(ip_protocol), target_host)

        assert scan.rc == 0
        assert scan.stdout.strip().split("\n") == expected_output


@pytest.mark.parametrize("target_host", parameters_optional_hosts)
@pytest.mark.parametrize("ip_protocol", [4, 6])
def test_icmp_rate_limit_parameters_optional(host, target_host, ip_protocol):
    """
    Test if ICMP rate limits are applied correctly.
    """

    # Sequence explanation (for total of 17 packets being sent):
    #
    # - First second, 10 packets accepted because of burst policy.
    # - First second, 1 packet dropped because both burst policy and rate limit have been exceeded.
    # - Second second, waiting for dropped packet timeout. Rate limit catches up with burst.
    # - Third second, 5 packets accpeted thanks to being within the rate limit.
    # - Third second, 1 packet dropped because rate limit has been exceeded.
    expected_output = ["1 received"] * 10 + ["0 received"] * 1 + ["1 received"] * 5 + ["0 received"] * 1

    with host.sudo():

        # Clear the hash bucket.
        time.sleep(2)

        # Send 17 x 1 packets (-c 1), with timeout of one second (-W 1).
        scan = host.run("for i in $(seq 17); do sudo ping -%s -c 1 -W 1 %s | grep -o '[[:digit:]] received'; done", str(ip_protocol), target_host)

        assert scan.rc == 0
        assert scan.stdout.strip().split("\n") == expected_output