32
32
]
33
33
34
34
35
+ def _peer_chain (* , host : str , port : int = 443 , cafile : str = certifi .where ()) -> list [Certificate ]:
36
+ """
37
+ Returns the peer certificate and intermediate chain for a given TLS connection
38
+ on `(host, post)`.
39
+ """
40
+ ctx = SSL .Context (method = SSL .TLS_METHOD )
41
+ ctx .load_verify_locations (cafile = cafile )
42
+
43
+ conn = SSL .Connection (ctx , socket = socket .socket (socket .AF_INET , socket .SOCK_STREAM ))
44
+ conn .set_tlsext_host_name (host .encode ())
45
+ conn .connect ((host , port ))
46
+ conn .do_handshake ()
47
+
48
+ chain = conn .get_verified_chain ()
49
+ assert chain is not None
50
+
51
+ return [Certificate (c .to_cryptography ()) for c in chain ]
52
+
53
+
35
54
def compile () -> None :
36
55
# NOTE: Uses `ASSETS_DIR_RW` instead of `ASSETS_PATH` since the latter
37
56
# is a read-only API for package resources.
38
57
online_assets = ASSETS_DIR_RW / "online"
39
58
online_assets .mkdir (exist_ok = True )
40
59
41
- for site in _TOPSITES :
42
- logger .info (f"generating online testcase for { site } " )
43
-
44
- ctx = SSL .Context (method = SSL .TLS_METHOD )
45
- ctx .load_verify_locations (cafile = certifi .where ())
46
-
47
- conn = SSL .Connection (ctx , socket = socket .socket (socket .AF_INET , socket .SOCK_STREAM ))
48
- conn .set_tlsext_host_name (site .encode ())
49
- conn .connect ((site , 443 ))
50
- conn .do_handshake ()
60
+ for host in _TOPSITES :
61
+ logger .info (f"generating online testcase for { host } " )
51
62
52
- peer_chain = [ Certificate ( c . to_cryptography ()) for c in ( conn . get_verified_chain () or [])]
63
+ peer_chain = _peer_chain ( host = host )
53
64
54
65
# NOTE: We use the peer certificate's own state to produce our expected
55
66
# validation time. This would be incorrect in a normal path validation operation,
@@ -61,25 +72,25 @@ def compile() -> None:
61
72
) + timedelta (seconds = 1 )
62
73
63
74
builder = (
64
- Builder (id = f"online::{ site } " , description = f"A valid chain for `{ site } `." )
75
+ Builder (id = f"online::{ host } " , description = f"A valid chain for `{ host } `." )
65
76
.server_validation ()
66
77
.peer_certificate (peer_cert )
67
78
.untrusted_intermediates (* peer_chain [1 :- 1 ])
68
79
.trusted_certs (* peer_chain [- 1 :])
69
- .expected_peer_name (PeerName (kind = "DNS" , value = site ))
80
+ .expected_peer_name (PeerName (kind = "DNS" , value = host ))
70
81
.validation_time (peer_cert_validation_time )
71
82
.succeeds ()
72
83
)
73
84
74
85
testcase = builder .build ()
75
- path = online_assets / f"{ site } .limbo.json"
76
- path .write_text (testcase .json (indent = 2 ))
86
+ path = online_assets / f"{ host } .limbo.json"
87
+ path .write_text (testcase .model_dump_json (indent = 2 ))
77
88
78
89
79
90
def register_testcases () -> None :
80
91
online_assets = ASSETS_PATH / "online"
81
92
for tc_path in online_assets .iterdir ():
82
- testcase = Testcase .parse_raw (tc_path .read_text ())
93
+ testcase = Testcase .model_validate_json (tc_path .read_text ())
83
94
84
95
print (f"loading pre-generated testcase: { testcase .id } " )
85
96
0 commit comments