-
Notifications
You must be signed in to change notification settings - Fork 202
Description
RobustConnection does not reconnect after "Server connection was stuck"
TL;DR
RobustConnection does not do anything with a connection after it was stuck. Here's my workaround:
async def serve(self):
async with self._connection_pool, self._channel_pool:
keyboard_interrupt = False
while not keyboard_interrupt:
tasks = [
self._create_listener(
f"topic_name-1",
self._process_request_message,
),
self._create_listener(
f"topic-name-2",
self._process_response_message,
),
]
try:
await asyncio.gather(*tasks) # This threw "RuntimeError: cannot reuse already awaited coroutine" so I moved tasks' creation in a loop
except KeyboardInterrupt:
keyboard_interrupt = True
except Exception as e:
logger.error(e)
logger.debug(traceback.format_exc())Deep dive
Now to the problem part. My approach to this lib is not that straightforward due to the nature of my task but it is taken from the documentation so it shouldn't be the reason for the issue to arise despite the offered solution.
First of all, I use the aio_pika.robust_connection.connect_robust:
async def _get_connection(self):
return await aio_pika.robust_connection.connect_robust(
self._rabbit_endpoint
)I also use the channel pool since I need to handle multiple channels simultaneously and with a real asynchronous manner:
async def _get_channel(
self,
) -> aio_pika.Channel:
async with self._connection_pool.acquire() as connection:
return await connection.channel()Pretty straightforward pools' declaration:
self._connection_pool: Pool = Pool(
self._get_connection, max_size=2
)
self._channel_pool: Pool = Pool(self._get_channel, max_size=10)The way I connect to a queue:
async def _create_listener(
self,
queue_name: str,
callback: t.Callable[
[aio_pika.abc.AbstractIncomingMessage], t.Awaitable[None]
],
) -> None:
async with self._channel_pool.acquire() as channel:
queue = await channel.declare_queue(queue_name, durable=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
asyncio.create_task(callback(message))But it's just a manual reconnection so I think ideally it should be handled within the RobustConnection.
Related issues
This issue is related to these: #577 #563 #588 and #620
I want to reply to this:
This can be solved by using aio_pika.robust_connection.connect_robust
[connect_robust](https://aio-pika.readthedocs.io/en/latest/#aio-pika-connect-function-and-aio-pika-connection-class-specific
)
Originally posted by @lin-gooo in #620
No, it can't. Just tried the aio_pika.robust_connection.connect_robust (which is just a link to aio_pika.connect_robust btw) and it lead to the same error. Issue was closed on this reply however it didn't help me.