Changeset - 693d6960b1a0
[Not reviewed]
0 2 0
Branko Majic (branko) - 6 years ago 2020-01-06 16:54:36
branko@majic.rs
MAR-148: Fix test for the common role:

- Do some additional stripping on command outputs to get rid of extra
new lines.
2 files changed with 3 insertions and 3 deletions:
0 comments (0 inline, 0 general)
roles/common/molecule/default/tests/test_default.py
Show inline comments
 
import os
 

	
 
import testinfra.utils.ansible_runner
 

	
 
import pytest
 

	
 

	
 
testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
 
    os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('parameters-*')
 

	
 

	
 
def test_pam_umask(host):
 
    """
 
    Tests configuration of PAM umask module.
 
    """
 

	
 
    pam_auth_update_config = host.file('/usr/share/pam-configs/umask')
 
    assert pam_auth_update_config.exists
 
    assert pam_auth_update_config.user == 'root'
 
    assert pam_auth_update_config.group == 'root'
 
    assert pam_auth_update_config.mode == 0o644
 

	
 
    assert host.file('/etc/pam.d/common-session').contains(r'session[[:blank:]]\+required[[:blank:]]\+pam_umask.so')
 
    assert host.file('/etc/pam.d/common-session-noninteractive').contains(r'session[[:blank:]]\+required[[:blank:]]\+pam_umask.so')
 

	
 

	
 
def test_login_umask(host):
 
    """
 
    Tests set-up of default UMASK via /etc/login.defs.
 
    """
 

	
 
    assert host.file('/etc/login.defs').contains(r'UMASK[[:blank:]]\+027')
 

	
 

	
 
def test_adduser_umask(host):
 
    """
 
    Tests UMASK configuration used for creating user home directory.
 
    """
 

	
 
    assert host.file('/etc/adduser.conf').contains('DIR_MODE=0750')
 

	
 

	
 
def test_bash_prompt(host):
 
    """
 
    Tests file permissions on custom bash prompt configuration.
 
    """
 

	
 
    bash_prompt = host.file('/etc/profile.d/bash_prompt.sh')
 

	
 
    assert bash_prompt.exists
 
    assert bash_prompt.user == 'root'
 
    assert bash_prompt.group == 'root'
 
    assert bash_prompt.mode == 0o644
 

	
 

	
 
def test_home_profile_d(host):
 
    """
 
    Tests deployment of special profile file used for enabling profile.d-like
 
    capability in user's home directory.
 
    """
 

	
 
    home_profile_d = host.file('/etc/profile.d/z99-user_profile_d.sh')
 

	
 
    assert home_profile_d.is_file
 
    assert home_profile_d.user == 'root'
 
    assert home_profile_d.group == 'root'
 
    assert home_profile_d.mode == 0o644
 

	
 

	
 
def test_home_skeleton_bashrc(host):
 
    """
 
    Tests deployment of home directory skeleton bashrc.
 
    """
 

	
 
    bashrc = host.file('/etc/skel/.bashrc')
 

	
 
    assert bashrc.is_file
 
    assert bashrc.user == 'root'
 
    assert bashrc.group == 'root'
 
    assert bashrc.mode == 0o644
 
    assert bashrc.sha256sum == '4f946fb387a413c8d7633787d8e8a7785c256d77f7c6a692822ffdb439c78277'
 

	
 

	
 
def test_default_bashrc(host):
 
    """
 
    Tests deployment of default bashrc file.
 
    """
 

	
 
    bashrc = host.file('/etc/bash.bashrc')
 

	
 
    assert bashrc.is_file
 
    assert bashrc.user == 'root'
 
    assert bashrc.group == 'root'
 
    assert bashrc.mode == 0o644
 

	
 

	
 
def test_root_bashrc(host):
 
    """
 
    Tests overwriting of root's bashrc configuration with default one.
 
    """
 

	
 
    with host.sudo():
 
        bashrc = host.file('/root/.bashrc')
 

	
 
        assert bashrc.is_file
 
        assert bashrc.user == 'root'
 
        assert bashrc.group == 'root'
 
        assert bashrc.mode == 0o640
 
        assert bashrc.sha256sum == '4f946fb387a413c8d7633787d8e8a7785c256d77f7c6a692822ffdb439c78277'
 

	
 

	
 
def test_installed_packages(host):
 
    """
 
    Tests installation of required packages.
 
    """
 

	
 
    assert host.package('sudo').is_installed
 
    assert host.package('ssl-cert').is_installed
 
    assert host.package('rcconf').is_installed
 
    assert host.package('ferm').is_installed
 
    assert host.package('apticron').is_installed
 
    assert host.package('python-setuptools').is_installed
 
    assert host.package('python3-setuptools').is_installed
 
    assert host.package('virtualenv').is_installed
 

	
 

	
 
def test_root_remote_login_disabled(host):
 
    """
 
    Tests if SSH server has been configured to prevent remote root logins.
 
    """
 

	
 
    assert 'PermitRootLogin no' in host.file('/etc/ssh/sshd_config').content
 

	
 

	
 
def test_remote_login_via_password_disabled(host):
 
    """
 
    Tests if SSH server has been configured to disable password-based
 
    authentication.
 
    """
 

	
 
    assert 'PasswordAuthentication no' in host.file('/etc/ssh/sshd_config').content
 

	
 

	
 
def test_ferm_service_configuration(host):
 

	
 
    ferm_service_config = host.file('/etc/default/ferm')
 

	
 
    assert ferm_service_config.is_file
 
    assert ferm_service_config.user == 'root'
 
    assert ferm_service_config.group == 'root'
 
    assert ferm_service_config.mode == 0o644
 
    assert 'FAST=yes' in ferm_service_config.content
 
    assert 'CACHE=no' in ferm_service_config.content
 
    assert 'ENABLED="yes"' in ferm_service_config.content
 

	
 

	
 
def test_ferm_configuration_directory(host):
 
    """
 
    Tests creation of ferm configuration directory.
 
    """
 

	
 
    with host.sudo():
 
        ferm_dir = host.file('/etc/ferm/conf.d')
 

	
 
        assert ferm_dir.is_directory
 
        assert ferm_dir.user == 'root'
 
        assert ferm_dir.group == 'root'
 
        assert ferm_dir.mode == 0o750
 

	
 

	
 
def test_ferm_configuration(host):
 
    """
 
    Tests deployment of basic ferm configuration files.
 
    """
 

	
 
    with host.sudo():
 

	
 
        ferm_configuration = host.file('/etc/ferm/ferm.conf')
 
        assert ferm_configuration.is_file
 
        assert ferm_configuration.user == 'root'
 
        assert ferm_configuration.group == 'root'
 
        assert ferm_configuration.mode == 0o640
 
        assert "@include '/etc/ferm/conf.d/';" in ferm_configuration.content
 

	
 
        ferm_base = host.file('/etc/ferm/conf.d/00-base.conf')
 
        assert ferm_base.is_file
 
        assert ferm_base.user == 'root'
 
        assert ferm_base.group == 'root'
 
        assert ferm_base.mode == 0o640
 

	
 

	
 
def test_ferm_service(host):
 
    """
 
    Tests if ferm is started and enabled to start automatically on boot.
 
    """
 

	
 
    ferm = host.service('ferm')
 

	
 
    assert ferm.is_running
 
    assert ferm.is_enabled
 

	
 

	
 
def test_check_certificate_script(host):
 

	
 
    check_certificate = host.file('/usr/local/bin/check_certificate.sh')
 

	
 
    assert check_certificate.is_file
 
    assert check_certificate.user == 'root'
 
    assert check_certificate.group == 'root'
 
    assert check_certificate.mode == 0o755
 

	
 

	
 
def test_check_certificate_directory(host):
 

	
 
    check_certificate_dir = host.file('/etc/check_certificate')
 

	
 
    assert check_certificate_dir.is_directory
 
    assert check_certificate_dir.user == 'root'
 
    assert check_certificate_dir.group == 'root'
 
    assert check_certificate_dir.mode == 0o755
 

	
 

	
 
def test_check_certificate_crontab(host):
 
    """
 
    Tests deployment of cron job for checking certificates.
 
    """
 

	
 
    check_certificate_crontab = host.file('/etc/cron.d/check_certificate')
 

	
 
    assert check_certificate_crontab.is_file
 
    assert check_certificate_crontab.user == 'root'
 
    assert check_certificate_crontab.group == 'root'
 
    assert check_certificate_crontab.mode == 0o644
 
    assert "0 0 * * * nobody /usr/local/bin/check_certificate.sh -q expiration" in check_certificate_crontab.content
 

	
 

	
 
