Changeset - 332380e47daf
[Not reviewed]
0 4 0
Branko Majic (branko) - 6 years ago 2018-03-06 17:22:32
branko@majic.rs
GC-17: Refactored server command to handle output and return exit code:

- Updated server command to return just a status code.
- Updated existing code and tests that call out to the server command
to use correct function signature.
- Moved output from cli module to commands module.
- Updated existing tests for the server command to test for output
being produced in correct streams.
4 files changed with 67 insertions and 41 deletions:
0 comments (0 inline, 0 general)
gimmecert/cli.py
Show inline comments
 
@@ -88,13 +88,10 @@ def setup_server_subcommand_parser(parser, subparsers):
 
    def server_wrapper(args):
 
        project_directory = os.getcwd()
 

	
 
        status, message = server(project_directory, args.entity_name, args.dns_name)
 
        status_code = server(sys.stdout, sys.stderr, project_directory, args.entity_name, args.dns_name)
 

	
 
        if status is False:
 
            print(message, file=sys.stderr)
 
            exit(ERROR_GENERIC)
 
        else:
 
            print(message)
 
        if status_code != ExitCode.SUCCESS:
 
            exit(status_code)
 

	
 
    subparser.set_defaults(func=server_wrapper)
 

	
gimmecert/commands.py
Show inline comments
 
@@ -31,6 +31,8 @@ class ExitCode:
 

	
 
    SUCCESS = 0
 
    ERROR_ALREADY_INITIALISED = 10
 
    ERROR_NOT_INITIALISED = 11
 
    ERROR_CERTIFICATE_ALREADY_ISSUED = 12
 

	
 

	
 
def init(stdout, stderr, project_directory, ca_base_name, ca_hierarchy_depth):
 
@@ -92,11 +94,17 @@ def init(stdout, stderr, project_directory, ca_base_name, ca_hierarchy_depth):
 
    return ExitCode.SUCCESS
 

	
 

	
 
def server(project_directory, entity_name, extra_dns_names):
 
def server(stdout, stderr, project_directory, entity_name, extra_dns_names):
 
    """
 
    Generates a server private key and issues a server certificate
 
    using the CA hierarchy initialised within the specified directory.
 

	
 
    :param stdout: Output stream where the informative messages should be written-out.
 
    :type stdout: io.IOBase
 

	
 
    :param stderr: Output stream where the error messages should be written-out.
 
    :type stderr: io.IOBase
 

	
 
    :param project_directory: Path to project directory under which the CA artifacats etc will be looked-up.
 
    :type project_directory: str
 

	
 
@@ -106,23 +114,24 @@ def server(project_directory, entity_name, extra_dns_names):
 
    :param extra_dns_names: List of additional DNS names to include in the subject alternative name.
 
    :type extra_dns_names: list[str]
 

	
 
    :returns: Tuple consisting out of status and message to show to user.
 
    :rtype: (bool, str)
 
    :returns: Status code, one from gimmecert.commands.ExitCode.
 
    :rtype: int
 
    """
 

	
 
    private_key_path = os.path.join('.gimmecert', 'server', '%s.key.pem' % entity_name)
 
    certificate_path = os.path.join('.gimmecert', 'server', '%s.cert.pem' % entity_name)
 

	
 
    if not gimmecert.storage.is_initialised(project_directory):
 
        return False, "CA hierarchy must be initialised prior to issuing server certificates. Run the gimmecert init command first."
 
        print("CA hierarchy must be initialised prior to issuing server certificates. Run the gimmecert init command first.", file=stderr)
 
        return ExitCode.ERROR_NOT_INITIALISED
 

	
 
    if os.path.exists(private_key_path) or os.path.exists(certificate_path):
 
        return False, "Refusing to overwrite existing data. Certificate has already been issued for server myserver."
 
        print("Refusing to overwrite existing data. Certificate has already been issued for server myserver.", file=stderr)
 
        return ExitCode.ERROR_CERTIFICATE_ALREADY_ISSUED
 

	
 
    message = """Server certificate issued.\n
 
    print("""Server certificate issued.\n
 
    Server private key: .gimmecert/server/%s.key.pem
 
    Server certificate: .gimmecert/server/%s.cert.pem
 
