Skip to content

RobustConnection and QueueIterator #312

@ivonindza

Description

@ivonindza

This is about handling the scenario where RabbitMQ fails and is later restarted, specifically how to handle the messages which were prefetched with a QueueIterator before the connection broke.

Let's take a simple application which consumes from one queue and publishes to another. It uses the connect_robust call to make the connection. When I restart the RabbitMQ, while the consuming/publishing is happening, either:

  1. An aiormq.exceptions.ChannelInvalidStateError will be raised, because the channel is closed
  2. Or, if the application tries to do a message.ack() (or message.reject()) a RuntimeError with the message "Writer is None" is raised.

In the background, aio_pika will get a ConnectionResetError (Errno 104, Connection reset by peer) and will start reconnecting. Eventually, it will reconnect successfully and restore channels, exchanges, and queues. However, the messages that were prefetched before the connection broke end up in a weird state - they cannot be acked or rejected any more, instead will fail with the above-mentioned RuntimeError (Writer is None). As a consequence of this, the queue iterator cannot be closed cleanly, because it will try to call message.reject() for the messages remaining in the internal asyncio.Queue datastructure and fail on the first.

There are two possibilities now. If you try to close the iterator and create a new one, these prefetched messages will remain unacknowledged. So, your application won't process them and RabbitMQ won't be able to deliver them again (or to a different application). Example code for this:

RABBITMQ_URL = 'amqp://guest:guest@localhost/'
QUEUE_IN = 'input'
QUEUE_OUT = 'output'

async def main():
    conn = await aio_pika.connect_robust(RABBITMQ_URL)
    async with conn:
        channel = await conn.channel()
        await channel.set_qos(prefetch_count=10)
        queue_in = await channel.declare_queue(QUEUE_IN, durable=True)
        queue_out = await channel.declare_queue(QUEUE_OUT, durable=True)

        while True:
            try:
                async with queue_in.iterator() as qiter:
                    async for message in qiter:
                        await channel.default_exchange.publish(
                            aio_pika.Message(
                                message.body,
                                delivery_mode = aio_pika.DeliveryMode.PERSISTENT
                            ),
                            routing_key = QUEUE_OUT
                        )
                        await message.ack()
                        await asyncio.sleep(1) # Slow down so as not to empty the queue too quickly
            except (aiormq.exceptions.ChannelInvalidStateError, RuntimeError) as e:
                await conn.connected.wait()

The second possibility is to continue consuming from the same queue iterator on reconnection. The prefetched messages will all fail with the RuntimeError (Writer is None) and also asyncio will issue a warning "socket.send() raised exception." This will release them back to RabbitMQ, so they will be consumed again later. However, this means that my application will still process them needlessly as they have no chance of being acked, but my application doesn't know about this (and can't differentiate these messages from the new ones that are consumed after reconnection). Example code for this scenario:

async def main():
    conn = await aio_pika.connect_robust(RABBITMQ_URL)
    async with conn:
        channel = await conn.channel()
        await channel.set_qos(prefetch_count=10)
        queue_in = await channel.declare_queue(QUEUE_IN, durable=True)
        queue_out = await channel.declare_queue(QUEUE_OUT, durable=True)

        async with queue_in.iterator() as qiter:
            while True:
                try:
                    async for message in qiter:
                        await channel.default_exchange.publish(
                            aio_pika.Message(
                                message.body,
                                delivery_mode = aio_pika.DeliveryMode.PERSISTENT
                            ),
                            routing_key = QUEUE_OUT
                        )
                        await message.ack()
                        await asyncio.sleep(1) # Slow down so as not to empty the queue too quickly
                except (aiormq.exceptions.ChannelInvalidStateError, RuntimeError) as e:
                    await rabbit_conn.connected.wait()

I noticed that the prefetched messages remain with the reference to obsolete aiormq.Channel instance from before the connection broke. However, just updating them with the new channel reference just breaks everything even more.

I would expect either to be able to use the prefetched messages with the new connection so that they can be acked, or alternatively that they are purged from the queue so they won't needlessly be processed by the application if it's not possible to ack them with the new connection.

I am using aio_pika version 6.4.1.

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions