diff --git a/docs/source/examples/pooling-recycled.py b/docs/source/examples/pooling-recycled.py new file mode 100644 index 00000000..65343778 --- /dev/null +++ b/docs/source/examples/pooling-recycled.py @@ -0,0 +1,76 @@ +import asyncio +import aio_pika +from aio_pika.pool import Pool + + +class NonRestoringRobustChannel(aio_pika.RobustChannel): + """ + Custom robust channel that when reopened, does not restore any + queues or exchanges. + """ + async def reopen(self) -> None: + # Clear out exchanges and queues when reopened + self._exchanges.clear() + self._queues.clear() + await super().reopen() + + +class NonRestoringRobustConnection(aio_pika.RobustConnection): + """ + Robust connection that uses a custom channel class + """ + CHANNEL_CLASS = NonRestoringRobustChannel + + +async def main(): + loop = asyncio.get_event_loop() + + async def get_connection(): + return await aio_pika.connect_robust( + "amqp://guest:guest@localhost/", + # Use the connection class that does not restore connections + connection_class=NonRestoringRobustConnection, + ) + + connection_pool = Pool(get_connection, max_size=2, loop=loop) + + async def get_channel() -> aio_pika.Channel: + async with connection_pool.acquire() as connection: + return await connection.channel() + + channel_pool = Pool(get_channel, max_size=10, loop=loop) + queue_name = "pool_queue" + + async def consume(): + async with channel_pool.acquire() as channel: # type: aio_pika.Channel + await channel.set_qos(10) + + queue = await channel.declare_queue( + queue_name, durable=False, auto_delete=False + ) + + async with queue.iterator() as queue_iter: + async for message in queue_iter: + print(message) + await message.ack() + + async def publish(): + async with channel_pool.acquire() as channel: # type: aio_pika.Channel + # Reopen channels that have been closed previously + if channel.is_closed: + await channel.reopen() + await channel.default_exchange.publish( + aio_pika.Message(("Channel: %r" % channel).encode()), + queue_name, + ) + + async with connection_pool, channel_pool: + task = loop.create_task(consume()) + await asyncio.wait([publish() for _ in range(10000)]) + await task + + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) + loop.close() diff --git a/docs/source/quick-start.rst b/docs/source/quick-start.rst index 26759aa1..9c25fe1b 100644 --- a/docs/source/quick-start.rst +++ b/docs/source/quick-start.rst @@ -51,3 +51,16 @@ Connection pooling .. literalinclude:: examples/pooling.py :language: python + +Connection pooling with recycled channels +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +When channels are re-used from a pool where queues and exchanges are not +consistent (i.e. the first time a channel is checked out it creates and uses +queue1 and the next time it is checked out it creates and uses queue2), it +is necessary to clear queues and exchanges before attempting to use the +channel. This is done with custom Connection and Channel classes and by +calling ``reopen`` on channel checkout if it was closed. + +.. literalinclude:: examples/pooling-recycled.py + :language: python