Skip to content

Commit 85fd0af

Browse files
committed
Add Stream options
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 7626be6 commit 85fd0af

File tree

3 files changed

+104
-4
lines changed

3 files changed

+104
-4
lines changed

RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public IConsumerBuilder.IStreamOptions Offset(string interval)
8686

8787
public IConsumerBuilder.IStreamOptions FilterValues(string[] values)
8888
{
89-
filters[new Symbol("rabbitmq:stream-filter")] = values;
89+
filters[new Symbol("rabbitmq:stream-filter")] = values.ToList();
9090
return this;
9191
}
9292

RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Amqp;
22
using Amqp.Framing;
3+
using Amqp.Types;
34

45
namespace RabbitMQ.AMQP.Client.Impl;
56

@@ -117,7 +118,7 @@ public IMessage Subject(string subject)
117118
public IMessage Annotation(string key, object value)
118119
{
119120
EnsureAnnotations();
120-
NativeMessage.MessageAnnotations[key] = value;
121+
NativeMessage.MessageAnnotations[new Symbol(key)] = value;
121122
return this;
122123
}
123124

Tests/ConsumerTests.cs

Lines changed: 101 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Collections.Generic;
2+
using System.Linq;
23
using System.Threading;
34
using System.Threading.Tasks;
45
using RabbitMQ.AMQP.Client;
@@ -146,7 +147,7 @@ await publisher.Publish(new AmqpMessage($"message_{i}"),
146147
await connection.CloseAsync();
147148
}
148149

149-
150+
150151
/// <summary>
151152
/// Test the consumer for a stream queue with offset
152153
/// The test is not deterministic because we don't know how many messages will be consumed
@@ -185,6 +186,104 @@ public async Task ConsumerForStreamQueueWithOffset(StreamOffsetSpecification off
185186
await connection.CloseAsync();
186187
}
187188

189+
190+
/// <summary>
191+
/// Test for stream filtering
192+
/// There are two consumers:
193+
/// - one with a filter that should receive only the messages with the filter
194+
/// - one without filter that should receive all messages
195+
/// </summary>
196+
/// <param name="filter"></param>
197+
/// <param name="expected"></param>
198+
[Theory]
199+
[InlineData("pizza,beer,pasta,wine", 4)]
200+
[InlineData("pizza,beer", 2)]
201+
[InlineData("pizza", 1)]
202+
public async Task ConsumerWithStreamFilterShouldReceiveOnlyPartOfTheMessages(string filter, int expected)
203+
{
204+
string[] filters = filter.Split(",");
205+
206+
IConnection connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build());
207+
IManagement management = connection.Management();
208+
string queueName = $"ConsumerWithStreamFilterShouldReceiveOnlyPartOfTheMessages_{filter}";
209+
await management.Queue().Name(queueName).Type(QueueType.STREAM).Declare();
210+
foreach (string se in filters)
211+
{
212+
await Publish(connection, queueName, 1, se);
213+
}
214+
215+
// wait for the messages to be published and the chunks to be created
216+
await Task.Delay(1000);
217+
// publish extra messages without filter and these messages should be always excluded
218+
// by the consumer with the filter
219+
await Publish(connection, queueName, 10);
220+
221+
List<IMessage> receivedMessages = [];
222+
IConsumer consumer = connection.ConsumerBuilder().Queue(queueName).InitialCredits(100)
223+
.MessageHandler((context, message) =>
224+
{
225+
receivedMessages.Add(message);
226+
context.Accept();
227+
}).Stream().FilterValues(filters).FilterMatchUnfiltered(false)
228+
.Offset(StreamOffsetSpecification.First).Builder()
229+
.Build();
230+
231+
int receivedWithoutFilters = 0;
232+
IConsumer consumerWithoutFilters = connection.ConsumerBuilder().Queue(queueName).InitialCredits(100)
233+
.MessageHandler((context, message) =>
234+
{
235+
Interlocked.Increment(ref receivedWithoutFilters);
236+
context.Accept();
237+
}).Stream()
238+
.Offset(StreamOffsetSpecification.First).Builder()
239+
.Build();
240+
241+
// wait for the consumer to consume all messages
242+
await Task.Delay(500);
243+
Assert.Equal(expected, receivedMessages.Count);
244+
Assert.Equal(filters.Length + 10, receivedWithoutFilters);
245+
246+
await consumer.CloseAsync();
247+
await consumerWithoutFilters.CloseAsync();
248+
await management.QueueDeletion().Delete(queueName);
249+
await connection.CloseAsync();
250+
}
251+
252+
253+
/// <summary>
254+
/// Test the offset value for the stream queue
255+
/// </summary>
256+
/// <param name="offsetStart"></param>
257+
/// <param name="numberOfMessagesExpected"></param>
258+
259+
[Theory]
260+
[InlineData(0, 100)]
261+
[InlineData(50, 50)]
262+
[InlineData(99, 1)]
263+
public async Task ConsumerForStreamQueueWithOffsetValue(int offsetStart, int numberOfMessagesExpected)
264+
{
265+
IConnection connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build());
266+
IManagement management = connection.Management();
267+
string queueName = $"ConsumerForStreamQueueWithOffsetValue_{offsetStart}";
268+
await management.Queue().Name(queueName).Type(QueueType.STREAM).Declare();
269+
await Publish(connection, queueName, 100);
270+
int consumed = 0;
271+
IConsumer consumer = connection.ConsumerBuilder().Queue(queueName).InitialCredits(100)
272+
.MessageHandler((context, message) => { Interlocked.Increment(ref consumed); }).Stream().Offset(offsetStart)
273+
.Builder().Build();
274+
275+
// wait for the consumer to consume all messages
276+
// we can't use the TaskCompletionSource here because we don't know how many messages will be consumed
277+
// In two seconds, the consumer should consume all messages
278+
await Task.Delay(2000);
279+
280+
Assert.Equal(consumed, numberOfMessagesExpected);
281+
await consumer.CloseAsync();
282+
await management.QueueDeletion().Delete(queueName);
283+
await connection.CloseAsync();
284+
}
285+
286+
188287
private static async Task Publish(IConnection connection, string queue, int numberOfMessages,
189288
string filter = null)
190289
{
@@ -197,7 +296,7 @@ private static async Task Publish(IConnection connection, string queue, int numb
197296
message.Annotation("x-stream-filter-value", filter);
198297
}
199298

200-
await publisher.Publish(new AmqpMessage($"message_{i}"),
299+
await publisher.Publish(message,
201300
(_, descriptor) => { Assert.Equal(OutcomeState.Accepted, descriptor.State); });
202301
}
203302
}

0 commit comments

Comments
 (0)