diff --git a/limbo/testcases/online.py b/limbo/testcases/online.py index 5958212d..9bef0fba 100644 --- a/limbo/testcases/online.py +++ b/limbo/testcases/online.py @@ -32,24 +32,35 @@ ] +def _peer_chain(*, host: str, port: int = 443, cafile: str = certifi.where()) -> list[Certificate]: + """ + Returns the peer certificate and intermediate chain for a given TLS connection + on `(host, post)`. + """ + ctx = SSL.Context(method=SSL.TLS_METHOD) + ctx.load_verify_locations(cafile=cafile) + + conn = SSL.Connection(ctx, socket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)) + conn.set_tlsext_host_name(host.encode()) + conn.connect((host, port)) + conn.do_handshake() + + chain = conn.get_verified_chain() + assert chain is not None + + return [Certificate(c.to_cryptography()) for c in chain] + + def compile() -> None: # NOTE: Uses `ASSETS_DIR_RW` instead of `ASSETS_PATH` since the latter # is a read-only API for package resources. online_assets = ASSETS_DIR_RW / "online" online_assets.mkdir(exist_ok=True) - for site in _TOPSITES: - logger.info(f"generating online testcase for {site}") - - ctx = SSL.Context(method=SSL.TLS_METHOD) - ctx.load_verify_locations(cafile=certifi.where()) - - conn = SSL.Connection(ctx, socket=socket.socket(socket.AF_INET, socket.SOCK_STREAM)) - conn.set_tlsext_host_name(site.encode()) - conn.connect((site, 443)) - conn.do_handshake() + for host in _TOPSITES: + logger.info(f"generating online testcase for {host}") - peer_chain = [Certificate(c.to_cryptography()) for c in (conn.get_verified_chain() or [])] + peer_chain = _peer_chain(host=host) # NOTE: We use the peer certificate's own state to produce our expected # validation time. This would be incorrect in a normal path validation operation, @@ -61,25 +72,25 @@ def compile() -> None: ) + timedelta(seconds=1) builder = ( - Builder(id=f"online::{site}", description=f"A valid chain for `{site}`.") + Builder(id=f"online::{host}", description=f"A valid chain for `{host}`.") .server_validation() .peer_certificate(peer_cert) .untrusted_intermediates(*peer_chain[1:-1]) .trusted_certs(*peer_chain[-1:]) - .expected_peer_name(PeerName(kind="DNS", value=site)) + .expected_peer_name(PeerName(kind="DNS", value=host)) .validation_time(peer_cert_validation_time) .succeeds() ) testcase = builder.build() - path = online_assets / f"{site}.limbo.json" - path.write_text(testcase.json(indent=2)) + path = online_assets / f"{host}.limbo.json" + path.write_text(testcase.model_dump_json(indent=2)) def register_testcases() -> None: online_assets = ASSETS_PATH / "online" for tc_path in online_assets.iterdir(): - testcase = Testcase.parse_raw(tc_path.read_text()) + testcase = Testcase.model_validate_json(tc_path.read_text()) print(f"loading pre-generated testcase: {testcase.id}")