Files @ 2381ba93d089
Branch filter:

Location: majic-ansible-roles/roles/common/molecule/default/tests/test_parameters_optional.py

branko
MAR-148: Better workaround for https://github.com/ansible/ansible/issues/64560 (override the module_utils/mysql.py instead of the module itself).
import os
import re
import socket

import paramiko

import testinfra.utils.ansible_runner


testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
    os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('parameters-optional')


def test_apt_proxy(host):
    """
    Tests if proxy configuration for apt has been deployed correctly.
    """

    proxy_config = host.file('/etc/apt/apt.conf.d/00proxy')

    assert proxy_config.exists
    assert proxy_config.user == 'root'
    assert proxy_config.group == 'root'
    assert proxy_config.mode == 0o644


def test_bash_prompt_content(host):
    """
    Tests that custom bash prompt has been configured correctly with specified
    colour and prompt.
    """

    config = host.file('/etc/profile.d/bash_prompt.sh')

    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\[\\033[0;36m\\]\\u@\\h[test]:\\w\\$ \\[\\033[0m\\]'" in config.content
    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\u@\\h[test]:\\w\\$ '" in config.content


def test_common_installed_packages_common(host):
    """
    Tests that user-provided common packages have been installed.
    """

    debian_release = host.ansible("setup")["ansible_facts"]["ansible_distribution_release"]

    assert host.package('units').is_installed
    assert host.package('gnutls-bin').is_installed

    # Different name of package in different Debian releases.
    if debian_release == 'jessie':
        assert host.package('libmariadb-client-lgpl-dev-compat').is_installed
    elif debian_release == 'stretch':
        assert host.package('libmariadbclient-dev-compat').is_installed
    else:
        raise Exception("Cannot run this test on debian release: %s" % debian_release)


def test_ssh_login_mechanisms(host):
    """
    Tests available SSH login mechanisms (should be just public key).
    """

    # Extract first non-IPv6 IP. Crude test, but it should work.
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)

    sock = socket.socket()
    sock.connect((remote_ip, 22))

    transport = paramiko.transport.Transport(sock)
    transport.connect()

    try:
        transport.auth_none('')
    except paramiko.transport.BadAuthenticationType, err:
        assert err.allowed_types == ['publickey']


def test_mariadb_mysql_config_symlink(host):
    """
    Tests if symbolic link has been set-up for mariadb_config binary to be
    accessible as mysql_config as well.

    Only applicable to Debian Jessie.
    """

    if host.ansible("setup")["ansible_facts"]["ansible_distribution_release"] == 'jessie':
        mysql_config = host.file('/usr/bin/mysql_config')

        assert mysql_config.is_symlink
        assert mysql_config.linked_to == '/usr/bin/mariadb_config'


def test_emacs_electric_indent_mode(host):
    """
    Tests if Emacs electric indent mode has been disabled via custom
    configuration file.
    """

    emacs_config = host.file('/etc/emacs/site-start.d/01disable-electric-indent-mode.el')

    assert emacs_config.is_file
    assert emacs_config.user == 'root'
    assert emacs_config.group == 'root'
    assert emacs_config.mode == 0o644
    assert "(electric-indent-mode -1)" in emacs_config.content


def test_os_groups(host):
    """
    Tests if user-supplied system groups have been created correctly.
    """

    group1 = host.group('group1')
    assert group1.gid == 1001

    group2 = host.group('group2')
    assert group2.gid == 3001

    group3 = host.group('group3')
    assert group3.gid == 3002

    user1_group = host.group('user1')
    assert user1_group.gid == 3003

    user2_group = host.group('user2')
    assert user2_group.gid == 2001

    user3_group = host.group('user3')
    assert user3_group.gid == 2002


def test_os_users(host):
    """
    Tests if user-supplied system users have been created correctly.
    """

    with host.sudo():
        user1 = host.user('user1')
        assert user1.uid == 1001
        assert user1.group == 'user1'
        assert user1.groups == ['user1']
        assert user1.shell == '/bin/bash'
        assert user1.password == '!'

        user1_authorized_keys = host.file(os.path.join(user1.home, '.ssh', 'authorized_keys'))
        assert not user1_authorized_keys.exists

        user2 = host.user('user2')
        assert user2.uid == 2001
        assert user2.group == 'user2'
        assert sorted(user2.groups) == sorted(['group1', 'group2', 'user2'])
        assert user2.shell == '/bin/bash'
        assert user2.password == '$6$wdXOQiMe09ugh0$VRIph2XA2QQyEYlAlH7zT4TPACDUalf/4FKpqG9JRHfKxANTcTug2ANCt450htcs0LikJfHLWofLP54jraFU61'

        user2_authorized_keys = host.file(os.path.join(user2.home, '.ssh', 'authorized_keys'))
        assert open('tests/data/ssh/clientkey1.pub', 'r').read().strip() in user2_authorized_keys.content
        assert open('tests/data/ssh/clientkey2.pub', 'r').read().strip() in user2_authorized_keys.content

        user3 = host.user('user3')
        assert user3.uid == 2002
        assert user3.group == 'user3'
        assert sorted(user3.groups) == sorted(['group3', 'user3'])
        assert user3.shell == '/bin/bash'
        assert user3.password == '$6$nmx.21uLqT$9LrUqNUgUwIM.l0KFKgr2.kDEwe2lo7IbBIhnG70AGW7GTFdWBUFnGAxH15YxikTXhDJD/uxd.NNgojEOjRvx1'

        user3_authorized_keys = host.file(os.path.join(user3.home, '.ssh', 'authorized_keys'))
        assert open('tests/data/ssh/clientkey3.pub', 'r').read().strip() in user3_authorized_keys.content


def test_authorized_keys_login(host):
    """
    Tests if authorized SSH keys for user-provided system users have been set-up
    correctly.
    """

    class IgnorePolicy(paramiko.client.MissingHostKeyPolicy):

        def missing_host_key(self, client, hostname, key):
            pass

    client = paramiko.client.SSHClient()
    client.set_missing_host_key_policy(IgnorePolicy())

    # Extract first non-IPv6 IP. Crude test, but it should work.
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)

    # No exception will be raised if connection is successful.
    client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey1')
    client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey2')
    client.connect(remote_ip, username="user3", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey3')


def test_ca_certificates(host):
    """
    Tests if CA certificates have been correctly deployed to the system.
    """

    ca1_cert = host.file('/usr/local/share/ca-certificates/cacert1.crt')
    assert ca1_cert.is_file
    assert ca1_cert.user == 'root'
    assert ca1_cert.group == 'root'
    assert ca1_cert.mode == 0o644

    ca1_cert_symlink = host.file('/etc/ssl/certs/cacert1.pem')
    assert ca1_cert_symlink.is_symlink
    assert ca1_cert_symlink.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'

    ca1_cert_hash_1 = host.file('/etc/ssl/certs/3ce70b58.0')
    assert ca1_cert_hash_1.is_symlink
    assert ca1_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'

    ca1_cert_hash_1 = host.file('/etc/ssl/certs/49f72a44.0')
    assert ca1_cert_hash_1.is_symlink
    assert ca1_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'

    ca2_cert = host.file('/usr/local/share/ca-certificates/cacert2.crt')
    assert ca2_cert.is_file
    assert ca2_cert.user == 'root'
    assert ca2_cert.group == 'root'
    assert ca2_cert.mode == 0o644

    ca2_cert_symlink = host.file('/etc/ssl/certs/cacert2.pem')
    assert ca2_cert_symlink.is_symlink
    assert ca2_cert_symlink.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'

    ca2_cert_hash_1 = host.file('/etc/ssl/certs/a52eec00.0')
    assert ca2_cert_hash_1.is_symlink
    assert ca2_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'

    ca2_cert_hash_1 = host.file('/etc/ssl/certs/a0d2e9e4.0')
    assert ca2_cert_hash_1.is_symlink
    assert ca2_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'


def test_ferm_base_rules(host):
    """
    Tests if base ferm configuration has been deployed correctly with proper
    user-provided rate-limiting.
    """

    with host.sudo():
        ferm_base = host.file('/etc/ferm/conf.d/00-base.conf')

        assert "mod hashlimit hashlimit 5/second hashlimit-burst 5" in ferm_base.content

        iptables = host.command('iptables-save')

        assert iptables.rc == 0
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
        assert "-A flood -p tcp -m tcp --tcp-flags FIN,SYN,RST,ACK SYN -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout

        ip6tables = host.command('ip6tables-save')
        assert ip6tables.rc == 0
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
        assert "-A flood -p ipv6-icmp -m icmp6 --icmpv6-type 128 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in ip6tables.stdout


def test_pipreqcheck_virtualenv_user(host):
    """
    Tests if group and user for running pip requirements upgrade checks have
    been created correctly with user-provided uid/gid.
    """

    group = host.group('pipreqcheck')
    assert group.exists
    assert group.gid == 2500

    user = host.user('pipreqcheck')
    assert user.exists
    assert user.home == '/var/lib/pipreqcheck'
    assert user.uid == 2500
    assert user.group == 'pipreqcheck'
    assert user.groups == ['pipreqcheck']


def test_backup_configuration(host):
    """
    Tests if backup configuration has been deployed correctly.
    """

    with host.sudo():

        common = host.file('/etc/duply/main/patterns/common')
        assert common.is_file
        assert "/var/log" in common.content.split("\n")
        assert "/etc/shadow" in common.content.split("\n")
        assert "/var/mail" in common.content.split("\n")
        assert "/var/spool/cron" in common.content.split("\n")

        common_extra = host.file('/etc/duply/main/patterns/common_extra')
        assert common_extra.is_file
        assert "/home/user1" in common_extra.content.split("\n")
        assert "/home/user2" in common_extra.content.split("\n")


def test_ntp_software_installed(host):
    """
    Tests if NTP packages are installed.
    """

    assert host.package('ntp').is_installed
    assert host.package('ntpdate').is_installed


def test_ntp_server_configuration(host):
    """
    Tests if NTP server has been correctly configured.
    """

    with host.sudo():

        # Read the configuration file.
        configuration = host.file("/etc/ntp.conf").content.split("\n")

        # Extract only the relevant sections of files (exculde empty
        # lines and comments).
        configuration = [c.strip() for c in configuration if re.match(r'^\s*(|#.*)$', c) is None]

        # Ensure correct servers have been configured in the pool.
        servers = [c for c in configuration if c.startswith('server')]

        expected_servers = ["server 0.debian.pool.ntp.org iburst",
                            "server 1.debian.pool.ntp.org iburst",
                            "server 2.debian.pool.ntp.org iburst"]

        assert sorted(servers) == sorted(expected_servers)

        # Ensure querying of server is disable for untrusted clients.
        restrictions = [c for c in configuration if c.startswith('restrict')]
        expected_restrictions = ["restrict -4 default kod notrap nomodify nopeer noquery",
                                 "restrict -6 default kod notrap nomodify nopeer noquery",
                                 "restrict 127.0.0.1",
                                 "restrict ::1"]

        assert sorted(restrictions) == sorted(expected_restrictions)


def test_ntp_query_server_count(host):

    # Two lines for headers, and one line per configured server.
    expected_stdout_line_count = 5

    ntpq = host.command("ntpq -p -n")

    assert ntpq.rc == 0
    assert len(ntpq.stdout.strip().split("\n")) == expected_stdout_line_count


def test_ntp_listening_interfaces(host):
    """
    Tests if NTP server is listening on correct ports.
    """

    assert host.socket('udp://:::123').is_listening