From 805e3e62d3fad0c91673af5927c371285fcbcea0 Mon Sep 17 00:00:00 2001
From: philippe PICHET
Date: Mon, 6 May 2024 11:25:31 +0200
Subject: [PATCH] feat(GPS): allow send attributes in Google PubSub message.
---
docs/transport/gps.md | 15 +++++++++++++++
pkg/gps/GpsMessage.php | 22 +++++++++++++++++-----
pkg/gps/GpsProducer.php | 7 ++++---
pkg/gps/Tests/GpsMessageTest.php | 11 ++++++++++-
pkg/gps/Tests/GpsProducerTest.php | 31 +++++++++++++++++++++++++++++++
5 files changed, 77 insertions(+), 9 deletions(-)
diff --git a/docs/transport/gps.md b/docs/transport/gps.md
index b56f5c949..0ed47afe5 100644
--- a/docs/transport/gps.md
+++ b/docs/transport/gps.md
@@ -62,6 +62,21 @@ $context->declareTopic($fooTopic);
$context->createProducer()->send($fooTopic, $message);
```
+You can send attributes using headers :
+
+```php
+createTopic('foo');
+$attributes = ['key1' => 'value1'];
+$message = $context->createMessage('Hello world!', [], ['attributes' => $attributes]);
+
+$context->declareTopic($fooTopic);
+
+$context->createProducer()->send($fooTopic, $message);
+```
+
## Consume message:
Before you can consume message you have to subscribe a queue to the topic.
diff --git a/pkg/gps/GpsMessage.php b/pkg/gps/GpsMessage.php
index b4a11dc5a..e638c26ad 100644
--- a/pkg/gps/GpsMessage.php
+++ b/pkg/gps/GpsMessage.php
@@ -34,10 +34,17 @@ class GpsMessage implements Message, \JsonSerializable
*/
private $nativeMessage;
+ /**
+ * @var array
+ */
+ private $attributes;
+
public function __construct(string $body = '', array $properties = [], array $headers = [])
{
$this->body = $body;
$this->properties = $properties;
+ $this->attributes = $headers['attributes'] ?? [];
+ unset($headers['attributes']);
$this->headers = $headers;
$this->redelivered = false;
@@ -103,7 +110,7 @@ public function isRedelivered(): bool
return $this->redelivered;
}
- public function setCorrelationId(string $correlationId = null): void
+ public function setCorrelationId(?string $correlationId = null): void
{
$this->setHeader('correlation_id', $correlationId);
}
@@ -113,7 +120,7 @@ public function getCorrelationId(): ?string
return $this->getHeader('correlation_id');
}
- public function setMessageId(string $messageId = null): void
+ public function setMessageId(?string $messageId = null): void
{
$this->setHeader('message_id', $messageId);
}
@@ -130,12 +137,12 @@ public function getTimestamp(): ?int
return null === $value ? null : (int) $value;
}
- public function setTimestamp(int $timestamp = null): void
+ public function setTimestamp(?int $timestamp = null): void
{
$this->setHeader('timestamp', $timestamp);
}
- public function setReplyTo(string $replyTo = null): void
+ public function setReplyTo(?string $replyTo = null): void
{
$this->setHeader('reply_to', $replyTo);
}
@@ -169,8 +176,13 @@ public function getNativeMessage(): ?GoogleMessage
return $this->nativeMessage;
}
- public function setNativeMessage(GoogleMessage $message = null): void
+ public function setNativeMessage(?GoogleMessage $message = null): void
{
$this->nativeMessage = $message;
}
+
+ public function getAttributes(): array
+ {
+ return $this->attributes;
+ }
}
diff --git a/pkg/gps/GpsProducer.php b/pkg/gps/GpsProducer.php
index 86c9052c0..e2e6d4046 100644
--- a/pkg/gps/GpsProducer.php
+++ b/pkg/gps/GpsProducer.php
@@ -39,10 +39,11 @@ public function send(Destination $destination, Message $message): void
$topic = $this->context->getClient()->topic($destination->getTopicName());
$topic->publish([
'data' => json_encode($message),
+ 'attributes' => $message->getAttributes(),
]);
}
- public function setDeliveryDelay(int $deliveryDelay = null): Producer
+ public function setDeliveryDelay(?int $deliveryDelay = null): Producer
{
if (null === $deliveryDelay) {
return $this;
@@ -56,7 +57,7 @@ public function getDeliveryDelay(): ?int
return null;
}
- public function setPriority(int $priority = null): Producer
+ public function setPriority(?int $priority = null): Producer
{
if (null === $priority) {
return $this;
@@ -70,7 +71,7 @@ public function getPriority(): ?int
return null;
}
- public function setTimeToLive(int $timeToLive = null): Producer
+ public function setTimeToLive(?int $timeToLive = null): Producer
{
if (null === $timeToLive) {
return $this;
diff --git a/pkg/gps/Tests/GpsMessageTest.php b/pkg/gps/Tests/GpsMessageTest.php
index a43a22315..c78372e88 100644
--- a/pkg/gps/Tests/GpsMessageTest.php
+++ b/pkg/gps/Tests/GpsMessageTest.php
@@ -29,7 +29,7 @@ public function testCouldBeUnserializedFromJson()
$json = json_encode($message);
- //guard
+ // guard
$this->assertNotEmpty($json);
$unserializedMessage = GpsMessage::jsonUnserialize($json);
@@ -70,4 +70,13 @@ public function testThrowIfMalformedJsonGivenOnUnsterilizedFromJson()
GpsMessage::jsonUnserialize('{]');
}
+
+ public function testGetAttributes()
+ {
+ $message = new GpsMessage('the body', [], ['attributes' => ['key1' => 'value1']]);
+
+ $attributes = $message->getAttributes();
+
+ $this->assertSame(['key1' => 'value1'], $attributes);
+ }
}
diff --git a/pkg/gps/Tests/GpsProducerTest.php b/pkg/gps/Tests/GpsProducerTest.php
index 1e1bfae41..9e39078e4 100644
--- a/pkg/gps/Tests/GpsProducerTest.php
+++ b/pkg/gps/Tests/GpsProducerTest.php
@@ -55,6 +55,37 @@ public function testShouldSendMessage()
$producer->send($topic, $message);
}
+ public function testShouldSendMessageWithAttributes()
+ {
+ $topic = new GpsTopic('topic-name');
+ $message = new GpsMessage('', [], ['attributes' => ['key1' => 'value1']]);
+
+ $gtopic = $this->createGTopicMock();
+ $gtopic
+ ->expects($this->once())
+ ->method('publish')
+ ->with($this->identicalTo(['data' => '{"body":"","properties":[],"headers":[]}', 'attributes' => ['key1' => 'value1']]))
+ ;
+
+ $client = $this->createPubSubClientMock();
+ $client
+ ->expects($this->once())
+ ->method('topic')
+ ->with('topic-name')
+ ->willReturn($gtopic)
+ ;
+
+ $context = $this->createContextMock();
+ $context
+ ->expects($this->once())
+ ->method('getClient')
+ ->willReturn($client)
+ ;
+
+ $producer = new GpsProducer($context);
+ $producer->send($topic, $message);
+ }
+
/**
* @return GpsContext|\PHPUnit\Framework\MockObject\MockObject|GpsContext
*/