Changeset - d6a8b9523eb6
[Not reviewed]
0 3 0
Branko Majic (branko) - 8 years ago 2017-11-19 18:21:04
branko@majic.rs
MAR-127: Added tests for time synchronisation (NTP) implementation:

- Updated test playbook.
- Added tests related to time synchronisation.
3 files changed with 76 insertions and 0 deletions:
0 comments (0 inline, 0 general)
roles/common/playbook.yml
Show inline comments
 
@@ -23,55 +23,61 @@
 
- hosts: parameters-optional
 
  roles:
 
    - role: common
 
      enable_backup: yes
 
      apt_proxy: "http://10.31.127.2:3142/"
 
      os_users:
 
        - name: user1
 
        - name: user2
 
          uid: 2001
 
          additional_groups:
 
            - group1
 
            - group2
 
          authorized_keys:
 
            - "{{ lookup('file', 'tests/data/ssh/clientkey1.pub') }}"
 
            - "{{ lookup('file', 'tests/data/ssh/clientkey2.pub') }}"
 
          # Password is 'user2'.
 
          password: "$6$wdXOQiMe09ugh0$VRIph2XA2QQyEYlAlH7zT4TPACDUalf/4FKpqG9JRHfKxANTcTug2ANCt450htcs0LikJfHLWofLP54jraFU61"
 
        - name: user3
 
          uid: 2002
 
          additional_groups:
 
            - group3
 
          authorized_keys:
 
            - "{{ lookup('file', 'tests/data/ssh/clientkey3.pub') }}"
 
          # Password is 'user3'.
 
          password: "$6$nmx.21uLqT$9LrUqNUgUwIM.l0KFKgr2.kDEwe2lo7IbBIhnG70AGW7GTFdWBUFnGAxH15YxikTXhDJD/uxd.NNgojEOjRvx1"
 
      os_groups:
 
        - name: group1
 
        - name: group2
 
          gid: 3001
 
        - name: group3
 
          gid: 3002
 
      common_packages:
 
        - units
 
        - gnutls-bin
 
        - "{{ 'libmariadb-client-lgpl-dev-compat' if ansible_distribution_release == 'jessie' else 'libmariadbclient-dev-compat' if ansible_distribution_release == 'stretch' }}"
 
        - emacs24-nox
 
      ca_certificates:
 
        cacert1: "{{ lookup('file', 'tests/data/x509/ca1.cert.pem') }}"
 
        cacert2: "{{ lookup('file', 'tests/data/x509/ca2.cert.pem') }}"
 
      extra_backup_patterns:
 
        - /home/user1
 
        - /home/user2
 
      incoming_connection_limit: 5/second
 
      incoming_connection_limit_burst: 5
 
      pipreqcheck_uid: 2500
 
      pipreqcheck_gid: 2500
 
      prompt_colour: cyan
 
      prompt_id: test
 
      # Purposefully set this to 3 servers to make sure we are
 
      # overriding the default configuration.
 
      ntp_servers:
 
        - "0.debian.pool.ntp.org"
 
        - "1.debian.pool.ntp.org"
 
        - "2.debian.pool.ntp.org"
 
      # From backup_client role meta dependency.
 
      backup_encryption_key: "{{ lookup('file', 'tests/data/gnupg/backup_encryption_key') }}"
 
      backup_server: backup-server
 
      backup_server_host_ssh_public_keys:
 
        - bougs-backup-server-key-1
 
        - bougs-backup-server-key-2
 
      backup_ssh_key: "bogus-backup-client-key"
roles/common/tests/test_parameters_mandatory.py
Show inline comments
 
