|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | | -from typing import TYPE_CHECKING, Union |
| 3 | +from typing import TYPE_CHECKING, Any, Union |
4 | 4 |
|
5 | 5 | from dissect.database.ese.tools import certlog |
6 | 6 | from dissect.database.exception import Error |
|
10 | 10 | from dissect.target.plugin import Plugin, export |
11 | 11 |
|
12 | 12 | if TYPE_CHECKING: |
13 | | - from collections.abc import Iterator |
| 13 | + from collections.abc import Callable, Iterator |
14 | 14 | from pathlib import Path |
15 | 15 |
|
16 | 16 | from dissect.target.target import Target |
|
43 | 43 | CertificateRecord = TargetRecordDescriptor( |
44 | 44 | "filesystem/windows/certlog/certificate", |
45 | 45 | [ |
46 | | - ("string", "certificate_hash2"), |
| 46 | + ("digest", "fingerprint"), |
47 | 47 | ("string", "certificate_template"), |
48 | 48 | ("string", "common_name"), |
49 | 49 | ("string", "country"), |
50 | 50 | ("string", "device_serial_number"), |
51 | | - ("string", "distinguished_name"), |
| 51 | + ("string", "subject_dn"), |
52 | 52 | ("string", "domain_component"), |
53 | 53 | ("string", "email"), |
54 | 54 | ("string", "given_name"), |
|
58 | 58 | ("string", "organizational_unit"), |
59 | 59 | ("string", "public_key_algorithm"), |
60 | 60 | ("string", "serial_number_hex"), |
| 61 | + ("varint", "serial_number"), |
61 | 62 | ("string", "state_or_province"), |
62 | 63 | ("string", "street_address"), |
63 | 64 | ("string", "subject_key_identifier"), |
|
69 | 70 | ("string", "enrollment_flags"), |
70 | 71 | ("string", "general_flags"), |
71 | 72 | ("string", "issuer_name_id"), |
72 | | - ("datetime", "not_after"), |
73 | | - ("datetime", "not_before"), |
| 73 | + ("datetime", "not_valid_after"), |
| 74 | + ("datetime", "not_valid_before"), |
74 | 75 | ("string", "private_key_flags"), |
75 | 76 | ("string", "public_key"), |
76 | 77 | ("varint", "public_key_length"), |
|
95 | 96 | ("string", "device_serial_number"), |
96 | 97 | ("string", "disposition"), |
97 | 98 | ("string", "disposition_message"), |
98 | | - ("string", "distinguished_name"), |
| 99 | + ("string", "subject_dn"), |
99 | 100 | ("string", "domain_component"), |
100 | 101 | ("string", "email"), |
101 | 102 | ("string", "endorsement_certificate_hash"), |
|
172 | 173 | "$AttributeValue": "common_name", |
173 | 174 | "$CRLPublishError": "crl_publish_error", |
174 | 175 | "$CallerName": "caller_name", |
175 | | - "$CertificateHash2": "certificate_hash2", |
| 176 | + "$CertificateHash": "fingerprint", |
| 177 | + "$CertificateHash2": "fingerprint", |
176 | 178 | "$CertificateTemplate": "certificate_template", |
177 | 179 | "$CommonName": "common_name", |
178 | 180 | "$Country": "country", |
179 | 181 | "$DeviceSerialNumber": "device_serial_number", |
180 | 182 | "$DispositionMessage": "disposition_message", |
181 | | - "$DistinguishedName": "distinguished_name", |
| 183 | + "$DistinguishedName": "subject_dn", |
182 | 184 | "$DomainComponent": "domain_component", |
183 | 185 | "$EMail": "email", |
184 | 186 | "$EndorsementCertificateHash": "endorsement_certificate_hash", |
|
221 | 223 | "NameId": "name_id", |
222 | 224 | "NextPublish": "next_publish", |
223 | 225 | "NextUpdate": "next_update", |
224 | | - "NotAfter": "not_after", |
225 | | - "NotBefore": "not_before", |
| 226 | + "NotAfter": "not_valid_after", |
| 227 | + "NotBefore": "not_valid_before", |
226 | 228 | "Number": "number", |
227 | 229 | "PrivateKeyFlags": "private_key_flags", |
228 | 230 | "PropagationComplete": "propagation_complete", |
|
254 | 256 | } |
255 | 257 |
|
256 | 258 |
|
| 259 | +def format_fingerprint(input_hash: str | None) -> tuple[str | None, str | None, str | None]: |
| 260 | + if input_hash: |
| 261 | + input_hash = input_hash.replace(" ", "") |
| 262 | + # hash is expected to be a sha1, but as it not documented, we make this function more flexible if hash is |
| 263 | + # in another standard format (md5/sha256), especially in the future |
| 264 | + match len(input_hash): |
| 265 | + case 32: |
| 266 | + return input_hash, None, None |
| 267 | + case 40: |
| 268 | + return None, input_hash, None |
| 269 | + case 64: |
| 270 | + return None, None, input_hash |
| 271 | + case _: |
| 272 | + raise ValueError( |
| 273 | + "Unexpected hash size found while processing certlog " |
| 274 | + f"$CertificateHash/$CertificateHash2 column: len {len(input_hash)}, content {input_hash}" |
| 275 | + ) |
| 276 | + return None, None, None |
| 277 | + |
| 278 | + |
| 279 | +def format_serial_number(serial_number_as_hex: str | None) -> str | None: |
| 280 | + if not serial_number_as_hex: |
| 281 | + return None |
| 282 | + return serial_number_as_hex.replace(" ", "") |
| 283 | + |
| 284 | + |
| 285 | +def serial_number_as_int(serial_number_as_hex: str | None) -> int | None: |
| 286 | + if not serial_number_as_hex: |
| 287 | + return None |
| 288 | + return int(serial_number_as_hex, 16) |
| 289 | + |
| 290 | + |
| 291 | +FORMATING_FUNC: dict[str, Callable[[Any], Any]] = { |
| 292 | + "fingerprint": format_fingerprint, |
| 293 | + "serial_number_hex": format_serial_number, |
| 294 | +} |
| 295 | + |
| 296 | + |
257 | 297 | class CertLogPlugin(Plugin): |
258 | 298 | """Return all available data stored in CertLog databases. |
259 | 299 |
|
@@ -302,8 +342,27 @@ def read_records(self, table_name: str, record_type: CertLogRecord) -> Iterator[ |
302 | 342 | record_values = {} |
303 | 343 | for column, value in column_values: |
304 | 344 | new_column = FIELD_MAPPINGS.get(column) |
305 | | - if new_column: |
| 345 | + if new_column in FORMATING_FUNC: |
| 346 | + try: |
| 347 | + value = FORMATING_FUNC[new_column](value) |
| 348 | + except Exception as e: |
| 349 | + self.target.log.warning("Error formatting column %s (%s): %s", new_column, column, value) |
| 350 | + self.target.log.debug("", exc_info=e) |
| 351 | + value = None |
| 352 | + if new_column and new_column not in record_values: |
306 | 353 | record_values[new_column] = value |
| 354 | + # Serial number is format as int and string, to ease search of a specific sn in both format |
| 355 | + if new_column == "serial_number_hex": |
| 356 | + record_values["serial_number"] = serial_number_as_int(value) |
| 357 | + elif new_column: |
| 358 | + self.target.log.debug( |
| 359 | + "Unexpected element while processing %s entries : %s column already exists " |
| 360 | + "(mapped from original column name %s). This may be cause by two column that were not" |
| 361 | + " expected to be present in the same time.", |
| 362 | + table_name, |
| 363 | + new_column, |
| 364 | + column, |
| 365 | + ) |
307 | 366 | else: |
308 | 367 | self.target.log.debug( |
309 | 368 | "Unexpected column for table %s in CA %s: %s", table_name, ca_name, column |
|
0 commit comments