Changeset - ef201fa5ec5f
[Not reviewed]
14 1 17
Branko Majic (branko) - 6 years ago 2017-11-25 20:18:55
branko@majic.rs
MAR-128: Upgraded tests for backup_server role:

- Switch to new Molecule configuration.
- Updated set-up playbook to use become: yes.
- Moved some preparatory steps outside of the main playbook (eases
idempotence tests).
- Updated tests to reference the yml inventory file.
- Updated tests to use new fixture (host instead of individual ones).
- Switched to extracting IP address instead of hard-coding it in a
couple of tests.
- Moved test for checking available authentication mechanisms for
backup SSH server to be part of testing of parameters_optional only
for now (it was hard coded to that IP, and fails on
parameters-mandatory due to iptables not opening correct ports).
20 files changed with 449 insertions and 317 deletions:
0 comments (0 inline, 0 general)
roles/backup_server/meta/main.yml
Show inline comments
 
---
 

	
 
dependencies:
 
  - common
 
\ No newline at end of file
 
  - common
roles/backup_server/molecule.yml
Show inline comments
 
deleted file
roles/backup_server/molecule/default/create.yml
Show inline comments
 
new file 100644
 
---
 
- name: Create
 
  hosts: localhost
 
  connection: local
 
  gather_facts: False
 
  no_log: "{{ not lookup('env', 'MOLECULE_DEBUG') | bool }}"
 
  vars:
 
    molecule_file: "{{ lookup('env', 'MOLECULE_FILE') }}"
 
    molecule_instance_config: "{{ lookup('env', 'MOLECULE_INSTANCE_CONFIG') }}"
 
    molecule_yml: "{{ lookup('file', molecule_file) | molecule_from_yaml }}"
 
  tasks:
 
    - name: Create molecule instance(s)
 
      molecule_vagrant:
 
        instance_name: "{{ item.name }}"
 
        instance_interfaces: "{{ item.interfaces | default(omit) }}"
 
        instance_raw_config_args: "{{ item.instance_raw_config_args | default(omit) }}"
 

	
 
        platform_box: "{{ item.box }}"
 
        platform_box_version: "{{ item.box_version | default(omit) }}"
 
        platform_box_url: "{{ item.box_url | default(omit) }}"
 

	
 
        provider_name: "{{ molecule_yml.driver.provider.name }}"
 
        provider_memory: "{{ item.memory | default(omit) }}"
 
        provider_cpus: "{{ item.cpus | default(omit) }}"
 
        provider_raw_config_args: "{{ item.raw_config_args | default(omit) }}"
 

	
 
        state: up
 
      register: server
 
      with_items: "{{ molecule_yml.platforms }}"
 

	
 
    # Mandatory configuration for Molecule to function.
 

	
 
    - name: Populate instance config dict
 
      set_fact:
 
        instance_conf_dict: {
 
          'instance': "{{ item.Host }}",
 
          'address': "{{ item.HostName }}",
 
          'user': "{{ item.User }}",
 
          'port': "{{ item.Port }}",
 
          'identity_file': "{{ item.IdentityFile }}", }
 
      with_items: "{{ server.results }}"
 
      register: instance_config_dict
 
      when: server.changed | bool
 

	
 
    - name: Convert instance config dict to a list
 
      set_fact:
 
        instance_conf: "{{ instance_config_dict.results | map(attribute='ansible_facts.instance_conf_dict') | list }}"
 
      when: server.changed | bool
 

	
 
    - name: Dump instance config
 
      copy:
 
        # NOTE(retr0h): Workaround for Ansible 2.2.
 
        #               https://github.com/ansible/ansible/issues/20885
 
        content: "{{ instance_conf | to_json | from_json | molecule_to_yaml | molecule_header }}"
 
        dest: "{{ molecule_instance_config }}"
 
      when: server.changed | bool
roles/backup_server/molecule/default/destroy.yml
Show inline comments
 
new file 100644
 
---
 

	
 
- name: Destroy
 
  hosts: localhost
 
  connection: local
 
  gather_facts: False
 
  no_log: "{{ not lookup('env', 'MOLECULE_DEBUG') | bool }}"
 
  vars:
 
    molecule_file: "{{ lookup('env', 'MOLECULE_FILE') }}"
 
    molecule_instance_config: "{{ lookup('env',' MOLECULE_INSTANCE_CONFIG') }}"
 
    molecule_yml: "{{ lookup('file', molecule_file) | molecule_from_yaml }}"
 
  tasks:
 
    - name: Destroy molecule instance(s)
 
      molecule_vagrant:
 
        instance_name: "{{ item.name }}"
 
        platform_box: "{{ item.box }}"
 
        provider_name: "{{ molecule_yml.driver.provider.name }}"
 
        force_stop: "{{ item.force_stop | default(True) }}"
 

	
 
        state: destroy
 
      register: server
 
      with_items: "{{ molecule_yml.platforms }}"
 

	
 
    # Mandatory configuration for Molecule to function.
 

	
 
    - name: Populate instance config
 
      set_fact:
 
        instance_conf: {}
 

	
 
    - name: Dump instance config
 
      copy:
 
        # NOTE(retr0h): Workaround for Ansible 2.2.
 
        #               https://github.com/ansible/ansible/issues/20885
 
        content: "{{ instance_conf | to_json | from_json | molecule_to_yaml | molecule_header }}"
 
        dest: "{{ molecule_instance_config }}"
 
      when: server.changed | bool
roles/backup_server/molecule/default/molecule.yml
Show inline comments
 
new file 100644
 
---
 

	
 
dependency: {}
 

	
 
driver:
 
  name: vagrant
 
  provider:
 
    name: virtualbox
 

	
 
lint:
 
  name: yamllint
 

	
 
platforms:
 

	
 
  - name: parameters-mandatory-jessie64
 
    groups:
 
      - parameters-mandatory
 
    box: debian/contrib-jessie64
 
    memory: 256
 
    cpus: 1
 
    interfaces:
 
      - auto_config: true
 
        ip: 10.31.127.10
 
        network_name: private_network
 
        type: static
 

	
 
  - name: parameters-optional-jessie64
 
    groups:
 
      - parameters-optional
 
    box: debian/contrib-jessie64
 
    memory: 256
 
    cpus: 1
 
    interfaces:
 
      - auto_config: true
 
        ip: 10.31.127.11
 
        network_name: private_network
 
        type: static
 

	
 
provisioner:
 
  name: ansible
 
  config_options:
 
    ssh_connection:
 
      pipelining: "True"
 
  lint:
 
    name: ansible-lint
 

	
 
scenario:
 
  name: default
 

	
 
verifier:
 
  name: testinfra
 
  lint:
 
    name: flake8
roles/backup_server/molecule/default/playbook.yml
Show inline comments
 
file renamed from roles/backup_server/playbook.yml to roles/backup_server/molecule/default/playbook.yml
 
---
 

	
 
- hosts: all
 
  tasks:
 

	
 
    - name: Update all caches to avoid errors due to missing remote archives
 
      apt:
 
        update_cache: yes
 
      changed_when: False
 

	
 
- hosts: parameters-mandatory
 
  become: yes
 
  roles:
 
    - role: backup_server
 
      backup_host_ssh_private_keys:
 
@@ -18,6 +11,7 @@
 
        ecdsa: "{{ lookup('file', 'tests/data/ssh/server_ecdsa') }}"
 

	
 
- hosts: parameters-optional
 
  become: yes
 
  roles:
 
    - role: backup_server
 
      backup_host_ssh_private_keys:
roles/backup_server/molecule/default/prepare.yml
Show inline comments
 
new file 100644
 
---
 

	
 
- name: Prepare
 
  hosts: all
 
  gather_facts: False
 
  tasks:
 

	
 
- hosts: all
 
  become: yes
 
  tasks:
 

	
 
    - name: Install python for Ansible
 
      raw: test -e /usr/bin/python || (apt -y update && apt install -y python-minimal)
 
      become: True
 
      changed_when: False
 

	
 
    - name: Update all caches to avoid errors due to missing remote archives
 
      apt:
 
        update_cache: yes
roles/backup_server/molecule/default/tests/data/ssh/client1
Show inline comments
 
file renamed from roles/backup_server/tests/data/ssh/client1 to roles/backup_server/molecule/default/tests/data/ssh/client1
roles/backup_server/molecule/default/tests/data/ssh/client1.pub
Show inline comments
 
file renamed from roles/backup_server/tests/data/ssh/client1.pub to roles/backup_server/molecule/default/tests/data/ssh/client1.pub
roles/backup_server/molecule/default/tests/data/ssh/client2
Show inline comments
 
