|
| 1 | +import datetime |
| 2 | +import json |
1 | 3 | import socket |
2 | 4 | import sys |
3 | 5 | from contextlib import contextmanager |
|
7 | 9 | import pytest |
8 | 10 | from cryptography import x509 |
9 | 11 | from cryptography.hazmat.backends import default_backend |
| 12 | +from cryptography.hazmat.primitives import hashes, serialization |
| 13 | +from cryptography.hazmat.primitives.asymmetric import rsa |
10 | 14 | from cryptography.x509.oid import ExtensionOID |
11 | 15 | from ops import CharmBase |
12 | 16 | from scenario import Context, PeerRelation, Relation, State |
@@ -43,6 +47,71 @@ def _mock_san(self): |
43 | 47 | return None |
44 | 48 |
|
45 | 49 |
|
| 50 | +def generate_certificate_and_key(): |
| 51 | + """Generate certificate and CA to use for tests.""" |
| 52 | + # Generate private key |
| 53 | + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) |
| 54 | + |
| 55 | + # Generate CA certificate |
| 56 | + ca_subject = issuer = x509.Name( |
| 57 | + [ |
| 58 | + x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "US"), |
| 59 | + x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "California"), |
| 60 | + x509.NameAttribute(x509.NameOID.LOCALITY_NAME, "San Francisco"), |
| 61 | + x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "Example CA"), |
| 62 | + x509.NameAttribute(x509.NameOID.COMMON_NAME, "example.com"), |
| 63 | + ] |
| 64 | + ) |
| 65 | + |
| 66 | + ca_cert = ( |
| 67 | + x509.CertificateBuilder() |
| 68 | + .subject_name(ca_subject) |
| 69 | + .issuer_name(issuer) |
| 70 | + .public_key(private_key.public_key()) |
| 71 | + .serial_number(x509.random_serial_number()) |
| 72 | + .not_valid_before(datetime.datetime.utcnow()) |
| 73 | + .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=365)) |
| 74 | + .add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) |
| 75 | + .sign(private_key, hashes.SHA256()) |
| 76 | + ) |
| 77 | + |
| 78 | + # Generate server certificate |
| 79 | + server_subject = x509.Name( |
| 80 | + [ |
| 81 | + x509.NameAttribute(x509.NameOID.COUNTRY_NAME, "US"), |
| 82 | + x509.NameAttribute(x509.NameOID.STATE_OR_PROVINCE_NAME, "California"), |
| 83 | + x509.NameAttribute(x509.NameOID.LOCALITY_NAME, "San Francisco"), |
| 84 | + x509.NameAttribute(x509.NameOID.ORGANIZATION_NAME, "Example Server"), |
| 85 | + x509.NameAttribute(x509.NameOID.COMMON_NAME, "server.example.com"), |
| 86 | + ] |
| 87 | + ) |
| 88 | + |
| 89 | + server_cert = ( |
| 90 | + x509.CertificateBuilder() |
| 91 | + .subject_name(server_subject) |
| 92 | + .issuer_name(ca_subject) |
| 93 | + .public_key(private_key.public_key()) |
| 94 | + .serial_number(x509.random_serial_number()) |
| 95 | + .not_valid_before(datetime.datetime.utcnow()) |
| 96 | + .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=30)) |
| 97 | + .add_extension( |
| 98 | + x509.SubjectAlternativeName([x509.DNSName("server.example.com")]), critical=False |
| 99 | + ) |
| 100 | + .sign(private_key, hashes.SHA256()) |
| 101 | + ) |
| 102 | + |
| 103 | + # Convert to PEM format |
| 104 | + ca_cert_pem = ca_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8") |
| 105 | + server_cert_pem = server_cert.public_bytes(serialization.Encoding.PEM).decode("utf-8") |
| 106 | + private_key_pem = private_key.private_bytes( |
| 107 | + encoding=serialization.Encoding.PEM, |
| 108 | + format=serialization.PrivateFormat.TraditionalOpenSSL, |
| 109 | + encryption_algorithm=serialization.NoEncryption(), |
| 110 | + ).decode("utf-8") |
| 111 | + |
| 112 | + return ca_cert_pem, server_cert_pem, private_key_pem |
| 113 | + |
| 114 | + |
46 | 115 | def get_csr_obj(csr: str): |
47 | 116 | return x509.load_pem_x509_csr(csr.encode(), default_backend()) |
48 | 117 |
|
@@ -174,3 +243,30 @@ def test_csr_no_change(ctx, certificates): |
174 | 243 | csr = get_csr_obj(charm.ch._csr) |
175 | 244 | assert get_sans_from_csr(csr) == {socket.getfqdn()} |
176 | 245 | assert renew_patch.call_count == 0 |
| 246 | + |
| 247 | + |
| 248 | +def test_chain_contains_server_cert(ctx, certificates): |
| 249 | + ca_cert_pem, server_cert_pem, _ = generate_certificate_and_key() |
| 250 | + |
| 251 | + certificates = certificates.replace( |
| 252 | + remote_app_data={ |
| 253 | + "certificates": json.dumps( |
| 254 | + [ |
| 255 | + { |
| 256 | + "certificate": server_cert_pem, |
| 257 | + "ca": ca_cert_pem, |
| 258 | + "chain": [ca_cert_pem], |
| 259 | + "certificate_signing_request": "csr", |
| 260 | + } |
| 261 | + ], |
| 262 | + ) |
| 263 | + }, |
| 264 | + local_unit_data={ |
| 265 | + "certificate_signing_requests": json.dumps([{"certificate_signing_request": "csr"}]) |
| 266 | + }, |
| 267 | + ) |
| 268 | + |
| 269 | + with ctx.manager("update_status", State(leader=True, relations=[certificates])) as mgr: |
| 270 | + mgr.run() |
| 271 | + assert server_cert_pem in mgr.charm.ch.chain |
| 272 | + assert x509.load_pem_x509_certificate(mgr.charm.ch.chain.encode(), default_backend()) |
0 commit comments