Files @ 84a71d898692
Branch filter:

Location: majic-ansible-roles/roles/common/molecule/default/tests/test_parameters_optional.py - annotation

branko
MAR-129: Updated backup_server tests to use the group_vars directory.
ea69b2719d8e
d6a8b9523eb6
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
eb4d09d4abd3
1640ad5b4cac
1640ad5b4cac
eb4d09d4abd3
eb4d09d4abd3
eb4d09d4abd3
1640ad5b4cac
eb4d09d4abd3
1640ad5b4cac
eb4d09d4abd3
eb4d09d4abd3
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
1640ad5b4cac
1640ad5b4cac
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
eb4d09d4abd3
eb4d09d4abd3
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
1640ad5b4cac
ea69b2719d8e
eb4d09d4abd3
eb4d09d4abd3
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
5f4a2f7d9aca
5f4a2f7d9aca
5f4a2f7d9aca
5f4a2f7d9aca
5f4a2f7d9aca
ea69b2719d8e
5f4a2f7d9aca
ea69b2719d8e
1640ad5b4cac
1640ad5b4cac
1640ad5b4cac
ea69b2719d8e
1640ad5b4cac
1640ad5b4cac
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
eb4d09d4abd3
eb4d09d4abd3
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
eb4d09d4abd3
eb4d09d4abd3
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
1640ad5b4cac
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
3d3f7f804487
3d3f7f804487
1640ad5b4cac
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
1640ad5b4cac
3d3f7f804487
1640ad5b4cac
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
1640ad5b4cac
3d3f7f804487
3d3f7f804487
3d3f7f804487
d6a8b9523eb6
d6a8b9523eb6
1640ad5b4cac
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
1640ad5b4cac
1640ad5b4cac
d6a8b9523eb6
d6a8b9523eb6
1640ad5b4cac
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
1640ad5b4cac
d6a8b9523eb6
d6a8b9523eb6
1640ad5b4cac
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
5bc6b7fb4cb5
5bc6b7fb4cb5
5bc6b7fb4cb5
5bc6b7fb4cb5
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
1640ad5b4cac
5bc6b7fb4cb5
5bc6b7fb4cb5
5bc6b7fb4cb5
5bc6b7fb4cb5
1640ad5b4cac
5bc6b7fb4cb5
5bc6b7fb4cb5
5bc6b7fb4cb5
5bc6b7fb4cb5
5bc6b7fb4cb5
1640ad5b4cac
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
d6a8b9523eb6
1640ad5b4cac
import os
import re
import socket

import paramiko

import testinfra.utils.ansible_runner


testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
    '.molecule/ansible_inventory.yml').get_hosts('parameters-optional')


def test_apt_proxy(host):
    """
    Tests if proxy configuration for apt has been deployed correctly.
    """

    proxy_config = host.file('/etc/apt/apt.conf.d/00proxy')

    assert proxy_config.exists
    assert proxy_config.user == 'root'
    assert proxy_config.group == 'root'
    assert proxy_config.mode == 0o644


def test_bash_prompt_content(host):
    """
    Tests that custom bash prompt has been configured correctly with specified
    colour and prompt.
    """

    config = host.file('/etc/profile.d/bash_prompt.sh')

    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\[\\033[0;36m\\]\\u@\\h[test]:\\w\\$ \\[\\033[0m\\]'" in config.content
    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\u@\\h[test]:\\w\\$ '" in config.content


def test_common_installed_packages_common(host):
    """
    Tests that user-provided common packages have been installed.
    """

    debian_release = host.ansible("setup")["ansible_facts"]["ansible_distribution_release"]

    assert host.package('units').is_installed
    assert host.package('gnutls-bin').is_installed

    # Different name of package in different Debian releases.
    if debian_release == 'jessie':
        assert host.package('libmariadb-client-lgpl-dev-compat').is_installed
    elif debian_release == 'stretch':
        assert host.package('libmariadbclient-dev-compat').is_installed
    else:
        raise Exception("Cannot run this test on debian release: %s" % debian_release)


def test_ssh_login_mechanisms(host):
    """
    Tests available SSH login mechanisms (should be just public key).
    """

    # Extract first non-IPv6 IP. Crude test, but it should work.
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)

    sock = socket.socket()
    sock.connect((remote_ip, 22))

    transport = paramiko.transport.Transport(sock)
    transport.connect()

    try:
        transport.auth_none('')
    except paramiko.transport.BadAuthenticationType, err:
        assert err.allowed_types == ['publickey']


def test_mariadb_mysql_config_symlink(host):
    """
    Tests if symbolic link has been set-up for mariadb_config binary to be
    accessible as mysql_config as well.

    Only applicable to Debian Jessie.
    """

    if host.ansible("setup")["ansible_facts"]["ansible_distribution_release"] == 'jessie':
        mysql_config = host.file('/usr/bin/mysql_config')

        assert mysql_config.is_symlink
        assert mysql_config.linked_to == '/usr/bin/mariadb_config'


def test_emacs_electric_indent_mode(host):
    """
    Tests if Emacs electric indent mode has been disabled via custom
    configuration file.
    """

    emacs_config = host.file('/etc/emacs/site-start.d/01disable-electric-indent-mode.el')

    assert emacs_config.is_file
    assert emacs_config.user == 'root'
    assert emacs_config.group == 'root'
    assert emacs_config.mode == 0o644
    assert "(electric-indent-mode -1)" in emacs_config.content


def test_os_groups(host):
    """
    Tests if user-supplied system groups have been created correctly.
    """

    group1 = host.group('group1')
    assert group1.gid == 1001

    group2 = host.group('group2')
    assert group2.gid == 3001

    group3 = host.group('group3')
    assert group3.gid == 3002

    user1_group = host.group('user1')
    assert user1_group.gid == 3003

    user2_group = host.group('user2')
    assert user2_group.gid == 2001

    user3_group = host.group('user3')
    assert user3_group.gid == 2002


def test_os_users(host):
    """
    Tests if user-supplied system users have been created correctly.
    """

    with host.sudo():
        user1 = host.user('user1')
        assert user1.uid == 1001
        assert user1.group == 'user1'
        assert user1.groups == ['user1']
        assert user1.shell == '/bin/bash'
        assert user1.password == '!'

        user1_authorized_keys = host.file(os.path.join(user1.home, '.ssh', 'authorized_keys'))
        assert not user1_authorized_keys.exists

        user2 = host.user('user2')
        assert user2.uid == 2001
        assert user2.group == 'user2'
        assert sorted(user2.groups) == sorted(['group1', 'group2', 'user2'])
        assert user2.shell == '/bin/bash'
        assert user2.password == '$6$wdXOQiMe09ugh0$VRIph2XA2QQyEYlAlH7zT4TPACDUalf/4FKpqG9JRHfKxANTcTug2ANCt450htcs0LikJfHLWofLP54jraFU61'

        user2_authorized_keys = host.file(os.path.join(user2.home, '.ssh', 'authorized_keys'))
        assert open('tests/data/ssh/clientkey1.pub', 'r').read().strip() in user2_authorized_keys.content
        assert open('tests/data/ssh/clientkey2.pub', 'r').read().strip() in user2_authorized_keys.content

        user3 = host.user('user3')
        assert user3.uid == 2002
        assert user3.group == 'user3'
        assert sorted(user3.groups) == sorted(['group3', 'user3'])
        assert user3.shell == '/bin/bash'
        assert user3.password == '$6$nmx.21uLqT$9LrUqNUgUwIM.l0KFKgr2.kDEwe2lo7IbBIhnG70AGW7GTFdWBUFnGAxH15YxikTXhDJD/uxd.NNgojEOjRvx1'

        user3_authorized_keys = host.file(os.path.join(user3.home, '.ssh', 'authorized_keys'))
        assert open('tests/data/ssh/clientkey3.pub', 'r').read().strip() in user3_authorized_keys.content


def test_authorized_keys_login(host):
    """
    Tests if authorized SSH keys for user-provided system users have been set-up
    correctly.
    """

    class IgnorePolicy(paramiko.client.MissingHostKeyPolicy):

        def missing_host_key(self, client, hostname, key):
            pass

    client = paramiko.client.SSHClient()
    client.set_missing_host_key_policy(IgnorePolicy())

    # Extract first non-IPv6 IP. Crude test, but it should work.
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)

    # No exception will be raised if connection is successful.
    client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey1')
    client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey2')
    client.connect(remote_ip, username="user3", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey3')


def test_ca_certificates(host):
    """
    Tests if CA certificates have been correctly deployed to the system.
    """

    ca1_cert = host.file('/usr/local/share/ca-certificates/cacert1.crt')
    assert ca1_cert.is_file
    assert ca1_cert.user == 'root'
    assert ca1_cert.group == 'root'
    assert ca1_cert.mode == 0o644

    ca1_cert_symlink = host.file('/etc/ssl/certs/cacert1.pem')
    assert ca1_cert_symlink.is_symlink
    assert ca1_cert_symlink.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'

    ca1_cert_hash_1 = host.file('/etc/ssl/certs/3ce70b58.0')
    assert ca1_cert_hash_1.is_symlink
    assert ca1_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'

    ca1_cert_hash_1 = host.file('/etc/ssl/certs/49f72a44.0')
    assert ca1_cert_hash_1.is_symlink
    assert ca1_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'

    ca2_cert = host.file('/usr/local/share/ca-certificates/cacert2.crt')
    assert ca2_cert.is_file
    assert ca2_cert.user == 'root'
    assert ca2_cert.group == 'root'
    assert ca2_cert.mode == 0o644

    ca2_cert_symlink = host.file('/etc/ssl/certs/cacert2.pem')
    assert ca2_cert_symlink.is_symlink
    assert ca2_cert_symlink.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'

    ca2_cert_hash_1 = host.file('/etc/ssl/certs/a52eec00.0')
    assert ca2_cert_hash_1.is_symlink
    assert ca2_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'

    ca2_cert_hash_1 = host.file('/etc/ssl/certs/a0d2e9e4.0')
    assert ca2_cert_hash_1.is_symlink
    assert ca2_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'


def test_ferm_base_rules(host):
    """
    Tests if base ferm configuration has been deployed correctly with proper
    user-provided rate-limiting.
    """

    with host.sudo():
        ferm_base = host.file('/etc/ferm/conf.d/00-base.conf')

        assert "mod hashlimit hashlimit 5/second hashlimit-burst 5" in ferm_base.content

        iptables = host.command('iptables-save')

        assert iptables.rc == 0
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
        assert "-A flood -p tcp -m tcp --tcp-flags FIN,SYN,RST,ACK SYN -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout

        ip6tables = host.command('ip6tables-save')
        assert ip6tables.rc == 0
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
        assert "-A flood -p ipv6-icmp -m icmp6 --icmpv6-type 128 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in ip6tables.stdout


def test_pipreqcheck_virtualenv_user(host):
    """
    Tests if group and user for running pip requirements upgrade checks have
    been created correctly with user-provided uid/gid.
    """

    group = host.group('pipreqcheck')
    assert group.exists
    assert group.gid == 2500

    user = host.user('pipreqcheck')
    assert user.exists
    assert user.home == '/var/lib/pipreqcheck'
    assert user.uid == 2500
    assert user.group == 'pipreqcheck'
    assert user.groups == ['pipreqcheck']


def test_backup_configuration(host):
    """
    Tests if backup configuration has been deployed correctly.
    """

    with host.sudo():

        common = host.file('/etc/duply/main/patterns/common')
        assert common.is_file
        assert "/var/log" in common.content.split("\n")
        assert "/etc/shadow" in common.content.split("\n")
        assert "/var/mail" in common.content.split("\n")
        assert "/var/spool/cron" in common.content.split("\n")

        common_extra = host.file('/etc/duply/main/patterns/common_extra')
        assert common_extra.is_file
        assert "/home/user1" in common_extra.content.split("\n")
        assert "/home/user2" in common_extra.content.split("\n")


def test_ntp_software_installed(host):
    """
    Tests if NTP packages are installed.
    """

    assert host.package('ntp').is_installed
    assert host.package('ntpdate').is_installed


def test_ntp_server_configuration(host):
    """
    Tests if NTP server has been correctly configured.
    """

    with host.sudo():

        # Read the configuration file.
        configuration = host.file("/etc/ntp.conf").content.split("\n")

        # Extract only the relevant sections of files (exculde empty
        # lines and comments).
        configuration = [c.strip() for c in configuration if re.match('^\s*(|#.*)$', c) is None]

        # Ensure correct servers have been configured in the pool.
        servers = [c for c in configuration if c.startswith('server')]

        expected_servers = ["server 0.debian.pool.ntp.org iburst",
                            "server 1.debian.pool.ntp.org iburst",
                            "server 2.debian.pool.ntp.org iburst"]

        assert sorted(servers) == sorted(expected_servers)

        # Ensure querying of server is disable for untrusted clients.
        restrictions = [c for c in configuration if c.startswith('restrict')]
        expected_restrictions = ["restrict -4 default kod notrap nomodify nopeer noquery",
                                 "restrict -6 default kod notrap nomodify nopeer noquery",
                                 "restrict 127.0.0.1",
                                 "restrict ::1"]

        assert sorted(restrictions) == sorted(expected_restrictions)


def test_ntp_query_server_count(host):

    # Two lines for headers, and one line per configured server.
    expected_stdout_line_count = 5

    ntpq = host.command("ntpq -p -n")

    assert ntpq.rc == 0
    assert len(ntpq.stdout.split("\n")) == expected_stdout_line_count


def test_ntp_listening_interfaces(host):
    """
    Tests if NTP server is listening on correct ports.
    """

    assert host.socket('udp://:::123').is_listening