@@ -77,48 +77,69 @@ def test_ferm_base_rules(Command, File, Sudo):
 

	
 
    with Sudo():
 
        ferm_base = File('/etc/ferm/conf.d/00-base.conf')
 

	
 
        assert "mod hashlimit hashlimit 3/second hashlimit-burst 9" in ferm_base.content
 

	
 
        iptables = Command('iptables-save')
 

	
 
        assert iptables.rc == 0
 
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 3/sec --hashlimit-burst 9 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
 
        assert "-A flood -p tcp -m tcp --tcp-flags FIN,SYN,RST,ACK SYN -m hashlimit --hashlimit-upto 3/sec --hashlimit-burst 9 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
 

	
 
        ip6tables = Command('ip6tables-save')
 
        assert ip6tables.rc == 0
 
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 3/sec --hashlimit-burst 9 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
 
        assert "-A flood -p ipv6-icmp -m icmp6 --icmpv6-type 128 -m hashlimit --hashlimit-upto 3/sec --hashlimit-burst 9 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in ip6tables.stdout
 

	
 

	
 
def test_pipreqcheck_virtualenv_user(Group, User):
 
    """
 
    Tests if user/group for running the pip requirements upgrade checks have
 
    been created correctly.
 
    """
 

	
 
    group = Group('pipreqcheck')
 
    assert group.exists
 
    assert group.gid == 1001
 

	
 
    user = User('pipreqcheck')
 
    assert user.exists
 
    assert user.home == '/var/lib/pipreqcheck'
 
    assert user.uid == 1001
 
    assert user.group == 'pipreqcheck'
 
    assert user.groups == ['pipreqcheck']
 

	
 

	
 
def test_backup_configuration_absent(File, Sudo):
 
    """
 
    Tests if backup configuration is absent. This should be the case when only
 
    mandatory parameters are provided.
 
    """
 

	
 
    with Sudo():
 
        assert not File('/etc/duply/main/patterns/common').exists
 

	
 

	
 
def test_ntp_software_not_installed(Package):
 
    """
 
    Tests if NTP packages are absent.
 
    """
 

	
 
    # @TODO: This throws an exception. It seems version of Testinfra
 
    # used cannot properly check for absence of package.
 
    # assert not Package('ntp').is_installed
 
    # assert not Package('ntpdate').is_installed
 

	
 
    pass
 

	
 

	
 
def test_ntp_listening_interfaces(Socket):
 
    """
 
    Tests if NTP server is not listening.
 
    """
 

	
 
    assert not Socket('udp://:::123').is_listening
roles/common/tests/test_parameters_optional.py
Show inline comments
 
import os
 
import re
 
import socket
 

	
 
import paramiko
 

	
 
import testinfra.utils.ansible_runner
 

	
 

	
 
testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
 
    '.molecule/ansible_inventory').get_hosts('parameters-optional')
 

	
 

	
 
def test_apt_proxy(File):
 
    """
 
    Tests if proxy configuration for apt has been deployed correctly.
 
    """
 

	
 
    proxy_config = File('/etc/apt/apt.conf.d/00proxy')
 

	
 
    assert proxy_config.exists
 
    assert proxy_config.user == 'root'
 
    assert proxy_config.group == 'root'
 
    assert proxy_config.mode == 0o644
 

	
 

	
 
