Files @ 49af212543b0
Branch filter:

Location: majic-ansible-roles/roles/common/molecule/default/tests/test_parameters_optional.py

branko
MAR-192: Switch to using NTP pools instead of servers:

- This is the recommended configuration by NTPsec, and also default on
Debian. Previuosly suggested values for servers have been pool
addresses in any case.
import os
import re
import socket

import paramiko

import testinfra.utils.ansible_runner

import pytest


testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
    os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('parameters-optional')


def test_apt_proxy(host):
    """
    Tests if proxy configuration for apt has been deployed correctly.
    """

    proxy_config = host.file('/etc/apt/apt.conf.d/00proxy')

    assert proxy_config.exists
    assert proxy_config.user == 'root'
    assert proxy_config.group == 'root'
    assert proxy_config.mode == 0o644


def test_bash_prompt_content(host):
    """
    Tests that custom bash prompt has been configured correctly with specified
    colour and prompt.
    """

    config = host.file('/etc/profile.d/bash_prompt.sh')

    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\[\\033[0;36m\\]\\u@\\h[test]:\\w\\$ \\[\\033[0m\\]'" in \
        config.content_string
    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\u@\\h[test]:\\w\\$ '" in config.content_string


def test_common_packages_are_installed(host):
    """
    Tests that user-provided common packages have been installed.
    """

    assert host.package('units').is_installed
    assert host.package('gnutls-bin').is_installed
    assert host.package('emacs-nox').is_installed


def test_ssh_login_mechanisms(host):
    """
    Tests available SSH login mechanisms (should be just public key).
    """

    # Extract first non-IPv6 IP. Crude test, but it should work.
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)

    sock = socket.socket()
    sock.connect((remote_ip, 22))

    transport = paramiko.transport.Transport(sock)
    transport.connect()

    try:
        transport.auth_none('')
    except paramiko.transport.BadAuthenticationType as err:
        assert err.allowed_types == ['publickey']


def test_emacs_electric_indent_mode(host):
    """
    Tests if Emacs electric indent mode has been disabled via custom
    configuration file.
    """

    emacs_config = host.file('/etc/emacs/site-start.d/01disable-electric-indent-mode.el')

    assert emacs_config.is_file
    assert emacs_config.user == 'root'
    assert emacs_config.group == 'root'
    assert emacs_config.mode == 0o644
    assert "(electric-indent-mode -1)" in emacs_config.content_string


def test_os_groups(host):
    """
    Tests if user-supplied system groups have been created correctly.
    """

    group1 = host.group('group1')
    assert group1.gid == 1001

    group2 = host.group('group2')
    assert group2.gid == 3001

    group3 = host.group('group3')
    assert group3.gid == 3002

    user1_group = host.group('user1')
    assert user1_group.gid == 3003

    user2_group = host.group('user2')
    assert user2_group.gid == 2001

    user3_group = host.group('user3')
    assert user3_group.gid == 2002


def test_os_users(host):
    """
    Tests if user-supplied system users have been created correctly.
    """

    with host.sudo():
        user1 = host.user('user1')
        assert user1.uid == 1001
        assert user1.group == 'user1'
        assert user1.groups == ['user1']
        assert user1.shell == '/bin/bash'
        assert user1.password == '!'

        user1_authorized_keys = host.file(os.path.join(user1.home, '.ssh', 'authorized_keys'))
        assert not user1_authorized_keys.exists

        user2 = host.user('user2')
        assert user2.uid == 2001
        assert user2.group == 'user2'
        assert sorted(user2.groups) == sorted(['group1', 'group2', 'user2'])
        assert user2.shell == '/bin/bash'
        assert user2.password == '$6$wdXOQiMe09ugh0$VRIph2XA2QQyEYlAlH7zT4TPACDUalf/4FKpqG9JRHfKxANTcTug2ANCt450htcs0LikJfHLWofLP54jraFU61'

        user2_authorized_keys = host.file(os.path.join(user2.home, '.ssh', 'authorized_keys'))
        assert open('tests/data/ssh/clientkey1.pub', 'r').read().strip() in user2_authorized_keys.content_string
        assert open('tests/data/ssh/clientkey2.pub', 'r').read().strip() in user2_authorized_keys.content_string

        user3 = host.user('user3')
        assert user3.uid == 2002
        assert user3.group == 'user3'
        assert sorted(user3.groups) == sorted(['group3', 'user3'])
        assert user3.shell == '/bin/bash'
        assert user3.password == '$6$nmx.21uLqT$9LrUqNUgUwIM.l0KFKgr2.kDEwe2lo7IbBIhnG70AGW7GTFdWBUFnGAxH15YxikTXhDJD/uxd.NNgojEOjRvx1'

        user3_authorized_keys = host.file(os.path.join(user3.home, '.ssh', 'authorized_keys'))
        assert open('tests/data/ssh/clientkey3.pub', 'r').read().strip() in user3_authorized_keys.content_string


def test_authorized_keys_login(host):
    """
    Tests if authorized SSH keys for user-provided system users have been set-up
    correctly.
    """

    class IgnorePolicy(paramiko.client.MissingHostKeyPolicy):

        def missing_host_key(self, client, hostname, key):
            pass

    client = paramiko.client.SSHClient()
    client.set_missing_host_key_policy(IgnorePolicy())

    # Extract first non-IPv6 IP. Crude test, but it should work.
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)

    # No exception will be raised if connection is successful.
    client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey1')
    client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey2')
    client.connect(remote_ip, username="user3", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey3')


@pytest.mark.parametrize('ca_certificate_basename', [
    'cacert1',
    'cacert2',
])
def test_ca_certificates(host, ca_certificate_basename):
    """
    Tests if CA certificates have been correctly deployed to the system.
    """

    ca_certificate_path = '/usr/local/share/ca-certificates/%s.crt' % ca_certificate_basename
    ca_certificate_symlink_path = '/etc/ssl/certs/%s.pem' % ca_certificate_basename
    ca_certificate_hash = host.run('openssl x509 -hash -noout -in %s', ca_certificate_path).stdout.strip()
    ca_certificate_hash_symlink_path = '/etc/ssl/certs/%s.0' % ca_certificate_hash

    ca_certificate = host.file(ca_certificate_path)
    ca_certificate_symlink = host.file(ca_certificate_symlink_path)
    ca_certificate_hash_symlink = host.file(ca_certificate_hash_symlink_path)

    assert ca_certificate.is_file
    assert ca_certificate.user == 'root'
    assert ca_certificate.group == 'root'
    assert ca_certificate.mode == 0o644

    assert ca_certificate_symlink.is_symlink
    assert ca_certificate_symlink.linked_to == ca_certificate_path

    assert ca_certificate_hash_symlink.is_symlink
    assert ca_certificate_hash_symlink.linked_to == ca_certificate_path


def test_ferm_base_rules(host):
    """
    Tests if base ferm configuration has been deployed correctly with proper
    user-provided rate-limiting.
    """

    with host.sudo():
        ferm_base = host.file('/etc/ferm/conf.d/00-base.conf')

        assert "mod hashlimit hashlimit 5/second hashlimit-burst 5" in ferm_base.content_string

        iptables = host.command('iptables-save')

        assert iptables.rc == 0
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
        assert "-A flood -p tcp -m tcp --tcp-flags FIN,SYN,RST,ACK SYN -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout

        ip6tables = host.command('ip6tables-save')
        assert ip6tables.rc == 0
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
        assert "-A flood -p ipv6-icmp -m icmp6 --icmpv6-type 128 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in ip6tables.stdout


def test_pipreqcheck_virtualenv_user(host):
    """
    Tests if group and user for running pip requirements upgrade checks have
    been created correctly with user-provided uid/gid.
    """

    group = host.group('pipreqcheck')
    assert group.exists
    assert group.gid == 2500

    user = host.user('pipreqcheck')
    assert user.exists
    assert user.home == '/var/lib/pipreqcheck'
    assert user.uid == 2500
    assert user.group == 'pipreqcheck'
    assert user.groups == ['pipreqcheck']


def test_backup_configuration(host):
    """
    Tests if backup configuration has been deployed correctly.
    """

    with host.sudo():

        common = host.file('/etc/duply/main/patterns/common')
        assert common.is_file
        assert "/var/log" in common.content_string.split("\n")
        assert "/etc/shadow" in common.content_string.split("\n")
        assert "/var/mail" in common.content_string.split("\n")
        assert "/var/spool/cron" in common.content_string.split("\n")

        common_extra = host.file('/etc/duply/main/patterns/common_extra')
        assert common_extra.is_file
        assert "/home/user1" in common_extra.content_string.split("\n")
        assert "/home/user2" in common_extra.content_string.split("\n")


def test_ntp_software_installed(host):
    """
    Tests if NTP packages are installed.
    """

    assert host.package('ntpsec').is_installed
    assert host.package('ntpsec-ntpdate').is_installed


def test_ntp_server_configuration(host):
    """
    Tests if NTP server has been correctly configured.
    """

    with host.sudo():

        # Check for presence of the configuration file.
        configuration_file = host.file("/etc/ntpsec/ntp.conf")

        assert configuration_file.exists
        assert configuration_file.user == 'root'
        assert configuration_file.group == 'root'
        assert configuration_file.mode == 0o644

        # Extract relevant sections of configuration (exclude empty
        # lines and comments).
        configuration = configuration_file.content_string.split("\n")
        configuration = [c.strip() for c in configuration if re.match(r'^\s*(|#.*)$', c) is None]

        # Ensure correct pools have been configured.
        pools = [c for c in configuration if c.startswith('pool')]

        expected_pools = ["pool 0.debian.pool.ntp.org iburst",
                          "pool 1.debian.pool.ntp.org iburst",
                          "pool 2.debian.pool.ntp.org iburst"]

        assert sorted(pools) == sorted(expected_pools)

        # Ensure querying of server is disabled for untrusted clients.
        restrictions = [c for c in configuration if c.startswith('restrict')]
        expected_restrictions = ["restrict default kod nomodify nopeer noquery limited",
                                 "restrict 127.0.0.1",
                                 "restrict ::1"]

        assert sorted(restrictions) == sorted(expected_restrictions)


def test_ntp_runtime_pool_count(host):

    ntpq = host.command("ntpq -p -n")
    assert ntpq.rc == 0

    # We expect 3 pools, as requested via role parameter.
    ntpq_pool_info = [line for line in ntpq.stdout.split("\n") if ".POOL." in line]
    assert len(ntpq_pool_info) == 3


def test_ntp_listening_interfaces(host):
    """
    Tests if NTP server is listening on correct ports.
    """

    assert host.socket('udp://:::123').is_listening


def test_pipreqcheck_input_content(host):
    """
    Tests content of requirements input file used for virtual
    environment utilised by script that performs pip requirements
    upgrade checks.
    """

    requirements_path = '/etc/pip_check_requirements_upgrades/pipreqcheck/requirements.in'
    expected_requirements = [
        "pip >= 0.3.1",
        "pip-tools >= 0.3.2",
        "setuptools >= 0.3.3",
        "wheel >= 0.3.4"
    ]

    with host.sudo():
        deployed_requirements = host.file(requirements_path).content_string

        expected_requirements = sorted([line.lower() for line in expected_requirements])
        actual_requirements = sorted(deployed_requirements.lower().strip().split("\n"))

        assert actual_requirements == expected_requirements