Files @ 36e1c9460cd6
Branch filter:

Location: majic-ansible-roles/roles/common/tests/test_parameters_optional.py

branko
MAR-27: Added initial scaffolding for testing mail_forwarder role:

- Fixed issues reported by Ansible linting check (some mode-related syntax and
one ignore.
- Added Molecule configuration for testing mandatory and optional
parameters. Covers both Debian Jessie and Debian Stretch.
- Added test playbook for setting-up the test instances. A helper relay mail
server.
- Updated both mail_server and mail_forwarder to fall-back to using
native (/etc/hosts) resolving if DNS fails. Solves issue with test environment
not having proper DNS set-up for all domains etc.
- Added a number of data/config files associated with tests.
- Added dummy test file.
import os
import socket

import paramiko

import testinfra.utils.ansible_runner


testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
    '.molecule/ansible_inventory').get_hosts('parameters-optional')


def test_apt_proxy(File):
    """
    Tests if proxy configuration for apt has been deployed correctly.
    """

    proxy_config = File('/etc/apt/apt.conf.d/00proxy')

    assert proxy_config.exists
    assert proxy_config.user == 'root'
    assert proxy_config.group == 'root'
    assert proxy_config.mode == 0o644


def test_bash_prompt_content(File):
    """
    Tests that custom bash prompt has been configured correctly with specified
    colour and prompt.
    """

    config = File('/etc/profile.d/bash_prompt.sh')

    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\[\\033[0;36m\\]\\u@\\h[test]:\\w\\$ \\[\\033[0m\\]'" in config.content
    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\u@\\h[test]:\\w\\$ '" in config.content


def test_common_installed_packages_common(Ansible, Package):
    """
    Tests that user-provided common packages have been installed.
    """

    debian_release = Ansible("setup")["ansible_facts"]["ansible_distribution_release"]

    assert Package('units').is_installed
    assert Package('gnutls-bin').is_installed

    # Different name of package in different Debian releases.
    if debian_release == 'jessie':
        assert Package('libmariadb-client-lgpl-dev-compat').is_installed
    elif debian_release == 'stretch':
        assert Package('libmariadbclient-dev-compat').is_installed
    else:
        raise Exception("Cannot run this test on debian release: %s" % debian_release)


def test_ssh_login_mechanisms():
    """
    Tests available SSH login mechanisms (should be just public key).
    """

    sock = socket.socket()
    sock.connect(('10.31.127.4', 22))

    transport = paramiko.transport.Transport(sock)
    transport.connect()

    try:
        transport.auth_none('')
    except paramiko.transport.BadAuthenticationType, err:
        assert err.allowed_types == ['publickey']


def test_mariadb_mysql_config_symlink(Ansible, File):
    """
    Tests if symbolic link has been set-up for mariadb_config binary to be
    accessible as mysql_config as well.

    Only applicable to Debian Jessie.
    """

    if Ansible("setup")["ansible_facts"]["ansible_distribution_release"] == 'jessie':
        mysql_config = File('/usr/bin/mysql_config')

        assert mysql_config.is_symlink
        assert mysql_config.linked_to == '/usr/bin/mariadb_config'


def test_emacs_electric_indent_mode(File):
    """
    Tests if Emacs electric indent mode has been disabled via custom
    configuration file.
    """

    emacs_config = File('/etc/emacs/site-start.d/01disable-electric-indent-mode.el')

    assert emacs_config.is_file
    assert emacs_config.user == 'root'
    assert emacs_config.group == 'root'
    assert emacs_config.mode == 0o644
    assert "(electric-indent-mode -1)" in emacs_config.content


def test_os_groups(Group):
    """
    Tests if user-supplied system groups have been created correctly.
    """

    group1 = Group('group1')
    assert group1.gid == 1001

    group2 = Group('group2')
    assert group2.gid == 3001

    group3 = Group('group3')
    assert group3.gid == 3002

    user1_group = Group('user1')
    assert user1_group.gid == 3003

    user2_group = Group('user2')
    assert user2_group.gid == 2001

    user3_group = Group('user3')
    assert user3_group.gid == 2002


def test_os_users(File, Sudo, User):
    """
    Tests if user-supplied system users have been created correctly.
    """

    with Sudo():
        user1 = User('user1')
        assert user1.uid == 1001
        assert user1.group == 'user1'
        assert user1.groups == ['user1']
        assert user1.shell == '/bin/bash'
        assert user1.password == '!'

        user1_authorized_keys = File(os.path.join(user1.home, '.ssh', 'authorized_keys'))
        assert not user1_authorized_keys.exists

        user2 = User('user2')
        assert user2.uid == 2001
        assert user2.group == 'user2'
        assert sorted(user2.groups) == sorted(['group1', 'group2', 'user2'])
        assert user2.shell == '/bin/bash'
        assert user2.password == '$6$wdXOQiMe09ugh0$VRIph2XA2QQyEYlAlH7zT4TPACDUalf/4FKpqG9JRHfKxANTcTug2ANCt450htcs0LikJfHLWofLP54jraFU61'

        user2_authorized_keys = File(os.path.join(user2.home, '.ssh', 'authorized_keys'))
        assert open('tests/data/ssh/clientkey1.pub', 'r').read().strip() in user2_authorized_keys.content
        assert open('tests/data/ssh/clientkey2.pub', 'r').read().strip() in user2_authorized_keys.content

        user3 = User('user3')
        assert user3.uid == 2002
        assert user3.group == 'user3'
        assert sorted(user3.groups) == sorted(['group3', 'user3'])
        assert user3.shell == '/bin/bash'
        assert user3.password == '$6$nmx.21uLqT$9LrUqNUgUwIM.l0KFKgr2.kDEwe2lo7IbBIhnG70AGW7GTFdWBUFnGAxH15YxikTXhDJD/uxd.NNgojEOjRvx1'

        user3_authorized_keys = File(os.path.join(user3.home, '.ssh', 'authorized_keys'))
        assert open('tests/data/ssh/clientkey3.pub', 'r').read().strip() in user3_authorized_keys.content


def test_authorized_keys_login():
    """
    Tests if authorized SSH keys for user-provided system users have been set-up
    correctly.
    """

    client = paramiko.client.SSHClient()
    client.set_missing_host_key_policy(paramiko.client.WarningPolicy())

    # No exception will be raised if connection is successful.
    client.connect("10.31.127.4", username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey1')
    client.connect("10.31.127.4", username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey2')
    client.connect("10.31.127.4", username="user3", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey3')


def test_ca_certificates(File):
    """
    Tests if CA certificates have been correctly deployed to the system.
    """

    ca1_cert = File('/usr/local/share/ca-certificates/cacert1.crt')
    assert ca1_cert.is_file
    assert ca1_cert.user == 'root'
    assert ca1_cert.group == 'root'
    assert ca1_cert.mode == 0o644

    ca1_cert_symlink = File('/etc/ssl/certs/cacert1.pem')
    assert ca1_cert_symlink.is_symlink
    assert ca1_cert_symlink.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'

    ca1_cert_hash_1 = File('/etc/ssl/certs/3ce70b58.0')
    assert ca1_cert_hash_1.is_symlink
    assert ca1_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'

    ca1_cert_hash_1 = File('/etc/ssl/certs/49f72a44.0')
    assert ca1_cert_hash_1.is_symlink
    assert ca1_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'

    ca2_cert = File('/usr/local/share/ca-certificates/cacert2.crt')
    assert ca2_cert.is_file
    assert ca2_cert.user == 'root'
    assert ca2_cert.group == 'root'
    assert ca2_cert.mode == 0o644

    ca2_cert_symlink = File('/etc/ssl/certs/cacert2.pem')
    assert ca2_cert_symlink.is_symlink
    assert ca2_cert_symlink.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'

    ca2_cert_hash_1 = File('/etc/ssl/certs/a52eec00.0')
    assert ca2_cert_hash_1.is_symlink
    assert ca2_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'

    ca2_cert_hash_1 = File('/etc/ssl/certs/a0d2e9e4.0')
    assert ca2_cert_hash_1.is_symlink
    assert ca2_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'


def test_ferm_base_rules(Command, File, Sudo):
    """
    Tests if base ferm configuration has been deployed correctly with proper
    user-provided rate-limiting.
    """

    with Sudo():
        ferm_base = File('/etc/ferm/conf.d/00-base.conf')

        assert "mod hashlimit hashlimit 5/second hashlimit-burst 5" in ferm_base.content

        iptables = Command('iptables-save')

        assert iptables.rc == 0
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
        assert "-A flood -p tcp -m tcp --tcp-flags FIN,SYN,RST,ACK SYN -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout

        ip6tables = Command('ip6tables-save')
        assert ip6tables.rc == 0
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
        assert "-A flood -p ipv6-icmp -m icmp6 --icmpv6-type 128 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in ip6tables.stdout


def test_pipreqcheck_virtualenv_user(Group, User):
    """
    Tests if group and user for running pip requirements upgrade checks have
    been created correctly with user-provided uid/gid.
    """

    group = Group('pipreqcheck')
    assert group.exists
    assert group.gid == 2500

    user = User('pipreqcheck')
    assert user.exists
    assert user.home == '/var/lib/pipreqcheck'
    assert user.uid == 2500
    assert user.group == 'pipreqcheck'
    assert user.groups == ['pipreqcheck']


def test_backup_configuration(File, Sudo):
    """
    Tests if backup configuration has been deployed correctly.
    """

    with Sudo():

        common = File('/etc/duply/main/patterns/common')
        assert common.is_file
        assert "/var/log" in common.content.split("\n")
        assert "/etc/shadow" in common.content.split("\n")
        assert "/var/mail" in common.content.split("\n")
        assert "/var/spool/cron" in common.content.split("\n")

        common_extra = File('/etc/duply/main/patterns/common_extra')
        assert common_extra.is_file
        assert "/home/user1" in common_extra.content.split("\n")
        assert "/home/user2" in common_extra.content.split("\n")