Changeset - a1219ebc2188
[Not reviewed]
0 9 0
Branko Majic (branko) - 6 years ago 2018-04-27 21:57:30
branko@majic.rs
GC-22: Updated server command to allow reading of CSR from stdin:

- Implemented an additional helper for functional tests for running
interactive commands.
- Implemented functional test covering passing-in CSR to the server
command via stdin (interactively).
- Updated server command implementation.
- Implemented utility function for reading input from user.
- Implemented utility function for reading CSR from string in (in
OpenSSL-style PEM format)
- Fixed some missing imports in the custom pytest fixture.
- Implemented relevant unit tests.
9 files changed with 259 insertions and 3 deletions:
0 comments (0 inline, 0 general)
functional_tests/base.py
Show inline comments
 
@@ -19,8 +19,11 @@
 
#
 

	
 

	
 
import io
 
import subprocess
 

	
 
import pexpect
 

	
 

	
 
def run_command(command, *args):
 
    """
 
@@ -48,3 +51,70 @@ def run_command(command, *args):
 
    stdout, stderr = stdout.decode(), stderr.decode()
 

	
 
    return stdout, stderr, process.returncode
 

	
 

	
 
def run_interactive_command(prompt_answers, command, *args):
 
    """
 
    Helper function that runs the specified command, and takes care of
 
    providing answers to interactive prompts.
 

	
 
    This is a convenience wrapper around the pexpect library.
 

	
 
    Unlikes the run_command helper, this helper is not capable of
 
    separating the standard output from standard error
 
    (unfortunately).
 

	
 
    The failure message returned describes only issues related to
 
    command not providing the expected prompt for answer, or if
 
    command gets stuck in a prompt after all expected prompts were
 
    processed.
 

	
 
    :param prompt_answers: List of prompts and their correspnding answers. To send a control character, start the answer with 'Ctrl-' (for example 'Ctrl-d').
 
    :type prompt_answers: list[(str, str)]
 

	
 
    :param command: Command that should be run.
 
    :type command: str
 

	
 
    :param *args: Zero or more arguments to pass to the command.
 
    :type *args: str
 

	
 
    :returns: (failure, output, exit_code) -- Prompt failure message, combined standard output and error, and exit code (None if prompt failure happened).
 
    :rtype: (str or None, str, int)
 
    """
 

	
 
    # Assume that all prompts/answers worked as expected.
 
    failure = None
 

	
 
    # Spawn the process, use dedicated stream for capturin command
 
    # stdout/stderr.
 
    output_stream = io.StringIO()
 
    process = pexpect.spawnu(command, [*args], timeout=2)
 
    process.logfile_read = output_stream
 

	
 
    # Try to feed the interactive process with answers. Stop iteration
 
    # at first prompt that was not reached.
 
    for prompt, answer in prompt_answers:
 
        try:
 
            process.expect(prompt)
 
            if answer.startswith('Ctrl-'):
 
                process.sendcontrol(answer.lstrip('Ctrl-'))
 
            else:
 
                process.sendline(answer)
 
        except pexpect.TIMEOUT:
 
            failure = "Command never prompted us with: %s" % prompt
 
            process.terminate()
 
            break
 

	
 
    # If we were successful until now, wait for the process to exit.
 
    if failure is None:
 
        try:
 
            process.expect(pexpect.EOF)
 
        except pexpect.TIMEOUT:
 
            failure = "Command got stuck waiting for input."
 
            process.terminate()
 

	
 
    process.close()
 
    output = output_stream.getvalue()
 
    exit_code = process.exitstatus
 

	
 
    return failure, output, exit_code
functional_tests/test_csr.py
Show inline comments
 
@@ -19,7 +19,7 @@
 
#
 

	
 

	
 
from .base import run_command
 
from .base import run_command, run_interactive_command
 

	
 

	
 
def test_commands_report_csr_option_as_available():
 
@@ -412,3 +412,64 @@ def test_renew_certificate_originally_issued_with_csr_using_private_key(tmpdir):
 
    assert client_new_certificate != client_old_certificate
 
    assert client_old_certificate_public_key != client_generated_private_key_public_key
 
    assert client_new_certificate_public_key == client_generated_private_key_public_key
 

	
 

	
 
def test_server_command_accepts_csr_from_stdin(tmpdir):
 
    # John is working on a project where he has already generated
 
    # server private key.
 
    tmpdir.chdir()
 
    run_command("openssl", "genrsa", "-out", "myserver1.key.pem", "2048")
 

	
 
    # However, he still needs to have a CA as a trustpoint, so he goes
 
    # ahead and initialises Gimmecert for this purpose.
 
    run_command("gimmecert", "init")
 

	
 
    # Before issuing the certificate, he generates a CSR for the
 
    # server private key.
 
    custom_csr, _, exit_code = run_command("openssl", "req", "-new", "-key", "myserver1.key.pem", "-subj", "/CN=myserver1")
 

	
 
    # John realises that although the CSR generation was successful, he
 
    # forgot to output it to a file.
 
    assert exit_code == 0
 
    assert "BEGIN CERTIFICATE REQUEST" in custom_csr
 
    assert "END CERTIFICATE REQUEST" in custom_csr
 

	
 
    # He could output the CSR into a file, and feed that into
 
    # Gimmecert, but he feels a bit lazy. Instead, John tries to pass
 
    # in a dash ("-") as input, knowing that it is commonly used as
 
    # shorthand for reading from standard input.
 
    prompt_failure, output, exit_code = run_interactive_command([], "gimmecert", "server", "--csr", "-", "myserver1")
 

	
 
    # John sees that the application has prompted him to provide the
 
    # CSR interactively, and that it waits for his input.
 
    assert exit_code is None, "Output was: %s" % output
 
    assert prompt_failure == "Command got stuck waiting for input.", "Output was: %s" % output
 
    assert "Please enter the CSR (finish with Ctrl-D on an empty line):" in output
 

	
 
    # John reruns the command, this time passing-in the CSR and ending
 
    # the input with Ctrl-D.
 
    prompt_failure, output, exit_code = run_interactive_command([('Please enter the CSR \(finish with Ctrl-D on an empty line\):', custom_csr + '\n\004')],
 
                                                                "gimmecert", "server", "--csr", "-", "myserver1")
 

	
 
    # The operation is successful, and he is presented with
 
    # information about generated artefacts.
 
    assert prompt_failure is None
 
    assert exit_code == 0
 
    assert ".gimmecert/server/myserver1.cert.pem" in output
 
    assert ".gimmecert/server/myserver1.csr.pem" in output
 

	
 
    # John also notices that there is no mention of a private key.
 
    assert ".gimmecert/server/myserver1.key.pem" not in output
 

	
 
    # John notices that the content of stored CSR is identical to the
 
    # one he provided.
 
    stored_csr = tmpdir.join(".gimmecert", "server", "myserver1.csr.pem").read()
 
    assert custom_csr == stored_csr
 

	
 
    # John then quickly has a look at the public key associated with
 
    # the private key, and public key stored in certificate.
 
    public_key, _, _ = run_command("openssl", "rsa", "-pubout", "-in", "myserver1.key.pem")
 
    certificate_public_key, _, _ = run_command("openssl", "x509", "-pubkey", "-noout", "-in", ".gimmecert/server/myserver1.cert.pem")
 

	
 
    # To his delight, they are identical.
 
    assert certificate_public_key == public_key
gimmecert/cli.py
Show inline comments
 
@@ -109,7 +109,7 @@ def setup_server_subcommand_parser(parser, subparsers):
 
    the private key, but replacing the DNS subject alternative names with listed values (if any). \
 
    If entity does not exist, this option has no effect, and a new private key/certificate will be generated as usual.''')
 
    subparser.add_argument('--csr', '-c', type=str, default=None, help='''Do not generate server private key locally, and use the passed-in \
 
    certificate signing request (CSR) instead.''')
 
    certificate signing request (CSR) instead. Use dash (-) to read from standard input.''')
 

	
 
    def server_wrapper(args):
 
        project_directory = os.getcwd()
gimmecert/commands.py
Show inline comments
 
@@ -20,6 +20,7 @@
 

	
 
import os
 
import datetime
 
import sys
 

	
 
import gimmecert.crypto
 
import gimmecert.storage
 
@@ -174,6 +175,12 @@ def server(stdout, stderr, project_directory, entity_name, extra_dns_names, upda
 
        csr = gimmecert.storage.read_csr(csr_path)
 
        public_key = csr.public_key()
 
        private_key = None
 
    elif custom_csr_path == "-":
 
        renew_certificate = False
 
        csr_pem = gimmecert.utils.read_input(sys.stdin, stderr, "Please enter the CSR")
 
        csr = gimmecert.utils.csr_from_pem(csr_pem)
 
        public_key = csr.public_key()
 
        private_key = None
 
    elif custom_csr_path:
 
        renew_certificate = False
 
        csr = gimmecert.storage.read_csr(custom_csr_path)
gimmecert/utils.py
Show inline comments
 
@@ -111,3 +111,53 @@ def get_dns_names(certificate):
 
        dns_names = []
 

	
 
    return dns_names
 

	
 

	
 
def read_input(input_stream, prompt_stream, prompt):
 
    """
 
    Reads input from the passed-in input stream until Ctrl-D sequence
 
    is reached, while also providing a meaningful prompt to the user.
 

	
 
    The prompt will be extended with short information telling the
 
    user to end input with Ctrl-D.
 

	
 
    :param input_stream: Input stream to read from.
 
    :type input_stream: io.IOBase
 

	
 
    :param prompt_stream: Output stream where the prompt should be written-out.
 
    :type prompt_stream: io.IOBase
 

	
 
    :param prompt: Prompt message to show to the user.
 
    :type prompt: str
 
    """
 

	
 
    print("%s (finish with Ctrl-D on an empty line):\n" % prompt, file=prompt_stream)
 

	
 
    user_input = ""
 

	
 
    c = input_stream.read(1)
 
    while c != '':
 
        user_input += c
 
        c = input_stream.read(1)
 

	
 
    return user_input
 

	
 

	
 
def csr_from_pem(csr_pem):
 
    """
 
    Converts passed-in CSR in OpenSSL-style PEM format into a CSR
 
    object.
 

	
 
    :param csr_pem: CSR in OpenSSL-style PEM format.
 
    :type csr_pem: str
 

	
 
    :returns: CSR object.
 
    :rtype: cryptography.x509.CertificateSigningRequest
 
    """
 

	
 
    csr = cryptography.x509.load_pem_x509_csr(
 
        bytes(csr_pem, encoding='utf8'),
 
        cryptography.hazmat.backends.default_backend()
 
    )
 

	
 
    return csr
setup.py
Show inline comments
 
@@ -42,6 +42,7 @@ test_requirements = [
 
    'pytest>=3.4,<3.5',
 
    'pytest-cov>=2.5,<2.6',
 
    'tox>=2.9,<2.10',
 
    'pexpect>=4.5,<4.6',
 
]
 

	
 
setup_requirements = [
tests/conftest.py
Show inline comments
 
@@ -23,6 +23,8 @@ import collections
 
import io
 

	
 
import gimmecert
 
import gimmecert.crypto
 
import gimmecert.storage
 

	
 
import pytest
 

	
tests/test_commands.py
Show inline comments
 
@@ -21,11 +21,13 @@
 
import argparse
 
import io
 
import os
 
import sys
 

	
 
import gimmecert.commands
 

	
 
from freezegun import freeze_time
 
import pytest
 
from unittest import mock
 
from freezegun import freeze_time
 

	
 

	
 
def test_init_sets_up_directory_structure(tmpdir):
 
@@ -1467,3 +1469,32 @@ def test_renew_replaces_server_csr_with_private_key(tmpdir):
 

	
 
    assert not csr_file.check()
 
    assert certificate_public_numbers == private_key_public_numbers
 

	
 

	
 
@mock.patch('gimmecert.utils.read_input')
 
def test_server_reads_csr_from_stdin(mock_read_input, sample_project_directory, key_with_csr):
 
    entity_name = 'myserver'
 
    stored_csr_file = sample_project_directory.join('.gimmecert', 'server', '%s.csr.pem' % entity_name)
 
    certificate_file = sample_project_directory.join('.gimmecert', 'server', '%s.cert.pem' % entity_name)
 

	
 
    # Mock our util for reading input from user.
 
    mock_read_input.return_value = key_with_csr.csr_pem
 

	
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, sample_project_directory.strpath, entity_name, None, True, '-')
 
    assert status_code == 0
 

	
 
    # Read stored/generated artefacts.
 
    stored_csr = gimmecert.storage.read_csr(stored_csr_file.strpath)
 
    certificate = gimmecert.storage.read_certificate(certificate_file.strpath)
 

	
 
    custom_csr_public_numbers = key_with_csr.csr.public_key().public_numbers()
 
    stored_csr_public_numbers = stored_csr.public_key().public_numbers()
 
    certificate_public_numbers = certificate.public_key().public_numbers()
 

	
 
    mock_read_input.assert_called_once_with(sys.stdin, stderr_stream, "Please enter the CSR")
 
    assert stored_csr_public_numbers == custom_csr_public_numbers
 
    assert certificate_public_numbers == custom_csr_public_numbers
 
    assert certificate.subject != key_with_csr.csr.subject
tests/test_utils.py
Show inline comments
 
@@ -20,6 +20,7 @@
 

	
 

	
 
import datetime
 
import io
 

	
 
import cryptography.x509
 
import cryptography.hazmat.backends
 
@@ -102,3 +103,36 @@ def test_get_dns_names_returns_list_of_dns_names():
 

	
 
    assert isinstance(dns_names, list)
 
    assert dns_names == ['myserver', 'myservice1.example.com', 'myservice2.example.com']
 

	
 

	
 
def test_read_long_input():
 

	
 
    provided_input = """\
 
This is my input string that
 
spans multiple
 
lines.
 
"""
 

	
 
    input_stream = io.StringIO()
 
    prompt_stream = io.StringIO()
 

	
 
    # End the input with Ctrl-D.
 
    input_stream.write(provided_input)
 
    input_stream.seek(0)
 

	
 
    returned_input = gimmecert.utils.read_input(input_stream, prompt_stream, "My prompt")
 

	
 
    prompt = prompt_stream.getvalue()
 

	
 
    assert prompt == "My prompt (finish with Ctrl-D on an empty line):\n\n"
 
    assert isinstance(returned_input, str)
 
    assert returned_input == provided_input
 

	
 

	
 
def test_csr_from_pem(key_with_csr):
 

	
 
    csr = gimmecert.utils.csr_from_pem(key_with_csr.csr_pem)
 

	
 
    assert isinstance(csr, cryptography.x509.CertificateSigningRequest)
 
    assert csr.public_key().public_numbers() == key_with_csr.csr.public_key().public_numbers()
 
    assert csr.subject == key_with_csr.csr.subject
0 comments (0 inline, 0 general)