file renamed from roles/backup_server/tests/data/ssh/client2 to roles/backup_server/molecule/default/tests/data/ssh/client2
roles/backup_server/molecule/default/tests/data/ssh/client2.pub
Show inline comments
 
file renamed from roles/backup_server/tests/data/ssh/client2.pub to roles/backup_server/molecule/default/tests/data/ssh/client2.pub
roles/backup_server/molecule/default/tests/data/ssh/known_hosts
Show inline comments
 
file renamed from roles/backup_server/tests/data/ssh/known_hosts to roles/backup_server/molecule/default/tests/data/ssh/known_hosts
roles/backup_server/molecule/default/tests/data/ssh/server_dsa
Show inline comments
 
file renamed from roles/backup_server/tests/data/ssh/server_dsa to roles/backup_server/molecule/default/tests/data/ssh/server_dsa
roles/backup_server/molecule/default/tests/data/ssh/server_ecdsa
Show inline comments
 
file renamed from roles/backup_server/tests/data/ssh/server_ecdsa to roles/backup_server/molecule/default/tests/data/ssh/server_ecdsa
roles/backup_server/molecule/default/tests/data/ssh/server_ed25519
Show inline comments
 
file renamed from roles/backup_server/tests/data/ssh/server_ed25519 to roles/backup_server/molecule/default/tests/data/ssh/server_ed25519
roles/backup_server/molecule/default/tests/data/ssh/server_rsa
Show inline comments
 
file renamed from roles/backup_server/tests/data/ssh/server_rsa to roles/backup_server/molecule/default/tests/data/ssh/server_rsa
roles/backup_server/molecule/default/tests/test_default.py
Show inline comments
 
file renamed from roles/backup_server/tests/test_default.py to roles/backup_server/molecule/default/tests/test_default.py
 
import socket
 

	
 
import paramiko
 

	
 
import testinfra.utils.ansible_runner
 

	
 
testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
 
    '.molecule/ansible_inventory').get_hosts('all')
 
    '.molecule/ansible_inventory.yml').get_hosts('all')
 

	
 

	
 
def test_installed_software(Package):
 
def test_installed_software(host):
 
    """
 
    Tests if the required packages have been installed.
 
    """
 

	
 
    assert Package('duplicity').is_installed
 
    assert Package('duply').is_installed
 
    assert host.package('duplicity').is_installed
 
    assert host.package('duply').is_installed
 

	
 

	
 
def test_backup_directory(File, Sudo):
 
def test_backup_directory(host):
 
    """
 
    Tests if the backup directory has been set-up correctly.
 
    """
 

	
 
    with Sudo():
 
    with host.sudo():
 

	
 
        backup_directory = File('/srv/backups')
 
        backup_directory = host.file('/srv/backups')
 

	
 
        assert backup_directory.is_directory
 
        assert backup_directory.user == 'root'
 
@@ -32,26 +28,26 @@ def test_backup_directory(File, Sudo):
 
        assert backup_directory.mode == 0o751
 

	
 

	
 
def test_regular_ssh_server_configuration(File, Sudo):
 
def test_regular_ssh_server_configuration(host):
 
    """
 
    Tests if the default SSH server has been configured correctly (to prevent
 
    access to it via backup users).
 
    """
 

	
 
    with Sudo():
 
    with host.sudo():
 

	
 
        assert "DenyGroups backup" in File('/etc/ssh/sshd_config').content
 
        assert "DenyGroups backup" in host.file('/etc/ssh/sshd_config').content
 

	
 

	
 
def test_backup_ssh_server_configuration_directory(File, Sudo):
 
def test_backup_ssh_server_configuration_directory(host):
 
    """
 
    Tests if the backup SSH server configuration directory has been created
 
    correctly.
 
    """
 

	
 
    with Sudo():
 
    with host.sudo():
 

	
 
        backup_ssh_server_directory = File('/etc/ssh-backup')
 
        backup_ssh_server_directory = host.file('/etc/ssh-backup')
 

	
 
        assert backup_ssh_server_directory.is_directory
 
        assert backup_ssh_server_directory.user == 'root'
 
@@ -59,13 +55,13 @@ def test_backup_ssh_server_configuration_directory(File, Sudo):
 
        assert backup_ssh_server_directory.mode == 0o700
 

	
 

	
 
