Changeset - 84ba90064e85
[Not reviewed]
0 5 0
Branko Majic (branko) - 6 years ago 2018-03-06 16:54:17
branko@majic.rs
GC-17: Refactored init command to handle output and return exit code:

- Introduced a holder class for command exit codes.
- Moved output for the cli module to commands module.
- Implemented new tests for verifying the command output.
- Updated existing tests for verifying return value of command output.
- Updated existing code and tests to use the new signature for init
command.
5 files changed with 117 insertions and 38 deletions:
0 comments (0 inline, 0 general)
gimmecert/cli.py
Show inline comments
 
@@ -24,7 +24,7 @@ import os
 
import sys
 

	
 
from .decorators import subcommand_parser, get_subcommand_parser_setup_functions
 
from .commands import init, server
 
from .commands import init, server, ExitCode
 

	
 

	
 
ERROR_GENERIC = 10
 
@@ -61,15 +61,9 @@ def setup_init_subcommand_parser(parser, subparsers):
 
        if args.ca_base_name is None:
 
            args.ca_base_name = os.path.basename(project_directory)
 

	
 
        if init(project_directory, args.ca_base_name, args.ca_hierarchy_depth):
 
            print("CA hierarchy initialised. Generated artefacts:")
 
            for level in range(1, args.ca_hierarchy_depth+1):
 
                print("    CA Level %d private key: .gimmecert/ca/level%d.key.pem" % (level, level))
 
                print("    CA Level %d certificate: .gimmecert/ca/level%d.cert.pem" % (level, level))
 
            print("    Full certificate chain: .gimmecert/ca/chain-full.cert.pem")
 
        else:
 
            print("CA hierarchy has already been initialised.", file=sys.stderr)
 
            exit(ERROR_GENERIC)
 
        status_code = init(sys.stdout, sys.stderr, project_directory, args.ca_base_name, args.ca_hierarchy_depth)
 
        if status_code != ExitCode.SUCCESS:
 
            exit(status_code)
 

	
 
    subparser.set_defaults(func=init_wrapper)
 

	
gimmecert/commands.py
Show inline comments
 
@@ -24,11 +24,26 @@ import gimmecert.crypto
 
import gimmecert.storage
 

	
 

	
 
def init(project_directory, ca_base_name, ca_hierarchy_depth):
 
class ExitCode:
 
    """
 
    Convenience class for storing exit codes in central location.
 
    """
 

	
 
    SUCCESS = 0
 
    ERROR_ALREADY_INITIALISED = 10
 

	
 

	
 
def init(stdout, stderr, project_directory, ca_base_name, ca_hierarchy_depth):
 
    """
 
    Initialises the necessary directory and CA hierarchies for use in
 
    the specified directory.
 

	
 
    :param stdout: Output stream where the informative messages should be written-out.
 
    :type stdout: io.IOBase
 

	
 
    :param stderr: Output stream where the error messages should be written-out.
 
    :type stderr: io.IOBase
 

	
 
    :param project_directory: Path to directory where the structure should be initialised. Should be top-level project directory normally.
 
    :type project_directory: str
 

	
 
@@ -38,8 +53,8 @@ def init(project_directory, ca_base_name, ca_hierarchy_depth):
 
    :param ca_hierarchy_depth: Length/depths of CA hierarchy that should be initialised. E.g. total number of CAs in chain.
 
    :type ca_hierarchy_depth: int
 

	
 
    :returns: False, if directory has been initialised in previous run, True if project has been initialised in this run.
 
    :rtype: bool
 
    :returns: Status code, one from gimmecert.commands.ExitCode.
 
    :rtype: int
 
    """
 

	
 
    # Set-up various paths.
 
@@ -47,7 +62,8 @@ def init(project_directory, ca_base_name, ca_hierarchy_depth):
 
    ca_directory = os.path.join(base_directory, 'ca')
 

	
 
    if os.path.exists(base_directory):
 
        return False
 
        print("CA hierarchy has already been initialised.", file=stderr)
 
        return ExitCode.ERROR_ALREADY_INITIALISED
 

	
 
    # Initialise the directory.
 
    gimmecert.storage.initialise_storage(project_directory)
 
@@ -67,7 +83,13 @@ def init(project_directory, ca_base_name, ca_hierarchy_depth):
 
    full_chain_path = os.path.join(ca_directory, 'chain-full.cert.pem')
 
    gimmecert.storage.write_certificate_chain(full_chain, full_chain_path)
 

	
 
    return True
 
    print("CA hierarchy initialised. Generated artefacts:", file=stdout)
 
    for level in range(1, ca_hierarchy_depth+1):
 
        print("    CA Level %d private key: .gimmecert/ca/level%d.key.pem" % (level, level), file=stdout)
 
        print("    CA Level %d certificate: .gimmecert/ca/level%d.cert.pem" % (level, level), file=stdout)
 
        print("    Full certificate chain: .gimmecert/ca/chain-full.cert.pem", file=stdout)
 

	
 
    return ExitCode.SUCCESS
 

	
 

	
 
def server(project_directory, entity_name, extra_dns_names):
tests/test_cli.py
Show inline comments
 
@@ -20,6 +20,8 @@
 

	
 

	
 
import argparse
 
import io
 
import sys
 

	
 
import gimmecert.cli
 
import gimmecert.decorators
 
@@ -164,30 +166,35 @@ def test_setup_init_subcommand_sets_function_callback():
 
@mock.patch('sys.argv', ['gimmecert', 'init'])
 
@mock.patch('gimmecert.cli.init')
 
def test_init_command_invoked_with_correct_parameters_no_options(mock_init, tmpdir):
 
    mock_init.return_value = gimmecert.commands.ExitCode.SUCCESS
 

	
 
    default_depth = 1
 

	
 
    tmpdir.chdir()
 

	
 
    gimmecert.cli.main()
 

	
 
    mock_init.assert_called_once_with(tmpdir.strpath, tmpdir.basename, default_depth)
 
    mock_init.assert_called_once_with(sys.stdout, sys.stderr, tmpdir.strpath, tmpdir.basename, default_depth)
 

	
 

	
 
@mock.patch('sys.argv', ['gimmecert', 'init', '-b', 'My Project'])
 
@mock.patch('gimmecert.cli.init')
 
def test_init_command_invoked_with_correct_parameters_with_options(mock_init, tmpdir):
 
    mock_init.return_value = gimmecert.commands.ExitCode.SUCCESS
 

	
 
    default_depth = 1
 

	
 
    tmpdir.chdir()
 

	
 
    gimmecert.cli.main()
 

	
 
    mock_init.assert_called_once_with(tmpdir.strpath, 'My Project', default_depth)
 
    mock_init.assert_called_once_with(sys.stdout, sys.stderr, tmpdir.strpath, 'My Project', default_depth)
 

	
 

	
 
@mock.patch('sys.argv', ['gimmecert', 'init', '--ca-base-name', 'My Project'])
 
@mock.patch('gimmecert.cli.init')
 
def test_init_command_accepts_ca_base_name_option_long_form(mock_init):
 
    mock_init.return_value = gimmecert.commands.ExitCode.SUCCESS
 

	
 
    gimmecert.cli.main()  # Should not raise
 

	
 
@@ -195,6 +202,7 @@ def test_init_command_accepts_ca_base_name_option_long_form(mock_init):
 
@mock.patch('sys.argv', ['gimmecert', 'init', '-b', 'My Project'])
 
@mock.patch('gimmecert.cli.init')
 
def test_init_command_accepts_ca_base_name_option_short_form(mock_init):
 
    mock_init.return_value = gimmecert.commands.ExitCode.SUCCESS
 

	
 
    gimmecert.cli.main()  # Should not raise
 

	
 
@@ -202,6 +210,7 @@ def test_init_command_accepts_ca_base_name_option_short_form(mock_init):
 
@mock.patch('sys.argv', ['gimmecert', 'init', '--ca-hierarchy-depth', '3'])
 
@mock.patch('gimmecert.cli.init')
 
def test_init_command_accepts_ca_hierarchy_depth_option_long_form(mock_init):
 
    mock_init.return_value = gimmecert.commands.ExitCode.SUCCESS
 

	
 
    gimmecert.cli.main()  # Should not raise
 

	
 
@@ -209,6 +218,7 @@ def test_init_command_accepts_ca_hierarchy_depth_option_long_form(mock_init):
 
@mock.patch('sys.argv', ['gimmecert', 'init', '-d', '3'])
 
@mock.patch('gimmecert.cli.init')
 
def test_init_command_accepts_ca_hierarchy_depth_option_short_form(mock_init):
 
    mock_init.return_value = gimmecert.commands.ExitCode.SUCCESS
 

	
 
    gimmecert.cli.main()  # Should not raise
 

	
 
@@ -216,7 +226,7 @@ def test_init_command_accepts_ca_hierarchy_depth_option_short_form(mock_init):
 
@mock.patch('sys.argv', ['gimmecert', 'init'])
 
def test_init_command_exists_with_error_if_hierarchy_already_initialised(tmpdir):
 
    tmpdir.chdir()
 
    gimmecert.commands.init(tmpdir.strpath, tmpdir.basename, 1)
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, 1)
 

	
 
    with pytest.raises(SystemExit) as e_info:
 
        gimmecert.cli.main()
tests/test_commands.py
Show inline comments
 
@@ -18,6 +18,7 @@
 
# Gimmecert.  If not, see <http://www.gnu.org/licenses/>.
 
#
 

	
 
import io
 
import os
 

	
 
import gimmecert.commands
 
@@ -31,7 +32,7 @@ def test_init_sets_up_directory_structure(tmpdir):
 

	
 
    tmpdir.chdir()
 

	
 
    gimmecert.commands.init(tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    assert os.path.exists(base_dir.strpath)
 
    assert os.path.exists(ca_dir.strpath)
 
@@ -43,7 +44,7 @@ def test_init_generates_single_ca_artifact_for_depth_1(tmpdir):
 

	
 
    tmpdir.chdir()
 

	
 
    gimmecert.commands.init(tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    assert os.path.exists(tmpdir.join('.gimmecert', 'ca', 'level1.key.pem').strpath)
 
    assert os.path.exists(tmpdir.join('.gimmecert', 'ca', 'level1.cert.pem').strpath)
 
@@ -55,7 +56,7 @@ def test_init_generates_three_ca_artifacts_for_depth_3(tmpdir):
 

	
 
    tmpdir.chdir()
 

	
 
    gimmecert.commands.init(tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    assert os.path.exists(tmpdir.join('.gimmecert', 'ca', 'level1.key.pem').strpath)
 
    assert os.path.exists(tmpdir.join('.gimmecert', 'ca', 'level1.cert.pem').strpath)
 
@@ -71,7 +72,7 @@ def test_init_outputs_full_chain_for_depth_1(tmpdir):
 

	
 
    tmpdir.chdir()
 

	
 
    gimmecert.commands.init(tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    level1_certificate = tmpdir.join('.gimmecert', 'ca', 'level1.cert.pem').read()
 
    full_chain = tmpdir.join('.gimmecert', 'ca', 'chain-full.cert.pem').read()
 
@@ -84,7 +85,7 @@ def test_init_outputs_full_chain_for_depth_3(tmpdir):
 

	
 
    tmpdir.chdir()
 

	
 
    gimmecert.commands.init(tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    level1_certificate = tmpdir.join('.gimmecert', 'ca', 'level1.cert.pem').read()
 
    level2_certificate = tmpdir.join('.gimmecert', 'ca', 'level2.cert.pem').read()
 
@@ -96,25 +97,25 @@ def test_init_outputs_full_chain_for_depth_3(tmpdir):
 
    assert full_chain == "%s\n%s\n%s" % (level1_certificate, level2_certificate, level3_certificate)
 

	
 

	
 
def test_init_returns_true_if_directory_has_not_been_previously_initialised(tmpdir):
 
def test_init_returns_success_if_directory_has_not_been_previously_initialised(tmpdir):
 
    depth = 1
 

	
 
    tmpdir.chdir()
 

	
 
    initialised = gimmecert.commands.init(tmpdir.strpath, tmpdir.basename, depth)
 
    status_code = gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    assert initialised is True
 
    assert status_code == gimmecert.commands.ExitCode.SUCCESS
 

	
 

	
 
def test_init_returns_false_if_directory_has_been_previously_initialised(tmpdir):
 
def test_init_returns_error_code_if_directory_has_been_previously_initialised(tmpdir):
 
    depth = 1
 

	
 
    tmpdir.chdir()
 

	
 
    gimmecert.commands.init(tmpdir.strpath, tmpdir.basename, depth)
 
    initialised = gimmecert.commands.init(tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 
    status_code = gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    assert initialised is False
 
    assert status_code == gimmecert.commands.ExitCode.ERROR_ALREADY_INITIALISED
 

	
 

	
 
def test_init_does_not_overwrite_artifcats_if_already_initialised(tmpdir):
 
@@ -122,13 +123,13 @@ def test_init_does_not_overwrite_artifcats_if_already_initialised(tmpdir):
 

	
 
    tmpdir.chdir()
 

	
 
    gimmecert.commands.init(tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    level1_private_key_before = tmpdir.join('.gimmecert', 'ca', 'level1.key.pem').read()
 
    level1_certificate_before = tmpdir.join('.gimmecert', 'ca', 'level1.cert.pem').read()
 
    full_chain_before = tmpdir.join('.gimmecert', 'ca', 'chain-full.cert.pem').read()
 

	
 
    gimmecert.commands.init(tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    level1_private_key_after = tmpdir.join('.gimmecert', 'ca', 'level1.key.pem').read()
 
    level1_certificate_after = tmpdir.join('.gimmecert', 'ca', 'level1.cert.pem').read()
 
@@ -161,7 +162,7 @@ def test_server_reports_paths_to_generated_artifacts(tmpdir):
 
    depth = 1
 

	
 
    tmpdir.chdir()
 
    gimmecert.commands.init(tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    status, message = gimmecert.commands.server(tmpdir.strpath, 'myserver', None)
 

	
 
@@ -175,7 +176,7 @@ def test_server_outputs_private_key_to_file(tmpdir):
 
    private_key_file = tmpdir.join('.gimmecert', 'server', 'myserver.key.pem')
 

	
 
    tmpdir.chdir()
 
    gimmecert.commands.init(tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    gimmecert.commands.server(tmpdir.strpath, 'myserver', None)
 

	
 
@@ -192,7 +193,7 @@ def test_server_outputs_certificate_to_file(tmpdir):
 
    certificate_file = tmpdir.join('.gimmecert', 'server', 'myserver.cert.pem')
 

	
 
    tmpdir.chdir()
 
    gimmecert.commands.init(tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    gimmecert.commands.server(tmpdir.strpath, 'myserver', None)
 

	
 
@@ -210,7 +211,7 @@ def test_server_errors_out_if_certificate_already_issued(tmpdir):
 
    tmpdir.chdir()
 

	
 
    # Previous run.
 
    gimmecert.commands.init(tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.server(tmpdir.strpath, 'myserver', None)
 
    existing_private_key = tmpdir.join('.gimmecert', 'server', 'myserver.key.pem').read()
 
    certificate = tmpdir.join('.gimmecert', 'server', 'myserver.cert.pem').read()
 
@@ -222,3 +223,54 @@ def test_server_errors_out_if_certificate_already_issued(tmpdir):
 
    assert "already been issued" in message
 
    assert tmpdir.join('.gimmecert', 'server', 'myserver.key.pem').read() == existing_private_key
 
    assert tmpdir.join('.gimmecert', 'server', 'myserver.cert.pem').read() == certificate
 

	
 

	
 
def test_init_command_stdout_and_stderr_for_single_ca(tmpdir):
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    gimmecert.commands.init(stdout_stream, stderr_stream, tmpdir.strpath, "myproject", 1)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert stderr == ""
 
    assert "CA hierarchy initialised" in stdout
 
    assert ".gimmecert/ca/level1.cert.pem" in stdout
 
    assert ".gimmecert/ca/level1.key.pem" in stdout
 
    assert ".gimmecert/ca/chain-full.cert.pem" in stdout
 

	
 

	
 
def test_init_command_stdout_and_stderr_for_multiple_cas(tmpdir):
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    gimmecert.commands.init(stdout_stream, stderr_stream, tmpdir.strpath, "myproject", 3)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert stderr == ""
 
    assert "CA hierarchy initialised" in stdout
 
    assert ".gimmecert/ca/level1.cert.pem" in stdout
 
    assert ".gimmecert/ca/level1.key.pem" in stdout
 
    assert ".gimmecert/ca/level2.cert.pem" in stdout
 
    assert ".gimmecert/ca/level2.key.pem" in stdout
 
    assert ".gimmecert/ca/level3.cert.pem" in stdout
 
    assert ".gimmecert/ca/level3.key.pem" in stdout
 
    assert ".gimmecert/ca/chain-full.cert.pem" in stdout
 

	
 

	
 
def test_init_command_stdout_and_stderr_if_hierarchy_already_initialised(tmpdir):
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, "myproject", 1)
 

	
 
    gimmecert.commands.init(stdout_stream, stderr_stream, tmpdir.strpath, "myproject", 1)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert "CA hierarchy has already been initialised" in stderr
 
    assert stdout == ""
tests/test_storage.py
Show inline comments
 
@@ -19,6 +19,7 @@
 
#
 

	
 
import os
 
import io
 

	
 
import cryptography
 

	
 
@@ -104,7 +105,7 @@ def test_is_initialised_returns_false_if_directory_is_not_initialised(tmpdir):
 

	
 
def test_read_ca_hierarchy_returns_list_of_ca_private_key_and_certificate_pairs_for_single_ca(tmpdir):
 
    tmpdir.chdir()
 
    gimmecert.commands.init(tmpdir.strpath, 'My Project', 1)
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, 'My Project', 1)
 

	
 
    ca_hierarchy = gimmecert.storage.read_ca_hierarchy(tmpdir.join('.gimmecert', 'ca').strpath)
 

	
 
@@ -144,7 +145,7 @@ def test_read_certificate_returns_certificate(tmpdir):
 

	
 
def test_read_ca_hierarchy_returns_list_of_ca_private_key_and_certificate_pairs_in_hierarchy_order_for_multiple_cas(tmpdir):
 
    tmpdir.chdir()
 
    gimmecert.commands.init(tmpdir.strpath, 'My Project', 4)
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, 'My Project', 4)
 

	
 
    ca_hierarchy = gimmecert.storage.read_ca_hierarchy(tmpdir.join('.gimmecert', 'ca').strpath)
 

	
0 comments (0 inline, 0 general)