Files @ 490c7b153f52
Branch filter:

Location: majic-ansible-roles/roles/common/tests/test_parameters_optional.py - annotation

branko
Noticket: Improved logic behind a couple of tests that use commands to ensure a specific error is happening for tests to pass.
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
eb4d09d4abd3
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
eb4d09d4abd3
eb4d09d4abd3
ea69b2719d8e
ea69b2719d8e
eb4d09d4abd3
eb4d09d4abd3
eb4d09d4abd3
eb4d09d4abd3
eb4d09d4abd3
eb4d09d4abd3
eb4d09d4abd3
eb4d09d4abd3
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
eb4d09d4abd3
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
eb4d09d4abd3
eb4d09d4abd3
ea69b2719d8e
ea69b2719d8e
eb4d09d4abd3
eb4d09d4abd3
ea69b2719d8e
eb4d09d4abd3
eb4d09d4abd3
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
eb4d09d4abd3
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
eb4d09d4abd3
eb4d09d4abd3
eb4d09d4abd3
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
eb4d09d4abd3
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
eb4d09d4abd3
eb4d09d4abd3
eb4d09d4abd3
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
ea69b2719d8e
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
3d3f7f804487
import os
import socket

import paramiko

import testinfra.utils.ansible_runner


testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
    '.molecule/ansible_inventory').get_hosts('parameters-optional')


def test_apt_proxy(File):
    """
    Tests if proxy configuration for apt has been deployed correctly.
    """

    proxy_config = File('/etc/apt/apt.conf.d/00proxy')

    assert proxy_config.exists
    assert proxy_config.user == 'root'
    assert proxy_config.group == 'root'
    assert proxy_config.mode == 0o644


def test_bash_prompt_content(File):
    """
    Tests that custom bash prompt has been configured correctly with specified
    colour and prompt.
    """

    config = File('/etc/profile.d/bash_prompt.sh')

    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\[\\033[0;36m\\]\\u@\\h[test]:\\w\\$ \\[\\033[0m\\]'" in config.content
    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\u@\\h[test]:\\w\\$ '" in config.content


def test_common_installed_packages_common(Ansible, Package):
    """
    Tests that user-provided common packages have been installed.
    """

    debian_release = Ansible("setup")["ansible_facts"]["ansible_distribution_release"]

    assert Package('units').is_installed
    assert Package('gnutls-bin').is_installed

    # Different name of package in different Debian releases.
    if debian_release == 'jessie':
        assert Package('libmariadb-client-lgpl-dev-compat').is_installed
    elif debian_release == 'stretch':
        assert Package('libmariadbclient-dev-compat').is_installed
    else:
        raise Exception("Cannot run this test on debian release: %s" % debian_release)


def test_ssh_login_mechanisms():
    """
    Tests available SSH login mechanisms (should be just public key).
    """

    sock = socket.socket()
    sock.connect(('10.31.127.4', 22))

    transport = paramiko.transport.Transport(sock)
    transport.connect()

    try:
        transport.auth_none('')
    except paramiko.transport.BadAuthenticationType, err:
        assert err.allowed_types == ['publickey']


def test_mariadb_mysql_config_symlink(Ansible, File):
    """
    Tests if symbolic link has been set-up for mariadb_config binary to be
    accessible as mysql_config as well.

    Only applicable to Debian Jessie.
    """

    if Ansible("setup")["ansible_facts"]["ansible_distribution_release"] == 'jessie':
        mysql_config = File('/usr/bin/mysql_config')

        assert mysql_config.is_symlink
        assert mysql_config.linked_to == '/usr/bin/mariadb_config'


def test_emacs_electric_indent_mode(File):
    """
    Tests if Emacs electric indent mode has been disabled via custom
    configuration file.
    """

    emacs_config = File('/etc/emacs/site-start.d/01disable-electric-indent-mode.el')

    assert emacs_config.is_file
    assert emacs_config.user == 'root'
    assert emacs_config.group == 'root'
    assert emacs_config.mode == 0o644
    assert "(electric-indent-mode -1)" in emacs_config.content


def test_os_groups(Group):
    """
    Tests if user-supplied system groups have been created correctly.
    """

    group1 = Group('group1')
    assert group1.gid == 1001

    group2 = Group('group2')
    assert group2.gid == 3001

    group3 = Group('group3')
    assert group3.gid == 3002

    user1_group = Group('user1')
    assert user1_group.gid == 3003

    user2_group = Group('user2')
    assert user2_group.gid == 2001

    user3_group = Group('user3')
    assert user3_group.gid == 2002


def test_os_users(File, Sudo, User):
    """
    Tests if user-supplied system users have been created correctly.
    """

    with Sudo():
        user1 = User('user1')
        assert user1.uid == 1001
        assert user1.group == 'user1'
        assert user1.groups == ['user1']
        assert user1.shell == '/bin/bash'
        assert user1.password == '!'

        user1_authorized_keys = File(os.path.join(user1.home, '.ssh', 'authorized_keys'))
        assert not user1_authorized_keys.exists

        user2 = User('user2')
        assert user2.uid == 2001
        assert user2.group == 'user2'
        assert sorted(user2.groups) == sorted(['group1', 'group2', 'user2'])
        assert user2.shell == '/bin/bash'
        assert user2.password == '$6$wdXOQiMe09ugh0$VRIph2XA2QQyEYlAlH7zT4TPACDUalf/4FKpqG9JRHfKxANTcTug2ANCt450htcs0LikJfHLWofLP54jraFU61'

        user2_authorized_keys = File(os.path.join(user2.home, '.ssh', 'authorized_keys'))
        assert open('tests/data/ssh/clientkey1.pub', 'r').read().strip() in user2_authorized_keys.content
        assert open('tests/data/ssh/clientkey2.pub', 'r').read().strip() in user2_authorized_keys.content

        user3 = User('user3')
        assert user3.uid == 2002
        assert user3.group == 'user3'
        assert sorted(user3.groups) == sorted(['group3', 'user3'])
        assert user3.shell == '/bin/bash'
        assert user3.password == '$6$nmx.21uLqT$9LrUqNUgUwIM.l0KFKgr2.kDEwe2lo7IbBIhnG70AGW7GTFdWBUFnGAxH15YxikTXhDJD/uxd.NNgojEOjRvx1'

        user3_authorized_keys = File(os.path.join(user3.home, '.ssh', 'authorized_keys'))
        assert open('tests/data/ssh/clientkey3.pub', 'r').read().strip() in user3_authorized_keys.content


def test_authorized_keys_login():
    """
    Tests if authorized SSH keys for user-provided system users have been set-up
    correctly.
    """

    client = paramiko.client.SSHClient()
    client.set_missing_host_key_policy(paramiko.client.WarningPolicy())

    # No exception will be raised if connection is successful.
    client.connect("10.31.127.4", username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey1')
    client.connect("10.31.127.4", username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey2')
    client.connect("10.31.127.4", username="user3", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey3')


def test_ca_certificates(File):
    """
    Tests if CA certificates have been correctly deployed to the system.
    """

    ca1_cert = File('/usr/local/share/ca-certificates/cacert1.crt')
    assert ca1_cert.is_file
    assert ca1_cert.user == 'root'
    assert ca1_cert.group == 'root'
    assert ca1_cert.mode == 0o644

    ca1_cert_symlink = File('/etc/ssl/certs/cacert1.pem')
    assert ca1_cert_symlink.is_symlink
    assert ca1_cert_symlink.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'

    ca1_cert_hash_1 = File('/etc/ssl/certs/3ce70b58.0')
    assert ca1_cert_hash_1.is_symlink
    assert ca1_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'

    ca1_cert_hash_1 = File('/etc/ssl/certs/49f72a44.0')
    assert ca1_cert_hash_1.is_symlink
    assert ca1_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'

    ca2_cert = File('/usr/local/share/ca-certificates/cacert2.crt')
    assert ca2_cert.is_file
    assert ca2_cert.user == 'root'
    assert ca2_cert.group == 'root'
    assert ca2_cert.mode == 0o644

    ca2_cert_symlink = File('/etc/ssl/certs/cacert2.pem')
    assert ca2_cert_symlink.is_symlink
    assert ca2_cert_symlink.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'

    ca2_cert_hash_1 = File('/etc/ssl/certs/a52eec00.0')
    assert ca2_cert_hash_1.is_symlink
    assert ca2_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'

    ca2_cert_hash_1 = File('/etc/ssl/certs/a0d2e9e4.0')
    assert ca2_cert_hash_1.is_symlink
    assert ca2_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'


def test_ferm_base_rules(Command, File, Sudo):
    """
    Tests if base ferm configuration has been deployed correctly with proper
    user-provided rate-limiting.
    """

    with Sudo():
        ferm_base = File('/etc/ferm/conf.d/00-base.conf')

        assert "mod hashlimit hashlimit 5/second hashlimit-burst 5" in ferm_base.content

        iptables = Command('iptables-save')

        assert iptables.rc == 0
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
        assert "-A flood -p tcp -m tcp --tcp-flags FIN,SYN,RST,ACK SYN -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout

        ip6tables = Command('ip6tables-save')
        assert ip6tables.rc == 0
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
        assert "-A flood -p ipv6-icmp -m icmp6 --icmpv6-type 128 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in ip6tables.stdout


def test_pipreqcheck_virtualenv_user(Group, User):
    """
    Tests if group and user for running pip requirements upgrade checks have
    been created correctly with user-provided uid/gid.
    """

    group = Group('pipreqcheck')
    assert group.exists
    assert group.gid == 2500

    user = User('pipreqcheck')
    assert user.exists
    assert user.home == '/var/lib/pipreqcheck'
    assert user.uid == 2500
    assert user.group == 'pipreqcheck'
    assert user.groups == ['pipreqcheck']


def test_backup_configuration(File, Sudo):
    """
    Tests if backup configuration has been deployed correctly.
    """

    with Sudo():

        common = File('/etc/duply/main/patterns/common')
        assert common.is_file
        assert "/var/log" in common.content.split("\n")
        assert "/etc/shadow" in common.content.split("\n")
        assert "/var/mail" in common.content.split("\n")
        assert "/var/spool/cron" in common.content.split("\n")

        common_extra = File('/etc/duply/main/patterns/common_extra')
        assert common_extra.is_file
        assert "/home/user1" in common_extra.content.split("\n")
        assert "/home/user2" in common_extra.content.split("\n")