Changeset - b171f6203e40
[Not reviewed]
0 4 0
Branko Majic (branko) - 2 months ago 2025-02-05 01:09:41
branko@majic.rs
MAR-241: Implemented connection rate limit tests for the common role:

- Tweak some test machine parameters to have better test
sample representation.
- Tests heavily rely not being laggy to get the expected output. Tests
may need to get reworked a bit if they end up failing repeatedly
during test runs.
4 files changed with 125 insertions and 8 deletions:
0 comments (0 inline, 0 general) First comment
roles/common/molecule/default/group_vars/parameters-optional.yml
Show inline comments
 
@@ -39,7 +39,7 @@ extra_backup_patterns:
 
  - /home/user1
 
  - /home/user2
 
incoming_connection_limit: 5/second
 
incoming_connection_limit_burst: 5
 
incoming_connection_limit_burst: 10
 
pipreqcheck_uid: 2500
 
pipreqcheck_gid: 2500
 
prompt_colour: cyan
roles/common/molecule/default/prepare.yml
Show inline comments
 
@@ -53,9 +53,11 @@
 
  become: true
 
  tasks:
 

	
 
    - name: Install tool for testing TCP connectivity
 
    - name: Install testing tools
 
      ansible.builtin.apt:
 
        name: nmap
 
        name:
 
          - nmap
 
          - netcat-openbsd
 
        state: present
 

	
 
    - name: Set-up /etc/hosts with entries for all servers
roles/common/molecule/default/tests/test_maintenance_from_allowed_client.py
Show inline comments
 
import os
 
import time
 

	
 
import pytest
 

	
 
@@ -43,3 +44,117 @@ def test_http_connectivity(host, target_host, ip_protocol):
 

	
 
        assert scan.rc == 0
 
        assert "Ports: 80/open/tcp//http" in scan.stdout
 

	
 

	
 
@pytest.mark.parametrize("target_host", parameters_mandatory_hosts)
 
@pytest.mark.parametrize("ip_protocol", [4, 6])
 
def test_tcp_rate_limit_parameters_mandatory(host, target_host, ip_protocol):
 
    """
 
    Test if TCP rate limits are applied correctly.
 
    """
 

	
 
    # Sequence explanation (for total of 14 packets being sent):
 
    #
 
    # - First second, 9 packets accepted because of burst policy.
 
    # - First second, 1 packet dropped because both burst policy and rate limit have been exceeded.
 
    # - Second second, waiting for dropped packet timeout. Rate limit catches up with burst.
 
    # - Third second, 3 packets accpeted thanks to being within the rate limit.
 
    # - Third second, 1 packet dropped because rate limit has been exceeded.
 
    expected_output = ["success"] * 9 + ["failure"] * 1 + ["success"] * 3 + ["failure"] * 1
 

	
 
    with host.sudo():
 

	
 
        # Clear the hash bucket.
 
        time.sleep(2)
 

	
 
        # Send 14 packets, timeout one second (-w), cut connection as
 
        # soon as it is established (-z).
 
        scan = host.run("for i in $(seq 14); do nc.openbsd -%s -w 1 -z %s 22 2>/dev/null && echo success || echo failure; done", str(ip_protocol), target_host)
 

	
 
        assert scan.rc == 0
 
        assert scan.stdout.strip().split("\n") == expected_output
 

	
 

	
 
@pytest.mark.parametrize("target_host", parameters_optional_hosts)
 
@pytest.mark.parametrize("ip_protocol", [4, 6])
 
def test_tcp_rate_limit_parameters_optional(host, target_host, ip_protocol):
 
    """
 
    Test if TCP rate limits are applied correctly.
 
    """
 

	
 
    # Sequence explanation (for total of 17 packets being sent):
 
    #
 
    # - First second, 10 packets accepted because of burst policy.
 
    # - First second, 1 packet dropped because both burst policy and rate limit have been exceeded.
 
    # - Second second, waiting for dropped packet timeout. Rate limit catches up with burst.
 
    # - Third second, 5 packets accpeted thanks to being within the rate limit.
 
    # - Third second, 1 packet dropped because rate limit has been exceeded.
 
    expected_output = ["success"] * 10 + ["failure"] * 1 + ["success"] * 5 + ["failure"] * 1
 

	
 
    with host.sudo():
 

	
 
        # Clear the hash bucket.
 
        time.sleep(2)
 

	
 
        # Send 17 packets, timeout one second (-w), cut connection as
 
        # soon as it is established (-z).
 
        scan = host.run("for i in $(seq 17); do nc.openbsd -%s -w 1 -z %s 22 2>/dev/null && echo success || echo failure; done", str(ip_protocol), target_host)
 

	
 
        assert scan.rc == 0
 
        assert scan.stdout.strip().split("\n") == expected_output
 

	
 

	
 
