Files @ 325b9d16a72b
Branch filter:

Location: majic-ansible-roles/roles/common/molecule/default/tests/test_parameters_optional.py - annotation

branko
MAR-151: Added support for Debian 10 Buster to common role:

- Updated tests.
- Updated role reference documentation.
- Updated role metadata information.
- Refactored IP plan for the test machines for better separation
between different types of machines and versions.
- Parametrised tests for limited connectivity using the maintenance
mode.
- Don't use MariaDB compat package in tests - name differs between
Debian 9 and Debian 10, and relevant parameter is already getting
tested properly using the remaining packages.
ea69b2719d8e
d6a8b9523eb6
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
baaf0db1e0ae
baaf0db1e0ae
ea69b2719d8e
ea69b2719d8e
d62b3adec462
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
33f4baab1260
33f4baab1260
d752715bb533
ea69b2719d8e
ea69b2719d8e
325b9d16a72b
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
1640ad5b4cac
325b9d16a72b
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
1640ad5b4cac
1640ad5b4cac
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
33f4baab1260
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
d752715bb533
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
d752715bb533
d752715bb533
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
d752715bb533
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
5f4a2f7d9aca
5f4a2f7d9aca
5f4a2f7d9aca
5f4a2f7d9aca
5f4a2f7d9aca
ea69b2719d8e
5f4a2f7d9aca
ea69b2719d8e
1640ad5b4cac
1640ad5b4cac
1640ad5b4cac
ea69b2719d8e
1640ad5b4cac
1640ad5b4cac
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
baaf0db1e0ae
baaf0db1e0ae
baaf0db1e0ae
baaf0db1e0ae
baaf0db1e0ae
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
baaf0db1e0ae
baaf0db1e0ae
baaf0db1e0ae
baaf0db1e0ae
ea69b2719d8e
baaf0db1e0ae
baaf0db1e0ae
baaf0db1e0ae
ea69b2719d8e
baaf0db1e0ae
baaf0db1e0ae
baaf0db1e0ae
baaf0db1e0ae
ca784c26d35c
baaf0db1e0ae
baaf0db1e0ae
ea69b2719d8e
baaf0db1e0ae
baaf0db1e0ae
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
1640ad5b4cac
ea69b2719d8e
d752715bb533
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
3d3f7f804487
3d3f7f804487
1640ad5b4cac
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
1640ad5b4cac
3d3f7f804487
1640ad5b4cac
3d3f7f804487
d752715bb533
d752715bb533
d752715bb533
d752715bb533
3d3f7f804487
1640ad5b4cac
3d3f7f804487
d752715bb533
d752715bb533
d6a8b9523eb6
d6a8b9523eb6
1640ad5b4cac
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
1640ad5b4cac
1640ad5b4cac
d6a8b9523eb6
d6a8b9523eb6
1640ad5b4cac
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
1640ad5b4cac
d6a8b9523eb6
d6a8b9523eb6
d752715bb533
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
6c1d08d39449
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
5bc6b7fb4cb5
5bc6b7fb4cb5
5bc6b7fb4cb5
5bc6b7fb4cb5
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
1640ad5b4cac
5bc6b7fb4cb5
5bc6b7fb4cb5
5bc6b7fb4cb5
5bc6b7fb4cb5
1640ad5b4cac
5bc6b7fb4cb5
5bc6b7fb4cb5
693d6960b1a0
5bc6b7fb4cb5
5bc6b7fb4cb5
1640ad5b4cac
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
1640ad5b4cac
import os
import re
import socket

import paramiko

import testinfra.utils.ansible_runner

import pytest


testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
    os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('parameters-optional')


def test_apt_proxy(host):
    """
    Tests if proxy configuration for apt has been deployed correctly.
    """

    proxy_config = host.file('/etc/apt/apt.conf.d/00proxy')

    assert proxy_config.exists
    assert proxy_config.user == 'root'
    assert proxy_config.group == 'root'
    assert proxy_config.mode == 0o644


