Changeset - 36d96a3fc472
[Not reviewed]
0 6 0
Branko Majic (branko) - 5 years ago 2021-01-13 00:07:32
branko@majic.rs
MAR-163: Ensure host.run is not suspectible to shell injection:

- Use host.run's built-in capability for handling parameter escaping
and insertion.
6 files changed with 9 insertions and 9 deletions:
0 comments (0 inline, 0 general)
roles/common/molecule/default/tests/test_default.py
Show inline comments
 
@@ -319,25 +319,25 @@ def test_pipreqcheck_requirements(host, requirements_in_path, requirements_txt_p
 
        "pip==20.2.4",
 
        "setuptools==50.3.2",
 
        "six==1.15.0",
 
        "wheel==0.35.1",
 
    ]),
 
])
 
def test_pipreqcheck_virtualenv_packages(host, pip_path, expected_packages):
 
    """
 
    Tests if correct packages are installed in virtualenv used for pip
 
    requirements checks..
 
    """
 

	
 
    packages = host.run("sudo -u %s %s freeze --all" % ('pipreqcheck', pip_path))
 
    packages = host.run("sudo -u pipreqcheck %s freeze --all", pip_path)
 

	
 
    # Normalise package names and order.
 
    expected_packages = sorted([p.lower() for p in expected_packages])
 
    actual_packages = sorted(packages.stdout.lower().strip().split("\n"))
 

	
 
    # This is a dummy distro-provided package ignored by the pip-tools.
 
    if "pkg-resources==0.0.0" in actual_packages:
 
        actual_packages.remove("pkg-resources==0.0.0")
 

	
 
    assert actual_packages == expected_packages
 

	
 

	
roles/mail_forwarder/molecule/default/tests/test_connectivity_from_client.py
Show inline comments
 