@pytest.mark.parametrize('virtualenv_activate_path', [
 
    '/var/lib/pipreqcheck/virtualenv/bin/activate',
 
    '/var/lib/pipreqcheck/virtualenv-py3/bin/activate',
 
])
 
def test_pipreqcheck_virtualenv(host, virtualenv_activate_path):
 
    """
 
    Tests creation of Python virtual environment used for performing pip
 
    requirements upgrade checks.
 
    """
 

	
 
    with host.sudo():
 
        virtualenv_activate = host.file(virtualenv_activate_path)
 

	
 
        assert virtualenv_activate.is_file
 
        assert virtualenv_activate.user == 'pipreqcheck'
 
        assert virtualenv_activate.group == 'pipreqcheck'
 
        # @TODO: Possibly due to some timing issues, this file might
 
        # sometimes end-up being 0640, sometimes 0644.
 
        # assert virtualenv_activate.mode == 0o644
 

	
 

	
 
@pytest.mark.parametrize('config_dir', [
 
    '/etc/pip_check_requirements_upgrades',
 
    '/etc/pip_check_requirements_upgrades-py3',
 
])
 
def test_pipreqcheck_directories(host, config_dir):
 
    """
 
    Tests creation of directories used for storing configuration used by script
 
    that performs pip requirements upgrade checks.
 
    """
 

	
 
    with host.sudo():
 
        pipreqcheck_config_directory = host.file(config_dir)
 
        assert pipreqcheck_config_directory.is_directory
 
        assert pipreqcheck_config_directory.user == 'root'
 
        assert pipreqcheck_config_directory.group == 'pipreqcheck'
 
        assert pipreqcheck_config_directory.mode == 0o750
 

	
 
        pipreqcheck_config_directory_pipreqcheck = host.file(os.path.join(config_dir, 'pipreqcheck'))
 
        assert pipreqcheck_config_directory_pipreqcheck.is_directory
 
        assert pipreqcheck_config_directory_pipreqcheck.user == 'root'
 
        assert pipreqcheck_config_directory_pipreqcheck.group == 'pipreqcheck'
 
        assert pipreqcheck_config_directory_pipreqcheck.mode == 0o750
 

	
 

	
 
@pytest.mark.parametrize('requirements_in_path, requirements_txt_path', [
 
    ('/etc/pip_check_requirements_upgrades/pipreqcheck/requirements.in',
 
     '/etc/pip_check_requirements_upgrades/pipreqcheck/requirements.txt'),
 
    ('/etc/pip_check_requirements_upgrades-py3/pipreqcheck/requirements.in',
 
     '/etc/pip_check_requirements_upgrades-py3/pipreqcheck/requirements.txt'),
 
])
 
def test_pipreqcheck_requirements(host, requirements_in_path, requirements_txt_path):
 
    """
 
    Tests deployment of requirements input and text file used for virtual
 
    environment utilised by script that perform pip requirements upgrade checks.
 
    """
 

	
 
    with host.sudo():
 
        requirements_in = host.file(requirements_in_path)
 
        assert requirements_in.is_file
 
        assert requirements_in.user == 'root'
 
        assert requirements_in.group == 'pipreqcheck'
 
        assert requirements_in.mode == 0o640
 

	
 
        requirements_txt = host.file(requirements_txt_path)
 
        requirements_txt.is_file
 
        assert requirements_txt.user == 'root'
 
        assert requirements_txt.group == 'pipreqcheck'
 
        assert requirements_txt.mode == 0o640
 

	
 

	
 
@pytest.mark.parametrize("pip_path, expected_packages",  [
 
    ('/var/lib/pipreqcheck/virtualenv/bin/pip', [
 
        "Click==7.0",
 
        "pip==19.2.3",
 
        "pip-tools==4.0.0",
 
        "setuptools==41.2.0",
 
        "six==1.12.0",
 
        "wheel==0.33.6",
 
    ]),
 
    ('/var/lib/pipreqcheck/virtualenv-py3/bin/pip', [
 
        "Click==7.0",
 
        "pip==19.1.1",
 
        "pip-tools==3.9.0",
 
        "setuptools==41.2.0",
 
        "six==1.12.0",
 
        "wheel==0.33.6",
 
    ]),
 
])
 
