Skip to content

Why do reusing TcpChannel reduce redelivered messages #548

@mi-mi36

Description

@mi-mi36

I executed two RabbitMQ consumer programs. One(A) is a lot of redelivered message arise, and the other(B) is rarely arise. Why is it?

The difference between them is whether TcpChannel is reused or not. Why is it affect redelivered behavior?

Below shows diffs between them.

$ diff A/AMQPConsumer.h B/AMQPConsumer.h
19a20
>     AMQP::TcpChannel* _channel; //This is newly added

$ diff A/AMQPConsumer.cpp B/AMQPConsumer.cpp
8a9
>     _channel = new AMQP::TcpChannel(_connection); //This is newly added
20c21
<         AMQP::TCPChannel channel(_connection);
---
>         //AMQP::TCPChannel channel(_connection); //Commented out
25c26
<             channel.ack(deliveryTag);
---
>             _channel->ack(deliveryTag);
33c34
<         channel.setQos(1);
---
>         _channel->setQos(1);

Full codes of (B) are here;

main.cpp

#include "AMQPConsumer.h"

int main(void)
{
    AMQPConsumer consumer;
    while(true)
    {
        std::vector<std::string> messages{};
        consumer.consume(messages);

        // process messages
    }
    return 0;
}

AMQPConsumer.h

#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <ev.h>

class AMQPConsumer
{
public:
    AMQPConsumer();
    ~AMQPConsumer();
    void consume(std::vector<std::string>& messages);
    static void timeoutCb(EV_P_ ev_timer* w, int revents);
    static void errorCb(const char* message);
    static voi startCb(const std::string& consumertag);
private:
    std::string _queueName;
    ev_timer _timer;
    struct ev_loop* _loop;
    AMQP::LibEvHandler* _handler;
    AMQP::TcpConnection* _connection;
    AMQP::TcpChannel* _channel; //This is newly added
};

AMQPConsumer.cpp

#include "AMQPConsumer.h"

AMQPConsumer::AMQPConsumer()
{
    _queueName = "MY_QUEUE";
    _loop = ev_loop_new(0);
    _handler = new AMQP::LibEvHandler(_loop);
    _connection = new AMQP::TcpConnection(_handler, AMQP::Address("127.0.0.1", xxxx, AMQP::Login("user", "password"), "/"));
    _channel = new AMQP::TcpChannel(_connection); //This is newly added
}

AMQPConsumer::~AMQPConsumer()
{
    ev_loop_destroy(_loop);
}

void AMQPConsumer::consume(std::vector<std::string>& messages)
{
    try
    {
        //AMQP::TCPChannel channel(_connection); //Commented out
        auto messageCb = [&channel, this](const AQP::Message& message, uint64_t deliveryTag, bool redelivered){
            std::string request(message.body(), message.bodySize());
            messages.push_back(request);
           
            _channel->ack(deliveryTag);

            if(!Manager.isWorking())
            {
                ev_break(_loop);
            }
        };
    
        _channel->setQos(1);
        channel.consume(_queueName).onReceived(messageCb).onSuccess(startCb).onError(errorCb);

        do
        {
            ev_timer_init(&_timer, timeoutCb, 1, 0.0);
            ev_timer_start(_loop, &_timer);
            ev_run(_loop, 0);

            double remain = ev_timer_remaining(_loop, &_timer);
            ev_timer_stop(_loop, &_timer);
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
    
            if(remain > 0.0)
                break;
        } while(Manager.isEnabled());

    } catch(const std::exception& e)
    {
        ErrorLog("%s¥n", e.what());
        throw;
    }
}

void AMQPConsumer::errorCb(const char* message)
{
    ErrorLog("consume failed %s¥n", message);
}

void AMQPConsumer::startCb(const std::string& consumertag)
{
    InfoLog("consume started %s¥n", consumertag.c_str());
}

void AMQPConsumer::timeoutCb(EV_P_ ev_timer* w, int revents)
{
    ev_break(EV_A_ EVBREAK_ONE);
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions