-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
Copy pathRabbitMqEventBus.php
56 lines (47 loc) · 1.33 KB
/
RabbitMqEventBus.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<?php
declare(strict_types=1);
namespace CodelyTv\Shared\Infrastructure\Bus\Event\RabbitMq;
use AMQPException;
use CodelyTv\Shared\Domain\Bus\Event\DomainEvent;
use CodelyTv\Shared\Domain\Bus\Event\EventBus;
use CodelyTv\Shared\Infrastructure\Bus\Event\DomainEventJsonSerializer;
use CodelyTv\Shared\Infrastructure\Bus\Event\MySql\MySqlDoctrineEventBus;
use function Lambdish\Phunctional\each;
final readonly class RabbitMqEventBus implements EventBus
{
public function __construct(
private RabbitMqConnection $connection,
private string $exchangeName,
private MySqlDoctrineEventBus $failoverPublisher
) {}
public function publish(DomainEvent ...$events): void
{
each($this->publisher(), $events);
}
private function publisher(): callable
{
return function (DomainEvent $event): void {
try {
$this->publishEvent($event);
} catch (AMQPException) {
$this->failoverPublisher->publish($event);
}
};
}
private function publishEvent(DomainEvent $event): void
{
$body = DomainEventJsonSerializer::serialize($event);
$routingKey = $event::eventName();
$messageId = $event->eventId();
$this->connection->exchange($this->exchangeName)->publish(
$body,
$routingKey,
AMQP_NOPARAM,
[
'message_id' => $messageId,
'content_type' => 'application/json',
'content_encoding' => 'utf-8',
]
);
}
}