Changeset - 45c0a6805fac
[Not reviewed]
0 5 0
Branko Majic (branko) - 6 years ago 2018-04-11 23:24:23
branko@majic.rs
GC-22: Implemented issuance of server certificates using passed-in CSR:

- Added functional test.
- Expanded server command to accept path to custom CSR file and handle
it appropriatelly.
- Updated existing unit tests to fix command server invocation.
- Added new unit tests.
5 files changed with 318 insertions and 36 deletions:
0 comments (0 inline, 0 general)
functional_tests/test_csr.py
Show inline comments
 
@@ -95,3 +95,45 @@ def test_client_certificate_issuance_by_passing_csr_as_file(tmpdir):
 

	
 
    # To his delight, they are identical.
 
    assert certificate_public_key == public_key
 

	
 

	
 
def test_server_certificate_issuance_by_passing_csr_as_file(tmpdir):
 
    # John is working on a project where he has already generated
 
    # server private key.
 
    tmpdir.chdir()
 
    run_command("openssl", "genrsa", "-out", "myserver1.key.pem", "2048")
 

	
 
    # However, he still needs to have a CA as a trustpoint, so he goes
 
    # ahead and initialises Gimmecert for this purpose.
 
    run_command("gimmecert", "init")
 

	
 
    # Before issuing the certificate, he goes ahead and generates a
 
    # CSR for the server private key
 
    run_command("openssl", "req", "-new", "-key", "myserver1.key.pem", "-subj", "/CN=myserver1", "-out", "myserver1.csr.pem")
 

	
 
    # John issues server certificate using CSR.
 
    stdout, stderr, exit_code = run_command("gimmecert", "server", "--csr", "myserver1.csr.pem", "myserver1")
 

	
 
    # The operation is successful, and he is presented with
 
    # information about generated artefacts.
 
    assert exit_code == 0
 
    assert stderr == ""
 
    assert ".gimmecert/server/myserver1.cert.pem" in stdout
 
    assert ".gimmecert/server/myserver1.csr.pem" in stdout
 

	
 
    # John also notices that there is no mention of a private key.
 
    assert ".gimmecert/server/myserver1.key.pem" not in stdout
 

	
 
    # John notices that the content of stored CSR is identical to the
 
    # one he provided.
 
    original_csr = tmpdir.join("myserver1.csr.pem").read()
 
    stored_csr = tmpdir.join(".gimmecert", "server", "myserver1.csr.pem").read()
 
    assert original_csr == stored_csr
 

	
 
    # John then quickly has a look at the public key associated with
 
    # the private key, and public key stored in certificate.
 
    public_key, _, _ = run_command("openssl", "rsa", "-pubout", "-in", "myserver1.key.pem")
 
    certificate_public_key, _, _ = run_command("openssl", "x509", "-pubkey", "-noout", "-in", ".gimmecert/server/myserver1.cert.pem")
 

	
 
    # To his delight, they are identical.
 
    assert certificate_public_key == public_key
gimmecert/cli.py
Show inline comments
 
@@ -114,7 +114,7 @@ def setup_server_subcommand_parser(parser, subparsers):
 
    def server_wrapper(args):
 
        project_directory = os.getcwd()
 

	
 
        return server(sys.stdout, sys.stderr, project_directory, args.entity_name, args.dns_name, args.update_dns_names)
 
        return server(sys.stdout, sys.stderr, project_directory, args.entity_name, args.dns_name, args.update_dns_names, args.csr)
 

	
 
    subparser.set_defaults(func=server_wrapper)
 

	
gimmecert/commands.py
Show inline comments
 
@@ -98,10 +98,17 @@ def init(stdout, stderr, project_directory, ca_base_name, ca_hierarchy_depth):
 
    return ExitCode.SUCCESS
 

	
 

	
 
def server(stdout, stderr, project_directory, entity_name, extra_dns_names, update_dns_names):
 
def server(stdout, stderr, project_directory, entity_name, extra_dns_names, update_dns_names, custom_csr_path):
 
    """
 
    Generates a server private key and issues a server certificate
 
    using the CA hierarchy initialised within the specified directory.
 
    Issues a server certificate using the CA hierarchy initialised
 
    within the specified directory.
 

	
 
    If custom CSR path is not passed-in, a private key will be
 
    generated and stored.
 

	
 
    If custom CSR is passed-in, no private key will be generated, and
 
    the CSR will be stored instead. Only the public key will be used
 
    from the CSR - no naming information is taken from it.
 

	
 
    :param stdout: Output stream where the informative messages should be written-out.
 
    :type stdout: io.IOBase
 
@@ -121,6 +128,9 @@ def server(stdout, stderr, project_directory, entity_name, extra_dns_names, upda
 
    :param update_dns_names: Whether the certificate should be renewed using the existing private key, but with new DNS subject alternative names.
 
    :type update: bool
 

	
 
    :param custom_csr_path: Path to custom certificate signing request to use for issuing client certificate. Set to None or "" to generate private key.
 
    :type custom_csr_path: str or None
 

	
 
    :returns: Status code, one from gimmecert.commands.ExitCode.
 
    :rtype: int
 
    """
 
@@ -128,6 +138,7 @@ def server(stdout, stderr, project_directory, entity_name, extra_dns_names, upda
 
    # Set-up some paths for outputting artefacts.
 
    private_key_path = os.path.join(project_directory, '.gimmecert', 'server', '%s.key.pem' % entity_name)
 
    certificate_path = os.path.join(project_directory, '.gimmecert', 'server', '%s.cert.pem' % entity_name)
 
    csr_path = os.path.join(project_directory, '.gimmecert', 'server', '%s.csr.pem' % entity_name)
 

	
 
    # Ensure hierarchy is initialised.
 
    if not gimmecert.storage.is_initialised(project_directory):
 
@@ -138,21 +149,33 @@ def server(stdout, stderr, project_directory, entity_name, extra_dns_names, upda
 
    # names has been requested.
 
    if not update_dns_names and (
 
            os.path.exists(private_key_path) or
 
            os.path.exists(certificate_path)
 
            os.path.exists(certificate_path) or
 
            os.path.exists(csr_path)
 
    ):
 
        print("Refusing to overwrite existing data. Certificate has already been issued for server %s." % entity_name, file=stderr)
 
        return ExitCode.ERROR_CERTIFICATE_ALREADY_ISSUED
 

	
 
    # Read or generate the private key.
 
    # Grab the private key or CSR, and extract public key.
 
    if update_dns_names and os.path.exists(private_key_path):
 
        renew_certificate = True
 
        private_key = gimmecert.storage.read_private_key(private_key_path)
 
        public_key = private_key.public_key()
 
        csr = None
 
    elif update_dns_names and os.path.exists(csr_path):
 
        renew_certificate = True
 
        csr = gimmecert.storage.read_csr(csr_path)
 
        public_key = csr.public_key()
 
        private_key = None
 
    elif custom_csr_path:
 
        renew_certificate = False
 
        csr = gimmecert.storage.read_csr(custom_csr_path)
 
        public_key = csr.public_key()
 
        private_key = None
 
    else:
 
        renew_certificate = False
 
        private_key = gimmecert.crypto.generate_private_key()
 

	
 
    # Extract the public key.
 
    public_key = private_key.public_key()
 
        public_key = private_key.public_key()
 
        csr = None
 

	
 
    # Grab the issuing CA private key and certificate.
 
    ca_hierarchy = gimmecert.storage.read_ca_hierarchy(os.path.join(project_directory, '.gimmecert', 'ca'))
 
@@ -161,17 +184,30 @@ def server(stdout, stderr, project_directory, entity_name, extra_dns_names, upda
 
    # Issue the certificate.
 
    certificate = gimmecert.crypto.issue_server_certificate(entity_name, public_key, issuer_private_key, issuer_certificate, extra_dns_names)
 

	
 
    gimmecert.storage.write_private_key(private_key, private_key_path)
 
    # Output CSR or private key depending on what has been passed-in.
 
    if csr:
 
        gimmecert.storage.write_csr(csr, csr_path)
 
    else:
 
        gimmecert.storage.write_private_key(private_key, private_key_path)
 

	
 
    gimmecert.storage.write_certificate(certificate, certificate_path)
 

	
 
    # Show user information about generated artefacts.
 
    if renew_certificate:
 
        print("Server certificate renewed with new DNS subject alternative names.", file=stdout)
 
        print("Server private key has remained unchanged.", file=stdout)
 

	
 
        if csr:
 
            print("Server CSR has remained unchanged.", file=stdout)
 
        else:
 
            print("Server private key has remained unchanged.", file=stdout)
 
    else:
 
        print("Server certificate issued.", file=stdout)
 

	
 
    print("Server private key: .gimmecert/server/%s.key.pem" % entity_name, file=stdout)
 
    if csr:
 
        print("Server CSR: .gimmecert/server/%s.csr.pem" % entity_name, file=stdout)
 
    else:
 
        print("Server private key: .gimmecert/server/%s.key.pem" % entity_name, file=stdout)
 

	
 
    print("Server certificate: .gimmecert/server/%s.cert.pem" % entity_name, file=stdout)
 

	
 
    return ExitCode.SUCCESS
tests/test_cli.py
Show inline comments
 
@@ -377,7 +377,7 @@ def test_server_command_invoked_with_correct_parameters_without_extra_dns_names(
 

	
 
    gimmecert.cli.main()
 

	
 
    mock_server.assert_called_once_with(sys.stdout, sys.stderr, tmpdir.strpath, 'myserver', [], False)
 
    mock_server.assert_called_once_with(sys.stdout, sys.stderr, tmpdir.strpath, 'myserver', [], False, None)
 

	
 

	
 
@mock.patch('sys.argv', ['gimmecert', 'server', 'myserver', 'service.local', 'service.example.com'])
 
@@ -391,7 +391,7 @@ def test_server_command_invoked_with_correct_parameters_with_extra_dns_names(moc
 

	
 
    gimmecert.cli.main()
 

	
 
    mock_server.assert_called_once_with(sys.stdout, sys.stderr, tmpdir.strpath, 'myserver', ['service.local', 'service.example.com'], False)
 
    mock_server.assert_called_once_with(sys.stdout, sys.stderr, tmpdir.strpath, 'myserver', ['service.local', 'service.example.com'], False, None)
 

	
 

	
 
@mock.patch('sys.argv', ['gimmecert', 'help'])
 
@@ -509,7 +509,7 @@ def test_server_command_invoked_with_correct_parameters_with_update_option(mock_
 

	
 
    gimmecert.cli.main()
 

	
 
    mock_server.assert_called_once_with(sys.stdout, sys.stderr, tmpdir.strpath, 'myserver', ['service.local'], True)
 
    mock_server.assert_called_once_with(sys.stdout, sys.stderr, tmpdir.strpath, 'myserver', ['service.local'], True, None)
 

	
 

	
 
@mock.patch('sys.argv', ['gimmecert', 'renew'])
tests/test_commands.py
Show inline comments
 
@@ -129,7 +129,7 @@ def test_init_does_not_overwrite_artifcats_if_already_initialised(tmpdir):
 

	
 

	
 
def test_server_returns_status_code(tmpdir):
 
    status_code = gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False)
 
    status_code = gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False, None)
 

	
 
    assert isinstance(status_code, int)
 

	
 
@@ -139,7 +139,7 @@ def test_server_reports_error_if_directory_is_not_initialised(tmpdir):
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, False)
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, False, None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 
@@ -157,7 +157,7 @@ def test_server_reports_success_and_paths_to_generated_artifacts(tmpdir):
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, False)
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, False, None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 
@@ -165,18 +165,21 @@ def test_server_reports_success_and_paths_to_generated_artifacts(tmpdir):
 
    assert status_code == gimmecert.commands.ExitCode.SUCCESS
 
    assert ".gimmecert/server/myserver.key.pem" in stdout
 
    assert ".gimmecert/server/myserver.cert.pem" in stdout
 
    assert ".gimmecert/server/myserver.csr.pem" not in stdout
 
    assert stderr == ""
 

	
 

	
 
def test_server_outputs_private_key_to_file(tmpdir):
 
def test_server_outputs_private_key_to_file_without_csr(tmpdir):
 
    depth = 1
 
    private_key_file = tmpdir.join('.gimmecert', 'server', 'myserver.key.pem')
 
    csr_file = tmpdir.join('.gimmecert', 'server', 'myserver.csr.pem')
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False, None)
 

	
 
    assert private_key_file.check(file=1)
 
    assert not csr_file.check()
 

	
 
    private_key_file_content = private_key_file.read()
 

	
 
@@ -187,12 +190,14 @@ def test_server_outputs_private_key_to_file(tmpdir):
 
def test_server_outputs_certificate_to_file(tmpdir):
 
    depth = 1
 
    certificate_file = tmpdir.join('.gimmecert', 'server', 'myserver.cert.pem')
 
    csr_file = tmpdir.join('.gimmecert', 'client', 'myclient.csr.pem')
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False, None)
 

	
 
    assert certificate_file.check(file=1)
 
    assert not csr_file.check()
 

	
 
    certificate_file_content = certificate_file.read()
 

	
 