def test_backup_ssh_server_service_configuration(File):
 
def test_backup_ssh_server_service_configuration(host):
 
    """
 
    Tests if the backup SSH server service configuration file has been set-up
 
    correctly.
 
    """
 

	
 
    config_file = File('/etc/default/ssh-backup')
 
    config_file = host.file('/etc/default/ssh-backup')
 

	
 
    assert config_file.is_file
 
    assert config_file.user == 'root'
 
@@ -74,14 +70,14 @@ def test_backup_ssh_server_service_configuration(File):
 
    assert 'SSHD_OPTS="-f /etc/ssh-backup/sshd_config"' in config_file.content
 

	
 

	
 
def test_backup_ssh_server_configuration(File, Sudo):
 
def test_backup_ssh_server_configuration(host):
 
    """
 
    Tests if the backup SSH server configuration file has been set-up correctly.
 
    """
 

	
 
    with Sudo():
 
    with host.sudo():
 

	
 
        config_file = File('/etc/ssh-backup/sshd_config')
 
        config_file = host.file('/etc/ssh-backup/sshd_config')
 

	
 
        assert config_file.is_file
 
        assert config_file.user == 'root'
 
@@ -100,35 +96,35 @@ def test_backup_ssh_server_configuration(File, Sudo):
 
        assert "HostKey /etc/ssh-backup/ssh_host_ed25519_key" in config_file.content
 

	
 

	
 
def test_backup_ssh_server_keys(File, Sudo):
 
def test_backup_ssh_server_keys(host):
 
    """
 
    Tests if the backup SSH server private keys have been deployed correctly.
 
    """
 

	
 
    with Sudo():
 
    with host.sudo():
 

	
 
        dsa = File('/etc/ssh-backup/ssh_host_dsa_key')
 
        dsa = host.file('/etc/ssh-backup/ssh_host_dsa_key')
 
        assert dsa.is_file
 
        assert dsa.user == 'root'
 
        assert dsa.group == 'root'
 
        assert dsa.mode == 0o600
 
        assert dsa.content == open('tests/data/ssh/server_dsa', 'r').read()
 

	
 
        rsa = File('/etc/ssh-backup/ssh_host_rsa_key')
 
        rsa = host.file('/etc/ssh-backup/ssh_host_rsa_key')
 
        assert rsa.is_file
 
        assert rsa.user == 'root'
 
        assert rsa.group == 'root'
 
        assert rsa.mode == 0o600
 
        assert rsa.content == open('tests/data/ssh/server_rsa', 'r').read()
 

	
 
        ed25519 = File('/etc/ssh-backup/ssh_host_ed25519_key')
 
        ed25519 = host.file('/etc/ssh-backup/ssh_host_ed25519_key')
 
        assert ed25519.is_file
 
        assert ed25519.user == 'root'
 
        assert ed25519.group == 'root'
 
        assert ed25519.mode == 0o600
 
        assert ed25519.content == open('tests/data/ssh/server_ed25519', 'r').read()
 

	
 
        ecdsa = File('/etc/ssh-backup/ssh_host_ecdsa_key')
 
        ecdsa = host.file('/etc/ssh-backup/ssh_host_ecdsa_key')
 
        assert ecdsa.is_file
 
        assert ecdsa.user == 'root'
 
        assert ecdsa.group == 'root'
 
@@ -136,13 +132,13 @@ def test_backup_ssh_server_keys(File, Sudo):
 
        assert ecdsa.content == open('tests/data/ssh/server_ecdsa', 'r').read()
 

	
 

	
 
def test_backup_ssh_server_systemd_service(File):
 
def test_backup_ssh_server_systemd_service(host):
 
    """
 
    Tests if the backup SSH server systemd service file has been deployed
 
    correctly.
 
    """
 

	
 
    service_file = File('/etc/systemd/system/ssh-backup.service')
 
    service_file = host.file('/etc/systemd/system/ssh-backup.service')
 

	
 
    assert service_file.is_file
 
    assert service_file.user == 'root'
 
@@ -151,32 +147,15 @@ def test_backup_ssh_server_systemd_service(File):
 
    assert "EnvironmentFile=-/etc/default/ssh-backup" in service_file.content
 

	
 

	
 
def test_backup_ssh_server_service(Service, Socket, Sudo):
 
