- 
                Notifications
    You must be signed in to change notification settings 
- Fork 202
Description
I've observed a gradually increasing memory footprint in my production environment and began investigating the root cause. After experimenting with various configurations, I created a minimal producer/consumer test setup to isolate the issue. While I’m open to the possibility of incorrect usage, this appears to be a memory leak, specifically linked to aiormq.channel.
Since this behavior surfaces through usage of aio_pika (not aiormq directly), I’m filing the issue here.
Reproduction Scenario
The test case involves a simple consumer that uses queue.iterator(timeout=5) inside an outer while True loop. This mirrors our production design, where consumers handle bursts of messages followed by idle periods. The timeout and looping logic allow us to perform other tasks during downtime.
The key issue is:
- If the iterator processes at least one message, that last message appears to be retained in memory.
- From RabbitMQ’s perspective, the message is fully handled. It is acked and removed from the queue.
- However, memory analysis using gcandtracemallocreveals that the message remains in memory even after explicitly closing the connection.
- I’ve also observed the same memory retention behavior when using queue.get() instead of the iterator, suggesting the issue is not limited to just the iterator interface.
To further confirm:
- Run the producer.py script, then wait for the consumer to timeout.
- Run the producer again.
- Each cycle increases memory usage and the number of AbstractIncomingMessage instances by one, regardless of how many messages were handled in that cycle.
Environment
- aio_pika: 9.5.5
- aiormq: 6.8.1
- Python: 3.12.3
I’ve included comments in the code to clarify assumptions and reproduction steps.
Let me know if this issue would be better directed at the aiormq project, or if there's anything further I can provide to assist in troubleshooting.
File: consumer.py
import asyncio
import gc
import tracemalloc
import aio_pika
from aio_pika.abc import AbstractIncomingMessage
tracemalloc.start(1)
def debug_logging():
    gc.collect()
    messages = [
        obj for obj in gc.get_objects() if isinstance(obj, AbstractIncomingMessage)
    ]
    # this shows that there is an IncomingMessage stuck. My assumption is that it is the last message handled
    # by the iterator before it times out
    print(f"[GC Check] Live aio_pika IncomingMessages: {len(messages)}")
    # in order to see this as the number 1 memory hog, make sure the messages are of sufficient size
    # or there are enough of them by publishing, then wait for timeout, then publish again, and repeat
    snapshot = tracemalloc.take_snapshot()
    top = snapshot.statistics("traceback")
    print(top[0].traceback.format())
    print(top[0])
async def async_main():
    # this "while True" is needed in my code as there is some additional handling that we
    # do whenever this consumer is idle
    while True:
        try:
            connection = await aio_pika.connect_robust()
            channel = await connection.channel()
            await channel.set_qos(prefetch_count=1)
            queue = await channel.declare_queue(
                "my_test_queue",
                auto_delete=False,
                exclusive=False,
                passive=False,
                durable=True,
            )
            exchange = await channel.declare_exchange(
                "test_exchange", aio_pika.ExchangeType.TOPIC
            )
            await queue.bind(exchange, "test.routing.#")
            debug_logging()
            async with queue.iterator(timeout=5) as queue_iter:
                async for raw_message in queue_iter:
                    async with raw_message.process():
                        print("handling message")
                        await asyncio.sleep(0.3)
        except asyncio.TimeoutError:
            print("queue iterator timeout")
            try:
                await connection.close()
                print("connection closed")
            except Exception:
                pass
        # do additional work
if __name__ == "__main__":
    asyncio.run(async_main())File: producer.py
import asyncio
import random
import string
import aio_pika
# doesn't really matter how many you send
MESSAGE_COUNT = 10
# seems to be a good size to show up as top memory for me
MSG_CHAR_COUNT = 1000000
async def async_main():
    connection = await aio_pika.connect_robust()
    channel = await connection.channel()
    exchange = await channel.declare_exchange(
        "test_exchange", aio_pika.ExchangeType.TOPIC
    )
    message_str = "".join(
        random.choice(string.ascii_letters) for _ in range(MSG_CHAR_COUNT)
    )
    for _ in range(0, MESSAGE_COUNT):
        await exchange.publish(
            aio_pika.Message(body=message_str.encode()),
            routing_key="test.routing",
        )
        print("published message")
    await connection.close()
if __name__ == "__main__":
    asyncio.run(async_main())