@@ -208,12 +213,12 @@ def test_server_errors_out_if_certificate_already_issued(tmpdir):
 

	
 
    # Previous run.
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False, None)
 
    existing_private_key = tmpdir.join('.gimmecert', 'server', 'myserver.key.pem').read()
 
    certificate = tmpdir.join('.gimmecert', 'server', 'myserver.cert.pem').read()
 

	
 
    # New run.
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, False)
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, False, None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 
@@ -431,12 +436,12 @@ def test_server_reports_success_if_certificate_already_issued_but_update_was_req
 

	
 
    # Previous run.
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False, None)
 
    existing_private_key = tmpdir.join('.gimmecert', 'server', 'myserver.key.pem').read()
 
    certificate = tmpdir.join('.gimmecert', 'server', 'myserver.cert.pem').read()
 

	
 
    # New run.
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, True)
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, True, None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 
@@ -444,6 +449,7 @@ def test_server_reports_success_if_certificate_already_issued_but_update_was_req
 
    assert status_code == gimmecert.commands.ExitCode.SUCCESS
 
    assert ".gimmecert/server/myserver.key.pem" in stdout
 
    assert ".gimmecert/server/myserver.cert.pem" in stdout
 
    assert ".gimmecert/server/myserver.csr.pem" not in stdout
 
    assert "renewed with new DNS subject alternative names" in stdout
 
    assert "key has remained unchanged" in stdout
 
    assert stderr == ""
 
@@ -459,7 +465,7 @@ def test_server_reports_success_if_certificate_not_already_issued_but_update_was
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, True)
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, True, None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 
@@ -467,6 +473,7 @@ def test_server_reports_success_if_certificate_not_already_issued_but_update_was
 
    assert status_code == gimmecert.commands.ExitCode.SUCCESS
 
    assert ".gimmecert/server/myserver.key.pem" in stdout
 
    assert ".gimmecert/server/myserver.cert.pem" in stdout
 
    assert ".gimmecert/server/myserver.csr.pem" not in stdout
 
    assert stderr == ""
 

	
 

	
 
@@ -536,7 +543,7 @@ def test_renew_reports_success_and_paths_to_server_artifacts(tmpdir):
 
    stderr_stream = io.StringIO()
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False, None)
 

	
 
    status_code = gimmecert.commands.renew(stdout_stream, stderr_stream, tmpdir.strpath, 'server', 'myserver', False)
 

	
 
@@ -577,7 +584,7 @@ def test_renew_keeps_server_private_key(tmpdir):
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False, None)
 
    private_key_after_issuance = private_key_file.read()
 

	
 
    gimmecert.commands.renew(io.StringIO(), io.StringIO(), tmpdir.strpath, 'server', 'myserver', False)
 
@@ -607,7 +614,7 @@ def test_renew_replaces_server_certificate(tmpdir):
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False, None)
 
    certificate_after_issuance = certificate_file.read()
 

	
 
    gimmecert.commands.renew(io.StringIO(), io.StringIO(), tmpdir.strpath, 'server', 'myserver', False)
 
@@ -642,7 +649,7 @@ def test_renew_reports_success_and_paths_to_server_artifacts_with_new_key(tmpdir
 
    stderr_stream = io.StringIO()
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False, None)
 

	
 
    status_code = gimmecert.commands.renew(stdout_stream, stderr_stream, tmpdir.strpath, 'server', 'myserver', True)
 

	
 
@@ -662,7 +669,7 @@ def test_renew_generates_new_private_key_if_requested(tmpdir):
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False, None)
 
    private_key_after_issuance = private_key_file.read()
 

	
 
    gimmecert.commands.renew(io.StringIO(), io.StringIO(), tmpdir.strpath, 'server', 'myserver', True)
 
@@ -746,10 +753,10 @@ def test_status_reports_server_certificate_information(tmpdir):
 
        gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    with freeze_time('2018-02-01 00:15:00'):
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver1', None, False)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver1', None, False, None)
 

	
 
    with freeze_time('2018-03-01 00:15:00'):
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver2', ['myservice1.example.com', 'myservice2.example.com'], False)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver2', ['myservice1.example.com', 'myservice2.example.com'], False, None)
 

	
 
    status_code = gimmecert.commands.status(stdout_stream, stderr_stream, tmpdir.strpath)
 

	
 
@@ -864,8 +871,8 @@ def test_status_reports_no_client_certificates_were_issued(tmpdir):
 
    # Just create some sample data, but no client certificates.
 
    with freeze_time('2018-01-01 00:15:00'):
 
        gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver1', None, False)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver2', None, False)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver1', None, False, None)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver2', None, False, None)
 

	
 
    status_code = gimmecert.commands.status(stdout_stream, stderr_stream, tmpdir.strpath)
 

	
 
@@ -905,7 +912,7 @@ def test_certificate_marked_as_not_valid_or_expired_as_appropriate(tmpdir, subje
 
    # Perform action on our fixed issuance date.
 
    with freeze_time(issuance_date):
 
        gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, "My Project", depth)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False)
 
        gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False, None)
 
        gimmecert.commands.client(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myclient', None)
 

	
 
    # Move to specific date in future/past for different validity checks.
 
@@ -1015,3 +1022,200 @@ def test_client_uses_correct_public_key_but_no_naming_with_csr(tmpdir):
 

	
 
    assert certificate_public_numbers == csr_public_numbers
 
    assert csr.subject != certificate.subject
 

	
 

	
 
def test_server_reports_success_and_paths_to_generated_artifacts_with_csr(tmpdir):
 
    depth = 1
 
    custom_csr_file = tmpdir.join('mycustom.csr.pem')
 

	
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    private_key = gimmecert.crypto.generate_private_key()
 
    custom_csr = gimmecert.crypto.generate_csr('blah', private_key)
 
    gimmecert.storage.write_csr(custom_csr, custom_csr_file.strpath)
 

	
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, False, custom_csr_file.strpath)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert status_code == gimmecert.commands.ExitCode.SUCCESS
 
    assert ".gimmecert/server/myserver.csr.pem" in stdout
 
    assert ".gimmecert/server/myserver.cert.pem" in stdout
 
    assert ".gimmecert/server/myserver.key.pem" not in stdout
 
    assert stderr == ""
 

	
 

	
 
def test_server_outputs_passed_in_csr_to_file_without_private_key(tmpdir):
 
    depth = 1
 

	
 
    private_key_file = tmpdir.join('.gimmecert', 'server', 'myserver.key.pem')
 
    csr_file = tmpdir.join('.gimmecert', 'server', 'myserver.csr.pem')
 
    custom_csr_file = tmpdir.join('mycustom.csr.pem')
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    private_key = gimmecert.crypto.generate_private_key()
 
    csr = gimmecert.crypto.generate_csr('mycustomcsr', private_key)
 
    gimmecert.storage.write_csr(csr, custom_csr_file.strpath)
 
    custom_csr_file_content = custom_csr_file.read()
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False, custom_csr_file.strpath)
 

	
 
    assert csr_file.check(file=1)
 
    assert not private_key_file.check()
 

	
 
    csr_file_content = csr_file.read()
 

	
 
    assert csr_file_content == custom_csr_file_content
 

	
 

	
 
def test_server_uses_correct_public_key_but_no_naming_with_csr(tmpdir):
 
    depth = 1
 

	
 
    custom_csr_file = tmpdir.join('customcsr.pem')
 
    certificate_file = tmpdir.join('.gimmecert', 'server', 'myserver.cert.pem')
 

	
 
    private_key = gimmecert.crypto.generate_private_key()
 
    csr = gimmecert.crypto.generate_csr('mycustomcsr', private_key)
 
    gimmecert.storage.write_csr(csr, custom_csr_file.strpath)
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False, custom_csr_file.strpath)
 

	
 
    certificate = gimmecert.storage.read_certificate(certificate_file.strpath)
 

	
 
    csr_public_numbers = csr.public_key().public_numbers()
 
    certificate_public_numbers = certificate.public_key().public_numbers()
 

	
 
    assert certificate_public_numbers == csr_public_numbers
 
    assert csr.subject != certificate.subject
 

	
 

	
 
def test_server_reports_success_if_certificate_already_issued_but_update_was_requested_with_csr(tmpdir):
 
    depth = 1
 

	
 
    custom_csr_file = tmpdir.join('customcsr.pem')
 

	
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    private_key = gimmecert.crypto.generate_private_key()
 
    csr = gimmecert.crypto.generate_csr('mycustomcsr', private_key)
 
    gimmecert.storage.write_csr(csr, custom_csr_file.strpath)
 

	
 
    # Previous run.
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False, custom_csr_file.strpath)
 
    existing_csr = tmpdir.join('.gimmecert', 'server', 'myserver.csr.pem').read()
 
    certificate = tmpdir.join('.gimmecert', 'server', 'myserver.cert.pem').read()
 

	
 
    # New run.
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, True, None)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert status_code == gimmecert.commands.ExitCode.SUCCESS
 
    assert ".gimmecert/server/myserver.csr.pem" in stdout
 
    assert ".gimmecert/server/myserver.cert.pem" in stdout
 
    assert ".gimmecert/server/myserver.key.pem" not in stdout
 
    assert "renewed with new DNS subject alternative names" in stdout
 
    assert "CSR has remained unchanged" in stdout
 
    assert stderr == ""
 
    assert tmpdir.join('.gimmecert', 'server', 'myserver.csr.pem').read() == existing_csr
 
    assert tmpdir.join('.gimmecert', 'server', 'myserver.cert.pem').read() != certificate
 

	
 

	
 
def test_server_reports_success_if_certificate_not_already_issued_but_update_was_requested_with_csr(tmpdir):
 
    depth = 1
 

	
 
    custom_csr_file = tmpdir.join('customcsr.pem')
 

	
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    private_key = gimmecert.crypto.generate_private_key()
 
    csr = gimmecert.crypto.generate_csr('mycustomcsr', private_key)
 
    gimmecert.storage.write_csr(csr, custom_csr_file.strpath)
 

	
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, True, custom_csr_file.strpath)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert status_code == gimmecert.commands.ExitCode.SUCCESS
 
    assert ".gimmecert/server/myserver.csr.pem" in stdout
 
    assert ".gimmecert/server/myserver.cert.pem" in stdout
 
    assert ".gimmecert/server/myserver.key.pem" not in stdout
 
    assert stderr == ""
 

	
 

	
 
def test_client_errors_out_if_certificate_already_issued_with_csr(tmpdir):
 
    depth = 1
 

	
 
    custom_csr_file = tmpdir.join('mycustom.csr.pem')
 

	
 
    private_key = gimmecert.crypto.generate_private_key()
 
    csr = gimmecert.crypto.generate_csr('mycustomcsr', private_key)
 
    gimmecert.storage.write_csr(csr, custom_csr_file.strpath)
 

	
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    # Previous run.
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.client(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myclient', custom_csr_file.strpath)
 
    existing_csr = tmpdir.join('.gimmecert', 'client', 'myclient.csr.pem').read()
 
    certificate = tmpdir.join('.gimmecert', 'client', 'myclient.cert.pem').read()
 

	
 
    # New run.
 
    status_code = gimmecert.commands.client(stdout_stream, stderr_stream, tmpdir.strpath, 'myclient', custom_csr_file.strpath)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert status_code == gimmecert.commands.ExitCode.ERROR_CERTIFICATE_ALREADY_ISSUED
 
    assert "already been issued" in stderr
 
    assert "client myclient" in stderr
 
    assert stdout == ""
 
    assert tmpdir.join('.gimmecert', 'client', 'myclient.csr.pem').read() == existing_csr
 
    assert tmpdir.join('.gimmecert', 'client', 'myclient.cert.pem').read() == certificate
 

	
 

	
 
def test_server_errors_out_if_certificate_already_issued_with_csr(tmpdir):
 
    depth = 1
 

	
 
    custom_csr_file = tmpdir.join('mycustom.csr.pem')
 

	
 
    private_key = gimmecert.crypto.generate_private_key()
 
    csr = gimmecert.crypto.generate_csr('mycustomcsr', private_key)
 
    gimmecert.storage.write_csr(csr, custom_csr_file.strpath)
 

	
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    # Previous run.
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None, False, custom_csr_file.strpath)
 
    existing_csr = tmpdir.join('.gimmecert', 'server', 'myserver.csr.pem').read()
 
    certificate = tmpdir.join('.gimmecert', 'server', 'myserver.cert.pem').read()
 

	
 
    # New run.
 
    status_code = gimmecert.commands.server(stdout_stream, stderr_stream, tmpdir.strpath, 'myserver', None, False, custom_csr_file.strpath)
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert status_code == gimmecert.commands.ExitCode.ERROR_CERTIFICATE_ALREADY_ISSUED
 
    assert "already been issued" in stderr
 
    assert "server myserver" in stderr
 
    assert stdout == ""
 
    assert tmpdir.join('.gimmecert', 'server', 'myserver.csr.pem').read() == existing_csr
 
    assert tmpdir.join('.gimmecert', 'server', 'myserver.cert.pem').read() == certificate
0 comments (0 inline, 0 general)