Changeset - 693d6960b1a0
[Not reviewed]
0 2 0
Branko Majic (branko) - 6 years ago 2020-01-06 16:54:36
branko@majic.rs
MAR-148: Fix test for the common role:

- Do some additional stripping on command outputs to get rid of extra
new lines.
2 files changed with 3 insertions and 3 deletions:
0 comments (0 inline, 0 general)
roles/common/molecule/default/tests/test_default.py
Show inline comments
 
@@ -144,264 +144,264 @@ def test_remote_login_via_password_disabled(host):
 
def test_ferm_service_configuration(host):
 

	
 
    ferm_service_config = host.file('/etc/default/ferm')
 

	
 
    assert ferm_service_config.is_file
 
    assert ferm_service_config.user == 'root'
 
    assert ferm_service_config.group == 'root'
 
    assert ferm_service_config.mode == 0o644
 
    assert 'FAST=yes' in ferm_service_config.content
 
    assert 'CACHE=no' in ferm_service_config.content
 
    assert 'ENABLED="yes"' in ferm_service_config.content
 

	
 

	
 
def test_ferm_configuration_directory(host):
 
    """
 
    Tests creation of ferm configuration directory.
 
    """
 

	
 
    with host.sudo():
 
        ferm_dir = host.file('/etc/ferm/conf.d')
 

	
 
        assert ferm_dir.is_directory
 
        assert ferm_dir.user == 'root'
 
        assert ferm_dir.group == 'root'
 
        assert ferm_dir.mode == 0o750
 

	
 

	
 
def test_ferm_configuration(host):
 
    """
 
    Tests deployment of basic ferm configuration files.
 
    """
 

	
 
    with host.sudo():
 

	
 
        ferm_configuration = host.file('/etc/ferm/ferm.conf')
 
        assert ferm_configuration.is_file
 
        assert ferm_configuration.user == 'root'
 
        assert ferm_configuration.group == 'root'
 
        assert ferm_configuration.mode == 0o640
 
        assert "@include '/etc/ferm/conf.d/';" in ferm_configuration.content
 

	
 
        ferm_base = host.file('/etc/ferm/conf.d/00-base.conf')
 
        assert ferm_base.is_file
 
        assert ferm_base.user == 'root'
 
        assert ferm_base.group == 'root'
 
        assert ferm_base.mode == 0o640
 

	
 

	
 
def test_ferm_service(host):
 
    """
 
    Tests if ferm is started and enabled to start automatically on boot.
 
    """
 

	
 
    ferm = host.service('ferm')
 

	
 
    assert ferm.is_running
 
    assert ferm.is_enabled
 

	
 

	
 
def test_check_certificate_script(host):
 

	
 
    check_certificate = host.file('/usr/local/bin/check_certificate.sh')
 

	
 
    assert check_certificate.is_file
 
    assert check_certificate.user == 'root'
 
    assert check_certificate.group == 'root'
 
    assert check_certificate.mode == 0o755
 

	
 

	
 
def test_check_certificate_directory(host):
 

	
 
    check_certificate_dir = host.file('/etc/check_certificate')
 

	
 
    assert check_certificate_dir.is_directory
 
    assert check_certificate_dir.user == 'root'
 
    assert check_certificate_dir.group == 'root'
 
    assert check_certificate_dir.mode == 0o755
 

	
 

	
 
def test_check_certificate_crontab(host):
 
    """
 
    Tests deployment of cron job for checking certificates.
 
    """
 

	
 
    check_certificate_crontab = host.file('/etc/cron.d/check_certificate')
 

	
 
    assert check_certificate_crontab.is_file
 
    assert check_certificate_crontab.user == 'root'
 
    assert check_certificate_crontab.group == 'root'
 
    assert check_certificate_crontab.mode == 0o644
 
    assert "0 0 * * * nobody /usr/local/bin/check_certificate.sh -q expiration" in check_certificate_crontab.content
 

	
 

	
 
@pytest.mark.parametrize('virtualenv_activate_path', [
 
    '/var/lib/pipreqcheck/virtualenv/bin/activate',
 
    '/var/lib/pipreqcheck/virtualenv-py3/bin/activate',
 
])
 
def test_pipreqcheck_virtualenv(host, virtualenv_activate_path):
 
    """
 
    Tests creation of Python virtual environment used for performing pip
 
    requirements upgrade checks.
 
    """
 

	
 
    with host.sudo():
 
        virtualenv_activate = host.file(virtualenv_activate_path)
 

	
 
        assert virtualenv_activate.is_file
 
        assert virtualenv_activate.user == 'pipreqcheck'
 
        assert virtualenv_activate.group == 'pipreqcheck'
 
        # @TODO: Possibly due to some timing issues, this file might
 
        # sometimes end-up being 0640, sometimes 0644.
 
        # assert virtualenv_activate.mode == 0o644
 

	
 

	
 
@pytest.mark.parametrize('config_dir', [
 
    '/etc/pip_check_requirements_upgrades',
 
    '/etc/pip_check_requirements_upgrades-py3',
 
])
 
def test_pipreqcheck_directories(host, config_dir):
 
    """
 
    Tests creation of directories used for storing configuration used by script
 
    that performs pip requirements upgrade checks.
 
    """
 

	
 
    with host.sudo():
 
        pipreqcheck_config_directory = host.file(config_dir)
 
        assert pipreqcheck_config_directory.is_directory
 
        assert pipreqcheck_config_directory.user == 'root'
 
        assert pipreqcheck_config_directory.group == 'pipreqcheck'
 
        assert pipreqcheck_config_directory.mode == 0o750
 

	
 
        pipreqcheck_config_directory_pipreqcheck = host.file(os.path.join(config_dir, 'pipreqcheck'))
 
        assert pipreqcheck_config_directory_pipreqcheck.is_directory
 
        assert pipreqcheck_config_directory_pipreqcheck.user == 'root'
 
        assert pipreqcheck_config_directory_pipreqcheck.group == 'pipreqcheck'
 
        assert pipreqcheck_config_directory_pipreqcheck.mode == 0o750
 

	
 

	
 
@pytest.mark.parametrize('requirements_in_path, requirements_txt_path', [
 
    ('/etc/pip_check_requirements_upgrades/pipreqcheck/requirements.in',
 
     '/etc/pip_check_requirements_upgrades/pipreqcheck/requirements.txt'),
 
    ('/etc/pip_check_requirements_upgrades-py3/pipreqcheck/requirements.in',
 
     '/etc/pip_check_requirements_upgrades-py3/pipreqcheck/requirements.txt'),
 
])
 
def test_pipreqcheck_requirements(host, requirements_in_path, requirements_txt_path):
 
    """
 
    Tests deployment of requirements input and text file used for virtual
 
    environment utilised by script that perform pip requirements upgrade checks.
 
    """
 

	
 
    with host.sudo():
 
        requirements_in = host.file(requirements_in_path)
 
        assert requirements_in.is_file
 
        assert requirements_in.user == 'root'
 
        assert requirements_in.group == 'pipreqcheck'
 
        assert requirements_in.mode == 0o640
 

	
 
        requirements_txt = host.file(requirements_txt_path)
 
        requirements_txt.is_file
 
        assert requirements_txt.user == 'root'
 
        assert requirements_txt.group == 'pipreqcheck'
 
        assert requirements_txt.mode == 0o640
 

	
 

	
 
@pytest.mark.parametrize("pip_path, expected_packages",  [
 
    ('/var/lib/pipreqcheck/virtualenv/bin/pip', [
 
        "Click==7.0",
 
        "pip==19.2.3",
 
        "pip-tools==4.0.0",
 
        "setuptools==41.2.0",
 
        "six==1.12.0",
 
        "wheel==0.33.6",
 
    ]),
 
    ('/var/lib/pipreqcheck/virtualenv-py3/bin/pip', [
 
        "Click==7.0",
 
        "pip==19.1.1",
 
        "pip-tools==3.9.0",
 
        "setuptools==41.2.0",
 
        "six==1.12.0",
 
        "wheel==0.33.6",
 
    ]),
 
])
 
def test_pipreqcheck_virtualenv_packages(host, pip_path, expected_packages):
 
    """
 
    Tests if correct packages are installed in virtualenv used for pip
 
    requirements checks..
 
    """
 

	
 
    packages = host.run("sudo -u %s %s freeze --all" % ('pipreqcheck', pip_path))
 

	
 
    # Normalise package names and order.
 
    expected_packages = sorted([unicode(p.lower()) for p in expected_packages])
 
    actual_packages = sorted(packages.stdout.lower().split("\n"))
 
    actual_packages = sorted(packages.stdout.lower().strip().split("\n"))
 

	
 
    # This is a dummy distro-provided package ignored by the pip-tools.
 
    if "pkg-resources==0.0.0" in actual_packages:
 
        actual_packages.remove("pkg-resources==0.0.0")
 

	
 
    assert actual_packages == expected_packages
 

	
 

	
 
def test_pipreqcheck_script(host):
 
    """
 
    Tests script used for performing pip requirements upgrade checks.
 
    """
 

	
 
    pipreqcheck_script = host.file('/usr/local/bin/pip_check_requirements_upgrades.sh')
 

	
 
    assert pipreqcheck_script.is_file
 
    assert pipreqcheck_script.user == 'root'
 
    assert pipreqcheck_script.group == 'root'
 
    assert pipreqcheck_script.mode == 0o755
 

	
 

	
 
@pytest.mark.parametrize('crontab_path, virtualenv_path', [
 
    ('/etc/cron.d/check_pip_requirements', '/var/lib/pipreqcheck/virtualenv'),
 
    ('/etc/cron.d/check_pip_requirements-py3', '/var/lib/pipreqcheck/virtualenv-py3'),
 
])
 
def test_pipreqcheck_crontab(host, crontab_path, virtualenv_path):
 
    """
 
    Tests if crontab entry is set-up correctly for running the pip requirements
 
    upgrade checks.
 
    """
 

	
 
    crontab = host.file(crontab_path)
 

	
 
    assert crontab.is_file
 
    assert crontab.user == 'root'
 
    assert crontab.group == 'root'
 
    assert crontab.mode == 0o644
 
    assert "MAILTO=root" in crontab.content
 
    assert virtualenv_path in crontab.content.split(" ")
 

	
 

	
 
@pytest.mark.parametrize('python_path, expected_major_version', [
 
    ('/var/lib/pipreqcheck/virtualenv/bin/python', '2'),
 
    ('/var/lib/pipreqcheck/virtualenv-py3/bin/python', '3'),
 
])
 
def test_pipreqcheck_virtualenv_python_version(host, python_path, expected_major_version):
 
    """
 
    Tests if Python virtual environment for pipreqcheck has been
 
    set-up correctly.
 
    """
 

	
 
    with host.sudo('pipreqcheck'):
 
        major_version = host.run("%s -c %s", python_path, "import sys; print(sys.version_info.major)")
 

	
 
    assert major_version.rc == 0
 
    assert major_version.stdout == expected_major_version
 
    assert major_version.stdout.strip() == expected_major_version
 

	
 

	
 
@pytest.mark.parametrize('wrong_python_path', [
 
    '/var/lib/pipreqcheck/virtualenv/bin/python3',
 
    '/var/lib/pipreqcheck/virtualenv-py3/bin/python2',
 
])
 
def test_pipreqcheck_virtualenv_wrong_python_version_not_present(host, wrong_python_path):
 
    """
 
    Tests if wrong version of Python 2 is absent or not.
 
    """
 

	
 
    with host.sudo():
 
        wrong_python_path_file = host.file(wrong_python_path)
 

	
 
        assert not wrong_python_path_file.exists
roles/common/molecule/default/tests/test_parameters_optional.py
Show inline comments
 
@@ -158,201 +158,201 @@ def test_os_users(host):
 

	
 
        user3 = host.user('user3')
 
        assert user3.uid == 2002
 
        assert user3.group == 'user3'
 
        assert sorted(user3.groups) == sorted(['group3', 'user3'])
 
        assert user3.shell == '/bin/bash'
 
        assert user3.password == '$6$nmx.21uLqT$9LrUqNUgUwIM.l0KFKgr2.kDEwe2lo7IbBIhnG70AGW7GTFdWBUFnGAxH15YxikTXhDJD/uxd.NNgojEOjRvx1'
 

	
 
        user3_authorized_keys = host.file(os.path.join(user3.home, '.ssh', 'authorized_keys'))
 
        assert open('tests/data/ssh/clientkey3.pub', 'r').read().strip() in user3_authorized_keys.content
 

	
 

	
 
def test_authorized_keys_login(host):
 
    """
 
    Tests if authorized SSH keys for user-provided system users have been set-up
 
    correctly.
 
    """
 

	
 
    class IgnorePolicy(paramiko.client.MissingHostKeyPolicy):
 

	
 
        def missing_host_key(self, client, hostname, key):
 
            pass
 

	
 
    client = paramiko.client.SSHClient()
 
    client.set_missing_host_key_policy(IgnorePolicy())
 

	
 
    # Extract first non-IPv6 IP. Crude test, but it should work.
 
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)
 

	
 
    # No exception will be raised if connection is successful.
 
    client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey1')
 
    client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey2')
 
    client.connect(remote_ip, username="user3", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey3')
 

	
 

	
 
def test_ca_certificates(host):
 
    """
 
    Tests if CA certificates have been correctly deployed to the system.
 
    """
 

	
 
    ca1_cert = host.file('/usr/local/share/ca-certificates/cacert1.crt')
 
    assert ca1_cert.is_file
 
    assert ca1_cert.user == 'root'
 
    assert ca1_cert.group == 'root'
 
    assert ca1_cert.mode == 0o644
 

	
 
    ca1_cert_symlink = host.file('/etc/ssl/certs/cacert1.pem')
 
    assert ca1_cert_symlink.is_symlink
 
    assert ca1_cert_symlink.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'
 

	
 
    ca1_cert_hash_1 = host.file('/etc/ssl/certs/3ce70b58.0')
 
    assert ca1_cert_hash_1.is_symlink
 
    assert ca1_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'
 

	
 
    ca1_cert_hash_1 = host.file('/etc/ssl/certs/49f72a44.0')
 
    assert ca1_cert_hash_1.is_symlink
 
    assert ca1_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert1.crt'
 

	
 
    ca2_cert = host.file('/usr/local/share/ca-certificates/cacert2.crt')
 
    assert ca2_cert.is_file
 
    assert ca2_cert.user == 'root'
 
    assert ca2_cert.group == 'root'
 
    assert ca2_cert.mode == 0o644
 

	
 
    ca2_cert_symlink = host.file('/etc/ssl/certs/cacert2.pem')
 
    assert ca2_cert_symlink.is_symlink
 
    assert ca2_cert_symlink.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'
 

	
 
    ca2_cert_hash_1 = host.file('/etc/ssl/certs/a52eec00.0')
 
    assert ca2_cert_hash_1.is_symlink
 
    assert ca2_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'
 

	
 
    ca2_cert_hash_1 = host.file('/etc/ssl/certs/a0d2e9e4.0')
 
    assert ca2_cert_hash_1.is_symlink
 
    assert ca2_cert_hash_1.linked_to == '/usr/local/share/ca-certificates/cacert2.crt'
 

	
 

	
 
def test_ferm_base_rules(host):
 
    """
 
    Tests if base ferm configuration has been deployed correctly with proper
 
    user-provided rate-limiting.
 
    """
 

	
 
    with host.sudo():
 
        ferm_base = host.file('/etc/ferm/conf.d/00-base.conf')
 

	
 
        assert "mod hashlimit hashlimit 5/second hashlimit-burst 5" in ferm_base.content
 

	
 
        iptables = host.command('iptables-save')
 

	
 
        assert iptables.rc == 0
 
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
 
        assert "-A flood -p tcp -m tcp --tcp-flags FIN,SYN,RST,ACK SYN -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
 

	
 
        ip6tables = host.command('ip6tables-save')
 
        assert ip6tables.rc == 0
 
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
 
        assert "-A flood -p ipv6-icmp -m icmp6 --icmpv6-type 128 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in ip6tables.stdout
 

	
 

	
 
def test_pipreqcheck_virtualenv_user(host):
 
    """
 
    Tests if group and user for running pip requirements upgrade checks have
 
    been created correctly with user-provided uid/gid.
 
    """
 

	
 
    group = host.group('pipreqcheck')
 
    assert group.exists
 
    assert group.gid == 2500
 

	
 
    user = host.user('pipreqcheck')
 
    assert user.exists
 
    assert user.home == '/var/lib/pipreqcheck'
 
    assert user.uid == 2500
 
    assert user.group == 'pipreqcheck'
 
    assert user.groups == ['pipreqcheck']
 

	
 

	
 
def test_backup_configuration(host):
 
    """
 
    Tests if backup configuration has been deployed correctly.
 
    """
 

	
 
    with host.sudo():
 

	
 
        common = host.file('/etc/duply/main/patterns/common')
 
        assert common.is_file
 
        assert "/var/log" in common.content.split("\n")
 
        assert "/etc/shadow" in common.content.split("\n")
 
        assert "/var/mail" in common.content.split("\n")
 
        assert "/var/spool/cron" in common.content.split("\n")
 

	
 
        common_extra = host.file('/etc/duply/main/patterns/common_extra')
 
        assert common_extra.is_file
 
        assert "/home/user1" in common_extra.content.split("\n")
 
        assert "/home/user2" in common_extra.content.split("\n")
 

	
 

	
 
def test_ntp_software_installed(host):
 
    """
 
    Tests if NTP packages are installed.
 
    """
 

	
 
    assert host.package('ntp').is_installed
 
    assert host.package('ntpdate').is_installed
 

	
 

	
 
def test_ntp_server_configuration(host):
 
    """
 
    Tests if NTP server has been correctly configured.
 
    """
 

	
 
    with host.sudo():
 

	
 
        # Read the configuration file.
 
        configuration = host.file("/etc/ntp.conf").content.split("\n")
 

	
 
        # Extract only the relevant sections of files (exculde empty
 
        # lines and comments).
 
        configuration = [c.strip() for c in configuration if re.match(r'^\s*(|#.*)$', c) is None]
 

	
 
        # Ensure correct servers have been configured in the pool.
 
        servers = [c for c in configuration if c.startswith('server')]
 

	
 
        expected_servers = ["server 0.debian.pool.ntp.org iburst",
 
                            "server 1.debian.pool.ntp.org iburst",
 
                            "server 2.debian.pool.ntp.org iburst"]
 

	
 
        assert sorted(servers) == sorted(expected_servers)
 

	
 
        # Ensure querying of server is disable for untrusted clients.
 
        restrictions = [c for c in configuration if c.startswith('restrict')]
 
        expected_restrictions = ["restrict -4 default kod notrap nomodify nopeer noquery",
 
                                 "restrict -6 default kod notrap nomodify nopeer noquery",
 
                                 "restrict 127.0.0.1",
 
                                 "restrict ::1"]
 

	
 
        assert sorted(restrictions) == sorted(expected_restrictions)
 

	
 

	
 
def test_ntp_query_server_count(host):
 

	
 
    # Two lines for headers, and one line per configured server.
 
    expected_stdout_line_count = 5
 

	
 
    ntpq = host.command("ntpq -p -n")
 

	
 
    assert ntpq.rc == 0
 
    assert len(ntpq.stdout.split("\n")) == expected_stdout_line_count
 
    assert len(ntpq.stdout.strip().split("\n")) == expected_stdout_line_count
 

	
 

	
 
def test_ntp_listening_interfaces(host):
 
    """
 
    Tests if NTP server is listening on correct ports.
 
    """
 

	
 
    assert host.socket('udp://:::123').is_listening
0 comments (0 inline, 0 general)