Skip to content

asyncio.Future leaks after connection to the broker resets. #675

@UltimateLobster

Description

@UltimateLobster

When running a long, synchronous logic, aio-pika is prevented from processing heartbeats, which results in the connection being closed by the broker. (Which is to be expected).

However, it will also result in a future leaking without being properly handled.

The following code will reproduce the issue.

import asyncio
import time

import aio_pika
from aio_pika import Message, DeliveryMode
from pamqp.commands import Basic


async def main():
    # Low heartbeat will help us reproduce the issue
    connection = await aio_pika.connect("amqp://SOMETHING/%2F?heartbeat=5")
    
    async with connection:
        async with connection.channel() as channel:
            # This simulates a long, synchronous logic which will block aio_pika from processing heartbeats.
            time.sleep(20)
            message = Message(b"", delivery_mode=DeliveryMode.PERSISTENT)
            response = await channel.default_exchange.publish(message, routing_key="SomeQueue")
            assert isinstance(response, Basic.Ack)

if __name__ == '__main__':
    asyncio.run(main())

Expected Result

This code will result in an AMQPConnectionError. and nothing more.

Actual Result

This code will ALSO produce a Future exception was never retrieved error.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions