Changeset - 861fc9c9d668
[Not reviewed]
0 5 0
Branko Majic (branko) - 6 years ago 2018-03-25 12:18:47
branko@majic.rs
GC-18: Added functional renew command implementation:

- Added functional test covering the use of renew command for server
and client certificates.
- Added new crypto function for renewing a certificate based on
existing certificate, issuer private key, and issuer certificate.
- Fixed use of incorrect output stream in one of the existing tests
for the renew command.
- Added unit tests covering new functionality.
5 files changed with 345 insertions and 2 deletions:
0 comments (0 inline, 0 general)
functional_tests/test_renew.py
Show inline comments
 
@@ -101,3 +101,108 @@ def test_renew_command_reports_error_if_entity_does_not_exist(tmpdir):
 
    assert exit_code != 0
 
    assert stdout == ''
 
    assert stderr == "Cannot renew certificate. No existing certificate found for client myclient.\n"
 

	
 

	
 
def test_renew_command_renews_certificate(tmpdir):
 
    # At the end of his wits, John finally finds the correct project
 
    # directory where he has previuosly set-up the CA hierarchy and
 
    # issued a couple of certificates.
 
    tmpdir.chdir()
 
    run_command("gimmecert", "init")
 
    run_command("gimmecert", "server", "myserver", "myserver.local")
 
    run_command("gimmecert", "client", "myclient")
 

	
 
    # He fetches some information about the existing certificates.
 
    old_server_private_key = tmpdir.join(".gimmecert", "server", "myserver.key.pem").read()
 
    old_server_issuer_dn, _, _ = run_command('openssl', 'x509', '-noout', '-issuer', '-in', '.gimmecert/server/myserver.cert.pem')
 
    old_server_subject_dn, _, _ = run_command('openssl', 'x509', '-noout', '-subject', '-in', '.gimmecert/server/myserver.cert.pem')
 
    old_server_public_key, _, _ = run_command('openssl', 'x509', '-noout', '-pubkey', '-in', '.gimmecert/server/myserver.cert.pem')
 
    old_server_certificate_info, _, _ = run_command('openssl', 'x509', '-noout', '-text', '-in', '.gimmecert/server/myserver.cert.pem')
 
    old_server_issuer_dn = old_server_issuer_dn.replace('issuer=', '', 1).rstrip().replace(' /CN=', 'CN = ', 1)  # OpenSSL 1.0 vs 1.1 formatting
 
    old_server_subject_dn = old_server_subject_dn.replace('subject=', '', 1).rstrip().replace(' /CN=', 'CN = ', 1)  # OpenSSL 1.0 vs 1.1 formatting
 

	
 
    old_client_private_key = tmpdir.join(".gimmecert", "client", "myclient.key.pem").read()
 
    old_client_issuer_dn, _, _ = run_command('openssl', 'x509', '-noout', '-issuer', '-in', '.gimmecert/client/myclient.cert.pem')
 
    old_client_subject_dn, _, _ = run_command('openssl', 'x509', '-noout', '-subject', '-in', '.gimmecert/client/myclient.cert.pem')
 
    old_client_public_key, _, _ = run_command('openssl', 'x509', '-noout', '-pubkey', '-in', '.gimmecert/client/myclient.cert.pem')
 
    old_client_certificate_info, _, _ = run_command('openssl', 'x509', '-noout', '-text', '-in', '.gimmecert/client/myclient.cert.pem')
 
    old_client_issuer_dn = old_client_issuer_dn.replace('issuer=', '', 1).rstrip().replace(' /CN=', 'CN = ', 1)  # OpenSSL 1.0 vs 1.1 formatting
 
    old_client_subject_dn = old_client_subject_dn.replace('subject=', '', 1).rstrip().replace(' /CN=', 'CN = ', 1)  # OpenSSL 1.0 vs 1.1 formatting
 

	
 
    # He runs the renewal command for server certificate.
 
    stdout, stderr, exit_code = run_command('gimmecert', 'renew', 'server', 'myserver')
 

	
 
    # No errors are reported, and he is presented with a nice
 
    # informative message about certificate being renewed, as well as
 
    # paths to artifacts.
 
    assert exit_code == 0
 
    assert stderr == ""
 
    assert "Renewed certificate for server myserver." in stdout
 
    assert ".gimmecert/server/myserver.key.pem" in stdout
 
    assert ".gimmecert/server/myserver.cert.pem" in stdout
 

	
 
    # He does the same for the client certificate.
 
    stdout, stderr, exit_code = run_command('gimmecert', 'renew', 'client', 'myclient')
 

	
 
    # No errors are reported, and he is presented with a nice
 
    # informative message about certificate being renewed, as well as
 
    # paths to artifacts.
 
    assert exit_code == 0
 
    assert stderr == ""
 
    assert "Renewed certificate for client myclient." in stdout
 
    assert ".gimmecert/client/myclient.key.pem" in stdout
 
    assert ".gimmecert/client/myclient.cert.pem" in stdout
 

	
 
    # John has a look at generated certificates.
 
    new_server_private_key = tmpdir.join(".gimmecert", "server", "myserver.key.pem").read()
 
    new_server_issuer_dn, _, _ = run_command('openssl', 'x509', '-noout', '-issuer', '-in', '.gimmecert/server/myserver.cert.pem')
 
    new_server_subject_dn, _, _ = run_command('openssl', 'x509', '-noout', '-subject', '-in', '.gimmecert/server/myserver.cert.pem')
 
    new_server_public_key, _, _ = run_command('openssl', 'x509', '-noout', '-pubkey', '-in', '.gimmecert/server/myserver.cert.pem')
 
    new_server_certificate_info, _, _ = run_command('openssl', 'x509', '-noout', '-text', '-in', '.gimmecert/server/myserver.cert.pem')
 
    new_server_issuer_dn = new_server_issuer_dn.replace('issuer=', '', 1).rstrip().replace(' /CN=', 'CN = ', 1)  # OpenSSL 1.0 vs 1.1 formatting
 
    new_server_subject_dn = new_server_subject_dn.replace('subject=', '', 1).rstrip().replace(' /CN=', 'CN = ', 1)  # OpenSSL 1.0 vs 1.1 formatting
 

	
 
    new_client_private_key = tmpdir.join(".gimmecert", "client", "myclient.key.pem").read()
 
    new_client_issuer_dn, _, _ = run_command('openssl', 'x509', '-noout', '-issuer', '-in', '.gimmecert/client/myclient.cert.pem')
 
    new_client_subject_dn, _, _ = run_command('openssl', 'x509', '-noout', '-subject', '-in', '.gimmecert/client/myclient.cert.pem')
 
    new_client_public_key, _, _ = run_command('openssl', 'x509', '-noout', '-pubkey', '-in', '.gimmecert/client/myclient.cert.pem')
 
    new_client_certificate_info, _, _ = run_command('openssl', 'x509', '-noout', '-text', '-in', '.gimmecert/client/myclient.cert.pem')
 
    new_client_issuer_dn = new_client_issuer_dn.replace('issuer=', '', 1).rstrip().replace(' /CN=', 'CN = ', 1)  # OpenSSL 1.0 vs 1.1 formatting
 
    new_client_subject_dn = new_client_subject_dn.replace('subject=', '', 1).rstrip().replace(' /CN=', 'CN = ', 1)  # OpenSSL 1.0 vs 1.1 formatting
 

	
 
    # John compares the values from old certificates and new
 
    # certificates. To his delight, the same private key and naming
 
    # have been reused, but the certificates have definitively been
 
    # replaced.
 
    assert new_server_private_key == old_server_private_key
 
    assert new_server_issuer_dn == old_server_issuer_dn
 
    assert new_server_subject_dn == old_server_subject_dn
 
    assert new_server_public_key == old_server_public_key
 
    assert "DNS:myserver, DNS:myserver.local\n" in new_server_certificate_info
 
    assert new_server_certificate_info != old_server_certificate_info
 

	
 
    assert new_client_private_key == old_client_private_key
 
    assert new_client_issuer_dn == old_client_issuer_dn
 
    assert new_client_subject_dn == old_client_subject_dn
 
    assert new_client_public_key == old_client_public_key
 
    assert new_client_certificate_info != old_client_certificate_info
 

	
 
    # Finally, he runs a check to ensure the certificates can be
 
    # verified using the CA certificate chain.
 
    _, _, verify_server_error_code = run_command(
 
        "openssl", "verify",
 
        "-CAfile",
 
        ".gimmecert/ca/chain-full.cert.pem",
 
        ".gimmecert/server/myserver.cert.pem"
 
    )
 

	
 
    _, _, verify_client_error_code = run_command(
 
        "openssl", "verify",
 
        "-CAfile",
 
        ".gimmecert/ca/chain-full.cert.pem",
 
        ".gimmecert/server/myserver.cert.pem"
 
    )
 

	
 
    # He is happy to see that verification succeeds.
 
    assert verify_server_error_code == 0
 
    assert verify_client_error_code == 0
gimmecert/commands.py
Show inline comments
 
@@ -267,4 +267,19 @@ def renew(stdout, stderr, project_directory, entity_type, entity_name):
 

	
 
        return ExitCode.ERROR_UNKNOWN_ENTITY
 

	
 
    ca_hierarchy = gimmecert.storage.read_ca_hierarchy(os.path.join(project_directory, '.gimmecert', 'ca'))
 
    issuer_private_key, issuer_certificate = ca_hierarchy[-1]
 

	
 
    old_certificate = gimmecert.storage.read_certificate(certificate_path)
 

	
 
    certificate = gimmecert.crypto.renew_certificate(old_certificate, issuer_private_key, issuer_certificate)
 
    gimmecert.storage.write_certificate(certificate, certificate_path)
 

	
 
    print("Renewed certificate for %s %s.\n" % (entity_type, entity_name), file=stdout)
 
    print("""{entity_type_titled} private key: .gimmecert/{entity_type}/{entity_name}.key.pem\n
 
    {entity_type_titled} certificate: .gimmecert/{entity_type}/{entity_name}.cert.pem""".format(entity_type_titled=entity_type.title(),
 
                                                                                                entity_type=entity_type,
 
                                                                                                entity_name=entity_name),
 
          file=stdout)
 

	
 
    return ExitCode.SUCCESS
gimmecert/crypto.py
Show inline comments
 
@@ -302,3 +302,41 @@ def issue_client_certificate(name, public_key, issuer_private_key, issuer_certif
 
    certificate = issue_certificate(issuer_certificate.issuer, dn, issuer_private_key, public_key, not_before, not_after, extensions)
 

	
 
    return certificate
 

	
 

	
 
def renew_certificate(old_certificate, issuer_private_key, issuer_certificate):
 
    """
 
    Renews an existing certificate, while preserving issuer and
 
    subject DNs, as well as public key and all extensions from the old
 
    certificate.
 

	
 
    :param old_certificate: Previously issued certificate.
 
    :type old_certificate: cryptography.x509.Certificate
 

	
 
    :param issuer_private_key: Private key of the issuer to use for signing the certificate structure.
 
    :type issuer_private_key: cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey
 

	
 
    :param issuer_certificate: Certificate of certificate issuer. Naming and validity constraints will be applied based on its content.
 
    :type issuer_certificate: cryptography.x509.Certificate
 

	
 
    :returns: New certificate, which preserves naming, extensions, and public key of the old one.
 
    :rtype: cryptography.x509.Certificate
 
    """
 

	
 
    not_before, not_after = get_validity_range()
 

	
 
    if not_before < issuer_certificate.not_valid_before:
 
        not_before = issuer_certificate.not_valid_before
 

	
 
    if not_after > issuer_certificate.not_valid_after:
 
        not_after = issuer_certificate.not_valid_after
 

	
 
    new_certificate = issue_certificate(issuer_certificate.subject,
 
                                        old_certificate.subject,
 
                                        issuer_private_key,
 
                                        old_certificate.public_key(),
 
                                        not_before,
 
                                        not_after,
 
                                        [(e.value, e.critical) for e in old_certificate.extensions])
 

	
 
    return new_certificate
tests/test_commands.py
Show inline comments
 
@@ -494,7 +494,7 @@ def test_renew_reports_error_if_no_existing_server_certificate_is_present(tmpdir
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    status_code = gimmecert.commands.renew(stderr_stream, stderr_stream, tmpdir.strpath, 'server', 'myserver')
 
    status_code = gimmecert.commands.renew(stdout_stream, stderr_stream, tmpdir.strpath, 'server', 'myserver')
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 
@@ -512,7 +512,7 @@ def test_renew_reports_error_if_no_existing_client_certificate_is_present(tmpdir
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    status_code = gimmecert.commands.renew(stderr_stream, stderr_stream, tmpdir.strpath, 'client', 'myclient')
 
    status_code = gimmecert.commands.renew(stdout_stream, stderr_stream, tmpdir.strpath, 'client', 'myclient')
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 
@@ -521,3 +521,109 @@ def test_renew_reports_error_if_no_existing_client_certificate_is_present(tmpdir
 
    assert "Cannot renew certificate" in stderr
 
    assert "client myclient" in stderr
 
    assert stdout == ""
 

	
 

	
 
def test_renew_reports_success_and_paths_to_server_artifacts(tmpdir):
 
    depth = 1
 

	
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None)
 

	
 
    status_code = gimmecert.commands.renew(stdout_stream, stderr_stream, tmpdir.strpath, 'server', 'myserver')
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert status_code == gimmecert.commands.ExitCode.SUCCESS
 
    assert "Renewed certificate for server myserver." in stdout
 
    assert ".gimmecert/server/myserver.key.pem" in stdout
 
    assert ".gimmecert/server/myserver.cert.pem" in stdout
 
    assert stderr == ""
 

	
 

	
 
def test_renew_reports_success_and_paths_to_client_artifacts(tmpdir):
 
    depth = 1
 

	
 
    stdout_stream = io.StringIO()
 
    stderr_stream = io.StringIO()
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 
    gimmecert.commands.client(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myclient')
 

	
 
    status_code = gimmecert.commands.renew(stdout_stream, stderr_stream, tmpdir.strpath, 'client', 'myclient')
 

	
 
    stdout = stdout_stream.getvalue()
 
    stderr = stderr_stream.getvalue()
 

	
 
    assert status_code == gimmecert.commands.ExitCode.SUCCESS
 
    assert "Renewed certificate for client myclient." in stdout
 
    assert ".gimmecert/client/myclient.key.pem" in stdout
 
    assert ".gimmecert/client/myclient.cert.pem" in stdout
 
    assert stderr == ""
 

	
 

	
 
def test_renew_keeps_server_private_key(tmpdir):
 
    depth = 1
 
    private_key_file = tmpdir.join('.gimmecert', 'server', 'myserver.key.pem')
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None)
 
    private_key_after_issuance = private_key_file.read()
 

	
 
    gimmecert.commands.renew(io.StringIO(), io.StringIO(), tmpdir.strpath, 'server', 'myserver')
 
    private_key_after_renewal = private_key_file.read()
 

	
 
    assert private_key_after_issuance == private_key_after_renewal
 

	
 

	
 
def test_renew_keeps_client_private_key(tmpdir):
 
    depth = 1
 
    private_key_file = tmpdir.join('.gimmecert', 'client', 'myclient.key.pem')
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    gimmecert.commands.client(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myclient')
 
    private_key_after_issuance = private_key_file.read()
 

	
 
    gimmecert.commands.renew(io.StringIO(), io.StringIO(), tmpdir.strpath, 'client', 'myclient')
 
    private_key_after_renewal = private_key_file.read()
 

	
 
    assert private_key_after_issuance == private_key_after_renewal
 

	
 

	
 
def test_renew_replaces_server_certificate(tmpdir):
 
    depth = 1
 
    certificate_file = tmpdir.join('.gimmecert', 'server', 'myserver.cert.pem')
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    gimmecert.commands.server(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myserver', None)
 
    certificate_after_issuance = certificate_file.read()
 

	
 
    gimmecert.commands.renew(io.StringIO(), io.StringIO(), tmpdir.strpath, 'server', 'myserver')
 
    certificate_after_renewal = certificate_file.read()
 

	
 
    assert certificate_after_issuance != certificate_after_renewal
 
    assert certificate_after_renewal.startswith('-----BEGIN CERTIFICATE')
 
    assert certificate_after_renewal.endswith('END CERTIFICATE-----\n')
 

	
 

	
 
def test_renew_replaces_client_certificate(tmpdir):
 
    depth = 1
 
    certificate_file = tmpdir.join('.gimmecert', 'client', 'myclient.cert.pem')
 

	
 
    gimmecert.commands.init(io.StringIO(), io.StringIO(), tmpdir.strpath, tmpdir.basename, depth)
 

	
 
    gimmecert.commands.client(io.StringIO(), io.StringIO(), tmpdir.strpath, 'myclient')
 
    certificate_after_issuance = certificate_file.read()
 

	
 
    gimmecert.commands.renew(io.StringIO(), io.StringIO(), tmpdir.strpath, 'client', 'myclient')
 
    certificate_after_renewal = certificate_file.read()
 

	
 
    assert certificate_after_issuance != certificate_after_renewal
 
    assert certificate_after_renewal.startswith('-----BEGIN CERTIFICATE')
 
    assert certificate_after_renewal.endswith('END CERTIFICATE-----\n')
tests/test_crypto.py
Show inline comments
 
@@ -509,3 +509,82 @@ def test_issue_client_certificate_not_after_does_not_exceed_ca_validity():
 
        certificate1 = gimmecert.crypto.issue_client_certificate('myclient', private_key.public_key(), issuer_private_key, issuer_certificate)
 

	
 
    assert certificate1.not_valid_after == issuer_certificate.not_valid_after
 

	
 

	
 
def test_renew_certificate_returns_certificate():
 
    ca_hierarchy = gimmecert.crypto.generate_ca_hierarchy('My Project', 1)
 
    issuer_private_key, issuer_certificate = ca_hierarchy[0]
 

	
 
    private_key = gimmecert.crypto.generate_private_key()
 
    old_certificate = gimmecert.crypto.issue_server_certificate('myserver', private_key.public_key(), issuer_private_key, issuer_certificate)
 

	
 
    new_certificate = gimmecert.crypto.renew_certificate(old_certificate, issuer_private_key, issuer_certificate)
 

	
 
    assert isinstance(new_certificate, cryptography.x509.Certificate)
 

	
 

	
 
def test_renew_certificate_has_correct_content():
 
    ca_hierarchy = gimmecert.crypto.generate_ca_hierarchy('My Project', 1)
 
    issuer_private_key, issuer_certificate = ca_hierarchy[0]
 

	
 
    private_key = gimmecert.crypto.generate_private_key()
 
    old_certificate = gimmecert.crypto.issue_server_certificate('myserver', private_key.public_key(), issuer_private_key, issuer_certificate)
 

	
 
    new_certificate = gimmecert.crypto.renew_certificate(old_certificate, issuer_private_key, issuer_certificate)
 

	
 
    assert old_certificate != new_certificate  # make sure we didn't get identical certificate.
 
    assert old_certificate.issuer == new_certificate.issuer
 
    assert old_certificate.subject == new_certificate.subject
 
    assert old_certificate.public_key().public_numbers() == new_certificate.public_key().public_numbers()
 
    assert [e for e in old_certificate.extensions] == [e for e in new_certificate.extensions]
 

	
 

	
 
def test_renew_certificate_not_before_is_15_minutes_in_past():
 

	
 
    # Initial server certificate.
 
    with freeze_time('2018-01-01 00:15:00'):
 
        ca_hierarchy = gimmecert.crypto.generate_ca_hierarchy('My Project', 1)
 
        issuer_private_key, issuer_certificate = ca_hierarchy[0]
 

	
 
        private_key = gimmecert.crypto.generate_private_key()
 
        old_certificate = gimmecert.crypto.issue_server_certificate('myserver', private_key.public_key(), issuer_private_key, issuer_certificate)
 

	
 
    # Renew certificate.
 
    with freeze_time('2018-06-01 00:15:00'):
 
        certificate = gimmecert.crypto.renew_certificate(old_certificate, issuer_private_key, issuer_certificate)
 

	
 
    assert certificate.not_valid_before == datetime.datetime(2018, 6, 1, 0, 0)
 

	
 

	
 
def test_renew_certificate_not_before_does_not_exceed_ca_validity():
 

	
 
    # Initial server certificate.
 
    with freeze_time('2018-01-01 00:15:00'):
 
        ca_hierarchy = gimmecert.crypto.generate_ca_hierarchy('My Project', 1)
 
        issuer_private_key, issuer_certificate = ca_hierarchy[0]
 

	
 
        private_key = gimmecert.crypto.generate_private_key()
 
        old_certificate = gimmecert.crypto.issue_server_certificate('myserver', private_key.public_key(), issuer_private_key, issuer_certificate)
 

	
 
    # Renew certificate.
 
    with freeze_time(issuer_certificate.not_valid_before - datetime.timedelta(seconds=1)):
 
        certificate = gimmecert.crypto.renew_certificate(old_certificate, issuer_private_key, issuer_certificate)
 

	
 
    assert certificate.not_valid_before == issuer_certificate.not_valid_before
 

	
 

	
 
def test_renew_certificate_not_after_does_not_exceed_ca_validity():
 

	
 
    # Initial server certificate.
 
    with freeze_time('2018-01-01 00:15:00'):
 
        ca_hierarchy = gimmecert.crypto.generate_ca_hierarchy('My Project', 1)
 
        issuer_private_key, issuer_certificate = ca_hierarchy[0]
 

	
 
        private_key = gimmecert.crypto.generate_private_key()
 
        old_certificate = gimmecert.crypto.issue_server_certificate('myserver', private_key.public_key(), issuer_private_key, issuer_certificate)
 

	
 
    # Renew certificate.
 
    with freeze_time(issuer_certificate.not_valid_after + datetime.timedelta(seconds=1)):
 
        certificate = gimmecert.crypto.renew_certificate(old_certificate, issuer_private_key, issuer_certificate)
 

	
 
    assert certificate.not_valid_after == issuer_certificate.not_valid_after
0 comments (0 inline, 0 general)