""" % (entity_name, entity_name)
 
    Server certificate: .gimmecert/server/%s.cert.pem""" % (entity_name, entity_name), file=stdout)
 

	
 
    ca_hierarchy = gimmecert.storage.read_ca_hierarchy(os.path.join(project_directory, '.gimmecert', 'ca'))
 
    issuer_private_key, issuer_certificate = ca_hierarchy[-1]
 
@@ -132,4 +141,4 @@ def server(project_directory, entity_name, extra_dns_names):
 
    gimmecert.storage.write_private_key(private_key, private_key_path)
 
    gimmecert.storage.write_certificate(certificate, certificate_path)
 

	
 
    return True, message
 
    return ExitCode.SUCCESS
tests/test_cli.py
Show inline comments
 
@@ -269,7 +269,7 @@ def test_setup_server_subcommand_fails_without_arguments():
 
@mock.patch('gimmecert.cli.server')
 
def test_setup_server_subcommand_succeeds_with_just_entity_name_argument(mock_server):
 
    # We are just testing the parsing here.
 
    mock_server.return_value = True, "Fake message"
 
    mock_server.return_value = gimmecert.commands.ExitCode.SUCCESS
 

	
 
    gimmecert.cli.main()  # Should not raise.
 

	
 
@@ -278,7 +278,7 @@ def test_setup_server_subcommand_succeeds_with_just_entity_name_argument(mock_se
 
@mock.patch('gimmecert.cli.server')
 
def test_setup_server_subcommand_succeeds_with_entity_name_argument_and_one_dns_name(mock_server):
 
    # We are just testing the parsing here.
 
    mock_server.return_value = True, "Fake message"
 
    mock_server.return_value = gimmecert.commands.ExitCode.SUCCESS
 

	
 
    gimmecert.cli.main()  # Should not raise.
 

	
 
@@ -287,7 +287,7 @@ def test_setup_server_subcommand_succeeds_with_entity_name_argument_and_one_dns_
 
@mock.patch('gimmecert.cli.server')
 
def test_setup_server_subcommand_succeeds_with_entity_name_argument_and_four_dns_names(mock_server):
 
    # We are just testing the parsing here.
 
    mock_server.return_value = True, "Fake message"
 
    mock_server.return_value = gimmecert.commands.ExitCode.SUCCESS
 

	
 
    gimmecert.cli.main()  # Should not raise.
 

	
 
@@ -304,25 +304,25 @@ def test_setup_server_subcommand_sets_function_callback():
 
@mock.patch('sys.argv', ['gimmecert', 'server', 'myserver'])
 
@mock.patch('gimmecert.cli.server')
 
def test_server_command_invoked_with_correct_parameters_without_extra_dns_names(mock_server, tmpdir):
 
    mock_server.return_value = True, "Bogus"
 
    mock_server.return_value = gimmecert.commands.ExitCode.SUCCESS
 

	
 
    tmpdir.chdir()
 

	
 
    gimmecert.cli.main()
 

	
 
    mock_server.assert_called_once_with(tmpdir.strpath, 'myserver', [])
 
    mock_server.assert_called_once_with(sys.stdout, sys.stderr, tmpdir.strpath, 'myserver', [])
 

	
 

	
 
@mock.patch('sys.argv', ['gimmecert', 'server', 'myserver', 'service.local', 'service.example.com'])
 
@mock.patch('gimmecert.cli.server')
 
def test_server_command_invoked_with_correct_parameters_with_extra_dns_names(mock_server, tmpdir):
 
    mock_server.return_value = True, "Bogus"
 
    mock_server.return_value = gimmecert.commands.ExitCode.SUCCESS
 

	
 
    tmpdir.chdir()
 

	
 
    gimmecert.cli.main()
 

	
 
    mock_server.assert_called_once_with(tmpdir.strpath, 'myserver', ['service.local', 'service.example.com'])
 
    mock_server.assert_called_once_with(sys.stdout, sys.stderr, tmpdir.strpath, 'myserver', ['service.local', 'service.example.com'])
 

	
 

	
 
@mock.patch('sys.argv', ['gimmecert', 'server', 'myserver'])
tests/test_commands.py
Show inline comments
 
@@ -140,35 +140,48 @@ def test_init_does_not_overwrite_artifcats_if_already_initialised(tmpdir):
 
    assert full_chain_before == full_chain_after
 

	
 

	
 
def test_server_returns_status_and_message(tmpdir):
 
def test_server_returns_status_code(tmpdir):
 
    tmpdir.chdir()
 

	
 
    status, message = gimmecert.commands.server(tmpdir.strpath, 'myserver', None)
 
    status_code = gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None)
 

	
 
    assert isinstance(status, bool)
 
    assert isinstance(message, str)
 
    assert isinstance(status_code, int)
 

	
 

	
 
def test_server_reports_error_if_directory_is_not_initialised(tmpdir):
 
    tmpdir.chdir()
 

	
 
    status, message = gimmecert.commands.server(tmpdir.strpath, 'myserver', None)
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert status is False
 
    assert "must be initialised" in message
 
    assert "must be initialised" in stderr
 
    assert stdout == ""
 
    assert status_code == gimmecert.commands.ExitCode.ERROR_NOT_INITIALISED
 

	
 

	
 
def test_server_reports_paths_to_generated_artifacts(tmpdir):
 
def test_server_reports_success_and_paths_to_generated_artifacts(tmpdir):
 
    depth = 1
 

	
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    tmpdir.chdir()
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    status, message = gimmecert.commands.server(tmpdir.strpath, 'myserver', None)
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert status is True
 
    assert ".gimmecert/server/myserver.key.pem" in message
 
    assert ".gimmecert/server/myserver.cert.pem" in message
 
    assert status_code == gimmecert.commands.ExitCode.SUCCESS
 
    assert ".gimmecert/server/myserver.key.pem" in stdout
 
    assert ".gimmecert/server/myserver.cert.pem" in stdout
 
    assert stderr == ""
 

	
 

	
 
def test_server_outputs_private_key_to_file(tmpdir):
 
@@ -178,7 +191,7 @@ def test_server_outputs_private_key_to_file(tmpdir):
 
    tmpdir.chdir()
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    gimmecert.commands.server(tmpdir.strpath, 'myserver', None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None)
 

	
 
    assert private_key_file.check(file=1)
 

	
 
@@ -195,7 +208,7 @@ def test_server_outputs_certificate_to_file(tmpdir):
 
    tmpdir.chdir()
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    gimmecert.commands.server(tmpdir.strpath, 'myserver', None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None)
 

	
 
    assert certificate_file.check(file=1)
 

	
 
@@ -206,21 +219,28 @@ def test_server_outputs_certificate_to_file(tmpdir):
 

	
 

	
 
def test_server_errors_out_if_certificate_already_issued(tmpdir):
 
    tmpdir.chdir()
 

	
 
    depth = 1
 

	
 
    tmpdir.chdir()
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    # Previous run.
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.server(tmpdir.strpath, 'myserver', None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None)
 
    existing_private_key = tmpdir.join('.gimmecert', 'server', 'myserver.key.pem').read()
 
    certificate = tmpdir.join('.gimmecert', 'server', 'myserver.cert.pem').read()
 

	
 
    # New run.
 
    status, message = gimmecert.commands.server(tmpdir.strpath, 'myserver', None)
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert status is False
 
    assert "already been issued" in message
 
    assert status_code == gimmecert.commands.ExitCode.ERROR_CERTIFICATE_ALREADY_ISSUED
 
    assert "already been issued" in stderr
 
    assert stdout == ""
 
    assert tmpdir.join('.gimmecert', 'server', 'myserver.key.pem').read() == existing_private_key
 
    assert tmpdir.join('.gimmecert', 'server', 'myserver.cert.pem').read() == certificate
 

	
0 comments (0 inline, 0 general)