diff --git a/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/MessageEmitter.java b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/MessageEmitter.java new file mode 100644 index 0000000000000..a8a3bc9a16758 --- /dev/null +++ b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/MessageEmitter.java @@ -0,0 +1,19 @@ +package io.quarkus.it.kafka; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; + +@ApplicationScoped +public class MessageEmitter { + + @Inject + @Channel("foo.bar-topic") + Emitter emitter; + + public void emit(String message) { + emitter.send(message).toCompletableFuture().join(); + } +} diff --git a/integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java b/integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java index 4f158b1e70533..c6ff52d61500f 100644 --- a/integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java +++ b/integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java @@ -29,6 +29,9 @@ @QuarkusTest @QuarkusTestResource(KafkaCompanionResource.class) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) +// this test class needs to be executed first, since it relies on number of kafka messages send +// these messages are send during each test class execution and cannot be easily removed +@Order(1) public class KafkaConnectorTest { protected static final TypeRef> TYPE_REF = new TypeRef>() { diff --git a/integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/UsingKafkaCompanionTest.java b/integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/UsingKafkaCompanionTest.java new file mode 100644 index 0000000000000..65ca62293d672 --- /dev/null +++ b/integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/UsingKafkaCompanionTest.java @@ -0,0 +1,50 @@ +package io.quarkus.it.kafka; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Duration; +import java.util.List; + +import jakarta.inject.Inject; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import io.quarkus.test.common.WithTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.kafka.InjectKafkaCompanion; +import io.quarkus.test.kafka.KafkaCompanionResource; +import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion; + +@QuarkusTest +@WithTestResource(KafkaCompanionResource.class) +@DisabledOnOs({ OS.WINDOWS }) // Kafka requires docker to start +@Tag("https://github.com/quarkusio/quarkus/issues/50751") +@Order(2) +class UsingKafkaCompanionTest { + + @Inject + MessageEmitter messageTransmitter; + + @InjectKafkaCompanion + KafkaCompanion companion; + + @Test + void dashedTopic() { + String message = "Hello, Quarkus!"; + messageTransmitter.emit(message); + + List actual = companion.consumeStrings() + .fromTopics("foo.bar-topic") + .awaitRecords(1, Duration.ofSeconds(30)) + .stream() + .map(ConsumerRecord::value) + .toList(); + assertEquals(1, actual.size()); + assertEquals(message, actual.get(0)); + } +} diff --git a/integration-tests/reactive-messaging-kafka/src/test/resources/junit-platform.properties b/integration-tests/reactive-messaging-kafka/src/test/resources/junit-platform.properties new file mode 100644 index 0000000000000..44d89005418df --- /dev/null +++ b/integration-tests/reactive-messaging-kafka/src/test/resources/junit-platform.properties @@ -0,0 +1 @@ +junit.jupiter.testclass.order.default=org.junit.jupiter.api.ClassOrderer$OrderAnnotation