@pytest.mark.parametrize("target_host", parameters_mandatory_hosts)
 
@pytest.mark.parametrize("ip_protocol", [4, 6])
 
def test_icmp_rate_limit_parameters_mandatory(host, target_host, ip_protocol):
 
    """
 
    Test if ICMP rate limits are applied correctly.
 
    """
 

	
 
    # Sequence explanation (for total of 14 packets being sent):
 
    #
 
    # - First second, 9 packets accepted because of burst policy.
 
    # - First second, 1 packet dropped because both burst policy and rate limit have been exceeded.
 
    # - Second second, waiting for dropped packet timeout. Rate limit catches up with burst.
 
    # - Third second, 3 packets accpeted thanks to being within the rate limit.
 
    # - Third second, 1 packet dropped because rate limit has been exceeded.
 
    expected_output = ["1 received"] * 9 + ["0 received"] * 1 + ["1 received"] * 3 + ["0 received"] * 1
 

	
 
    with host.sudo():
 

	
 
        # Clear the hash bucket.
 
        time.sleep(2)
 

	
 
        # Send 14 x 1 packets (-c 1), with timeout of one second (-W 1).
 
        scan = host.run("for i in $(seq 14); do sudo ping -%s -c 1 -W 1 %s | grep -o '[[:digit:]] received'; done", str(ip_protocol), target_host)
 

	
 
        assert scan.rc == 0
 
        assert scan.stdout.strip().split("\n") == expected_output
 

	
 

	
 
@pytest.mark.parametrize("target_host", parameters_optional_hosts)
 
@pytest.mark.parametrize("ip_protocol", [4, 6])
 
def test_icmp_rate_limit_parameters_optional(host, target_host, ip_protocol):
 
    """
 
    Test if ICMP rate limits are applied correctly.
 
    """
 

	
 
    # Sequence explanation (for total of 17 packets being sent):
 
    #
 
    # - First second, 10 packets accepted because of burst policy.
 
    # - First second, 1 packet dropped because both burst policy and rate limit have been exceeded.
 
    # - Second second, waiting for dropped packet timeout. Rate limit catches up with burst.
 
    # - Third second, 5 packets accpeted thanks to being within the rate limit.
 
    # - Third second, 1 packet dropped because rate limit has been exceeded.
 
    expected_output = ["1 received"] * 10 + ["0 received"] * 1 + ["1 received"] * 5 + ["0 received"] * 1
 

	
 
    with host.sudo():
 

	
 
        # Clear the hash bucket.
 
        time.sleep(2)
 

	
 
        # Send 17 x 1 packets (-c 1), with timeout of one second (-W 1).
 
        scan = host.run("for i in $(seq 17); do sudo ping -%s -c 1 -W 1 %s | grep -o '[[:digit:]] received'; done", str(ip_protocol), target_host)
 

	
 
        assert scan.rc == 0
 
        assert scan.stdout.strip().split("\n") == expected_output
roles/common/molecule/default/tests/test_parameters_optional.py
Show inline comments
 
@@ -208,21 +208,21 @@ def test_ferm_base_rules(host):
 
    with host.sudo():
 
        ferm_base = host.file('/etc/ferm/conf.d/00-base.conf')
 

	
 
        assert "mod hashlimit hashlimit 5/second hashlimit-burst 5" in ferm_base.content_string
 
        assert "mod hashlimit hashlimit 5/second hashlimit-burst 10" in ferm_base.content_string
 

	
 
        iptables = host.command('iptables-save')
 

	
 
        assert iptables.rc == 0
 
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
 
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 10 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
 
        assert "-A flood -p tcp -m tcp --tcp-flags FIN,SYN,RST,ACK SYN -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
 
        assert "-A flood -p tcp -m tcp --tcp-flags FIN,SYN,RST,ACK SYN -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 10 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
 

	
 
        ip6tables = host.command('ip6tables-save')
 
        assert ip6tables.rc == 0
 
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
 
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 10 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
 
        assert "-A flood -p ipv6-icmp -m icmp6 --icmpv6-type 128 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
 
        assert "-A flood -p ipv6-icmp -m icmp6 --icmpv6-type 128 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 10 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in ip6tables.stdout
 

	
 

	
0 comments (0 inline, 0 general) First comment
You need to be logged in to comment. Login now