Changeset - 70b0277c55dc
[Not reviewed]
0 6 0
Branko Majic (branko) - 4 years ago 2020-06-07 23:09:40
branko@majic.rs
GC-37: Added support for requesting custom RSA key size when issuing server certificates:

- Added functional test.
- Added unit tests.
- Added new CLI option to the server command.
- Updated the server command to use the passed-in key specification.
- Updated existing tests to cope with changes to the server command
function signature.
6 files changed with 106 insertions and 41 deletions:
0 comments (0 inline, 0 general)
functional_tests/test_server.py
Show inline comments
 
@@ -233,6 +233,42 @@ def test_server_command_key_specification(tmpdir):
 
    # He then has a look at the certificate.
 
    stdout, _, _ = run_command('openssl', 'x509', '-noout', '-text', '-in', '.gimmecert/server/myserver1.cert.pem')
 

	
 
    # Likewise with the private key, the certificate is also using the
 
    # 1024-bit RSA key.
 
    assert "Public-Key: (1024 bit)" in stdout
 

	
 
    # At some point John realises that to cover all bases, he needs to
 
    # have a test with a server that uses 2048-bit RSA keys as
 
    # well. He does not want to regenerate all of the X.509 artefacts,
 
    # and would like to instead issues a single 2048-bit RSA key for a
 
    # specific server instead.
 

	
 
    # He starts off by having a look at the help for the server command.
 
    stdout, stderr, exit_code = run_command("gimmecert", "server", "-h")
 

	
 
    # John notices the option for passing-in a key specification.
 
    assert " --key-specification" in stdout
 
    assert " -k" in stdout
 

	
 
    # John goes ahead and tries to issue a server certificate using
 
    # key specification option.
 
    stdout, stderr, exit_code = run_command("gimmecert", "server", "--key-specification", "rsas:2048", "myserver2")
 

	
 
    # Unfortunately, the command fails due to John's typo.
 
    assert exit_code != 0
 
    assert "invalid key_specification" in stderr
 

	
 
    # John tries again, fixing his typo.
 
    stdout, stderr, exit_code = run_command("gimmecert", "server", "--key-specification", "rsa:2048", "myserver2")
 

	
 
    # This time around he succeeds.
 
    assert exit_code == 0
 
    assert stderr == ""
 

	
 
    # He runs a command to see details about the generated private
 
    # key.
 
    stdout, _, _ = run_command('openssl', 'rsa', '-noout', '-text', '-in', '.gimmecert/server/myserver2.key.pem')
 

	
 
    # He nods with his head, observing that the generated private key
 
    # uses the same key size as he has specified.
 
    assert "Private-Key: (2048 bit)" in stdout
gimmecert/cli.py
Show inline comments
 
@@ -142,17 +142,20 @@ def setup_help_subcommand_parser(parser, subparsers):
 
def setup_server_subcommand_parser(parser, subparsers):
 
    subparser = subparsers.add_parser('server', description='Issues server certificate.')
 
    subparser.add_argument('entity_name', help='Name of the server entity.')
 
    subparser.add_argument('dns_name', nargs='*', help='Additional DNS names to include in subject alternative name.')
 
    subparser.add_argument('--csr', '-c', type=str, default=None, help='''Do not generate server private key locally, and use the passed-in \
 
    certificate signing request (CSR) instead. Use dash (-) to read from standard input. Only the public key is taken from the CSR.''')
 
    subparser.add_argument('--key-specification', '-k', type=key_specification,
 
                           help='''Specification/parameters to use for private key generation. \
 
    For RSA keys, use format rsa:BIT_LENGTH. Default is to use same algorithm/parameters as used by CA hierarchy.''', default=None)
 

	
 
    def server_wrapper(args):
 
        project_directory = os.getcwd()
 

	
 
        return server(sys.stdout, sys.stderr, project_directory, args.entity_name, args.dns_name, args.csr)
 
        return server(sys.stdout, sys.stderr, project_directory, args.entity_name, args.dns_name, args.csr, args.key_specification)
 

	
 
    subparser.set_defaults(func=server_wrapper)
 

	
 
    return subparser
 

	
 

	
gimmecert/commands.py
Show inline comments
 
@@ -108,13 +108,13 @@ def init(stdout, stderr, project_directory, ca_base_name, ca_hierarchy_depth, ke
 

	
 
    print("    Full certificate chain: .gimmecert/ca/chain-full.cert.pem", file=stdout)
 

	
 
    return ExitCode.SUCCESS
 

	
 

	
 
def server(stdout, stderr, project_directory, entity_name, extra_dns_names, custom_csr_path):
 
def server(stdout, stderr, project_directory, entity_name, extra_dns_names, custom_csr_path, key_specification):
 
    """
 
    Issues a server certificate using the CA hierarchy initialised
 
    within the specified directory.
 

	
 
    If custom CSR path is not passed-in, a private key will be
 
    generated and stored.
 
@@ -136,14 +136,19 @@ def server(stdout, stderr, project_directory, entity_name, extra_dns_names, cust
 
    :type entity_name: str
 

	
 
    :param extra_dns_names: List of additional DNS names to include in the subject alternative name.
 
    :type extra_dns_names: list[str]
 

	
 
    :param custom_csr_path: Path to custom certificate signing request to use for issuing client certificate. Set to None or "" to generate private key.
 
                            Do not use together with key_specification.
 
    :type custom_csr_path: str or None
 

	
 
    :param key_specification: Key specification to use when generating private keys for the server. Ignored if custom_csr_path is specified. Set to None to
 
                              default to issuing CA hiearchy algorithm and parameters.
 
    :type key_specification: tuple(str, int) or None
 

	
 
    :returns: Status code, one from gimmecert.commands.ExitCode.
 
    :rtype: int
 
    """
 

	
 
    # Set-up some paths for outputting artefacts.
 
    private_key_path = os.path.join(project_directory, '.gimmecert', 'server', '%s.key.pem' % entity_name)
 
@@ -172,13 +177,14 @@ def server(stdout, stderr, project_directory, entity_name, extra_dns_names, cust
 
        private_key = None
 
    elif custom_csr_path:
 
        csr = gimmecert.storage.read_csr(custom_csr_path)
 
        public_key = csr.public_key()
 
        private_key = None
 
    else:
 
        key_specification = gimmecert.crypto.key_specification_from_public_key(issuer_private_key.public_key())
 
        if not key_specification:
 
            key_specification = gimmecert.crypto.key_specification_from_public_key(issuer_private_key.public_key())
 
        key_generator = gimmecert.crypto.KeyGenerator(key_specification[0], key_specification[1])
 
        private_key = key_generator()
 
        public_key = private_key.public_key()
 
        csr = None
 

	
 
    # Issue the certificate.
tests/conftest.py
Show inline comments
 
@@ -145,17 +145,17 @@ def sample_project_directory(tmpdir):
 
    # Initialise one-level deep hierarchy.
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, 1, ("rsa", 2048))
 

	
 
    # Issue a bunch of certificates.
 
    for i in range(1, per_type_count + 1):
 
        entity_name = "server-with-privkey-%d" % i
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, entity_name, None, None)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, entity_name, None, None, None)
 

	
 
        entity_name = "server-with-csr-%d" % i
 
        custom_csr_path = custom_csr_dir.join("server-with-csr-%d.csr.pem" % i).strpath
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, entity_name, None, custom_csr_path)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, entity_name, None, custom_csr_path, None)
 

	
 
        entity_name = "client-with-privkey-%d" % i
 
        gimmecert.commands.client(io.StringIO(), io.StringIO(), tmpdir.strpath, entity_name, None)
 

	
 
        entity_name = "client-with-csr-%d" % i
 
        custom_csr_path = custom_csr_dir.join("client-with-csr-%d.csr.pem" % i).strpath
tests/test_cli.py
Show inline comments
 
@@ -239,12 +239,16 @@ VALID_CLI_INVOCATIONS = [
 
                              "myserver3.example.com", "myserver4.example.com"]),
 

	
 
    # server, CSR long and short option
 
    ("gimmecert.cli.server", ["gimmecert", "server", "--csr", "myserver.csr.pem", "myserver"]),
 
    ("gimmecert.cli.server", ["gimmecert", "server", "-c", "myserver.csr.pem", "myserver"]),
 

	
 
    # server, key specification long and short option
 
    ("gimmecert.cli.server", ["gimmecert", "server", "--key-specification", "rsa:4096", "myserver"]),
 
    ("gimmecert.cli.server", ["gimmecert", "server", "-k", "rsa:1024", "myserver"]),
 

	
 
    # client, no options
 
    ("gimmecert.cli.client", ["gimmecert", "client", "myclient"]),
 

	
 
    # client, CSR long and short option
 
    ("gimmecert.cli.client", ["gimmecert", "client", "--csr", "myclient.csr.pem", "myclient"]),
 
    ("gimmecert.cli.client", ["gimmecert", "client", "-c", "myclient.csr.pem", "myclient"]),
 
@@ -312,12 +316,17 @@ def test_parser_commands_and_options_are_available(tmpdir, command_function, cli
 
# command from CLI. See test documentation for more details.
 
INVALID_CLI_INVOCATIONS = [
 
    # init, invalid key specification
 
    ("gimmecert.cli.init", ["gimmecert", "init", "-k", "rsa"]),
 
    ("gimmecert.cli.init", ["gimmecert", "init", "-k", "rsa:not_a_number"]),
 
    ("gimmecert.cli.init", ["gimmecert", "init", "-k", "unsupported:algorithm"]),
 

	
 
    # server, invalid key specification
 
    ("gimmecert.cli.server", ["gimmecert", "server", "-k", "rsa", "myserver"]),
 
    ("gimmecert.cli.server", ["gimmecert", "server", "-k", "rsa:not_a_number", "myserver"]),
 
    ("gimmecert.cli.server", ["gimmecert", "server", "-k", "unsupported:algorithm", "myserver"]),
 
]
 

	
 

	
 
@pytest.mark.parametrize("command_function, cli_invocation", INVALID_CLI_INVOCATIONS)
 
def test_invalid_parser_commands_and_options_produce_error(tmpdir, command_function, cli_invocation):
 
    """
 
@@ -426,27 +435,27 @@ def test_server_command_invoked_with_correct_parameters_without_extra_dns_names(
 
    tmpdir.chdir()
 

	
 
    mock_server.return_value = gimmecert.commands.ExitCode.SUCCESS
 

	
 
    gimmecert.cli.main()
 

	
 
    mock_server.assert_called_once_with(sys.stdout, sys.stderr, tmpdir.strpath, 'myserver', [], None)
 
    mock_server.assert_called_once_with(sys.stdout, sys.stderr, tmpdir.strpath, 'myserver', [], None, None)
 

	
 

	
 
@mock.patch('sys.argv', ['gimmecert', 'server', 'myserver', 'service.local', 'service.example.com'])
 
@mock.patch('sys.argv', ['gimmecert', 'server', '-k', 'rsa:1024', 'myserver', 'service.local', 'service.example.com'])
 
@mock.patch('gimmecert.cli.server')
 
def test_server_command_invoked_with_correct_parameters_with_extra_dns_names(mock_server, tmpdir):
 
def test_server_command_invoked_with_correct_parameters_with_extra_dns_names_and_key_specification(mock_server, tmpdir):
 
    # This should ensure we don't accidentally create artifacts
 
    # outside of test directory.
 
    tmpdir.chdir()
 

	
 
    mock_server.return_value = gimmecert.commands.ExitCode.SUCCESS
 

	
 
    gimmecert.cli.main()
 

	
 
    mock_server.assert_called_once_with(sys.stdout, sys.stderr, tmpdir.strpath, 'myserver', ['service.local', 'service.example.com'], None)
 
    mock_server.assert_called_once_with(sys.stdout, sys.stderr, tmpdir.strpath, 'myserver', ['service.local', 'service.example.com'], None, ("rsa", 1024))
 

	
 

	
 
@mock.patch('sys.argv', ['gimmecert', 'help'])
 
@mock.patch('gimmecert.cli.help_')
 
def test_help_command_invoked_with_correct_parameters(mock_help_, tmpdir):
 
    # This should ensure we don't accidentally create artifacts
tests/test_commands.py
Show inline comments
 
@@ -116,23 +116,23 @@ def test_init_does_not_overwrite_artifcats_if_already_initialised(tmpdir):
 
    assert level1_private_key_before == level1_private_key_after
 
    assert level1_certificate_before == level1_certificate_after
 
    assert full_chain_before == full_chain_after
 

	
 

	
 
def test_server_returns_status_code(tmpdir):
 
    status_code = gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, None)
 
    status_code = gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, None, None)
 

	
 
    assert isinstance(status_code, int)
 

	
 

	
 
def test_server_reports_error_if_directory_is_not_initialised(tmpdir):
 

	
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, None)
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, None, None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert "must be initialised" in stderr
 
    assert stdout == ""
 
@@ -175,13 +175,13 @@ def test_server_reports_success_and_outputs_correct_information(sample_project_d
 

	
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream,
 
                                            sample_project_directory.strpath, entity_name, None,
 
                                            custom_csr_path)
 
                                            custom_csr_path, None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert status_code == gimmecert.commands.ExitCode.SUCCESS
 
    assert stderr == ""
 
@@ -194,13 +194,13 @@ def test_server_reports_success_and_outputs_correct_information(sample_project_d
 

	
 

	
 
def test_server_outputs_private_key_to_file_without_csr(gctmpdir):
 
    private_key_file = gctmpdir.join('.gimmecert', 'server', 'myserver.key.pem')
 
    csr_file = gctmpdir.join('.gimmecert', 'server', 'myserver.csr.pem')
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None, None)
 

	
 
    assert private_key_file.check(file=1)
 
    assert not csr_file.check()
 

	
 
    private_key_file_content = private_key_file.read()
 

	
 
@@ -209,13 +209,13 @@ def test_server_outputs_private_key_to_file_without_csr(gctmpdir):
 

	
 

	
 
def test_server_outputs_certificate_to_file(gctmpdir):
 
    certificate_file = gctmpdir.join('.gimmecert', 'server', 'myserver.cert.pem')
 
    csr_file = gctmpdir.join('.gimmecert', 'client', 'myclient.csr.pem')
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None, None)
 

	
 
    assert certificate_file.check(file=1)
 
    assert not csr_file.check()
 

	
 
    certificate_file_content = certificate_file.read()
 

	
 
@@ -225,18 +225,18 @@ def test_server_outputs_certificate_to_file(gctmpdir):
 

	
 
def test_server_errors_out_if_certificate_already_issued(gctmpdir):
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    # Previous run.
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None, None)
 
    existing_private_key = gctmpdir.join('.gimmecert', 'server', 'myserver.key.pem').read()
 
    certificate = gctmpdir.join('.gimmecert', 'server', 'myserver.cert.pem').read()
 

	
 
    # New run.
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, gctmpdir.strpath, 'myserver', None, None)
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, gctmpdir.strpath, 'myserver', None, None, None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert status_code == gimmecert.commands.ExitCode.ERROR_CERTIFICATE_ALREADY_ISSUED
 
    assert "already been issued" in stderr
 
@@ -484,13 +484,13 @@ def test_renew_reports_error_if_no_existing_client_certificate_is_present(gctmpd
 

	
 

	
 
def test_renew_reports_success_and_paths_to_server_artifacts(gctmpdir):
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None, None)
 

	
 
    status_code = gimmecert.commands.renew(stdout_stream, stderr_stream, gctmpdir.strpath, 'server', 'myserver', False, None, None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
@@ -521,13 +521,13 @@ def test_renew_reports_success_and_paths_to_client_artifacts(gctmpdir):
 
    assert stderr == ""
 

	
 

	
 
def test_renew_keeps_server_private_key(gctmpdir):
 
    private_key_file = gctmpdir.join('.gimmecert', 'server', 'myserver.key.pem')
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None, None)
 
    private_key_after_issuance = private_key_file.read()
 

	
 
    gimmecert.commands.renew(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'server', 'myserver', False, None, None)
 
    private_key_after_renewal = private_key_file.read()
 

	
 
    assert private_key_after_issuance == private_key_after_renewal
 
@@ -545,13 +545,13 @@ def test_renew_keeps_client_private_key(gctmpdir):
 
    assert private_key_after_issuance == private_key_after_renewal
 

	
 

	
 
def test_renew_replaces_server_certificate(gctmpdir):
 
    certificate_file = gctmpdir.join('.gimmecert', 'server', 'myserver.cert.pem')
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None, None)
 
    certificate_after_issuance = certificate_file.read()
 

	
 
    gimmecert.commands.renew(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'server', 'myserver', False, None, None)
 
    certificate_after_renewal = certificate_file.read()
 

	
 
    assert certificate_after_issuance != certificate_after_renewal
 
@@ -574,13 +574,13 @@ def test_renew_replaces_client_certificate(gctmpdir):
 

	
 

	
 
def test_renew_reports_success_and_paths_to_server_artifacts_with_new_key(gctmpdir):
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None, None)
 

	
 
    status_code = gimmecert.commands.renew(stdout_stream, stderr_stream, gctmpdir.strpath, 'server', 'myserver', True, None, None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
@@ -591,13 +591,13 @@ def test_renew_reports_success_and_paths_to_server_artifacts_with_new_key(gctmpd
 
    assert stderr == ""
 

	
 

	
 
def test_renew_generates_new_private_key_if_requested(gctmpdir):
 
    private_key_file = gctmpdir.join('.gimmecert', 'server', 'myserver.key.pem')
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None, None)
 
    private_key_after_issuance = private_key_file.read()
 

	
 
    gimmecert.commands.renew(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'server', 'myserver', True, None, None)
 
    private_key_after_renewal = private_key_file.read()
 

	
 
    assert private_key_after_issuance != private_key_after_renewal
 
@@ -677,19 +677,19 @@ def test_status_reports_server_certificate_information(tmpdir):
 
    gimmecert.storage.write_csr(myserver3_csr, myserver3_csr_file.strpath)
 

	
 
    with freeze_time('2018-01-01 00:15:00'):
 
        gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, 3, ("rsa", 2048))
 

	
 
    with freeze_time('2018-02-01 00:15:00'):
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver1', None, None)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver1', None, None, None)
 

	
 
    with freeze_time('2018-03-01 00:15:00'):
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver2', ['myservice1.example.com', 'myservice2.example.com'], None)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver2', ['myservice1.example.com', 'myservice2.example.com'], None, None)
 

	
 
    with freeze_time('2018-04-01 00:15:00'):
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver3', None, myserver3_csr_file.strpath)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver3', None, myserver3_csr_file.strpath, None)
 

	
 
    with freeze_time('2018-06-01 00:15:00'):
 
        status_code = gimmecert.commands.status(stdout_stream, stderr_stream, tmpdir.strpath)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stdout_lines = stdout.split("\n")
 
@@ -822,14 +822,14 @@ def test_status_reports_no_client_certificates_were_issued(tmpdir):
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    # Just create some sample data, but no client certificates.
 
    with freeze_time('2018-01-01 00:15:00'):
 
        gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, 1, ("rsa", 2048))
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver1', None, None)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver2', None, None)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver1', None, None, None)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver2', None, None, None)
 

	
 
    status_code = gimmecert.commands.status(stdout_stream, stderr_stream, tmpdir.strpath)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
@@ -861,13 +861,13 @@ def test_certificate_marked_as_not_valid_or_expired_as_appropriate(tmpdir, subje
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    # Perform action on our fixed issuance date.
 
    with freeze_time(issuance_date):
 
        gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, "My Project", 1, ("rsa", 2048))
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, None)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, None, None)
 
        gimmecert.commands.client(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myclient', None)
 

	
 
    # Move to specific date in future/past for different validity checks.
 
    with freeze_time(status_date):
 
        status_code = gimmecert.commands.status(stdout_stream, stderr_stream, tmpdir.strpath)
 

	
 
@@ -968,13 +968,13 @@ def test_server_outputs_passed_in_csr_to_file_without_private_key(gctmpdir):
 

	
 
    private_key = gimmecert.crypto.generate_private_key()
 
    csr = gimmecert.crypto.generate_csr('mycustomcsr', private_key)
 
    gimmecert.storage.write_csr(csr, custom_csr_file.strpath)
 
    custom_csr_file_content = custom_csr_file.read()
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, custom_csr_file.strpath)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, custom_csr_file.strpath, None)
 

	
 
    assert csr_file.check(file=1)
 
    assert not private_key_file.check()
 

	
 
    csr_file_content = csr_file.read()
 

	
 
@@ -986,13 +986,13 @@ def test_server_uses_correct_public_key_but_no_naming_with_csr(gctmpdir):
 
    certificate_file = gctmpdir.join('.gimmecert', 'server', 'myserver.cert.pem')
 

	
 
    private_key = gimmecert.crypto.generate_private_key()
 
    csr = gimmecert.crypto.generate_csr('mycustomcsr', private_key)
 
    gimmecert.storage.write_csr(csr, custom_csr_file.strpath)
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, custom_csr_file.strpath)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, custom_csr_file.strpath, None)
 

	
 
    certificate = gimmecert.storage.read_certificate(certificate_file.strpath)
 

	
 
    csr_public_numbers = csr.public_key().public_numbers()
 
    certificate_public_numbers = certificate.public_key().public_numbers()
 

	
 
@@ -1037,18 +1037,18 @@ def test_server_errors_out_if_certificate_already_issued_with_csr(gctmpdir):
 
    gimmecert.storage.write_csr(csr, custom_csr_file.strpath)
 

	
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    # Previous run.
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, custom_csr_file.strpath)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, custom_csr_file.strpath, None)
 
    existing_csr = gctmpdir.join('.gimmecert', 'server', 'myserver.csr.pem').read()
 
    certificate = gctmpdir.join('.gimmecert', 'server', 'myserver.cert.pem').read()
 

	
 
    # New run.
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, gctmpdir.strpath, 'myserver', None, custom_csr_file.strpath)
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, gctmpdir.strpath, 'myserver', None, custom_csr_file.strpath, None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert status_code == gimmecert.commands.ExitCode.ERROR_CERTIFICATE_ALREADY_ISSUED
 
    assert "already been issued" in stderr
 
@@ -1065,13 +1065,13 @@ def test_renew_reports_success_and_paths_to_server_artifacts_with_csr(gctmpdir):
 
    stderr_stream = io.StringIO()
 

	
 
    private_key = gimmecert.crypto.generate_private_key()
 
    csr = gimmecert.crypto.generate_csr("mytest", private_key)
 
    gimmecert.storage.write_csr(csr, csr_file.strpath)
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, csr_file.strpath)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, csr_file.strpath, None)
 

	
 
    status_code = gimmecert.commands.renew(stdout_stream, stderr_stream, gctmpdir.strpath, 'server', 'myserver', False, None, None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
@@ -1115,13 +1115,13 @@ def test_renew_reports_success_and_paths_to_server_artifacts_with_csr_when_repla
 
    stderr_stream = io.StringIO()
 

	
 
    private_key = gimmecert.crypto.generate_private_key()
 
    csr = gimmecert.crypto.generate_csr("mytest", private_key)
 
    gimmecert.storage.write_csr(csr, csr_file.strpath)
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None, None)
 

	
 
    status_code = gimmecert.commands.renew(stdout_stream, stderr_stream, gctmpdir.strpath, 'server', 'myserver', False, csr_file.strpath, None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
@@ -1143,13 +1143,13 @@ def test_renew_replaces_server_private_key_with_csr(gctmpdir):
 

	
 
    custom_csr_private_key = gimmecert.crypto.generate_private_key()
 
    custom_csr = gimmecert.crypto.generate_csr("mycustom", custom_csr_private_key)
 
    gimmecert.storage.write_csr(custom_csr, custom_csr_file.strpath)
 
    custom_csr_file_content = custom_csr_file.read()
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None, None)
 

	
 
    assert private_key_file.check(file=1)
 

	
 
    gimmecert.commands.renew(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'server', 'myserver', False, custom_csr_file.strpath, None)
 

	
 
    assert csr_file.check(file=1)
 
@@ -1196,13 +1196,13 @@ def test_renew_reports_success_and_paths_to_server_artifacts_with_private_key_wh
 
    stderr_stream = io.StringIO()
 

	
 
    custom_private_key = gimmecert.crypto.generate_private_key()
 
    custom_csr = gimmecert.crypto.generate_csr("mytest", custom_private_key)
 
    gimmecert.storage.write_csr(custom_csr, custom_csr_file.strpath)
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, custom_csr_file.strpath)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, custom_csr_file.strpath, None)
 

	
 
    status_code = gimmecert.commands.renew(stdout_stream, stderr_stream, gctmpdir.strpath, 'server', 'myserver', True, None, None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
@@ -1238,13 +1238,13 @@ def test_renew_reports_success_and_paths_to_artifacts_when_renewing_server_certi
 
    assert stderr == ""
 

	
 

	
 
def test_renew_replaces_dns_names(gctmpdir):
 
    certificate_file = gctmpdir.join(".gimmecert", "server", "myserver.cert.pem")
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', ['myservice1.local', 'myservice2.local'], None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', ['myservice1.local', 'myservice2.local'], None, None)
 

	
 
    old_certificate_pem = certificate_file.read()
 
    old_certificate = gimmecert.storage.read_certificate(certificate_file.strpath)
 
    old_subject_alt_name = old_certificate.extensions.get_extension_for_class(cryptography.x509.SubjectAlternativeName).value
 

	
 
    gimmecert.commands.renew(io.StringIO(), io.StringIO(), gctmpdir.strpath,
 
@@ -1263,13 +1263,13 @@ def test_renew_replaces_dns_names(gctmpdir):
 
    assert new_subject_alt_name == expected_subject_alt_name
 

	
 

	
 
def test_renew_removes_dns_names(gctmpdir):
 
    certificate_file = gctmpdir.join(".gimmecert", "server", "myserver.cert.pem")
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', ['myservice1.local', 'myservice2.local'], None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', ['myservice1.local', 'myservice2.local'], None, None)
 

	
 
    old_certificate_pem = certificate_file.read()
 
    old_certificate = gimmecert.storage.read_certificate(certificate_file.strpath)
 
    old_subject_alt_name = old_certificate.extensions.get_extension_for_class(cryptography.x509.SubjectAlternativeName).value
 

	
 
    gimmecert.commands.renew(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'server', 'myserver', False, None, [])
 
@@ -1293,13 +1293,13 @@ def test_renew_replaces_server_csr_with_private_key(gctmpdir):
 
    private_key_file = gctmpdir.join(".gimmecert", "server", "myserver.key.pem")
 

	
 
    custom_csr_private_key = gimmecert.crypto.generate_private_key()
 
    custom_csr = gimmecert.crypto.generate_csr("mycustom", custom_csr_private_key)
 
    gimmecert.storage.write_csr(custom_csr, custom_csr_file.strpath)
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, custom_csr_file.strpath)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, custom_csr_file.strpath, None)
 

	
 
    assert csr_file.check(file=1)
 

	
 
    gimmecert.commands.renew(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'server', 'myserver', True, None, None)
 

	
 
    assert private_key_file.check(file=1)
 
@@ -1322,13 +1322,13 @@ def test_server_reads_csr_from_stdin(mock_read_input, sample_project_directory,
 
    # Mock our util for reading input from user.
 
    mock_read_input.return_value = key_with_csr.csr_pem
 

	
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, sample_project_directory.strpath, entity_name, None, '-')
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, sample_project_directory.strpath, entity_name, None, '-', None)
 
    assert status_code == 0
 

	
 
    # Read stored/generated artefacts.
 
    stored_csr = gimmecert.storage.read_csr(stored_csr_file.strpath)
 
    certificate = gimmecert.storage.read_certificate(certificate_file.strpath)
 

	
 
@@ -1375,13 +1375,13 @@ def test_client_reads_csr_from_stdin(mock_read_input, sample_project_directory,
 
def test_renew_server_reads_csr_from_stdin(mock_read_input, sample_project_directory, key_with_csr):
 
    entity_name = 'myserver'
 
    stored_csr_file = sample_project_directory.join('.gimmecert', 'server', '%s.csr.pem' % entity_name)
 
    certificate_file = sample_project_directory.join('.gimmecert', 'server', '%s.cert.pem' % entity_name)
 

	
 
    # Generate server certificate that will be renewed.
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), sample_project_directory.strpath, entity_name, None, None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), sample_project_directory.strpath, entity_name, None, None, None)
 

	
 
    # Mock our util for reading input from user.
 
    mock_read_input.return_value = key_with_csr.csr_pem
 

	
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 
@@ -1437,11 +1437,22 @@ def test_renew_client_reads_csr_from_stdin(mock_read_input, sample_project_direc
 

	
 
def test_server_uses_same_private_key_algorithm_and_parameters_as_issuer_when_generating_private_key(tmpdir):
 

	
 
    private_key_file = tmpdir.join('.gimmecert', 'server', 'myserver.key.pem')
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, 1, ("rsa", 1024))
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, None)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, None, None)
 

	
 
    private_key = gimmecert.storage.read_private_key(private_key_file.strpath)
 

	
 
    assert private_key.key_size == 1024
 

	
 

	
 
def test_server_uses_passed_in_private_key_algorithm_and_parameters_when_generating_private_key(gctmpdir):
 

	
 
    private_key_file = gctmpdir.join('.gimmecert', 'server', 'myserver.key.pem')
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), gctmpdir.strpath, 'myserver', None, None, ("rsa", 1024))
 

	
 
    private_key = gimmecert.storage.read_private_key(private_key_file.strpath)
 

	
 
    assert private_key.key_size == 1024
0 comments (0 inline, 0 general)