Skip to content

Potential memory leak - pamqp\base.py #628

@ionutbaltariu

Description

@ionutbaltariu

I am investigating a memory leak in a service and I've managed to see the following with tracemalloc:

At the first run I get the following:
[..]\venv\Lib\site-packages\pamqp\base.py:60: size=2616 B, count=44, average=59 B

After submitting some messages in the queue, it seems that the size only increases:
[..]\venv\Lib\site-packages\pamqp\base.py:60: size=7746 B, count=129, average=60 B and eventually it reaches 14.3 KiB

I might be doing the test in a wrong manner, so please let me know what information is needed to actually confirm any eventual memory leak.

The used version is 9.4.1

This is the code:

async def run_async(self) -> None:
        logger.info("Will start the worker..")
        tracemalloc.start()
        self.internal_events_session = aiohttp.ClientSession()
        self.google_req_session = aiohttp.ClientSession()

        asyncio.create_task([..])

        logger.info("Declared request sessions..")

        logger.info("Connecting to RabbitMQ..")
        connection = await aio_pika.connect_robust(
            host=os.getenv(f"RABBIT_MQ_HOST"),
            port=int(os.getenv(f"RABBIT_MQ_PORT")),
            virtualhost="/",
            login=os.getenv("RABBIT_MQ_USERNAME"),
            password=os.getenv("RABBIT_MQ_PASSWORD")
        )

        queue_name = os.getenv("QNAME")

        logger.info("Creating channel..")
        channel = await connection.channel()

        logger.info("Setting QOS..")
        await channel.set_qos(prefetch_count=10)

        logger.info("Declaring queue..")
        queue = await channel.get_queue(queue_name)

        logger.info("Will start consuming..")
        await queue.consume(self.process_message)

        try:
            # Wait until terminate
            await asyncio.Future()
        finally:
            await connection.close()

And self.process_message has the following structure:

async def process_message(self, message: aio_pika.abc.AbstractIncomingMessage) -> None:
        async with message.process():
        [.. code with async operations]
        snapshot = tracemalloc.take_snapshot()
        top_stats = snapshot.statistics('lineno')

        for stat in top_stats[:20]:
                print(stat)

The other shown stats do not have significant differences in the memory size after 100+ runs of self.process_message, with pamqp\base.py:60 being the single one that constantly increases.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions