diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs index 5eb1fee..281166e 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs @@ -86,7 +86,7 @@ public IConsumerBuilder.IStreamOptions Offset(string interval) public IConsumerBuilder.IStreamOptions FilterValues(string[] values) { - filters[new Symbol("rabbitmq:stream-filter")] = values; + filters[new Symbol("rabbitmq:stream-filter")] = values.ToList(); return this; } diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs index ce505a0..f20dc02 100644 --- a/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs +++ b/RabbitMQ.AMQP.Client/Impl/AmqpMessage.cs @@ -1,5 +1,6 @@ using Amqp; using Amqp.Framing; +using Amqp.Types; namespace RabbitMQ.AMQP.Client.Impl; @@ -117,7 +118,7 @@ public IMessage Subject(string subject) public IMessage Annotation(string key, object value) { EnsureAnnotations(); - NativeMessage.MessageAnnotations[key] = value; + NativeMessage.MessageAnnotations[new Symbol(key)] = value; return this; } diff --git a/Tests/ConsumerTests.cs b/Tests/ConsumerTests.cs index 7e41401..cbee30c 100644 --- a/Tests/ConsumerTests.cs +++ b/Tests/ConsumerTests.cs @@ -1,4 +1,5 @@ using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using RabbitMQ.AMQP.Client; @@ -146,7 +147,7 @@ await publisher.Publish(new AmqpMessage($"message_{i}"), await connection.CloseAsync(); } - + /// /// Test the consumer for a stream queue with offset /// The test is not deterministic because we don't know how many messages will be consumed @@ -185,6 +186,104 @@ public async Task ConsumerForStreamQueueWithOffset(StreamOffsetSpecification off await connection.CloseAsync(); } + + /// + /// Test for stream filtering + /// There are two consumers: + /// - one with a filter that should receive only the messages with the filter + /// - one without filter that should receive all messages + /// + /// + /// + [Theory] + [InlineData("pizza,beer,pasta,wine", 4)] + [InlineData("pizza,beer", 2)] + [InlineData("pizza", 1)] + public async Task ConsumerWithStreamFilterShouldReceiveOnlyPartOfTheMessages(string filter, int expected) + { + string[] filters = filter.Split(","); + + IConnection connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build()); + IManagement management = connection.Management(); + string queueName = $"ConsumerWithStreamFilterShouldReceiveOnlyPartOfTheMessages_{filter}"; + await management.Queue().Name(queueName).Type(QueueType.STREAM).Declare(); + foreach (string se in filters) + { + await Publish(connection, queueName, 1, se); + } + + // wait for the messages to be published and the chunks to be created + await Task.Delay(1000); + // publish extra messages without filter and these messages should be always excluded + // by the consumer with the filter + await Publish(connection, queueName, 10); + + List receivedMessages = []; + IConsumer consumer = connection.ConsumerBuilder().Queue(queueName).InitialCredits(100) + .MessageHandler((context, message) => + { + receivedMessages.Add(message); + context.Accept(); + }).Stream().FilterValues(filters).FilterMatchUnfiltered(false) + .Offset(StreamOffsetSpecification.First).Builder() + .Build(); + + int receivedWithoutFilters = 0; + IConsumer consumerWithoutFilters = connection.ConsumerBuilder().Queue(queueName).InitialCredits(100) + .MessageHandler((context, message) => + { + Interlocked.Increment(ref receivedWithoutFilters); + context.Accept(); + }).Stream() + .Offset(StreamOffsetSpecification.First).Builder() + .Build(); + + // wait for the consumer to consume all messages + await Task.Delay(500); + Assert.Equal(expected, receivedMessages.Count); + Assert.Equal(filters.Length + 10, receivedWithoutFilters); + + await consumer.CloseAsync(); + await consumerWithoutFilters.CloseAsync(); + await management.QueueDeletion().Delete(queueName); + await connection.CloseAsync(); + } + + + /// + /// Test the offset value for the stream queue + /// + /// + /// + + [Theory] + [InlineData(0, 100)] + [InlineData(50, 50)] + [InlineData(99, 1)] + public async Task ConsumerForStreamQueueWithOffsetValue(int offsetStart, int numberOfMessagesExpected) + { + IConnection connection = await AmqpConnection.CreateAsync(ConnectionSettingBuilder.Create().Build()); + IManagement management = connection.Management(); + string queueName = $"ConsumerForStreamQueueWithOffsetValue_{offsetStart}"; + await management.Queue().Name(queueName).Type(QueueType.STREAM).Declare(); + await Publish(connection, queueName, 100); + int consumed = 0; + IConsumer consumer = connection.ConsumerBuilder().Queue(queueName).InitialCredits(100) + .MessageHandler((context, message) => { Interlocked.Increment(ref consumed); }).Stream().Offset(offsetStart) + .Builder().Build(); + + // wait for the consumer to consume all messages + // we can't use the TaskCompletionSource here because we don't know how many messages will be consumed + // In two seconds, the consumer should consume all messages + await Task.Delay(2000); + + Assert.Equal(consumed, numberOfMessagesExpected); + await consumer.CloseAsync(); + await management.QueueDeletion().Delete(queueName); + await connection.CloseAsync(); + } + + private static async Task Publish(IConnection connection, string queue, int numberOfMessages, string filter = null) { @@ -197,7 +296,7 @@ private static async Task Publish(IConnection connection, string queue, int numb message.Annotation("x-stream-filter-value", filter); } - await publisher.Publish(new AmqpMessage($"message_{i}"), + await publisher.Publish(message, (_, descriptor) => { Assert.Equal(OutcomeState.Accepted, descriptor.State); }); } }