@@ -14,15 +14,15 @@ ansible_runner = testinfra.utils.ansible_runner.AnsibleRunner(
 
@pytest.mark.parametrize("server",
 
                         sorted(
 
                             set(ansible_runner.get_hosts('all')) -
 
                             set(ansible_runner.get_hosts('helper'))))
 
def test_connectivity_from_client(host, server):
 
    """
 
    Tests connectivity towards mail forwarder servers from client
 
    (non-relay). Connectivity should fail for both.
 
    """
 

	
 
    with host.sudo():
 

	
 
        ping = host.run('hping3 -S -p 25 -c 1 %s' % server)
 
        ping = host.run('hping3 -S -p 25 -c 1 %s', server)
 
        assert ping.rc != 0
 
        assert "100% packet loss" in ping.stderr
roles/mail_forwarder/molecule/default/tests/test_connectivity_from_relay.py
Show inline comments
 
@@ -12,41 +12,41 @@ ansible_runner = testinfra.utils.ansible_runner.AnsibleRunner(
 

	
 

	
 
@pytest.mark.parametrize("server",
 
                         ansible_runner.get_hosts('parameters-optional'))
 
def test_connectivity_from_authorised_relay(host, server):
 
    """
 
    Tests connectivity towards mail forwarder servers from authorised
 
    relay.
 
    """
 

	
 
    with host.sudo():
 

	
 
        ping = host.run('hping3 -S -p 25 -c 1 %s' % server)
 
        ping = host.run('hping3 -S -p 25 -c 1 %s', server)
 
        assert ping.rc == 0
 

	
 

	
 
@pytest.mark.parametrize("server",
 
                         sorted(
 
                             set(ansible_runner.get_hosts('parameters-mandatory')) |
 
                             set(ansible_runner.get_hosts('parameters-no-incoming'))))
 
def test_connectivity_from_unauthorised_relay(host, server):
 
    """
 
    Tests connectivity towards mail forwarder servers from unauthorised
 
    relay.
 
    """
 

	
 
    with host.sudo():
 

	
 
        ping = host.run('hping3 -S -p 25 -c 1 %s' % server)
 
        ping = host.run('hping3 -S -p 25 -c 1 %s', server)
 
        assert ping.rc != 0
 
        assert "100% packet loss" in ping.stderr
 

	
 

	
 
@pytest.mark.parametrize("server",
 
                         ansible_runner.get_hosts('parameters-optional'))
 
def test_mail_reception_from_authorised_relay(host, server):
 
    """
 
    Tests if mails can be sent from relay to servers configured to use the
 
    relay.
 
    """
 

	
 
@@ -54,15 +54,15 @@ def test_mail_reception_from_authorised_relay(host, server):
 
    assert send.rc == 0
 

	
 

	
 
@pytest.mark.parametrize("server",
 
                         ansible_runner.get_hosts('parameters-optional'))
 
def test_open_relay(host, server):
 
    """
 
    Tests if mail forwarder behaves as open relay.
 
    """
 

	
 
    no_recipients_accepted_error_code = 24
 

	
 
    send = host.run('swaks --suppress-data --to root@client1 --server %s' % server)
 
    send = host.run('swaks --suppress-data --to root@client1 --server %s', server)
 
    assert send.rc == no_recipients_accepted_error_code
 
    assert "Relay access denied" in send.stdout
roles/web_server/molecule/default/tests/test_client.py
Show inline comments
 
@@ -10,14 +10,14 @@ testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
 
def test_connectivity(host):
 
    """
 
    Tests connectivity to the web server (ports that should be reachable).
 
    """
 

	
 
    with host.sudo():
 

	
 
        for server in ["parameters-mandatory",
 
                       "parameters-optional"]:
 
            # HTTP, HTTPS.
 
            for port in [80, 443]:
 

	
 
                ping = host.run('hping3 -S -p %d -c 1 %s' % (port, server))
 
                ping = host.run('hping3 -S -p %s -c 1 %s', str(port), server)
 
                assert ping.rc == 0
roles/wsgi_website/molecule/default/tests/test_default.py
Show inline comments
 
@@ -282,25 +282,25 @@ def test_python_virtualenv_wrapper_script(host, wrapper_script, expected_owner,
 
    """
 
    Tests if Python virtualenv wrapper script is functioning correctly.
 
    """
 

	
 
    with host.sudo():
 

	
 
        wrapper = host.file(wrapper_script)
 
        assert wrapper.is_file
 
        assert wrapper.user == expected_owner
 
        assert wrapper.group == expected_group
 
        assert wrapper.mode == 0o750
 

	
 
        command = host.run("sudo -u %s %s python -c 'import gunicorn'" % (expected_owner, wrapper_script))
 
        command = host.run("sudo -u %s %s python -c 'import gunicorn'", expected_owner, wrapper_script)
 
        assert command.rc == 0
 

	
 

	
 
@pytest.mark.parametrize("admin_user, pip_path, expected_packages",  [
 
    ('admin-parameters-mandatory', '/var/www/parameters-mandatory/virtualenv/bin/pip', [
 
        "futures==3.3.0",
 
        "gunicorn==19.10.0",
 
    ]),
 
    ('admin-parameters-optional_local', '/var/www/parameters-optional.local/virtualenv/bin/pip', [
 
        "Pygments==2.2.0",
 
        "dnspython==1.15.0",
 
        "docopt==0.6.2",
 
@@ -324,25 +324,25 @@ def test_python_virtualenv_wrapper_script(host, wrapper_script, expected_owner,
 
        "click==6.7",
 
        "futures==3.1.1",
 
        "gunicorn==19.8.1",
 
        "itsdangerous==0.24",
 
        "six==1.10.0",
 
    ]),
 
])
 
def test_virtualenv_packages(host, admin_user, pip_path, expected_packages):
 
    """
 
    Tests if correct packages are installed in virtualenv.
 
    """
 

	
 
    packages = host.run("sudo -u %s %s freeze" % (admin_user, pip_path))
 
    packages = host.run("sudo -u %s %s freeze", admin_user, pip_path)
 

	
 
    # Normalise package names and order.
 
    expected_packages = sorted([p.lower() for p in expected_packages])
 
    actual_packages = sorted(packages.stdout.lower().strip().split("\n"))
 

	
 
    assert actual_packages == expected_packages
 

	
 

	
 
@pytest.mark.parametrize("config_file, expected_website_name, expected_socket_file_path", [
 
    ('/etc/systemd/system/parameters-mandatory.socket', 'parameters-mandatory', '/run/wsgi/parameters-mandatory.sock'),
 
    ('/etc/systemd/system/parameters-optional.local.socket', 'parameters-optional.local', '/run/wsgi/parameters-optional.local.sock'),
 
    ('/etc/systemd/system/parameters-paste-req.socket', 'parameters-paste-req', '/run/wsgi/parameters-paste-req.sock'),
roles/xmpp_server/molecule/default/tests/test_client.py
Show inline comments
 
@@ -10,25 +10,25 @@ testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner(
 
def test_connectivity(host):
 
    """
 
    Tests connectivity to the XMPP server (ports that should be reachable).
 
    """
 

	
 
    with host.sudo():
 

	
 
        for server in ["parameters-mandatory",
 
                       "parameters-optional"]:
 
            # c2s plaintext, c2s TLS, file proxy, s2s.
 
            for port in [5222, 5223, 5000, 5269]:
 

	
 
                ping = host.run('hping3 -S -p %d -c 1 %s' % (port, server))
 
                ping = host.run('hping3 -S -p %s -c 1 %s', str(port), server)
 
                assert ping.rc == 0
 

	
 

	
 
def test_tls(host):
 
    """
 
    Tests if TLS works as expected.
 
    """
 

	
 
    send = host.run("echo 'Hello' | sendxmpp --tls-ca-path /usr/local/share/ca-certificates/testca.crt "
 
                    "-t -u john.doe -p johnpassword -j domain1:5222 john.doe@domain1")
 
    assert send.rc == 0
 

	
0 comments (0 inline, 0 general)