def test_backup_ssh_server_service(host):
 
    """
 
    Tests if the backup SSH server service is running and listening on correct
 
    port.
 
    """
 

	
 
    with Sudo():
 
    with host.sudo():
 

	
 
        service = Service('ssh-backup')
 
        service = host.service('ssh-backup')
 
        assert service.is_running
 
        assert service.is_enabled
 
        assert Socket('tcp://0.0.0.0:2222').is_listening
 

	
 

	
 
def test_backup_ssh_server_login_mechanisms():
 
    """
 
    Tests available SSH login mechanisms (should be just public key).
 
    """
 

	
 
    sock = socket.socket()
 
    sock.connect(('10.31.127.11', 2222))
 

	
 
    transport = paramiko.transport.Transport(sock)
 
    transport.connect()
 

	
 
    try:
 
        transport.auth_none('')
 
    except paramiko.transport.BadAuthenticationType, err:
 
        assert err.allowed_types == ['publickey']
 
        assert host.socket('tcp://0.0.0.0:2222').is_listening
roles/backup_server/molecule/default/tests/test_parameters_mandatory.py
Show inline comments
 
file renamed from roles/backup_server/tests/test_parameters_mandatory.py to roles/backup_server/molecule/default/tests/test_parameters_mandatory.py
 
@@ -2,17 +2,17 @@ import testinfra.utils.ansible_runner
 

	
 

	
 
testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
 
    '.molecule/ansible_inventory').get_hosts('parameters-mandatory')
 
    '.molecule/ansible_inventory.yml').get_hosts('parameters-mandatory')
 

	
 

	
 
def test_firewall_configuration(File, Sudo):
 
def test_firewall_configuration(host):
 
    """
 
    Tests if the firewall configuration file has been deployed correctly.
 
    """
 

	
 
    with Sudo():
 
    with host.sudo():
 

	
 
        firewall_config = File('/etc/ferm/conf.d/40-backup.conf')
 
        firewall_config = host.file('/etc/ferm/conf.d/40-backup.conf')
 

	
 
        assert firewall_config.is_file
 
        assert firewall_config.user == 'root'
roles/backup_server/molecule/default/tests/test_parameters_optional.py
Show inline comments
 
new file 100644
 
import os
 
import socket
 

	
 
import paramiko
 

	
 
import testinfra.utils.ansible_runner
 

	
 

	
 
testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
 
    '.molecule/ansible_inventory.yml').get_hosts('parameters-optional')
 

	
 

	
 
def test_backup_client_users_and_groups(host):
 
    """
 
    Tests if the system groups and users for backup clients have been set-up
 
    correctly.
 
    """
 

	
 
    with host.sudo():
 

	
 
        client1_group = host.group('bak-client1_backup')
 
        assert client1_group.exists
 
        assert client1_group.gid < 1000
 

	
 
        client1_user = host.user('bak-client1_backup')
 
        assert client1_user.exists
 
        assert client1_user.group == 'bak-client1_backup'
 
        assert sorted(client1_user.groups) == sorted(['bak-client1_backup', 'backup'])
 
        assert client1_user.home == '/srv/backups/client1.backup'
 
        assert client1_user.uid < 1000
 
        assert client1_user.password == '!'
 

	
 
        client2_group = host.group('bak-client2-backup')
 
        assert client2_group.exists
 
        assert client2_group.gid == 5001
 

	
 
        client2_user = host.user('bak-client2-backup')
 
        assert client2_user.exists
 
        assert client2_user.group == 'bak-client2-backup'
 
        assert sorted(client2_user.groups) == sorted(['bak-client2-backup', 'backup'])
 
        assert client2_user.home == '/srv/backups/client2-backup'
 
        assert client2_user.uid == 5001
 
        assert client2_user.password == '!'
 

	
 

	
 
def test_backup_client_home_directories(host):
 
    """
 
    Tests if the home directory structure has been set-up correctly for the
 
    backup client system user.
 
    """
 

	
 
    with host.sudo():
 

	
 
        client1_user = host.user('bak-client1_backup')
 

	
 
        client1_user_home = host.file(client1_user.home)
 
        assert client1_user_home.is_directory
 
        assert client1_user_home.user == 'root'
 
        assert client1_user_home.group == 'bak-client1_backup'
 
        assert client1_user_home.mode == 0o750
 

	
 
        client1_user_duplicity = host.file(os.path.join(client1_user.home, 'duplicity'))
 
        assert client1_user_duplicity.is_directory
 
        assert client1_user_duplicity.user == 'bak-client1_backup'
 
        assert client1_user_duplicity.group == 'bak-client1_backup'
 
        assert client1_user_duplicity.mode == 0o770
 

	
 
        client1_user_ssh = host.file(os.path.join(client1_user.home, '.ssh'))
 
        assert client1_user_ssh.is_directory
 
        assert client1_user_ssh.user == 'root'
 
        assert client1_user_ssh.group == 'root'
 
        assert client1_user_ssh.mode == 0o751
 

	
 
        # This verifies /etc/skel was not used for setting-up home.
 
        assert not host.file(os.path.join(client1_user.home, '.bashrc')).exists
 

	
 
        client2_user = host.user('bak-client2-backup')
 

	
 
        client2_user_home = host.file(client2_user.home)
 
        assert client2_user_home.is_directory
 
        assert client2_user_home.user == 'root'
 
        assert client2_user_home.group == 'bak-client2-backup'
 
        assert client2_user_home.mode == 0o750
 

	
 
        client2_user_duplicity = host.file(os.path.join(client2_user.home, 'duplicity'))
 
        assert client2_user_duplicity.is_directory
 
        assert client2_user_duplicity.user == 'bak-client2-backup'
 
        assert client2_user_duplicity.group == 'bak-client2-backup'
 
        assert client2_user_duplicity.mode == 0o770
 

	
 
        client2_user_ssh = host.file(os.path.join(client2_user.home, '.ssh'))
 
        assert client2_user_ssh.is_directory
 
        assert client2_user_ssh.user == 'root'
 
        assert client2_user_ssh.group == 'root'
 
        assert client2_user_ssh.mode == 0o751
 

	
 
        # This verifies /etc/skel was not used for setting-up home.
 
        assert not host.file(os.path.join(client2_user.home, '.bashrc')).exists
 

	
 

	
 
def test_backup_client_authorized_keys(host):
 
    """
 
    Tests if the authorized keys for backup client system user have been set-up
 
    correctly.
 
    """
 

	
 
    with host.sudo():
 

	
 
        client1_user = host.user('bak-client1_backup')
 

	
 
        client1_user_authorized_keys = host.file(os.path.join(client1_user.home, '.ssh', 'authorized_keys'))
 
        assert client1_user_authorized_keys.is_file
 
        assert client1_user_authorized_keys.user == 'root'
 
        assert client1_user_authorized_keys.group == 'bak-client1_backup'
 
        assert client1_user_authorized_keys.mode == 0o640
 
        assert client1_user_authorized_keys.content == open('tests/data/ssh/client1.pub', 'r').read().strip()
 

	
 
        client2_user = host.user('bak-client2-backup')
 

	
 
        client2_user_authorized_keys = host.file(os.path.join(client2_user.home, '.ssh', 'authorized_keys'))
 
        assert client2_user_authorized_keys.is_file
 
        assert client2_user_authorized_keys.user == 'root'
 
        assert client2_user_authorized_keys.group == 'bak-client2-backup'
 
        assert client2_user_authorized_keys.mode == 0o640
 
        assert client2_user_authorized_keys.content == open('tests/data/ssh/client2.pub', 'r').read().strip()
 

	
 

	
 
def test_firewall_configuration(host):
 
    """
 
    Tests if the firewall configuration file has been deployed correctly.
 
    """
 

	
 
    with host.sudo():
 

	
 
        firewall_config = host.file('/etc/ferm/conf.d/40-backup.conf')
 

	
 
        assert firewall_config.is_file
 
        assert firewall_config.user == 'root'
 
        assert firewall_config.group == 'root'
 
        assert firewall_config.mode == 0o640
 
        assert 'saddr ( 10.31.127.1 10.31.127.3) @subchain "backup_in" {' in firewall_config.content
 

	
 

	
 