def test_bash_prompt_content(host):
    """
    Tests that custom bash prompt has been configured correctly with specified
    colour and prompt.
    """

    config = host.file('/etc/profile.d/bash_prompt.sh')

    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\[\\033[0;36m\\]\\u@\\h[test]:\\w\\$ \\[\\033[0m\\]'" in \
        config.content_string
    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\u@\\h[test]:\\w\\$ '" in config.content_string


def test_common_packages_are_installed(host):
    """
    Tests that user-provided common packages have been installed.
    """

    assert host.package('units').is_installed
    assert host.package('gnutls-bin').is_installed
    assert host.package('emacs24-nox').is_installed


def test_ssh_login_mechanisms(host):
    """
    Tests available SSH login mechanisms (should be just public key).
    """

    # Extract first non-IPv6 IP. Crude test, but it should work.
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)

    sock = socket.socket()
    sock.connect((remote_ip, 22))

    transport = paramiko.transport.Transport(sock)
    transport.connect()

    try:
        transport.auth_none('')
    except paramiko.transport.BadAuthenticationType as err:
        assert err.allowed_types == ['publickey']


def test_emacs_electric_indent_mode(host):
    """
    Tests if Emacs electric indent mode has been disabled via custom
    configuration file.
    """

    emacs_config = host.file('/etc/emacs/site-start.d/01disable-electric-indent-mode.el')

    assert emacs_config.is_file
    assert emacs_config.user == 'root'
    assert emacs_config.group == 'root'
    assert emacs_config.mode == 0o644
    assert "(electric-indent-mode -1)" in emacs_config.content_string


def test_os_groups(host):
    """
    Tests if user-supplied system groups have been created correctly.
    """

    group1 = host.group('group1')
    assert group1.gid == 1001

    group2 = host.group('group2')
    assert group2.gid == 3001

    group3 = host.group('group3')
    assert group3.gid == 3002

    user1_group = host.group('user1')
    assert user1_group.gid == 3003

    user2_group = host.group('user2')
    assert user2_group.gid == 2001

    user3_group = host.group('user3')
    assert user3_group.gid == 2002


def test_os_users(host):
    """
    Tests if user-supplied system users have been created correctly.
    """

    with host.sudo():
        user1 = host.user('user1')
        assert user1.uid == 1001
        assert user1.group == 'user1'
        assert user1.groups == ['user1']
        assert user1.shell == '/bin/bash'
        assert user1.password == '!'

        user1_authorized_keys = host.file(os.path.join(user1.home, '.ssh', 'authorized_keys'))
        assert not user1_authorized_keys.exists

        user2 = host.user('user2')
        assert user2.uid == 2001
        assert user2.group == 'user2'
        assert sorted(user2.groups) == sorted(['group1', 'group2', 'user2'])
        assert user2.shell == '/bin/bash'
        assert user2.password == '$6$wdXOQiMe09ugh0$VRIph2XA2QQyEYlAlH7zT4TPACDUalf/4FKpqG9JRHfKxANTcTug2ANCt450htcs0LikJfHLWofLP54jraFU61'

        user2_authorized_keys = host.file(os.path.join(user2.home, '.ssh', 'authorized_keys'))
        assert open('tests/data/ssh/clientkey1.pub', 'r').read().strip() in user2_authorized_keys.content_string
        assert open('tests/data/ssh/clientkey2.pub', 'r').read().strip() in user2_authorized_keys.content_string

        user3 = host.user('user3')
        assert user3.uid == 2002
        assert user3.group == 'user3'
        assert sorted(user3.groups) == sorted(['group3', 'user3'])
        assert user3.shell == '/bin/bash'
        assert user3.password == '$6$nmx.21uLqT$9LrUqNUgUwIM.l0KFKgr2.kDEwe2lo7IbBIhnG70AGW7GTFdWBUFnGAxH15YxikTXhDJD/uxd.NNgojEOjRvx1'

        user3_authorized_keys = host.file(os.path.join(user3.home, '.ssh', 'authorized_keys'))
        assert open('tests/data/ssh/clientkey3.pub', 'r').read().strip() in user3_authorized_keys.content_string


def test_authorized_keys_login(host):
    """
    Tests if authorized SSH keys for user-provided system users have been set-up
    correctly.
    """

    class IgnorePolicy(paramiko.client.MissingHostKeyPolicy):

        def missing_host_key(self, client, hostname, key):
            pass

    client = paramiko.client.SSHClient()
    client.set_missing_host_key_policy(IgnorePolicy())

    # Extract first non-IPv6 IP. Crude test, but it should work.
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)

    # No exception will be raised if connection is successful.
    client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey1')
    client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey2')
    client.connect(remote_ip, username="user3", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey3')


@pytest.mark.parametrize('ca_certificate_basename', [
    'cacert1',
    'cacert2',
])
def test_ca_certificates(host, ca_certificate_basename):
    """
    Tests if CA certificates have been correctly deployed to the system.
    """

    ca_certificate_path = '/usr/local/share/ca-certificates/%s.crt' % ca_certificate_basename
    ca_certificate_symlink_path = '/etc/ssl/certs/%s.pem' % ca_certificate_basename
    ca_certificate_hash = host.run('openssl x509 -hash -noout -in %s', ca_certificate_path).stdout.strip()
    ca_certificate_hash_symlink_path = '/etc/ssl/certs/%s.0' % ca_certificate_hash

    ca_certificate = host.file(ca_certificate_path)
    ca_certificate_symlink = host.file(ca_certificate_symlink_path)
    ca_certificate_hash_symlink = host.file(ca_certificate_hash_symlink_path)

    assert ca_certificate.is_file
    assert ca_certificate.user == 'root'
    assert ca_certificate.group == 'root'
    assert ca_certificate.mode == 0o644

    assert ca_certificate_symlink.is_symlink
    assert ca_certificate_symlink.linked_to == ca_certificate_path

    assert ca_certificate_hash_symlink.is_symlink
    assert ca_certificate_hash_symlink.linked_to == ca_certificate_path


def test_ferm_base_rules(host):
    """
    Tests if base ferm configuration has been deployed correctly with proper
    user-provided rate-limiting.
    """

    with host.sudo():
        ferm_base = host.file('/etc/ferm/conf.d/00-base.conf')

        assert "mod hashlimit hashlimit 5/second hashlimit-burst 5" in ferm_base.content_string

        iptables = host.command('iptables-save')

        assert iptables.rc == 0
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
        assert "-A flood -p tcp -m tcp --tcp-flags FIN,SYN,RST,ACK SYN -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout

        ip6tables = host.command('ip6tables-save')
        assert ip6tables.rc == 0
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
        assert "-A flood -p ipv6-icmp -m icmp6 --icmpv6-type 128 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in ip6tables.stdout


def test_pipreqcheck_virtualenv_user(host):
    """
    Tests if group and user for running pip requirements upgrade checks have
    been created correctly with user-provided uid/gid.
    """

    group = host.group('pipreqcheck')
    assert group.exists
    assert group.gid == 2500

    user = host.user('pipreqcheck')
    assert user.exists
    assert user.home == '/var/lib/pipreqcheck'
    assert user.uid == 2500
    assert user.group == 'pipreqcheck'
    assert user.groups == ['pipreqcheck']


def test_backup_configuration(host):
    """
    Tests if backup configuration has been deployed correctly.
    """

    with host.sudo():

        common = host.file('/etc/duply/main/patterns/common')
        assert common.is_file
        assert "/var/log" in common.content_string.split("\n")
        assert "/etc/shadow" in common.content_string.split("\n")
        assert "/var/mail" in common.content_string.split("\n")
        assert "/var/spool/cron" in common.content_string.split("\n")

        common_extra = host.file('/etc/duply/main/patterns/common_extra')
        assert common_extra.is_file
        assert "/home/user1" in common_extra.content_string.split("\n")
        assert "/home/user2" in common_extra.content_string.split("\n")


def test_ntp_software_installed(host):
    """
    Tests if NTP packages are installed.
    """

    assert host.package('ntp').is_installed
    assert host.package('ntpdate').is_installed


def test_ntp_server_configuration(host):
    """
    Tests if NTP server has been correctly configured.
    """

    with host.sudo():

        # Read the configuration file.
        configuration = host.file("/etc/ntp.conf").content_string.split("\n")

        # Extract only the relevant sections of files (exculde empty
        # lines and comments).
        configuration = [c.strip() for c in configuration if re.match(r'^\s*(|#.*)$', c) is None]

        # Ensure correct servers have been configured in the pool.
        servers = [c for c in configuration if c.startswith('server')]

        expected_servers = ["server 0.debian.pool.ntp.org iburst",
                            "server 1.debian.pool.ntp.org iburst",
                            "server 2.debian.pool.ntp.org iburst"]

        assert sorted(servers) == sorted(expected_servers)

        # Ensure querying of server is disable for untrusted clients.
        restrictions = [c for c in configuration if c.startswith('restrict')]
        expected_restrictions = ["restrict -4 default kod notrap nomodify nopeer noquery",
                                 "restrict -6 default kod notrap nomodify nopeer noquery",
                                 "restrict 127.0.0.1",
                                 "restrict ::1"]

        assert sorted(restrictions) == sorted(expected_restrictions)


def test_ntp_query_server_count(host):

    # Two lines for headers, and one line per configured server.
    expected_stdout_line_count = 5

    ntpq = host.command("ntpq -p -n")

    assert ntpq.rc == 0
    assert len(ntpq.stdout.strip().split("\n")) == expected_stdout_line_count


def test_ntp_listening_interfaces(host):
    """
    Tests if NTP server is listening on correct ports.
    """

    assert host.socket('udp://:::123').is_listening