Skip to content

Commit cf076b1

Browse files
committed
temporary: Debugging variations of streaming to identify the anyio task group exception.
1 parent 6b61e19 commit cf076b1

File tree

1 file changed

+19
-9
lines changed

1 file changed

+19
-9
lines changed

src/dqa/executor/mhqa.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from a2a.types import TaskState
1212

1313

14-
from dqa import EnvVars
14+
from dqa import EnvVars, ic
1515
from dqa.actor.mhqa import MHQAActor, MHQAActorInterface, MHQAActorMethods
1616
from dqa.actor.pubsub_topics import PubSubTopics
1717
from dqa.model.mhqa import (
@@ -43,6 +43,8 @@ async def do_mhqa_respond(self, data: MHQAInput):
4343
EnvVars.APP_DAPR_PUBSUB_MEMORY_STREAM_BUFFER_SIZE
4444
)
4545

46+
dc = DaprClient()
47+
4648
def pubsub_message_handler(message: SubscriptionMessage) -> TopicEventResponse:
4749
# TODO: Is this a reasonable way to drop stale messages?
4850
parsed_timestamp = message.extensions().get("time", None)
@@ -73,20 +75,28 @@ async def invoke_actor():
7375
raw_body=data.model_dump_json().encode(),
7476
)
7577

76-
with DaprClient() as dc:
77-
async with anyio.create_task_group() as tg:
78-
pubsub_topic_name = f"{PubSubTopics.MHQA_RESPONSE}/{data.thread_id}"
78+
pubsub_topic_name = f"{PubSubTopics.MHQA_RESPONSE}/{data.thread_id}"
79+
80+
async with anyio.create_task_group() as tg:
81+
try:
7982
dc.subscribe_with_handler(
8083
pubsub_name=EnvVars.DAPR_PUBSUB_NAME,
8184
topic=pubsub_topic_name,
8285
handler_fn=pubsub_message_handler,
8386
)
84-
85-
tg.start_soon(invoke_actor)
87+
tg.start_soon(invoke_actor, name=invoke_actor.__name__)
8688
# FIXME: Error "Attempted to exit cancel scope in a different task than it was entered in".
87-
async with receive_stream:
88-
async for item in receive_stream:
89-
yield item
89+
async for item in receive_stream:
90+
yield item
91+
tg.cancel_scope.cancel()
92+
ic("Cancelled anyio task group")
93+
except Exception as e:
94+
logger.exception(e)
95+
finally:
96+
await receive_stream.aclose()
97+
await send_stream.aclose()
98+
dc.close()
99+
ic("Receive and send streams closed")
90100

91101
async def do_mhqa_get_history(self, data: MHQAHistoryInput) -> str:
92102
proxy = ActorProxy.create(

0 commit comments

Comments
 (0)