Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batch consumer does not emit any event #730

Open
speech77 opened this issue Oct 18, 2024 · 4 comments
Open

batch consumer does not emit any event #730

speech77 opened this issue Oct 18, 2024 · 4 comments

Comments

@speech77
Copy link

speech77 commented Oct 18, 2024

Hi all,

OldSound\RabbitMqBundle\RabbitMq\BatchConsumer does not emit any event.
Only OldSound\RabbitMqBundle\RabbitMq\Consumer and classes extending it emit OldSound\RabbitMqBundle\Event\AMQPEvents.

Thanks in advance.
Cheers!

@speech77 speech77 changed the title batch:consume does not emit any event batch consumer does not emit any event Oct 18, 2024
@mihaileu
Copy link
Collaborator

https://github.com/php-amqplib/RabbitMqBundle/blob/master/RabbitMq/BatchConsumer.php#L198
Currenly the normal events contains the message, in batch case I think it would be to much to attach all the messages of a specific batch. What are your requirements for this case ?

@speech77
Copy link
Author

speech77 commented Oct 18, 2024

I would like to have support for OnConsumeEvent and OnIdleEvent.

At the moment, I don't need BeforeProcessingMessageEvent and AfterProcessingMessageEvent, but I suppose two specific events, BeforeProcessingMessagesEvent and AfterProcessingMessagesEvent, could be created to handle the 'before' and 'after'.

@speech77
Copy link
Author

speech77 commented Oct 18, 2024

My requirements are to manage the heartbeat in all my consumers by injecting logic like this:

<?php

namespace Service;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender;
use Psr\Log\LoggerInterface;
use RuntimeException;

class RabbitMqHeartbeatService
{
    /**
     * @var PCNTLHeartbeatSender|null
     */
    private $sender;

    /**
     * @var LoggerInterface
     */
    private $logger;

    public function __construct(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @param AMQPChannel $channel
     * @param string $consumerTag
     * @throws RuntimeException
     */
    public function registerHeartbeat(AMQPChannel $channel, string $consumerTag): void
    {
        if ($this->sender === null) {
            $connection = $channel->getConnection();
            if ($connection === null) {
                throw new RuntimeException('Connection cannot be null');
            }

            $this->logger->debug('Registering PhpAmqpLib heartbeat', ['consumer_tag' => $consumerTag]);
            /*
             * This only works if we have set of blocking operation
             * and every one of them is inside 'heartbeat' timeframe
             * https://github.com/php-amqplib/RabbitMqBundle/issues/301#issuecomment-805161267
             */
            $sender = new PCNTLHeartbeatSender($connection);
            $sender->register();
            $this->sender = $sender;
        }
    }
}

@mihaileu
Copy link
Collaborator

Currently I cannot alocate time for this. A PR would help moving forward

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants