import os import re import socket import paramiko import testinfra.utils.ansible_runner import pytest testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner( os.environ['MOLECULE_INVENTORY_FILE']).get_hosts('parameters-optional') def test_apt_proxy(host): """ Tests if proxy configuration for apt has been deployed correctly. """ proxy_config = host.file('/etc/apt/apt.conf.d/00proxy') assert proxy_config.exists assert proxy_config.user == 'root' assert proxy_config.group == 'root' assert proxy_config.mode == 0o644 def test_bash_prompt_content(host): """ Tests that custom bash prompt has been configured correctly with specified colour and prompt. """ config = host.file('/etc/profile.d/bash_prompt.sh') assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\[\\033[0;36m\\]\\u@\\h[test]:\\w\\$ \\[\\033[0m\\]'" in \ config.content_string assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\u@\\h[test]:\\w\\$ '" in config.content_string def test_common_packages_are_installed(host): """ Tests that user-provided common packages have been installed. """ assert host.package('units').is_installed assert host.package('gnutls-bin').is_installed assert host.package('emacs-nox').is_installed def test_ssh_login_mechanisms(host): """ Tests available SSH login mechanisms (should be just public key). """ # Extract first non-IPv6 IP. Crude test, but it should work. remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a) sock = socket.socket() sock.connect((remote_ip, 22)) transport = paramiko.transport.Transport(sock) transport.connect() try: transport.auth_none('') except paramiko.transport.BadAuthenticationType as err: assert err.allowed_types == ['publickey'] def test_emacs_electric_indent_mode(host): """ Tests if Emacs electric indent mode has been disabled via custom configuration file. """ emacs_config = host.file('/etc/emacs/site-start.d/01disable-electric-indent-mode.el') assert emacs_config.is_file assert emacs_config.user == 'root' assert emacs_config.group == 'root' assert emacs_config.mode == 0o644 assert "(electric-indent-mode -1)" in emacs_config.content_string def test_os_groups(host): """ Tests if user-supplied system groups have been created correctly. """ group1 = host.group('group1') assert group1.gid == 1001 group2 = host.group('group2') assert group2.gid == 3001 group3 = host.group('group3') assert group3.gid == 3002 user1_group = host.group('user1') assert user1_group.gid == 3003 user2_group = host.group('user2') assert user2_group.gid == 2001 user3_group = host.group('user3') assert user3_group.gid == 2002 def test_os_users(host): """ Tests if user-supplied system users have been created correctly. """ with host.sudo(): user1 = host.user('user1') assert user1.uid == 1001 assert user1.group == 'user1' assert user1.groups == ['user1'] assert user1.shell == '/bin/bash' assert user1.password == '!' user1_authorized_keys = host.file(os.path.join(user1.home, '.ssh', 'authorized_keys')) assert not user1_authorized_keys.exists user2 = host.user('user2') assert user2.uid == 2001 assert user2.group == 'user2' assert sorted(user2.groups) == sorted(['group1', 'group2', 'user2']) assert user2.shell == '/bin/bash' assert user2.password == '$6$wdXOQiMe09ugh0$VRIph2XA2QQyEYlAlH7zT4TPACDUalf/4FKpqG9JRHfKxANTcTug2ANCt450htcs0LikJfHLWofLP54jraFU61' user2_authorized_keys = host.file(os.path.join(user2.home, '.ssh', 'authorized_keys')) assert open('tests/data/ssh/clientkey1.pub', 'r').read().strip() in user2_authorized_keys.content_string assert open('tests/data/ssh/clientkey2.pub', 'r').read().strip() in user2_authorized_keys.content_string user3 = host.user('user3') assert user3.uid == 2002 assert user3.group == 'user3' assert sorted(user3.groups) == sorted(['group3', 'user3']) assert user3.shell == '/bin/bash' assert user3.password == '$6$nmx.21uLqT$9LrUqNUgUwIM.l0KFKgr2.kDEwe2lo7IbBIhnG70AGW7GTFdWBUFnGAxH15YxikTXhDJD/uxd.NNgojEOjRvx1' user3_authorized_keys = host.file(os.path.join(user3.home, '.ssh', 'authorized_keys')) assert open('tests/data/ssh/clientkey3.pub', 'r').read().strip() in user3_authorized_keys.content_string def test_authorized_keys_login(host): """ Tests if authorized SSH keys for user-provided system users have been set-up correctly. """ class IgnorePolicy(paramiko.client.MissingHostKeyPolicy): def missing_host_key(self, client, hostname, key): pass client = paramiko.client.SSHClient() client.set_missing_host_key_policy(IgnorePolicy()) # Extract first non-IPv6 IP. Crude test, but it should work. remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a) # No exception will be raised if connection is successful. client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey1') client.connect(remote_ip, username="user2", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey2') client.connect(remote_ip, username="user3", allow_agent=False, look_for_keys=False, key_filename='tests/data/ssh/clientkey3') @pytest.mark.parametrize('ca_certificate_basename', [ 'cacert1', 'cacert2', ]) def test_ca_certificates(host, ca_certificate_basename): """ Tests if CA certificates have been correctly deployed to the system. """ ca_certificate_path = '/usr/local/share/ca-certificates/%s.crt' % ca_certificate_basename ca_certificate_symlink_path = '/etc/ssl/certs/%s.pem' % ca_certificate_basename ca_certificate_hash = host.run('openssl x509 -hash -noout -in %s', ca_certificate_path).stdout.strip() ca_certificate_hash_symlink_path = '/etc/ssl/certs/%s.0' % ca_certificate_hash ca_certificate = host.file(ca_certificate_path) ca_certificate_symlink = host.file(ca_certificate_symlink_path) ca_certificate_hash_symlink = host.file(ca_certificate_hash_symlink_path) assert ca_certificate.is_file assert ca_certificate.user == 'root' assert ca_certificate.group == 'root' assert ca_certificate.mode == 0o644 assert ca_certificate_symlink.is_symlink assert ca_certificate_symlink.linked_to == ca_certificate_path assert ca_certificate_hash_symlink.is_symlink assert ca_certificate_hash_symlink.linked_to == ca_certificate_path def test_ferm_base_rules(host): """ Tests if base ferm configuration has been deployed correctly with proper user-provided rate-limiting. """ with host.sudo(): ferm_base = host.file('/etc/ferm/conf.d/00-base.conf') assert "mod hashlimit hashlimit 5/second hashlimit-burst 5" in ferm_base.content_string iptables = host.command('iptables-save') assert iptables.rc == 0 assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \ "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout assert "-A flood -p tcp -m tcp --tcp-flags FIN,SYN,RST,ACK SYN -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \ "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout ip6tables = host.command('ip6tables-save') assert ip6tables.rc == 0 assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \ "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout assert "-A flood -p ipv6-icmp -m icmp6 --icmpv6-type 128 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \ "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in ip6tables.stdout def test_pipreqcheck_virtualenv_user(host): """ Tests if group and user for running pip requirements upgrade checks have been created correctly with user-provided uid/gid. """ group = host.group('pipreqcheck') assert group.exists assert group.gid == 2500 user = host.user('pipreqcheck') assert user.exists assert user.home == '/var/lib/pipreqcheck' assert user.uid == 2500 assert user.group == 'pipreqcheck' assert user.groups == ['pipreqcheck'] def test_backup_configuration(host): """ Tests if backup configuration has been deployed correctly. """ with host.sudo(): common = host.file('/etc/duply/main/patterns/common') assert common.is_file assert "/var/log" in common.content_string.split("\n") assert "/etc/shadow" in common.content_string.split("\n") assert "/var/mail" in common.content_string.split("\n") assert "/var/spool/cron" in common.content_string.split("\n") common_extra = host.file('/etc/duply/main/patterns/common_extra') assert common_extra.is_file assert "/home/user1" in common_extra.content_string.split("\n") assert "/home/user2" in common_extra.content_string.split("\n") def test_ntp_software_installed(host): """ Tests if NTP packages are installed. """ assert host.package('ntpsec').is_installed assert host.package('ntpsec-ntpdate').is_installed def test_ntp_server_configuration(host): """ Tests if NTP server has been correctly configured. """ with host.sudo(): # Check for presence of the configuration file. configuration_file = host.file("/etc/ntpsec/ntp.conf") assert configuration_file.exists assert configuration_file.user == 'root' assert configuration_file.group == 'root' assert configuration_file.mode == 0o644 # Extract relevant sections of configuration (exclude empty # lines and comments). configuration = configuration_file.content_string.split("\n") configuration = [c.strip() for c in configuration if re.match(r'^\s*(|#.*)$', c) is None] # Ensure correct pools have been configured. pools = [c for c in configuration if c.startswith('pool')] expected_pools = ["pool 0.debian.pool.ntp.org iburst", "pool 1.debian.pool.ntp.org iburst", "pool 2.debian.pool.ntp.org iburst"] assert sorted(pools) == sorted(expected_pools) # Ensure querying of server is disabled for untrusted clients. restrictions = [c for c in configuration if c.startswith('restrict')] expected_restrictions = ["restrict default kod nomodify nopeer noquery limited", "restrict 127.0.0.1", "restrict ::1"] assert sorted(restrictions) == sorted(expected_restrictions) def test_ntp_runtime_pool_count(host): ntpq = host.command("ntpq -p -n") assert ntpq.rc == 0 # We expect 3 pools, as requested via role parameter. ntpq_pool_info = [line for line in ntpq.stdout.split("\n") if ".POOL." in line] assert len(ntpq_pool_info) == 3 def test_ntp_listening_interfaces(host): """ Tests if NTP server is listening on correct ports. """ assert host.socket('udp://:::123').is_listening def test_pipreqcheck_input_content(host): """ Tests content of requirements input file used for virtual environment utilised by script that performs pip requirements upgrade checks. """ requirements_path = '/etc/pip_check_requirements_upgrades/pipreqcheck/requirements.in' expected_requirements = [ "pip >= 0.3.1", "pip-tools >= 0.3.2", "setuptools >= 0.3.3", "wheel >= 0.3.4" ] with host.sudo(): deployed_requirements = host.file(requirements_path).content_string expected_requirements = sorted([line.lower() for line in expected_requirements]) actual_requirements = sorted(deployed_requirements.lower().strip().split("\n")) assert actual_requirements == expected_requirements