Skip to content

Commit e208666

Browse files
authored
Add MessageEvent.with_auto_ack() (#11)
1 parent f855696 commit e208666

File tree

2 files changed

+83
-1
lines changed

2 files changed

+83
-1
lines changed

stompman/listening_events.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
from collections.abc import Awaitable, Callable
12
from dataclasses import dataclass, field
2-
from typing import TYPE_CHECKING
3+
from typing import TYPE_CHECKING, Any, Self
34

45
from stompman.frames import (
56
AckFrame,
@@ -36,6 +37,24 @@ async def nack(self) -> None:
3637
)
3738
)
3839

40+
async def with_auto_ack(
41+
self,
42+
awaitable: Awaitable[None],
43+
*,
44+
on_suppressed_exception: Callable[[Exception, Self], Any],
45+
supressed_exception_classes: tuple[type[Exception], ...] = (Exception,),
46+
) -> None:
47+
called_nack = False
48+
try:
49+
await awaitable
50+
except supressed_exception_classes as exception:
51+
await self.nack()
52+
called_nack = True
53+
on_suppressed_exception(exception, self)
54+
finally:
55+
if not called_nack:
56+
await self.ack()
57+
3958

4059
@dataclass
4160
class ErrorEvent:

tests/test_client.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,69 @@ async def test_ack_nack() -> None:
346346
assert_frames_between_lifespan_match(collected_frames, [message_frame, nack_frame, ack_frame])
347347

348348

349+
def get_mocked_message_event() -> tuple[MessageEvent, mock.AsyncMock, mock.AsyncMock, mock.Mock]:
350+
ack_mock, nack_mock, on_suppressed_exception_mock = mock.AsyncMock(), mock.AsyncMock(), mock.Mock()
351+
352+
class CustomMessageEvent(MessageEvent):
353+
ack = ack_mock
354+
nack = nack_mock
355+
356+
return (
357+
CustomMessageEvent(
358+
_frame=MessageFrame(
359+
headers={"destination": "destination", "message-id": "message-id", "subscription": "subscription"},
360+
body=b"",
361+
),
362+
_client=mock.Mock(),
363+
),
364+
ack_mock,
365+
nack_mock,
366+
on_suppressed_exception_mock,
367+
)
368+
369+
370+
async def test_message_event_with_auto_ack_nack() -> None:
371+
event, ack, nack, on_suppressed_exception = get_mocked_message_event()
372+
exception = RuntimeError()
373+
374+
async def raises_runtime_error() -> None: # noqa: RUF029
375+
raise exception
376+
377+
await event.with_auto_ack(
378+
raises_runtime_error(),
379+
supressed_exception_classes=(Exception,),
380+
on_suppressed_exception=on_suppressed_exception,
381+
)
382+
383+
ack.assert_not_called()
384+
nack.assert_called_once_with()
385+
on_suppressed_exception.assert_called_once_with(exception, event)
386+
387+
388+
async def test_message_event_with_auto_ack_ack_raises() -> None:
389+
event, ack, nack, on_suppressed_exception = get_mocked_message_event()
390+
391+
async def func() -> None: # noqa: RUF029
392+
raise Exception # noqa: TRY002
393+
394+
with suppress(Exception):
395+
await event.with_auto_ack(
396+
func(), supressed_exception_classes=(RuntimeError,), on_suppressed_exception=on_suppressed_exception
397+
)
398+
399+
ack.assert_called_once_with()
400+
nack.assert_not_called()
401+
on_suppressed_exception.assert_not_called()
402+
403+
404+
async def test_message_event_with_auto_ack_ack_ok() -> None:
405+
event, ack, nack, on_suppressed_exception = get_mocked_message_event()
406+
await event.with_auto_ack(mock.AsyncMock()(), on_suppressed_exception=on_suppressed_exception)
407+
ack.assert_called_once_with()
408+
nack.assert_not_called()
409+
on_suppressed_exception.assert_not_called()
410+
411+
349412
async def test_send_message_and_enter_transaction_ok(monkeypatch: pytest.MonkeyPatch) -> None:
350413
body = b"hello"
351414
destination = "/queue/test"

0 commit comments

Comments
 (0)