def test_regular_ssh_server_inaccessible(host):
 
    """
 
    Tests if the default SSH server is inaccessible for the backup client system
 
    users.
 
    """
 

	
 
    # Extract first non-IPv6 IP. Crude test, but it should work.
 
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)
 
    local = host.get_host("local://")
 

	
 
    # Test connectivity towards regular ssh (should fail).
 
    login_attempt = local.run("ssh "
 
                              "-o PasswordAuthentication=no "
 
                              "-o StrictHostKeyChecking=no "
 
                              "-o UserKnownHostsFile=/dev/null "
 
                              "-i tests/data/ssh/client1 "
 
                              "bak-client1_backup@%s "
 
                              "/bin/echo sshtest" % remote_ip)
 
    assert login_attempt.rc != 0
 
    assert "Permission denied (publickey)" in login_attempt.stderr
 

	
 
    login_attempt = local.run("ssh "
 
                              "-o PasswordAuthentication=no "
 
                              "-o StrictHostKeyChecking=no "
 
                              "-o UserKnownHostsFile=/dev/null "
 
                              "-i tests/data/ssh/client2 "
 
                              "bak-client2-backup@%s "
 
                              "/bin/echo sshtest" % remote_ip)
 
    assert login_attempt.rc != 0
 
    assert "Permission denied (publickey)" in login_attempt.stderr
 

	
 

	
 
def test_backup_ssh_service_connectivity(host):
 
    """
 
    Tests if SFTP (only) is availavble to system users used by backup clients.
 
    """
 

	
 
    # Extract first non-IPv6 IP. Crude test, but it should work.
 
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)
 

	
 
    local = host.get_host("local://")
 

	
 
    # Test connectivity towards dedicated ssh (should be allowed, but only for sftp).
 
    login_attempt = local.run("ssh -p 2222 "
 
                              "-o PasswordAuthentication=no "
 
                              "-o StrictHostKeyChecking=no "
 
                              "-o UserKnownHostsFile=/dev/null "
 
                              "-i tests/data/ssh/client1 "
 
                              "bak-client1_backup@%s /bin/echo sshtest" % remote_ip)
 
    assert login_attempt.rc == 1
 
    assert "This service allows sftp connections only." in login_attempt.stdout
 

	
 
    # Test connectivity towards dedicated ssh (should be allowed, but only for sftp).
 
    login_attempt = local.run("ssh -p 2222 "
 
                              "-o PasswordAuthentication=no "
 
                              "-o StrictHostKeyChecking=no "
 
                              "-o UserKnownHostsFile=/dev/null "
 
                              "-i tests/data/ssh/client2 "
 
                              "bak-client2-backup@%s /bin/echo sshtest" % remote_ip)
 
    assert login_attempt.rc == 1
 
    assert "This service allows sftp connections only." in login_attempt.stdout
 

	
 

	
 
def test_backup_ssh_service_key_fingerprints(host):
 
    """
 
    Tests fingerprints of backup SSH server in order to ensure correct keys are
 
    in use.
 
    """
 

	
 
    # Extract first non-IPv6 IP. Crude test, but it should work.
 
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)
 

	
 
    local = host.get_host("local://")
 

	
 
    for key_type in ['ssh-dss', 'ssh-rsa', 'ssh-ed25519', 'ecdsa-sha2-nistp256']:
 

	
 
        login_attempt = local.run("ssh -p 2222 "
 
                                  "-o PasswordAuthentication=no "
 
                                  "-o StrictHostKeyChecking=yes "
 
                                  "-o UserKnownHostsFile=tests/data/ssh/known_hosts "
 
                                  "-i tests/data/ssh/client1 "
 
                                  "-o HostKeyAlgorithms=%s "
 
                                  "bak-client1_backup@%s /bin/echo sshtest" % (key_type, remote_ip))
 
        assert login_attempt.rc == 1
 
        assert "This service allows sftp connections only." in login_attempt.stdout
 

	
 

	
 
def test_backup_ssh_server_login_mechanisms(host):
 
    """
 
    Tests available SSH login mechanisms (should be just public key).
 
    """
 

	
 
    # Extract first non-IPv6 IP. Crude test, but it should work.
 
    remote_ip = next(a for a in host.interface("eth1").addresses if ":" not in a)
 

	
 
    sock = socket.socket()
 
    sock.connect((remote_ip, 2222))
 

	
 
    transport = paramiko.transport.Transport(sock)
 
    transport.connect()
 

	
 
    try:
 
        transport.auth_none('')
 
    except paramiko.transport.BadAuthenticationType, err:
 
        assert err.allowed_types == ['publickey']
roles/backup_server/tests/test_parameters_optional.py
Show inline comments
 
deleted file
0 comments (0 inline, 0 general)