def test_bash_prompt_content(File):
 
    """
 
    Tests that custom bash prompt has been configured correctly with specified
 
    colour and prompt.
 
    """
 

	
 
    config = File('/etc/profile.d/bash_prompt.sh')
 

	
 
    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\[\\033[0;36m\\]\\u@\\h[test]:\\w\\$ \\[\\033[0m\\]'" in config.content
 
    assert "export PS1='\\[\\e]0;\\u@\\h: \\w\\a\\]${debian_chroot:+($debian_chroot)}\\u@\\h[test]:\\w\\$ '" in config.content
 

	
 

	
 
def test_common_installed_packages_common(Ansible, Package):
 
    """
 
    Tests that user-provided common packages have been installed.
 
    """
 

	
 
    debian_release = Ansible("setup")["ansible_facts"]["ansible_distribution_release"]
 

	
 
    assert Package('units').is_installed
 
    assert Package('gnutls-bin').is_installed
 

	
 
    # Different name of package in different Debian releases.
 
    if debian_release == 'jessie':
 
@@ -238,48 +239,96 @@ def test_ferm_base_rules(Command, File, Sudo):
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
 
        assert "-A flood -p tcp -m tcp --tcp-flags FIN,SYN,RST,ACK SYN -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
 

	
 
        ip6tables = Command('ip6tables-save')
 
        assert ip6tables.rc == 0
 
        assert "-A flood -p icmp -m icmp --icmp-type 8 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in iptables.stdout
 
        assert "-A flood -p ipv6-icmp -m icmp6 --icmpv6-type 128 -m hashlimit --hashlimit-upto 5/sec --hashlimit-burst 5 " \
 
            "--hashlimit-mode srcip --hashlimit-name icmp -j RETURN" in ip6tables.stdout
 

	
 

	
 
def test_pipreqcheck_virtualenv_user(Group, User):
 
    """
 
    Tests if group and user for running pip requirements upgrade checks have
 
    been created correctly with user-provided uid/gid.
 
    """
 

	
 
    group = Group('pipreqcheck')
 
    assert group.exists
 
    assert group.gid == 2500
 

	
 
    user = User('pipreqcheck')
 
    assert user.exists
 
    assert user.home == '/var/lib/pipreqcheck'
 
    assert user.uid == 2500
 
    assert user.group == 'pipreqcheck'
 
    assert user.groups == ['pipreqcheck']
 

	
 

	
 
def test_backup_configuration(File, Sudo):
 
    """
 
    Tests if backup configuration has been deployed correctly.
 
    """
 

	
 
    with Sudo():
 

	
 
        common = File('/etc/duply/main/patterns/common')
 
        assert common.is_file
 
        assert "/var/log" in common.content.split("\n")
 
        assert "/etc/shadow" in common.content.split("\n")
 
        assert "/var/mail" in common.content.split("\n")
 
        assert "/var/spool/cron" in common.content.split("\n")
 

	
 
        common_extra = File('/etc/duply/main/patterns/common_extra')
 
        assert common_extra.is_file
 
        assert "/home/user1" in common_extra.content.split("\n")
 
        assert "/home/user2" in common_extra.content.split("\n")
 

	
 

	
 
def test_ntp_software_installed(Package):
 
    """
 
    Tests if NTP packages are installed.
 
    """
 

	
 
    assert Package('ntp').is_installed
 
    assert Package('ntpdate').is_installed
 

	
 

	
 
def test_ntp_server_configuration(File, Sudo):
 
    """
 
    Tests if NTP server has been correctly configured.
 
    """
 

	
 
    with Sudo():
 

	
 
        # Read the configuration file.
 
        configuration = File("/etc/ntp.conf").content.split("\n")
 

	
 
        # Extract only the relevant sections of files (exculde empty
 
        # lines and comments).
 
        configuration = [c.strip() for c in configuration if re.match('^\s*(|#.*)$', c) is None]
 

	
 
        # Ensure correct servers have been configured in the pool.
 
        servers = [c for c in configuration if c.startswith('server')]
 

	
 
        expected_servers = ["server 0.debian.pool.ntp.org iburst",
 
                            "server 1.debian.pool.ntp.org iburst",
 
                            "server 2.debian.pool.ntp.org iburst"]
 

	
 
        assert sorted(servers) == sorted(expected_servers)
 

	
 
        # Ensure querying of server is disable for untrusted clients.
 
        restrictions = [c for c in configuration if c.startswith('restrict')]
 
        expected_restrictions = ["restrict -4 default kod notrap nomodify nopeer noquery notrust",
 
                                 "restrict -6 default kod notrap nomodify nopeer noquery notrust"]
 

	
 
        assert sorted(restrictions) == sorted(expected_restrictions)
 

	
 

	
 
def test_ntp_listening_interfaces(Socket):
 
    """
 
    Tests if NTP server is listening on correct ports.
 
    """
 

	
 
    assert Socket('udp://:::123').is_listening
0 comments (0 inline, 0 general)