From c33ad32f4a89811f3ade6581f07c1a116d3b85b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 7 Jun 2023 15:56:54 +0200 Subject: [PATCH 1/2] Add test for super stream exchange References rabbitmq/rabbitmq-server#8398 --- .github/workflows/test-pr.yml | 3 + .../stream/impl/SuperStreamExchangeTest.java | 140 ++++++++++++++++++ .../com/rabbitmq/stream/impl/TestUtils.java | 34 ++++- 3 files changed, 175 insertions(+), 2 deletions(-) create mode 100644 src/test/java/com/rabbitmq/stream/impl/SuperStreamExchangeTest.java diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index aaf664e873..b55305d67f 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -5,6 +5,9 @@ on: branches: - main +env: + RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:stream-chunk-filtering-otp-max-bazel' + jobs: build: runs-on: ubuntu-22.04 diff --git a/src/test/java/com/rabbitmq/stream/impl/SuperStreamExchangeTest.java b/src/test/java/com/rabbitmq/stream/impl/SuperStreamExchangeTest.java new file mode 100644 index 0000000000..9d27c99195 --- /dev/null +++ b/src/test/java/com/rabbitmq/stream/impl/SuperStreamExchangeTest.java @@ -0,0 +1,140 @@ +// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved. +// +// This software, the RabbitMQ Stream Java client library, is dual-licensed under the +// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL"). +// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL, +// please see LICENSE-APACHE2. +// +// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, +// either express or implied. See the LICENSE file for specific language governing +// rights and limitations of this software. +// +// If you have any questions regarding licensing, please contact us at +// info@rabbitmq.com. +package com.rabbitmq.stream.impl; + +import static com.rabbitmq.stream.impl.TestUtils.*; +import static com.rabbitmq.stream.impl.TestUtils.SuperStreamExchangeType.SUPER; +import static org.assertj.core.api.Assertions.assertThat; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.stream.*; +import io.netty.channel.EventLoopGroup; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.function.BiConsumer;import java.util.stream.IntStream; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class) +public class SuperStreamExchangeTest { + + EventLoopGroup eventLoopGroup; + + Environment environment; + + Connection connection; + int partitions = 3; + int messageCount = 10_000; + String superStream; + + @BeforeEach + void init(TestInfo info) throws Exception { + EnvironmentBuilder environmentBuilder = + Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder(); + environmentBuilder.addressResolver(add -> localhost()); + environment = environmentBuilder.build(); + connection = new ConnectionFactory().newConnection(); + superStream = TestUtils.streamName(info); + } + + @AfterEach + void tearDown() throws Exception { + environment.close(); + deleteSuperStreamTopology(connection, superStream, partitions); + connection.close(); + } + + @Test + void publish() throws Exception { + declareSuperStreamTopology(connection, superStream, SUPER, partitions); + List routingKeys = new ArrayList<>(messageCount); + IntStream.range(0, messageCount) + .forEach(ignored -> routingKeys.add(UUID.randomUUID().toString())); + + CountDownLatch publishLatch = new CountDownLatch(messageCount); + try (Producer producer = + environment + .producerBuilder() + .superStream(superStream) + .routing(msg -> msg.getProperties().getMessageIdAsString()) + .producerBuilder() + .build()) { + ConfirmationHandler confirmationHandler = status -> publishLatch.countDown(); + routingKeys.forEach( + rk -> + producer.send( + producer.messageBuilder().properties().messageId(rk).messageBuilder().build(), + confirmationHandler)); + latchAssert(publishLatch).completes(); + } + + java.util.function.Consumer>> consumeMessages = + receivedMessages -> { + CountDownLatch consumeLatch = new CountDownLatch(messageCount); + try (Consumer ignored = + environment + .consumerBuilder() + .superStream(superStream) + .offset(OffsetSpecification.first()) + .messageHandler( + (ctx, msg) -> { + receivedMessages + .computeIfAbsent(ctx.stream(), k -> ConcurrentHashMap.newKeySet()) + .add(msg.getProperties().getMessageIdAsString()); + consumeLatch.countDown(); + }) + .build()) { + + latchAssert(consumeLatch).completes(); + assertThat(receivedMessages.values().stream().mapToInt(Set::size).sum()) + .isEqualTo(messageCount); + } + }; + + Map> streamProducerMessages = new ConcurrentHashMap<>(partitions); + consumeMessages.accept(streamProducerMessages); + + deleteSuperStreamTopology(connection, superStream, partitions); + declareSuperStreamTopology(connection, superStream, SUPER, partitions); + + try (Channel channel = connection.createChannel()) { + channel.confirmSelect(); + for (String rk : routingKeys) { + channel.basicPublish( + superStream, rk, new AMQP.BasicProperties.Builder().messageId(rk).build(), null); + } + channel.waitForConfirmsOrDie(); + } + + Map> amqpProducerMessages = new ConcurrentHashMap<>(partitions); + consumeMessages.accept(amqpProducerMessages); + assertThat(amqpProducerMessages).hasSameSizeAs(streamProducerMessages) + .containsKeys(streamProducerMessages.keySet().toArray(new String[]{})); + + BiConsumer, Set> compareSets = (s1, s2) -> { + assertThat(s1).hasSameSizeAs(s2); + s1.forEach(rk -> assertThat(s2).contains(rk)); + }; + + amqpProducerMessages.forEach( + (key, value) -> compareSets.accept(value, streamProducerMessages.get(key))); + } +} diff --git a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java index 873a6454cb..2a37bc45a2 100644 --- a/src/test/java/com/rabbitmq/stream/impl/TestUtils.java +++ b/src/test/java/com/rabbitmq/stream/impl/TestUtils.java @@ -13,13 +13,13 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; +import static com.rabbitmq.stream.impl.TestUtils.SuperStreamExchangeType.DIRECT; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.fail; import ch.qos.logback.classic.Level; -import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.stream.Address; @@ -272,18 +272,37 @@ static void doIfNotNull(T obj, Consumer action) { static void declareSuperStreamTopology(Connection connection, String superStream, int partitions) throws Exception { + declareSuperStreamTopology(connection, superStream, DIRECT, partitions); + } + + static void declareSuperStreamTopology( + Connection connection, + String superStream, + SuperStreamExchangeType exchangeType, + int partitions) + throws Exception { declareSuperStreamTopology( connection, superStream, + exchangeType, IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new)); } static void declareSuperStreamTopology(Connection connection, String superStream, String... rks) throws Exception { + declareSuperStreamTopology(connection, superStream, DIRECT, rks); + } + + static void declareSuperStreamTopology( + Connection connection, + String superStream, + SuperStreamExchangeType exchangeType, + String... rks) + throws Exception { try (Channel ch = connection.createChannel()) { ch.exchangeDeclare( superStream, - BuiltinExchangeType.DIRECT, + exchangeType.value, true, false, Collections.singletonMap("x-super-stream", true)); @@ -309,6 +328,17 @@ static void declareSuperStreamTopology(Connection connection, String superStream } } + public enum SuperStreamExchangeType { + DIRECT("direct"), + SUPER("x-super-stream"); + + final String value; + + SuperStreamExchangeType(String value) { + this.value = value; + } + } + static void deleteSuperStreamTopology(Connection connection, String superStream, int partitions) throws Exception { deleteSuperStreamTopology( From 5d6927561ad5415c95fe0a73c11a74b8586735bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 7 Jun 2023 16:03:37 +0200 Subject: [PATCH 2/2] Fix broker Docker image for PR testing --- .github/workflows/test-pr.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-pr.yml b/.github/workflows/test-pr.yml index b55305d67f..afe1ab7153 100644 --- a/.github/workflows/test-pr.yml +++ b/.github/workflows/test-pr.yml @@ -6,7 +6,7 @@ on: - main env: - RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:stream-chunk-filtering-otp-max-bazel' + RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:super-stream-exchange-type-otp-max-bazel' jobs: build: