Skip to content

aiormq channel state after basic_publish fails #671

@PaulWasTaken

Description

@PaulWasTaken

Greetings, I apologize in advance if this repo is not suitable for the issue. I came across with the problem when my producer stops sending any messages. After a small research I found out that there is something wrong with aiormq channel restoration/configuration.

Minimal example

import asyncio

import aio_pika
import aiormq
from aiormq.tools import Countdown

CALL_COUNT = 0


class _TestCountdown(Countdown):
    def get_timeout(self):
        global CALL_COUNT

        if CALL_COUNT == 0:
            CALL_COUNT += 1
            return None

        if CALL_COUNT == 1:
            CALL_COUNT += 1
            raise asyncio.TimeoutError

        return None


async def aiormq_example():
    connection = await aiormq.connect('amqp://guest:guest@localhost//')

    channel = await connection.channel()
    await channel.exchange_declare('test_ex')
    aiormq.channel.Countdown = _TestCountdown
    try:
        await channel.queue_declare('test_q', auto_delete=True)
    except asyncio.TimeoutError:
        await channel.queue_declare('test_q', auto_delete=True)


async def aio_pika_example():
    connection = await aio_pika.connect_robust()

    channel = await connection.channel()
    exchange = await channel.declare_exchange('test_ex')
    queue = await channel.declare_queue('test_q', auto_delete=True)
    await queue.bind(exchange, 'test_rk')

    aiormq.channel.Countdown = _TestCountdown
    try:
        await exchange.publish(aio_pika.Message(b'123'), 'test_rk')
    except asyncio.TimeoutError:
        await exchange.publish(aio_pika.Message(b'123'), 'test_rk')

aiormq_example won't let you to perform another operation since internal lock is held (I wonder, shouldn't we unlock it?), but in aio_pika_example channel is restored and is capable to send messages. Except it's delivery_tag is wrong. Let me explain, here we increment counter and put message to the queue. On the other hand we will remove queue write operation on timeout.

When we try to publish another message, we have different delivery_tag, for e.g channel.delivery_tag = 2 but the real one is 1, and confirmation future is stuck forever. As the result there will be timeout error for any publish call

I suspect that we can block consume since lock.release() is called in the same Countdown logic

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions