Skip to content

Commit 2a1c612

Browse files
committed
Add reproducer for kafka companion channel names
1 parent 13eb367 commit 2a1c612

File tree

4 files changed

+73
-0
lines changed

4 files changed

+73
-0
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.quarkus.it.kafka;
2+
3+
import jakarta.enterprise.context.ApplicationScoped;
4+
import jakarta.inject.Inject;
5+
6+
import org.eclipse.microprofile.reactive.messaging.Channel;
7+
import org.eclipse.microprofile.reactive.messaging.Emitter;
8+
9+
@ApplicationScoped
10+
public class MessageEmitter {
11+
12+
@Inject
13+
@Channel("foo.bar-topic")
14+
Emitter<String> emitter;
15+
16+
public void emit(String message) {
17+
emitter.send(message).toCompletableFuture().join();
18+
}
19+
}

integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@
2929
@QuarkusTest
3030
@QuarkusTestResource(KafkaCompanionResource.class)
3131
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
32+
// this test class needs to be executed first, since it relies on number of kafka messages send
33+
// these messages are send during each test class execution and cannot be easily removed
34+
@Order(1)
3235
public class KafkaConnectorTest {
3336

3437
protected static final TypeRef<List<Person>> TYPE_REF = new TypeRef<List<Person>>() {
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io.quarkus.it.kafka;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
5+
import java.time.Duration;
6+
import java.util.List;
7+
8+
import jakarta.inject.Inject;
9+
10+
import org.apache.kafka.clients.consumer.ConsumerRecord;
11+
import org.junit.jupiter.api.Order;
12+
import org.junit.jupiter.api.Tag;
13+
import org.junit.jupiter.api.Test;
14+
import org.junit.jupiter.api.condition.DisabledOnOs;
15+
import org.junit.jupiter.api.condition.OS;
16+
17+
import io.quarkus.test.common.WithTestResource;
18+
import io.quarkus.test.junit.QuarkusTest;
19+
import io.quarkus.test.kafka.InjectKafkaCompanion;
20+
import io.quarkus.test.kafka.KafkaCompanionResource;
21+
import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;
22+
23+
@QuarkusTest
24+
@WithTestResource(KafkaCompanionResource.class)
25+
@DisabledOnOs({ OS.WINDOWS }) // Kafka requires docker to start
26+
@Tag("https://github.com/quarkusio/quarkus/issues/50751")
27+
@Order(2)
28+
class UsingKafkaCompanionTest {
29+
30+
@Inject
31+
MessageEmitter messageTransmitter;
32+
33+
@InjectKafkaCompanion
34+
KafkaCompanion companion;
35+
36+
@Test
37+
void dashedTopic() {
38+
String message = "Hello, Quarkus!";
39+
messageTransmitter.emit(message);
40+
41+
List<String> actual = companion.consumeStrings()
42+
.fromTopics("foo.bar-topic")
43+
.awaitRecords(1, Duration.ofSeconds(30))
44+
.stream()
45+
.map(ConsumerRecord::value)
46+
.toList();
47+
assertEquals(1, actual.size());
48+
assertEquals(message, actual.get(0));
49+
}
50+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
junit.jupiter.testclass.order.default=org.junit.jupiter.api.ClassOrderer$OrderAnnotation

0 commit comments

Comments
 (0)