Skip to content

Commit

Permalink
chore: Migrate to new context manager asyncio.timeout()
Browse files Browse the repository at this point in the history
  • Loading branch information
rumpelsepp committed Jul 18, 2024
1 parent a7e2695 commit e76510d
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 41 deletions.
13 changes: 8 additions & 5 deletions src/gallia/commands/discover/doip.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ async def gather_doip_details(
await loop.sock_sendto(sock, hdr.pack(), (tgt_hostname, tgt_port))

try:
data, _ = await asyncio.wait_for(loop.sock_recvfrom(sock, 1024), 2)
async with asyncio.timeout(2):
data, _ = await loop.sock_recvfrom(sock, 1024)
except TimeoutError:
logger.info("[🐣] No response!")
continue
Expand Down Expand Up @@ -367,12 +368,13 @@ async def enumerate_target_addresses( # noqa: PLR0913
logger.info(f"[⏳] Waiting for reply of target {target_addr:#x}")
# Hardcoded loop to detect potential broadcasts
while True:
pot_broadcast, data = await asyncio.wait_for(
self.read_diag_request_custom(conn),
timeout = (
TimingAndCommunicationParameters.DiagnosticMessageMessageTimeout / 1000
if timeout is None
else timeout,
else timeout
)
async with asyncio.timeout(timeout):
pot_broadcast, data = await self.read_diag_request_custom(conn)
if pot_broadcast is None:
break

Expand Down Expand Up @@ -534,7 +536,8 @@ async def run_udp_discovery(self) -> list[tuple[str, int]]:
await loop.sock_sendto(sock, hdr.pack(), (ip.broadcast, 13400))
try:
while True:
data, addr = await asyncio.wait_for(loop.sock_recvfrom(sock, 1024), 2)
async with asyncio.timeout(2):
data, addr = await loop.sock_recvfrom(sock, 1024)
info = VehicleAnnouncementMessage.unpack(data[8:])
logger.notice(f"[💝]: {addr} responded: {info}")
found.append(addr)
Expand Down
3 changes: 2 additions & 1 deletion src/gallia/dumpcap.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ async def start(
return cls(proc, artifacts_dir, outfile)

async def sync(self, timeout: float = 1) -> None:
await asyncio.wait_for(self.ready_event.wait(), timeout)
async with asyncio.timeout(timeout):
await self.ready_event.wait()

async def stop(self) -> None:
logger.info(f"Waiting {self.cleanup}s for dumpcap to receive all packets")
Expand Down
3 changes: 2 additions & 1 deletion src/gallia/services/uds/ecu.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,8 @@ async def wait_for_ecu(

t = timeout if timeout is not None else self.timeout
try:
await asyncio.wait_for(self._wait_for_ecu(0.5), timeout=t)
async with asyncio.timeout(t):
await self._wait_for_ecu(0.5)
return True
except TimeoutError:
logger.critical("Timeout while waiting for ECU!")
Expand Down
6 changes: 4 additions & 2 deletions src/gallia/transports/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,17 @@ async def write(

writer = self.get_writer()
writer.write(binascii.hexlify(data) + b"\n")
await asyncio.wait_for(writer.drain(), timeout)
async with asyncio.timeout(timeout):
await writer.drain()
return len(data)

async def read(
self: TransportProtocol,
timeout: float | None = None,
tags: list[str] | None = None,
) -> bytes:
data = await asyncio.wait_for(self.get_reader().readline(), timeout)
async with asyncio.timeout(timeout):
data = await self.get_reader().readline()
d = data.decode().strip()

t = tags + ["read"] if tags is not None else ["read"]
Expand Down
6 changes: 4 additions & 2 deletions src/gallia/transports/can.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,16 @@ async def sendto(
logger.trace(f"{dst:03x}#{data.hex()}", extra={"tags": t})

loop = asyncio.get_running_loop()
await asyncio.wait_for(loop.sock_sendall(self._sock, msg.pack()), timeout)
async with asyncio.timeout(timeout):
await loop.sock_sendall(self._sock, msg.pack())
return len(data)

async def recvfrom(
self, timeout: float | None = None, tags: list[str] | None = None
) -> tuple[int, bytes]:
loop = asyncio.get_running_loop()
can_frame = await asyncio.wait_for(loop.sock_recv(self._sock, self.BUFSIZE), timeout)
async with asyncio.timeout(timeout):
can_frame = await loop.sock_recv(self._sock, self.BUFSIZE)
msg = CANMessage.unpack(can_frame)

t = tags + ["read"] if tags is not None else ["read"]
Expand Down
28 changes: 14 additions & 14 deletions src/gallia/transports/doip.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,17 +661,17 @@ async def write_request_raw(self, hdr: GenericHeader, payload: DoIPOutData) -> N
match payload:
case DiagnosticMessage():
# Now an ACK message is expected.
await asyncio.wait_for(
self._read_ack(payload.UserData),
async with asyncio.timeout(
TimingAndCommunicationParameters.DiagnosticMessageMessageAckTimeout
/ 1000,
)
):
await self._read_ack(payload.UserData)
case RoutingActivationRequest():
await asyncio.wait_for(
self._read_routing_activation_response(),
TimingAndCommunicationParameters.RoutingActivationResponseTimeout
async with asyncio.timeout(
TimingAndCommunicationParameters.DiagnosticMessageMessageAckTimeout
/ 1000,
)
):
await self._read_routing_activation_response()
except TimeoutError as e:
await self.close()
raise BrokenPipeError("Timeout while waiting for DoIP ACK message") from e
Expand Down Expand Up @@ -793,17 +793,15 @@ async def connect(

port = t.port if t.port is not None else 13400
config = DoIPConfig(**t.qs_flat)
conn = await asyncio.wait_for(
cls._connect(
async with asyncio.timeout(timeout):
conn = await cls._connect(
t.hostname,
port,
config.src_addr,
config.target_addr,
config.activation_type,
config.protocol_version,
),
timeout,
)
)
return cls(t, port, config, conn)

async def close(self) -> None:
Expand All @@ -817,7 +815,8 @@ async def read(
timeout: float | None = None,
tags: list[str] | None = None,
) -> bytes:
data = await asyncio.wait_for(self._conn.read_diag_request(), timeout)
async with asyncio.timeout(timeout):
data = await self._conn.read_diag_request()

t = tags + ["read"] if tags is not None else ["read"]
logger.trace(data.hex(), extra={"tags": t})
Expand All @@ -833,7 +832,8 @@ async def write(
logger.trace(data.hex(), extra={"tags": t})

try:
await asyncio.wait_for(self._conn.write_diag_request(data), timeout)
async with asyncio.timeout(timeout):
await self._conn.write_diag_request(data)
except DoIPNegativeAckError as e:
if e.nack_code != DiagnosticMessageNegativeAckCodes.TargetUnreachable:
raise e
Expand Down
6 changes: 4 additions & 2 deletions src/gallia/transports/isotp.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,15 @@ async def write(
logger.trace(data.hex(), extra={"tags": t})

loop = asyncio.get_running_loop()
await asyncio.wait_for(loop.sock_sendall(self._sock, data), timeout)
async with asyncio.timeout(timeout):
await loop.sock_sendall(self._sock, data)
return len(data)

async def read(self, timeout: float | None = None, tags: list[str] | None = None) -> bytes:
loop = asyncio.get_running_loop()
try:
data = await asyncio.wait_for(loop.sock_recv(self._sock, self.BUFSIZE), timeout)
async with asyncio.timeout(timeout):
data = await loop.sock_recv(self._sock, self.BUFSIZE)
except OSError as e:
if e.errno == errno.ECOMM:
raise BrokenPipeError(f"isotp flow control frame missing: {e}") from e
Expand Down
11 changes: 6 additions & 5 deletions src/gallia/transports/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ async def connect(cls, target: str | TargetURI, timeout: float | None = None) ->
t = target if isinstance(target, TargetURI) else TargetURI(target)
cls.check_scheme(t)

reader, writer = await asyncio.wait_for(
asyncio.open_connection(t.hostname, t.port), timeout
)
async with asyncio.timeout(timeout):
reader, writer = await asyncio.open_connection(t.hostname, t.port)
return cls(t, reader, writer)

async def close(self) -> None:
Expand All @@ -51,15 +50,17 @@ async def write(
logger.trace(data.hex(), extra={"tags": t})

self.writer.write(data)
await asyncio.wait_for(self.writer.drain(), timeout)
async with asyncio.timeout(timeout):
await self.writer.drain()
return len(data)

async def read(
self,
timeout: float | None = None,
tags: list[str] | None = None,
) -> bytes:
data = await asyncio.wait_for(self.reader.read(self.BUFSIZE), timeout)
async with asyncio.timeout(timeout):
data = await self.reader.read(self.BUFSIZE)

t = tags + ["read"] if tags is not None else ["read"]
logger.trace(data.hex(), extra={"tags": t})
Expand Down
6 changes: 4 additions & 2 deletions src/gallia/transports/unix.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ async def connect(cls, target: str | TargetURI, timeout: float | None = None) ->
t = target if isinstance(target, TargetURI) else TargetURI(target)
cls.check_scheme(t)

reader, writer = await asyncio.wait_for(asyncio.open_unix_connection(t.path), timeout)
async with asyncio.timeout(timeout):
reader, writer = await asyncio.open_unix_connection(t.path)

return cls(t, reader, writer)

Expand All @@ -48,7 +49,8 @@ async def write(
t = tags + ["write"] if tags is not None else ["write"]
logger.trace(data.hex(), extra={"tags": t})
self.writer.write(data)
await asyncio.wait_for(self.writer.drain(), timeout)
async with asyncio.timeout(timeout):
await self.writer.drain()

return len(data)

Expand Down
15 changes: 8 additions & 7 deletions src/opennetzteil/devices/rs/hmc804.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,22 @@ class HMC804(BaseNetzteil):
async def _connect(
self,
) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
return await asyncio.wait_for(
asyncio.open_connection(self.target.hostname, self.target.port),
self.timeout,
)
async with asyncio.timeout(self.timeout):
return await asyncio.open_connection(self.target.hostname, self.target.port)

async def _send_line(self, writer: asyncio.StreamWriter, data: str) -> None:
writer.write(data.encode() + b"\n")
await asyncio.wait_for(writer.drain(), self.timeout)
async with asyncio.timeout(self.timeout):
await writer.drain()

async def _recv_line(self, reader: asyncio.StreamReader) -> str:
return (await asyncio.wait_for(reader.readline(), self.timeout)).decode().strip()
async with asyncio.timeout(self.timeout):
return (await reader.readline()).decode().strip()

async def _close_conn(self, writer: asyncio.StreamWriter) -> None:
writer.close()
await asyncio.wait_for(writer.wait_closed(), self.timeout)
async with asyncio.timeout(self.timeout):
await writer.wait_closed()

async def _request(self, data: str) -> str:
reader, writer = await self._connect()
Expand Down

0 comments on commit e76510d

Please sign in to comment.