Files @ 72af31a420be
Branch filter:

Location: majic-ansible-roles/roles/common/molecule/default/tests/test_parameters_optional.py

branko
MAR-192: Switch to using NTPsec NTP server for increased security:

- This has for some time been a way better option, and it should also
provide for compatibility with Debian 12 Bookworm.
import os
import re
import socket

import paramiko

import testinfra.utils.ansible_runner

import pytest


testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
    os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('parameters-optional')


def test_apt_proxy(host):
    """
    Tests if proxy configuration for apt has been deployed correctly.
    """

    proxy_config = host.file('/etc/apt/apt.conf.d/00proxy')

    assert proxy_config.exists
    assert proxy_config.user == 'root'
    assert proxy_config.group == 'root'
    assert proxy_config.mode == 0o644


def test_bash_prompt_content(host):
    """
    Tests that custom bash prompt has been configured correctly with specified
    colour and prompt.
    """

    config = host.file('/etc/profile.d/bash_prompt.sh')

    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\[\\033[0;36m\\]\\u@\\h[test]:\\w\\$ \\[\\033[0m\\]'" in \
        config.content_string
    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\u@\\h[test]:\\w\\$ '" in config.content_string


def test_common_packages_are_installed(host):
    """
    Tests that user-provided common packages have been installed.
    """

    assert host.package('units').is_installed
    assert host.package('gnutls-bin').is_installed
    assert host.package('emacs-nox').is_installed


def test_ssh_login_mechanisms(host):
    """
    Tests available SSH login mechanisms (should be just public key).
    """

    # Extract first non-IPv6 IP. Crude test, but it should work.
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)

    sock = socket.socket()
    sock.connect((remote_ip, 22))

    transport = paramiko.transport.Transport(sock)
    transport.connect()

    try:
        transport.auth_none('')
    except paramiko.transport.BadAuthenticationType as err:
        assert err.allowed_types == ['publickey']


def test_emacs_electric_indent_mode(host):
    """
    Tests if Emacs electric indent mode has been disabled via custom
    configuration file.
    """

    emacs_config = host.file('/etc/emacs/site-start.d/01disable-electric-indent-mode.el')

    assert emacs_config.is_file
    assert emacs_config.user == 'root'
    assert emacs_config.group == 'root'
    assert emacs_config.mode == 0o644
    assert "(electric-indent-mode -1)" in emacs_config.content_string


def test_os_groups(host):
    """
    Tests if user-supplied system groups have been created correctly.
    """

    group1 = host.group('group1')
    assert group1.gid == 1001

    group2 = host.group('group2')
    assert group2.gid == 3001

    group3 = host.group('group3')
    assert group3.gid == 3002

    user1_group = host.group('user1')
    assert user1_group.gid == 3003

    user2_group = host.group('user2')
    assert user2_group.gid == 2001

    user3_group = host.group('user3')
    assert user3_group.gid == 2002


def test_os_users(host):
    """
    Tests if user-supplied system users have been created correctly.
    """

    with host.sudo():
        user1 = host.user('user1')
        assert user1.uid == 1001
        assert user1.group == 'user1'
        assert user1.groups == ['user1']
        assert user1.shell == '/bin/bash'
        assert user1.password == '!'

        user1_authorized_keys = host.file(os.path.join(user1.home, '.ssh', 'authorized_keys'))
        assert not user1_authorized_keys.exists

        user2 = host.user('user2')
        assert user2.uid == 2001
        assert user2.group == 'user2'
        assert sorted(user2.groups) == sorted(['group1', 'group2', 'user2'])
        assert user2.shell == '/bin/bash'
        assert user2.password == '$6$wdXOQiMe09ugh0$VRIph2XA2QQyEYlAlH7zT4TPACDUalf/4FKpqG9JRHfKxANTcTug2ANCt450htcs0LikJfHLWofLP54jraFU61'

        user2_authorized_keys = host.file(os.path.join(user2.home, '.ssh', 'authorized_keys'))
        assert open('tests/data/ssh/clientkey1.pub', 'r').read().strip() in user2_authorized_keys.content_string
        assert open('tests/data/ssh/clientkey2.pub', 'r').read().strip() in user2_authorized_keys.content_string

        user3 = host.user('user3')
        assert user3.uid == 2002
        assert user3.group == 'user3'
        assert sorted(user3.groups) == sorted(['group3', 'user3'])
        assert user3.shell == '/bin/bash'
        assert user3.password == '$6$nmx.21uLqT$9LrUqNUgUwIM.l0KFKgr2.kDEwe2lo7IbBIhnG70AGW7GTFdWBUFnGAxH15YxikTXhDJD/uxd.NNgojEOjRvx1'

        user3_authorized_keys = host.file(os.path.join(user3.home, '.ssh', 'authorized_keys'))
        assert open('tests/data/ssh/clientkey3.pub', 'r').read().strip() in user3_authorized_keys.content_string


def test_authorized_keys_login(host):
    """
    Tests if authorized SSH keys for user-provided system users have been set-up
    correctly.
    """

    class IgnorePolicy(paramiko.client.MissingHostKeyPolicy):

        def missing_host_key(self, client, hostname, key):
            pass

    client = paramiko.client.SSHClient()
    client.set_missing_host_key_policy(IgnorePolicy())

    # Extract first non-IPv6 IP. Crude test, but it should work.
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)

    # No exception will be raised if connection is successful.
    client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey1')
    client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey2')
    client.connect(remote_ip, username="user3", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey3')


@pytest.mark.parametrize('ca_certificate_basename', [
    'cacert1',
    'cacert2',
])
def test_ca_certificates(host, ca_certificate_basename):
    """
    Tests if CA certificates have been correctly deployed to the system.
    """

    ca_certificate_path = '/usr/local/share/ca-certificates/%s.crt' % ca_certificate_basename
    ca_certificate_symlink_path = '/etc/ssl/certs/%s.pem' % ca_certificate_basename
    ca_certificate_hash = host.run('openssl x509 -hash -noout -in %s', ca_certificate_path).stdout.strip()
    ca_certificate_hash_symlink_path = '/etc/ssl/certs/%s.0' % ca_certificate_hash

    ca_certificate = host.file(ca_certificate_path)
    ca_certificate_symlink = host.file(ca_certificate_symlink_path)
    ca_certificate_hash_symlink = host.file(ca_certificate_hash_symlink_path)

    assert ca_certificate.is_file
    assert ca_certificate.user == 'root'
    assert ca_certificate.group == 'root'
    assert ca_certificate.mode == 0o644

    assert ca_certificate_symlink.is_symlink
    assert ca_certificate_symlink.linked_to == ca_certificate_path

    assert ca_certificate_hash_symlink.is_symlink
    assert ca_certificate_hash_symlink.linked_to == ca_certificate_path


def test_ferm_base_rules(host):
    """
    Tests if base ferm configuration has been deployed correctly with proper
    user-provided rate-limiting.
    """

    with host.sudo():
        ferm_base = host.file('/etc/ferm/conf.d/00-base.conf')

        assert "mod hashlimit hashlimit 5/second hashlimit-burst 5" in ferm_base.content_string

        iptables = host.command('iptables-save')

        assert iptables.rc == 0
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
        assert "-A flood -p tcp -m tcp --tcp-flags FIN,SYN,RST,ACK SYN -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout

        ip6tables = host.command('ip6tables-save')
        assert ip6tables.rc == 0
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
        assert "-A flood -p ipv6-icmp -m icmp6 --icmpv6-type 128 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in ip6tables.stdout


def test_pipreqcheck_virtualenv_user(host):
    """
    Tests if group and user for running pip requirements upgrade checks have
    been created correctly with user-provided uid/gid.
    """

    group = host.group('pipreqcheck')
    assert group.exists
    assert group.gid == 2500

    user = host.user('pipreqcheck')
    assert user.exists
    assert user.home == '/var/lib/pipreqcheck'
    assert user.uid == 2500
    assert user.group == 'pipreqcheck'
    assert user.groups == ['pipreqcheck']


def test_backup_configuration(host):
    """
    Tests if backup configuration has been deployed correctly.
    """

    with host.sudo():

        common = host.file('/etc/duply/main/patterns/common')
        assert common.is_file
        assert "/var/log" in common.content_string.split("\n")
        assert "/etc/shadow" in common.content_string.split("\n")
        assert "/var/mail" in common.content_string.split("\n")
        assert "/var/spool/cron" in common.content_string.split("\n")

        common_extra = host.file('/etc/duply/main/patterns/common_extra')
        assert common_extra.is_file
        assert "/home/user1" in common_extra.content_string.split("\n")
        assert "/home/user2" in common_extra.content_string.split("\n")


def test_ntp_software_installed(host):
    """
    Tests if NTP packages are installed.
    """

    assert host.package('ntpsec').is_installed
    assert host.package('ntpsec-ntpdate').is_installed


def test_ntp_server_configuration(host):
    """
    Tests if NTP server has been correctly configured.
    """

    with host.sudo():

        # Check for presence of the configuration file.
        configuration_file = host.file("/etc/ntpsec/ntp.conf")

        assert configuration_file.exists
        assert configuration_file.user == 'root'
        assert configuration_file.group == 'root'
        assert configuration_file.mode == 0o644

        # Extract relevant sections of configuration (exclude empty
        # lines and comments).
        configuration = configuration_file.content_string.split("\n")
        configuration = [c.strip() for c in configuration if re.match(r'^\s*(|#.*)$', c) is None]

        # Ensure correct servers have been configured in the pool.
        servers = [c for c in configuration if c.startswith('server')]

        expected_servers = ["server 0.debian.pool.ntp.org iburst",
                            "server 1.debian.pool.ntp.org iburst",
                            "server 2.debian.pool.ntp.org iburst"]

        assert sorted(servers) == sorted(expected_servers)

        # Ensure querying of server is disabled for untrusted clients.
        restrictions = [c for c in configuration if c.startswith('restrict')]
        expected_restrictions = ["restrict default kod nomodify nopeer noquery limited",
                                 "restrict 127.0.0.1",
                                 "restrict ::1"]

        assert sorted(restrictions) == sorted(expected_restrictions)


def test_ntp_query_server_count(host):

    # Two lines for headers, and one line per configured server.
    expected_stdout_line_count = 5

    ntpq = host.command("ntpq -p -n")

    assert ntpq.rc == 0
    assert len(ntpq.stdout.strip().split("\n")) == expected_stdout_line_count


def test_ntp_listening_interfaces(host):
    """
    Tests if NTP server is listening on correct ports.
    """

    assert host.socket('udp://:::123').is_listening


def test_pipreqcheck_input_content(host):
    """
    Tests content of requirements input file used for virtual
    environment utilised by script that performs pip requirements
    upgrade checks.
    """

    requirements_path = '/etc/pip_check_requirements_upgrades/pipreqcheck/requirements.in'
    expected_requirements = [
        "pip >= 0.3.1",
        "pip-tools >= 0.3.2",
        "setuptools >= 0.3.3",
        "wheel >= 0.3.4"
    ]

    with host.sudo():
        deployed_requirements = host.file(requirements_path).content_string

        expected_requirements = sorted([line.lower() for line in expected_requirements])
        actual_requirements = sorted(deployed_requirements.lower().strip().split("\n"))

        assert actual_requirements == expected_requirements