Skip to content

aiormq.exceptions.ChannelInvalidStateError: No active transport in channel on wind-down #668

@loucadufault

Description

@loucadufault

We are seeing this error intermittently on app shutdown:

Traceback (most recent call last):
  File "/root/.pyenv/versions/3.10.14/lib/python3.10/site-packages/aio_pika/tools.py", line 306, in __task_inner
    await self.callback(*args, **kwargs)
  File "/root/.pyenv/versions/3.10.14/lib/python3.10/site-packages/aio_pika/robust_channel.py", line 120, in _on_close
    await self.restore()
  File "/root/.pyenv/versions/3.10.14/lib/python3.10/site-packages/aio_pika/robust_channel.py", line 95, in restore
    await self.reopen()
  File "/root/.pyenv/versions/3.10.14/lib/python3.10/site-packages/aio_pika/robust_channel.py", line 133, in reopen
    await super().reopen()
  File "/root/.pyenv/versions/3.10.14/lib/python3.10/site-packages/aio_pika/channel.py", line 244, in reopen
    await self._open()
  File "/root/.pyenv/versions/3.10.14/lib/python3.10/site-packages/aio_pika/channel.py", line 169, in _open
    raise ChannelInvalidStateError("No active transport in channel")
aiormq.exceptions.ChannelInvalidStateError: No active transport in channel

Followed almost immediately by:

Traceback (most recent call last):
  File "/root/.pyenv/versions/3.10.14/lib/python3.10/site-packages/aio_pika/tools.py", line 306, in __task_inner
    await self.callback(*args, **kwargs)
  File "/root/.pyenv/versions/3.10.14/lib/python3.10/site-packages/aio_pika/robust_channel.py", line 120, in _on_close
    await self.restore()
  File "/root/.pyenv/versions/3.10.14/lib/python3.10/site-packages/aio_pika/robust_channel.py", line 95, in restore
    await self.reopen()
  File "/root/.pyenv/versions/3.10.14/lib/python3.10/site-packages/aio_pika/robust_channel.py", line 133, in reopen
    await super().reopen()
  File "/root/.pyenv/versions/3.10.14/lib/python3.10/site-packages/aio_pika/channel.py", line 244, in reopen
    await self._open()
  File "/root/.pyenv/versions/3.10.14/lib/python3.10/site-packages/aio_pika/channel.py", line 173, in _open
    channel = await UnderlayChannel.create(
  File "/root/.pyenv/versions/3.10.14/lib/python3.10/site-packages/aio_pika/abc.py", line 485, in create
    channel = await connection.channel(**kwargs)
  File "/root/.pyenv/versions/3.10.14/lib/python3.10/site-packages/aiormq/connection.py", line 830, in channel
    raise RuntimeError("%r closed" % self)
RuntimeError: <Connection: "amqp://srv_user:******@rmq-host:5672//" at 0x736310f0ed40> closed

Connection code in FastAPI looks like:

consume_connection = await aio_pika.connect_robust(
    host=rabbitmq_consume_server_config.hostname,
    login=rabbitmq_consume_server_config.username,
    password=rabbitmq_consume_server_config.password,
)
await consume_connection.channel()
publish_connection = await aio_pika.connect_robust(
    host=rabbitmq_publish_server_config.hostname,
    login=rabbitmq_publish_server_config.username,
    password=rabbitmq_publish_server_config.password,
)
await publish_connection.channel()

# ... setup some exchanges
# ... setup some queue

def process_message(msg: : aio_pika.abc.AbstractIncomingMessage):
    body = message.body.decode()

    try:
        # ...some processing
        await rabbitmq_publish_exchange.publish(
            aio_pika.Message(body=orjson.dumps(publish_message)), routing_key=""
        )
    except Exception as e:
        logger.error(f"Error occured processing message: {e}")

await queue.consume(process_message)

The shutdown code, which happens in FastAPI lifespan function after the yield, just does Connection#close on both connections.

I'm wondering why this shutdown causes the RobustChannel to attempt to reconnect? In general, if this is not a bug, how can we properly cancel rmq tasks to avoid this issue.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions