From 22531b221b342659730c0dfdf1a63bf44cdf6d82 Mon Sep 17 00:00:00 2001 From: Stefan Tatschner Date: Tue, 13 Aug 2024 15:55:46 +0200 Subject: [PATCH] feat: Add support for the HSFZ protocol The High Speed Fahrzeugzugang (HSFZ) protocol is an automotive protocol used to tunnel UDS traffic through TCP. Since version 4.1, Wireshark contains [1] a protocol dissector for HSFZ enabling own implementations of this protocol. Scapy also contains an implementation of HSFZ [2]. [1]: https://github.com/wireshark/wireshark/commit/e5ced7ad7904cac016a965a7cbd2cce3348e9267 [2]: https://github.com/secdev/scapy/blob/master/scapy/contrib/automotive/bmw/hsfz.py Co-authored-by: Ferdinand Jarisch Co-authored-by: Tobias Specht --- src/gallia/commands/__init__.py | 39 +-- src/gallia/commands/discover/hsfz.py | 141 ++++++++++ src/gallia/transports/__init__.py | 3 + src/gallia/transports/hsfz.py | 369 +++++++++++++++++++++++++++ src/gallia/transports/schemes.py | 1 + tests/pytest/test_hsfz.py | 313 +++++++++++++++++++++++ 6 files changed, 848 insertions(+), 18 deletions(-) create mode 100644 src/gallia/commands/discover/hsfz.py create mode 100644 src/gallia/transports/hsfz.py create mode 100644 tests/pytest/test_hsfz.py diff --git a/src/gallia/commands/__init__.py b/src/gallia/commands/__init__.py index c6f7c1252..0f9416594 100644 --- a/src/gallia/commands/__init__.py +++ b/src/gallia/commands/__init__.py @@ -6,6 +6,7 @@ from gallia.command.base import BaseCommand from gallia.commands.discover.doip import DoIPDiscoverer +from gallia.commands.discover.hsfz import HSFZDiscoverer from gallia.commands.primitive.generic.pdu import GenericPDUPrimitive from gallia.commands.primitive.uds.dtc import DTCPrimitive from gallia.commands.primitive.uds.ecu_reset import ECUResetPrimitive @@ -26,46 +27,48 @@ from gallia.commands.scan.uds.sessions import SessionsScanner registry: list[type[BaseCommand]] = [ + DTCPrimitive, DoIPDiscoverer, + ECUResetPrimitive, + GenericPDUPrimitive, + HSFZDiscoverer, + IOCBIPrimitive, MemoryFunctionsScanner, + PingPrimitive, + RMBAPrimitive, + RTCLPrimitive, ReadByIdentifierPrimitive, ResetScanner, SASeedsDumper, ScanIdentifiers, - SessionsScanner, + SendPDUPrimitive, ServicesScanner, - DTCPrimitive, - ECUResetPrimitive, + SessionsScanner, VINPrimitive, - IOCBIPrimitive, - PingPrimitive, - RMBAPrimitive, - RTCLPrimitive, - GenericPDUPrimitive, - SendPDUPrimitive, WMBAPrimitive, WriteByIdentifierPrimitive, ] # TODO: Investigate why linters didn't catch faulty strings in here. __all__ = [ + "DTCPrimitive", "DoIPDiscoverer", + "ECUResetPrimitive", + "GenericPDUPrimitive", + "HSFZDiscoverer", + "IOCBIPrimitive", "MemoryFunctionsScanner", + "PingPrimitive", + "RMBAPrimitive", + "RTCLPrimitive", "ReadByIdentifierPrimitive", "ResetScanner", "SASeedsDumper", "ScanIdentifiers", - "SessionsScanner", + "SendPDUPrimitive", "ServicesScanner", - "DTCPrimitive", - "ECUResetPrimitive", + "SessionsScanner", "VINPrimitive", - "IOCBIPrimitive", - "PingPrimitive", - "RMBAPrimitive", - "RTCLPrimitive", - "GenericPDUPrimitive", - "SendPDUPrimitive", "WMBAPrimitive", "WriteByIdentifierPrimitive", ] diff --git a/src/gallia/commands/discover/hsfz.py b/src/gallia/commands/discover/hsfz.py new file mode 100644 index 000000000..3986894be --- /dev/null +++ b/src/gallia/commands/discover/hsfz.py @@ -0,0 +1,141 @@ +import asyncio +from argparse import Namespace + +from gallia.command import UDSDiscoveryScanner +from gallia.log import get_logger +from gallia.services.uds.core.service import ( + DiagnosticSessionControlRequest, + DiagnosticSessionControlResponse, + UDSRequest, +) +from gallia.services.uds.helpers import raise_for_mismatch +from gallia.transports.base import TargetURI +from gallia.transports.hsfz import HSFZConnection +from gallia.utils import auto_int, write_target_list + +logger = get_logger(__name__) + + +class HSFZDiscoverer(UDSDiscoveryScanner): + """ECU and routing discovery scanner for HSFZ""" + + COMMAND = "hsfz" + SHORT_HELP = "" + + def configure_parser(self) -> None: + self.parser.add_argument( + "--reversed", + action="store_true", + help="scan in reversed order", + ) + self.parser.add_argument( + "--src-addr", + type=auto_int, + default=0xF4, + help="HSFZ source address", + ) + self.parser.add_argument( + "--start", + metavar="INT", + type=auto_int, + default=0x00, + help="set start address", + ) + self.parser.add_argument( + "--stop", + metavar="INT", + type=auto_int, + default=0xFF, + help="set end address", + ) + + async def _probe( + self, + conn: HSFZConnection, + req: UDSRequest, + timeout: float, + ) -> bool: + data = req.pdu + result = False + + await asyncio.wait_for(conn.write_diag_request(data), timeout) + + # Broadcast endpoints deliver more responses. + # Make sure to flush the receive queue properly. + while True: + try: + raw_resp = await asyncio.wait_for(conn.read_diag_request(), timeout) + except TimeoutError: + return result + + resp = DiagnosticSessionControlResponse.parse_static(raw_resp) + raise_for_mismatch(req, resp) + result = True + + async def probe( + self, + host: str, + port: int, + src_addr: int, + dst_addr: int, + timeout: float, + ack_timeout: float = 1.0, + ) -> TargetURI | None: + req = DiagnosticSessionControlRequest(0x01) + + try: + conn = await HSFZConnection.connect( + host, + port, + src_addr, + dst_addr, + ack_timeout, + ) + except TimeoutError: + return None + + try: + result = await self._probe(conn, req, timeout) + except (TimeoutError, ConnectionError): + return None + finally: + await conn.close() + + if result: + return TargetURI.from_parts( + "hsfz", + host, + port, + { + "src_addr": f"{src_addr:#02x}", + "dst_addr": f"{dst_addr:#02x}", + "ack_timeout": int(ack_timeout) * 1000, + }, + ) + return None + + async def main(self, args: Namespace) -> None: + found = [] + gen = ( + range(args.stop + 1, args.start) if args.reversed else range(args.start, args.stop + 1) + ) + + for dst_addr in gen: + logger.info(f"testing target {dst_addr:#02x}") + + target = await self.probe( + args.target.hostname, + args.target.port, + args.src_addr, + dst_addr, + args.timeout, + ) + + if target is not None: + logger.info(f"found {dst_addr:#02x}") + found.append(target) + + logger.result(f"Found {len(found)} targets") + ecus_file = self.artifacts_dir.joinpath("ECUs.txt") + logger.result(f"Writing urls to file: {ecus_file}") + await write_target_list(ecus_file, found, self.db_handler) diff --git a/src/gallia/transports/__init__.py b/src/gallia/transports/__init__.py index 72ee72aec..be50b33b3 100644 --- a/src/gallia/transports/__init__.py +++ b/src/gallia/transports/__init__.py @@ -6,11 +6,13 @@ from gallia.transports.base import BaseTransport, TargetURI from gallia.transports.doip import DoIPTransport +from gallia.transports.hsfz import HSFZTransport from gallia.transports.schemes import TransportScheme from gallia.transports.tcp import TCPLinesTransport, TCPTransport registry: list[type[BaseTransport]] = [ DoIPTransport, + HSFZTransport, TCPLinesTransport, TCPTransport, ] @@ -18,6 +20,7 @@ __all__ = [ "BaseTransport", "DoIPTransport", + "HSFZTransport", "TCPLinesTransport", "TCPTransport", "TargetURI", diff --git a/src/gallia/transports/hsfz.py b/src/gallia/transports/hsfz.py new file mode 100644 index 000000000..f0c883488 --- /dev/null +++ b/src/gallia/transports/hsfz.py @@ -0,0 +1,369 @@ +# SPDX-FileCopyrightText: AISEC Pentesting Team +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import asyncio +import errno +import struct +import sys +from dataclasses import dataclass +from enum import IntEnum +from typing import Any, Self + +from pydantic import BaseModel, field_validator + +from gallia.log import get_logger +from gallia.transports.base import BaseTransport, TargetURI +from gallia.utils import auto_int + +logger = get_logger(__name__) + + +class HSFZStatus(IntEnum): + UNDEFINED = -0x01 + Data = 0x01 + Ack = 0x02 + Klemme15 = 0x10 + Vin = 0x11 + AliveCheck = 0x12 + StatusDataInquiry = 0x13 + IncorrectTesterAddressError = 0x40 + IncorrectControlWordError = 0x41 + IncorrectFormatError = 0x42 + IncorrectDestinationAddressError = 0x43 + MessageTooLarge = 0x44 + ApplicationNotReady = 0x45 + OutOfMemory = 0xFF + + @classmethod + def _missing_(cls, value: Any) -> HSFZStatus: + return cls.UNDEFINED + + +@dataclass +class HSFZHeader: + Len: int + CWord: int + + def pack(self) -> bytes: + return struct.pack("!IH", self.Len, self.CWord) + + @classmethod + def unpack(cls, data: bytes) -> Self: + len_, cword = struct.unpack("!IH", data) + return cls(len_, cword) + + +@dataclass +class HSFZDiagReqHeader: + src_addr: int + dst_addr: int + + def pack(self) -> bytes: + return struct.pack("!BB", self.src_addr, self.dst_addr) + + @classmethod + def unpack(cls, data: bytes) -> HSFZDiagReqHeader: + src_addr, dst_addr = struct.unpack("!BB", data) + return cls(src_addr, dst_addr) + + +HSFZFrame = tuple[HSFZHeader, HSFZDiagReqHeader | None, bytes | None] +HSFZDiagFrame = tuple[HSFZHeader, HSFZDiagReqHeader, bytes] + + +class HSFZConnection: + def __init__( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + src_addr: int, + dst_addr: int, + ack_timeout: float = 1.0, + ): + self.reader = reader + self.writer = writer + self.src_addr = src_addr + self.dst_addr = dst_addr + self.ack_timeout = ack_timeout + self._read_queue: asyncio.Queue[HSFZDiagFrame | int] = asyncio.Queue() + self._read_task = asyncio.create_task(self._read_worker()) + self._closed = False + self._mutex = asyncio.Lock() + + @classmethod + async def connect( + cls, + host: str, + port: int, + src_addr: int, + dst_addr: int, + ack_timeout: float, + ) -> HSFZConnection: + reader, writer = await asyncio.open_connection(host, port) + return cls( + reader, + writer, + src_addr, + dst_addr, + ack_timeout, + ) + + async def _read_frame(self) -> HSFZFrame: + # Header is fixed size 6 byte. + hdr_buf = await self.reader.readexactly(6) + hdr = HSFZHeader.unpack(hdr_buf) + + # If a message without a RequestHeader is received, + # the whole message must be read before erroring out. + # Otherwise the partial read packet stays in the receive + # buffer and causes further breakage… + if hdr.Len < 2: + data = None + if hdr.Len > 0: + data = await self.reader.readexactly(hdr.Len) + data_str = data.hex() if data is not None else data + logger.trace(f"hdr: {hdr}, req_hdr: None, data: {data_str}", extra={"tags": ["read"]}) + return hdr, None, data + + # DiagReqHeader is fixed size 2 byte. + req_buf = await self.reader.readexactly(2) + req_hdr = HSFZDiagReqHeader.unpack(req_buf) + + data_len = hdr.Len - 2 + data = await self.reader.readexactly(data_len) + logger.trace( + f"hdr: {hdr}, req_hdr: {req_hdr}, data: {data.hex()}", + extra={"tags": ["read"]}, + ) + return hdr, req_hdr, data + + async def write_frame(self, frame: HSFZFrame) -> None: + hdr, req_hdr, data = frame + buf = b"" + buf += hdr.pack() + log_msg = f"hdr: {hdr}" + if req_hdr is not None: + buf += req_hdr.pack() + log_msg += f", req_hdr: {req_hdr}" + if data is not None: + buf += data + log_msg += f", data: {data.hex()}" + self.writer.write(buf) + await self.writer.drain() + + logger.trace(log_msg, extra={"tags": ["write"]}) + + async def _read_worker(self) -> None: + try: + while True: + hdr, req_hdr, data = await self._read_frame() + + match hdr.CWord: + case HSFZStatus.AliveCheck: + await self.send_alive_msg() + continue + case HSFZStatus.Ack | HSFZStatus.Data: + if req_hdr is None: + logger.warning("unexpected frame: no hsfz request header") + continue + if data is None: + logger.warning("unexpected frame: no payload") + continue + await self._read_queue.put((hdr, req_hdr, data)) + case _: + await self._read_queue.put(hdr.CWord) + continue + + except asyncio.CancelledError: + logger.debug("read worker cancelled") + except asyncio.IncompleteReadError as e: + logger.debug(f"read worker received EOF: {e}") + except Exception as e: + logger.critical(f"read worker died: {e}") + + async def _unpack_frame(self, frame: HSFZDiagFrame | int) -> HSFZDiagFrame: + # I little hack, but it is either a tuple or an int…. + match frame: + case tuple(): + return frame + case int(): + await self.close() + raise BrokenPipeError(f"I can't even: {HSFZStatus(frame).name}") + case _: + raise RuntimeError(f"unexpected frame: {frame}") + + async def read_frame(self) -> HSFZDiagFrame | int: + if self._closed: + if sys.platform != "win32": + raise OSError(errno.EBADFD) + else: + raise RuntimeError("connection already closed") + + return await self._read_queue.get() + + async def read_diag_request(self) -> bytes: + unexpected_packets = [] + while True: + hdr, req_hdr, data = await self._unpack_frame(await self.read_frame()) + if hdr.CWord != HSFZStatus.Data: + logger.warning( + f"expected HSFZ data, instead got: {HSFZStatus(hdr.CWord).name} with payload {data.hex()}" + ) + unexpected_packets.append((hdr, req_hdr, data)) + continue + if req_hdr.src_addr != self.dst_addr or req_hdr.dst_addr != self.src_addr: + logger.warning( + f"HSFZ Data has unexpected addresses (src:dst); should be {self.dst_addr:#04x}:{self.src_addr:#04x}, but is {req_hdr.src_addr:#04x}:{req_hdr.dst_addr:#04x}" + ) + unexpected_packets.append((hdr, req_hdr, data)) + continue + + # We do not want to consume packets that we were not expecting; add them to queue again + for item in unexpected_packets: + await self._read_queue.put(item) + + return data + + async def _read_ack(self, prev_data: bytes) -> None: + unexpected_packets = [] + while True: + hdr, req_hdr, data = await self._unpack_frame(await self.read_frame()) + if hdr.CWord != HSFZStatus.Ack: + logger.warning( + f"expected HSFZ Ack for {prev_data.hex()}, instead got: {HSFZStatus(hdr.CWord).name} with payload {data.hex()}" + ) + unexpected_packets.append((hdr, req_hdr, data)) + continue + if req_hdr.src_addr != self.src_addr or req_hdr.dst_addr != self.dst_addr: + logger.warning( + f"HSFZ Ack has unexpected addresses (src:dst); should be {self.src_addr:#04x}:{self.dst_addr:#04x}, but is {req_hdr.src_addr:#04x}:{req_hdr.dst_addr:#04x}" + ) + unexpected_packets.append((hdr, req_hdr, data)) + continue + if prev_data[:5] != data: + logger.warning( + f"HSFZ Ack has unexpected data of {data.hex()}, should be {prev_data[:5].hex()}" + ) + unexpected_packets.append((hdr, req_hdr, data)) + continue + + # We do not want to consume packets that we were not expecting; add them to queue again + for item in unexpected_packets: + await self._read_queue.put(item) + + return + + async def write_diag_request_raw( + self, + hdr: HSFZHeader, + req_hdr: HSFZDiagReqHeader, + data: bytes, + ) -> None: + async with self._mutex: + await self.write_frame((hdr, req_hdr, data)) + + try: + # Now an ACK message is expected. + await asyncio.wait_for(self._read_ack(data), self.ack_timeout) + except TimeoutError as e: + await self.close() + raise BrokenPipeError("no ack by gateway") from e + + async def write_diag_request(self, data: bytes) -> None: + hdr = HSFZHeader(Len=len(data) + 2, CWord=HSFZStatus.Data) + req_hdr = HSFZDiagReqHeader(src_addr=self.src_addr, dst_addr=self.dst_addr) + await self.write_diag_request_raw(hdr, req_hdr, data) + + async def send_alive_msg(self) -> None: + hdr = HSFZHeader(Len=2, CWord=HSFZStatus.AliveCheck) + buf = b"" + buf += hdr.pack() + # For reasons, the tester address is two bytes large in this path. + buf += struct.pack("!H", self.src_addr) + + self.writer.write(buf) + await self.writer.drain() + + async def close(self) -> None: + if self._closed: + return + + self._closed = True + self._read_task.cancel() + self.writer.close() + await self.writer.wait_closed() + + +class HSFZConfig(BaseModel): + src_addr: int + dst_addr: int + ack_timeout: int = 1000 + + @field_validator( + "src_addr", + "dst_addr", + mode="before", + ) + def auto_int(cls, v: str) -> int: + return auto_int(v) + + +class HSFZTransport(BaseTransport, scheme="hsfz"): + def __init__( + self, + target: TargetURI, + port: int, + config: HSFZConfig, + conn: HSFZConnection, + ): + super().__init__(target) + self._conn = conn + self.port = port + + @classmethod + async def connect( + cls, + target: str | TargetURI, + timeout: float | None = None, + ) -> HSFZTransport: + t = TargetURI(target) if isinstance(target, str) else target + if t.hostname is None: + raise ValueError("no hostname specified") + + port = t.port if t.port is not None else 6801 + config = HSFZConfig(**t.qs_flat) + conn = await HSFZConnection.connect( + t.hostname, + port, + config.src_addr, + config.dst_addr, + config.ack_timeout / 1000, + ) + return cls( + t, + port, + config, + conn, + ) + + async def close(self) -> None: + await self._conn.close() + + async def read( + self, + timeout: float | None = None, + tags: list[str] | None = None, + ) -> bytes: + return await asyncio.wait_for(self._conn.read_diag_request(), timeout) + + async def write( + self, + data: bytes, + timeout: float | None = None, + tags: list[str] | None = None, + ) -> int: + await asyncio.wait_for(self._conn.write_diag_request(data), timeout) + return len(data) diff --git a/src/gallia/transports/schemes.py b/src/gallia/transports/schemes.py index 7eefac1a6..3593f7528 100644 --- a/src/gallia/transports/schemes.py +++ b/src/gallia/transports/schemes.py @@ -19,6 +19,7 @@ class TransportScheme(StrEnum): TCP_LINES = TCP_LINES HTTP = HTTP DOIP = DOIP + HSFZ = "hsfz" UNIX = "unix" UNIX_LINES = "unix-lines" diff --git a/tests/pytest/test_hsfz.py b/tests/pytest/test_hsfz.py new file mode 100644 index 000000000..53349e361 --- /dev/null +++ b/tests/pytest/test_hsfz.py @@ -0,0 +1,313 @@ +# SPDX-FileCopyrightText: AISEC Pentesting Team +# +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +from collections.abc import AsyncIterator + +import pytest +from gallia.services.uds.core.client import UDSClient +from gallia.services.uds.core.service import PositiveResponse +from gallia.transports.base import BaseTransport, TargetURI +from gallia.transports.hsfz import HSFZConnection, HSFZTransport +from gallia.transports.tcp import TCPTransport + +target = TargetURI("hsfz://localhost:6801?dst_addr=0x10&src_addr=0xf4") +listen_target = TargetURI("tcp://127.0.0.1:6801") + + +class TCPServer: + def __init__(self) -> None: + self.server: asyncio.Server + self.queue: asyncio.Queue[TCPTransport] = asyncio.Queue(1) + + async def _accept_cb( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + ) -> None: + await self.queue.put(TCPTransport(TargetURI("tcp://"), reader, writer)) + + async def listen(self, target: TargetURI) -> None: + self.server = await asyncio.start_server( + self._accept_cb, + host=target.hostname, + port=target.port, + ) + + async def accept(self) -> TCPTransport: + return await self.queue.get() + + async def close(self) -> None: + self.server.close() + await self.server.wait_closed() + + +@pytest.fixture() +async def dummy_server() -> AsyncIterator[TCPServer]: + dummy_server = TCPServer() + await dummy_server.listen(listen_target) + yield dummy_server + await dummy_server.close() + + +@pytest.fixture() +async def transports(dummy_server: TCPServer) -> AsyncIterator[tuple[BaseTransport, BaseTransport]]: + hsfz_transport = await HSFZTransport.connect(target) + dummy_transport = await dummy_server.accept() + + yield hsfz_transport, dummy_transport + + await hsfz_transport.close() + await dummy_transport.close() + + +@pytest.mark.asyncio +async def test_reconnect_after_powercycle(dummy_server: TCPServer) -> None: + hsfz_transport = await HSFZTransport.connect(target) + dummy_transport = await dummy_server.accept() + + # Simulate powercycle. + await dummy_transport.close() + + await hsfz_transport.reconnect() + dummy_transport = await dummy_server.accept() + + await dummy_transport.close() + await hsfz_transport.close() + + +@pytest.mark.asyncio +async def test_hsfz_timeout(transports: tuple[BaseTransport, BaseTransport]) -> None: + hsfz_transport, _ = transports + u = UDSClient(hsfz_transport, timeout=5) + with pytest.raises(BrokenPipeError): + await u.read_data_by_identifier(0x1234) + + +@pytest.mark.asyncio +async def test_hsfz_alive_check(transports: tuple[BaseTransport, BaseTransport]) -> None: + _, dummy_transport = transports + await dummy_transport.write( + bytes([0x00, 0x00, 0x00, 0x05, 0x00, 0x12, 0xFF, 0xFF, 0xCA, 0xFF, 0xEE]) + ) + resp = await dummy_transport.read(4096) + assert resp == bytes([0x00, 0x00, 0x00, 0x02, 0x00, 0x12, 0x00, 0xF4]) + + +@pytest.mark.asyncio +async def test_hsfz_diagnose_request(transports: tuple[BaseTransport, BaseTransport]) -> None: + hsfz_transport, dummy_transport = transports + u = UDSClient(hsfz_transport, max_retry=1, timeout=1) + task = asyncio.create_task(u.read_data_by_identifier(0x1234)) + await asyncio.sleep(0.5) + # Send hsfz ack + await dummy_transport.write( + bytes([0x00, 0x00, 0x00, 0x05, 0x00, 0x02, 0xF4, 0x10, 0x22, 0x12, 0x34]) + ) + await asyncio.sleep(0.1) + # Send caffee back. + await dummy_transport.write( + bytes( + [ + 0x00, + 0x00, + 0x00, + 0x08, + 0x00, + 0x01, + 0x10, + 0xF4, + 0x62, + 0x12, + 0x34, + 0xCA, + 0xFF, + 0xEE, + ] + ) + ) + resp = await task + assert isinstance(resp, PositiveResponse) + assert resp.data_record == bytes([0xCA, 0xFF, 0xEE]) + + +@pytest.mark.asyncio +async def test_request_pdu_mutex(transports: tuple[BaseTransport, BaseTransport]) -> None: + # This test ensures that the uds send primitive request_pdu() + # stays task safe. In other words, requests and responses + # between task1 and task2 must not be mixed. + hsfz_transport, dummy_transport = transports + u = UDSClient(hsfz_transport, max_retry=1, timeout=1) + task1 = asyncio.create_task(u.read_data_by_identifier(0x1234)) + task2 = asyncio.create_task(u.read_data_by_identifier(0x4321)) + for _ in range(2): + req = await dummy_transport.read(4096) + # Stupid check, but ok for this unit test. + if req[9] == 0x12: + # Send hsfz ack for task 1 + await dummy_transport.write( + bytes([0x00, 0x00, 0x00, 0x05, 0x00, 0x02, 0xF4, 0x10, 0x22, 0x12, 0x34]) + ) + else: + # Send hsfz ack for task 2 + await dummy_transport.write( + bytes([0x00, 0x00, 0x00, 0x05, 0x00, 0x02, 0xF4, 0x10, 0x22, 0x43, 0x21]) + ) + await asyncio.sleep(0.1) + # We did not have our morning coffee yet; send a response pending. + await dummy_transport.write( + bytes([0x00, 0x00, 0x00, 0x05, 0x00, 0x01, 0x10, 0xF4, 0x7F, 0x22, 0x78]) + ) + await asyncio.sleep(0.3) + if req[9] == 0x12: + # Send caffee back. + await dummy_transport.write( + bytes( + [ + 0x00, + 0x00, + 0x00, + 0x08, + 0x00, + 0x01, + 0x10, + 0xF4, + 0x62, + 0x12, + 0x34, + 0xCA, + 0xFF, + 0xEE, + ] + ) + ) + else: + # Send caeeff back. + await dummy_transport.write( + bytes( + [ + 0x00, + 0x00, + 0x00, + 0x08, + 0x00, + 0x01, + 0x10, + 0xF4, + 0x62, + 0x43, + 0x21, + 0xCA, + 0xEE, + 0xFF, + ] + ) + ) + + resp = await task1 + assert isinstance(resp, PositiveResponse) + assert resp.data_record == bytes([0xCA, 0xFF, 0xEE]) + + resp = await task2 + assert isinstance(resp, PositiveResponse) + assert resp.data_record == bytes([0xCA, 0xEE, 0xFF]) + + +@pytest.mark.asyncio +async def test_unexpected_messages(transports: tuple[BaseTransport, BaseTransport]) -> None: + hsfz_transport, dummy_transport = transports + u = UDSClient(hsfz_transport, max_retry=1, timeout=1) + task = asyncio.create_task(u.read_data_by_identifier(0x1234)) + + await dummy_transport.read(4096) + await dummy_transport.write( + bytes([0x00, 0x00, 0x00, 0x05, 0x00, 0x02, 0xF4, 0x10, 0x22, 0x12, 0x34]) + ) + await dummy_transport.write( + bytes( + [ + # This message has the wrong hsfz dst address. + # Must be ignored. + 0x00, + 0x00, + 0x00, + 0x08, + 0x00, + 0x01, + 0x10, + 0xF5, + 0x62, + 0x12, + 0x34, + 0xCA, + 0xEE, + 0xFE, + ] + ) + ) + await dummy_transport.write( + bytes( + [ + 0x00, + 0x00, + 0x00, + 0x08, + 0x00, + 0x01, + 0x10, + 0xF4, + 0x62, + 0x12, + 0x34, + 0xCA, + 0xEE, + 0xFF, + ] + ) + ) + resp = await task + assert isinstance(resp, PositiveResponse) + assert resp.data_record == bytes([0xCA, 0xEE, 0xFF]) + + +@pytest.mark.asyncio +async def test_unread_messages(dummy_server: TCPServer) -> None: + hsfz_conn = await HSFZConnection.connect("127.0.0.1", 6801, 0xF4, 0x10, 1.0) + dummy_transport = await dummy_server.accept() + + tester_present = bytes([0x3E, 0x80]) + session_change = bytes([0x10, 0x01]) + + # Write HSFZ ACK. + await dummy_transport.write( + bytes([0x00, 0x00, 0x00, 0x04, 0x00, 0x02, 0xF4, 0x10]) + tester_present + ) + await dummy_transport.write( + bytes( + [ + 0x00, + 0x00, + 0x00, + 0x05, + 0x00, + 0x01, + 0x10, + 0xF4, + 0xCA, + 0xFF, + 0xEE, + ] + ) + ) + await hsfz_conn.write_diag_request(tester_present) + + # Write HSFZ ACK. + await dummy_transport.write( + bytes([0x00, 0x00, 0x00, 0x04, 0x00, 0x02, 0xF4, 0x10]) + session_change + ) + await dummy_transport.read(4096) + frame = await hsfz_conn.read_frame() + assert isinstance(frame, tuple) + assert frame[2] == bytes([0xCA, 0xFF, 0xEE]) + await hsfz_conn.close()