def test_pipreqcheck_virtualenv_packages(host, pip_path, expected_packages):
 
    """
 
    Tests if correct packages are installed in virtualenv used for pip
 
    requirements checks..
 
    """
 

	
 
    packages = host.run("sudo -u %s %s freeze --all" % ('pipreqcheck', pip_path))
 

	
 
    # Normalise package names and order.
 
    expected_packages = sorted([unicode(p.lower()) for p in expected_packages])
 
    actual_packages = sorted(packages.stdout.lower().split("\n"))
 
    actual_packages = sorted(packages.stdout.lower().strip().split("\n"))
 

	
 
    # This is a dummy distro-provided package ignored by the pip-tools.
 
    if "pkg-resources==0.0.0" in actual_packages:
 
        actual_packages.remove("pkg-resources==0.0.0")
 

	
 
    assert actual_packages == expected_packages
 

	
 

	
 
def test_pipreqcheck_script(host):
 
    """
 
    Tests script used for performing pip requirements upgrade checks.
 
    """
 

	
 
    pipreqcheck_script = host.file('/usr/local/bin/pip_check_requirements_upgrades.sh')
 

	
 
    assert pipreqcheck_script.is_file
 
    assert pipreqcheck_script.user == 'root'
 
    assert pipreqcheck_script.group == 'root'
 
    assert pipreqcheck_script.mode == 0o755
 

	
 

	
 
@pytest.mark.parametrize('crontab_path, virtualenv_path', [
 
    ('/etc/cron.d/check_pip_requirements', '/var/lib/pipreqcheck/virtualenv'),
 
    ('/etc/cron.d/check_pip_requirements-py3', '/var/lib/pipreqcheck/virtualenv-py3'),
 
])
 
def test_pipreqcheck_crontab(host, crontab_path, virtualenv_path):
 
    """
 
    Tests if crontab entry is set-up correctly for running the pip requirements
 
    upgrade checks.
 
    """
 

	
 
    crontab = host.file(crontab_path)
 

	
 
    assert crontab.is_file
 
    assert crontab.user == 'root'
 
    assert crontab.group == 'root'
 
    assert crontab.mode == 0o644
 
    assert "MAILTO=root" in crontab.content
 
    assert virtualenv_path in crontab.content.split(" ")
 

	
 

	
 
@pytest.mark.parametrize('python_path, expected_major_version', [
 
    ('/var/lib/pipreqcheck/virtualenv/bin/python', '2'),
 
    ('/var/lib/pipreqcheck/virtualenv-py3/bin/python', '3'),
 
])
 
def test_pipreqcheck_virtualenv_python_version(host, python_path, expected_major_version):
 
    """
 
    Tests if Python virtual environment for pipreqcheck has been
 
    set-up correctly.
 
    """
 

	
 
    with host.sudo('pipreqcheck'):
 
        major_version = host.run("%s -c %s", python_path, "import sys; print(sys.version_info.major)")
 

	
 
    assert major_version.rc == 0
 
    assert major_version.stdout == expected_major_version
 
    assert major_version.stdout.strip() == expected_major_version
 

	
 

	
 
@pytest.mark.parametrize('wrong_python_path', [
 
    '/var/lib/pipreqcheck/virtualenv/bin/python3',
 
    '/var/lib/pipreqcheck/virtualenv-py3/bin/python2',
 
])
 
def test_pipreqcheck_virtualenv_wrong_python_version_not_present(host, wrong_python_path):
 
    """
 
    Tests if wrong version of Python 2 is absent or not.
 
    """
 

	
 
    with host.sudo():
 
        wrong_python_path_file = host.file(wrong_python_path)
 

	
 
        assert not wrong_python_path_file.exists
roles/common/molecule/default/tests/test_parameters_optional.py
Show inline comments
 
import os
 
import re
 
import socket
 

	
 
import paramiko
 

	
 
import testinfra.utils.ansible_runner
 

	
 

	
 
testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
 
    os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('parameters-optional')
 

	
 

	
 
def test_apt_proxy(host):
 
    """
 
    Tests if proxy configuration for apt has been deployed correctly.
 
    """
 

	
 
    proxy_config = host.file('/etc/apt/apt.conf.d/00proxy')
 

	
 
    assert proxy_config.exists
 
    assert proxy_config.user == 'root'
 
    assert proxy_config.group == 'root'
 
    assert proxy_config.mode == 0o644
 

	
 

	
 
def test_bash_prompt_content(host):
 
    """
 
    Tests that custom bash prompt has been configured correctly with specified
 
    colour and prompt.
 
    """
 

	
 
    config = host.file('/etc/profile.d/bash_prompt.sh')
 

	
 
    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\[\\033[0;36m\\]\\u@\\h[test]:\\w\\$ \\[\\033[0m\\]'" in config.content
 
    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\u@\\h[test]:\\w\\$ '" in config.content
 

	
 

	
 
def test_common_installed_packages_common(host):
 
    """
 
    Tests that user-provided common packages have been installed.
 
    """
 

	
 
    debian_release = host.ansible("setup")["ansible_facts"]["ansible_distribution_release"]
 

	
 
    assert host.package('units').is_installed
 
    assert host.package('gnutls-bin').is_installed
 

	
 
    # Different name of package in different Debian releases.
 
    if debian_release == 'jessie':
 
        assert host.package('libmariadb-client-lgpl-dev-compat').is_installed
 
    elif debian_release == 'stretch':
 
        assert host.package('libmariadbclient-dev-compat').is_installed
 
    else:
 
        raise Exception("Cannot run this test on debian release: %s" % debian_release)
 

	
 

	
 
def test_ssh_login_mechanisms(host):
 
    """
 
    Tests available SSH login mechanisms (should be just public key).
 
    """
 

	
 
    # Extract first non-IPv6 IP. Crude test, but it should work.
 
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)
 

	
 
    sock = socket.socket()
 
    sock.connect((remote_ip, 22))
 

	
 
    transport = paramiko.transport.Transport(sock)
 
    transport.connect()
 

	
 
    try:
 
        transport.auth_none('')
 
    except paramiko.transport.BadAuthenticationType, err:
 
        assert err.allowed_types == ['publickey']
 

	
 

	
 
def test_mariadb_mysql_config_symlink(host):
 
    """
 
    Tests if symbolic link has been set-up for mariadb_config binary to be
 
    accessible as mysql_config as well.
 

	
 
    Only applicable to Debian Jessie.
 
    """
 

	
 
    if host.ansible("setup")["ansible_facts"]["ansible_distribution_release"] == 'jessie':
 
        mysql_config = host.file('/usr/bin/mysql_config')
 

	
 
        assert mysql_config.is_symlink
 
        assert mysql_config.linked_to == '/usr/bin/mariadb_config'
 

	
 

	
 
def test_emacs_electric_indent_mode(host):
 
    """
 
    Tests if Emacs electric indent mode has been disabled via custom
 
    configuration file.
 
    """
 

	
 
    emacs_config = host.file('/etc/emacs/site-start.d/01disable-electric-indent-mode.el')
 

	
 
    assert emacs_config.is_file
 
    assert emacs_config.user == 'root'
 
    assert emacs_config.group == 'root'
 
    assert emacs_config.mode == 0o644
 
    assert "(electric-indent-mode -1)" in emacs_config.content
 

	
 

	
 
def test_os_groups(host):
 
    """
 
    Tests if user-supplied system groups have been created correctly.
 
    """
 

	
 
    group1 = host.group('group1')
 
    assert group1.gid == 1001
 

	
 
    group2 = host.group('group2')
 
    assert group2.gid == 3001
 

	
 
    group3 = host.group('group3')
 
    assert group3.gid == 3002
 

	
 
    user1_group = host.group('user1')
 
    assert user1_group.gid == 3003
 

	
 
    user2_group = host.group('user2')
 
    assert user2_group.gid == 2001
 

	
 
    user3_group = host.group('user3')
 
    assert user3_group.gid == 2002
 

	
 

	
 
def test_os_users(host):
 
    """
 
    Tests if user-supplied system users have been created correctly.
 
    """
 

	
 
    with host.sudo():
 
        user1 = host.user('user1')
 
        assert user1.uid == 1001
 
        assert user1.group == 'user1'
 
        assert user1.groups == ['user1']
 
        assert user1.shell == '/bin/bash'
 
        assert user1.password == '!'
 

	
 
        user1_authorized_keys = host.file(os.path.join(user1.home, '.ssh', 'authorized_keys'))
 
        assert not user1_authorized_keys.exists
 

	
 
        user2 = host.user('user2')
 
        assert user2.uid == 2001
 
        assert user2.group == 'user2'
 
        assert sorted(user2.groups) == sorted(['group1', 'group2', 'user2'])
 
        assert user2.shell == '/bin/bash'
 
        assert user2.password == '$6$wdXOQiMe09ugh0$VRIph2XA2QQyEYlAlH7zT4TPACDUalf/4FKpqG9JRHfKxANTcTug2ANCt450htcs0LikJfHLWofLP54jraFU61'
 

	
 
        user2_authorized_keys = host.file(os.path.join(user2.home, '.ssh', 'authorized_keys'))
 
        assert open('tests/data/ssh/clientkey1.pub', 'r').read().strip() in user2_authorized_keys.content
 
        assert open('tests/data/ssh/clientkey2.pub', 'r').read().strip() in user2_authorized_keys.content
 

	
 
        user3 = host.user('user3')
 
        assert user3.uid == 2002
 
        assert user3.group == 'user3'
 
        assert sorted(user3.groups) == sorted(['group3', 'user3'])
 
        assert user3.shell == '/bin/bash'
 
        assert user3.password == '$6$nmx.21uLqT$9LrUqNUgUwIM.l0KFKgr2.kDEwe2lo7IbBIhnG70AGW7GTFdWBUFnGAxH15YxikTXhDJD/uxd.NNgojEOjRvx1'
 

	
 
        user3_authorized_keys = host.file(os.path.join(user3.home, '.ssh', 'authorized_keys'))
 
        assert open('tests/data/ssh/clientkey3.pub', 'r').read().strip() in user3_authorized_keys.content
 

	
 

	
 
def test_authorized_keys_login(host):
 
    """
 
    Tests if authorized SSH keys for user-provided system users have been set-up
 
    correctly.
 
    """
 

	
 
    class IgnorePolicy(paramiko.client.MissingHostKeyPolicy):
 

	
 
        def missing_host_key(self, client, hostname, key):
 
            pass
 

	
 
    client = paramiko.client.SSHClient()
 
    client.set_missing_host_key_policy(IgnorePolicy())
 

	
 
    # Extract first non-IPv6 IP. Crude test, but it should work.
 
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)
 

	
 
    # No exception will be raised if connection is successful.
 
    client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey1')
 
    client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey2')
 
    client.connect(remote_ip, username="user3", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey3')
 

	
 

	
 
def test_ca_certificates(host):
 
    """
 
    Tests if CA certificates have been correctly deployed to the system.
 
    """
 

	
 
    ca1_cert = host.file('/usr/local/share/ca-certificates/cacert1.crt')
 
    assert ca1_cert.is_file
 
    assert ca1_cert.user == 'root'
 
    assert ca1_cert.group == 'root'
 
    assert ca1_cert.mode == 0o644
 

	
 
    ca1_cert_symlink = host.file('/etc/ssl/certs/cacert1.pem')
 
    assert ca1_cert_symlink.is_symlink
 
    assert ca1_cert_symlink.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'
 

	
 
    ca1_cert_hash_1 = host.file('/etc/ssl/certs/3ce70b58.0')
 
    assert ca1_cert_hash_1.is_symlink
 
    assert ca1_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'
 

	
 
    ca1_cert_hash_1 = host.file('/etc/ssl/certs/49f72a44.0')
 
    assert ca1_cert_hash_1.is_symlink
 
    assert ca1_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'
 

	
 
    ca2_cert = host.file('/usr/local/share/ca-certificates/cacert2.crt')
 
    assert ca2_cert.is_file
 
    assert ca2_cert.user == 'root'
 
    assert ca2_cert.group == 'root'
 
    assert ca2_cert.mode == 0o644
 

	
 
    ca2_cert_symlink = host.file('/etc/ssl/certs/cacert2.pem')
 
    assert ca2_cert_symlink.is_symlink
 
    assert ca2_cert_symlink.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'
 

	
 
    ca2_cert_hash_1 = host.file('/etc/ssl/certs/a52eec00.0')
 
    assert ca2_cert_hash_1.is_symlink
 
    assert ca2_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'
 

	
 
    ca2_cert_hash_1 = host.file('/etc/ssl/certs/a0d2e9e4.0')
 
    assert ca2_cert_hash_1.is_symlink
 
    assert ca2_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'
 

	
 

	
 
def test_ferm_base_rules(host):
 
    """
 
    Tests if base ferm configuration has been deployed correctly with proper
 
    user-provided rate-limiting.
 
    """
 

	
 
    with host.sudo():
 
        ferm_base = host.file('/etc/ferm/conf.d/00-base.conf')
 

	
 
        assert "mod hashlimit hashlimit 5/second hashlimit-burst 5" in ferm_base.content
 

	
 
        iptables = host.command('iptables-save')
 

	
 
        assert iptables.rc == 0
 
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
 
        assert "-A flood -p tcp -m tcp --tcp-flags FIN,SYN,RST,ACK SYN -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
 

	
 
        ip6tables = host.command('ip6tables-save')
 
        assert ip6tables.rc == 0
 
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
 
        assert "-A flood -p ipv6-icmp -m icmp6 --icmpv6-type 128 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in ip6tables.stdout
 

	
 

	
 
def test_pipreqcheck_virtualenv_user(host):
 
    """
 
    Tests if group and user for running pip requirements upgrade checks have
 
    been created correctly with user-provided uid/gid.
 
    """
 

	
 
    group = host.group('pipreqcheck')
 
    assert group.exists
 
    assert group.gid == 2500
 

	
 
    user = host.user('pipreqcheck')
 
    assert user.exists
 
    assert user.home == '/var/lib/pipreqcheck'
 
    assert user.uid == 2500
 
    assert user.group == 'pipreqcheck'
 
    assert user.groups == ['pipreqcheck']
 

	
 

	
 
def test_backup_configuration(host):
 
    """
 
    Tests if backup configuration has been deployed correctly.
 
    """
 

	
 
    with host.sudo():
 

	
 
        common = host.file('/etc/duply/main/patterns/common')
 
        assert common.is_file
 
        assert "/var/log" in common.content.split("\n")
 
        assert "/etc/shadow" in common.content.split("\n")
 
        assert "/var/mail" in common.content.split("\n")
 
        assert "/var/spool/cron" in common.content.split("\n")
 

	
 
        common_extra = host.file('/etc/duply/main/patterns/common_extra')
 
        assert common_extra.is_file
 
        assert "/home/user1" in common_extra.content.split("\n")
 
        assert "/home/user2" in common_extra.content.split("\n")
 

	
 

	
 
def test_ntp_software_installed(host):
 
    """
 
    Tests if NTP packages are installed.
 
    """
 

	
 
    assert host.package('ntp').is_installed
 
    assert host.package('ntpdate').is_installed
 

	
 

	
 
def test_ntp_server_configuration(host):
 
    """
 
    Tests if NTP server has been correctly configured.
 
    """
 

	
 
    with host.sudo():
 

	
 
        # Read the configuration file.
 
        configuration = host.file("/etc/ntp.conf").content.split("\n")
 

	
 
        # Extract only the relevant sections of files (exculde empty
 
        # lines and comments).
 
        configuration = [c.strip() for c in configuration if re.match(r'^\s*(|#.*)$', c) is None]
 

	
 
        # Ensure correct servers have been configured in the pool.
 
        servers = [c for c in configuration if c.startswith('server')]
 

	
 
        expected_servers = ["server 0.debian.pool.ntp.org iburst",
 
                            "server 1.debian.pool.ntp.org iburst",
 
                            "server 2.debian.pool.ntp.org iburst"]
 

	
 
        assert sorted(servers) == sorted(expected_servers)
 

	
 
        # Ensure querying of server is disable for untrusted clients.
 
        restrictions = [c for c in configuration if c.startswith('restrict')]
 
        expected_restrictions = ["restrict -4 default kod notrap nomodify nopeer noquery",
 
                                 "restrict -6 default kod notrap nomodify nopeer noquery",
 
                                 "restrict 127.0.0.1",
 
                                 "restrict ::1"]
 

	
 
        assert sorted(restrictions) == sorted(expected_restrictions)
 

	
 

	
 
def test_ntp_query_server_count(host):
 

	
 
    # Two lines for headers, and one line per configured server.
 
    expected_stdout_line_count = 5
 

	
 
    ntpq = host.command("ntpq -p -n")
 

	
 
    assert ntpq.rc == 0
 
    assert len(ntpq.stdout.split("\n")) == expected_stdout_line_count
 
    assert len(ntpq.stdout.strip().split("\n")) == expected_stdout_line_count
 

	
 

	
 
def test_ntp_listening_interfaces(host):
 
    """
 
    Tests if NTP server is listening on correct ports.
 
    """
 

	
 
    assert host.socket('udp://:::123').is_listening
0 comments (0 inline, 0 general)