Skip to content

Commit f629147

Browse files
ozangunalpcescoffier
authored andcommitted
Add test for Targeted with tombstone records
Closes #2687
1 parent 4ed4d5d commit f629147

File tree

1 file changed

+66
-0
lines changed
  • smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka

1 file changed

+66
-0
lines changed

smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSinkTest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.smallrye.common.annotation.Identifier;
3131
import io.smallrye.mutiny.Multi;
3232
import io.smallrye.mutiny.Uni;
33+
import io.smallrye.reactive.messaging.Targeted;
3334
import io.smallrye.reactive.messaging.health.HealthReport;
3435
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
3536
import io.smallrye.reactive.messaging.kafka.base.KafkaCompanionTestBase;
@@ -694,4 +695,69 @@ public Flow.Publisher<Integer> source2() {
694695

695696
}
696697

698+
@Test
699+
public void testTargetedWithTombstoneRecords() {
700+
String topic1 = topic + "-1";
701+
String topic2 = topic + "-2";
702+
companion.topics().createAndWait(topic1, 1);
703+
companion.topics().createAndWait(topic2, 1);
704+
705+
KafkaMapBasedConfig config = kafkaConfig("mp.messaging.outgoing.output1")
706+
.with("topic", topic1)
707+
.with("key.serializer", IntegerSerializer.class.getName())
708+
.with("value.serializer", StringSerializer.class.getName())
709+
.withPrefix("mp.messaging.outgoing.output2")
710+
.with("topic", topic2)
711+
.with("key.serializer", IntegerSerializer.class.getName())
712+
.with("value.serializer", StringSerializer.class.getName());
713+
714+
ConsumerTask<Integer, String> consumed1 = companion.consume(Integer.class, String.class)
715+
.fromTopics(topic1, 10);
716+
ConsumerTask<Integer, String> consumed2 = companion.consume(Integer.class, String.class)
717+
.fromTopics(topic2, 10);
718+
719+
runApplication(config, BeanProducingTargetedWithTombstones.class);
720+
721+
await().until(this::isReady);
722+
await().until(this::isAlive);
723+
724+
assertThat(consumed1.awaitCompletion().count()).isEqualTo(10);
725+
assertThat(consumed2.awaitCompletion().count()).isEqualTo(10);
726+
727+
// Verify topic1 receives regular records
728+
assertThat(consumed1.getRecords())
729+
.extracting(ConsumerRecord::key)
730+
.containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
731+
assertThat(consumed1.getRecords())
732+
.extracting(ConsumerRecord::value)
733+
.containsExactly("value-0", "value-1", "value-2", "value-3", "value-4", "value-5", "value-6", "value-7", "value-8", "value-9");
734+
735+
// Verify topic2 receives tombstone records (null values)
736+
assertThat(consumed2.getRecords())
737+
.extracting(ConsumerRecord::key)
738+
.contains(9);
739+
assertThat(consumed2.getRecords())
740+
.extracting(ConsumerRecord::value)
741+
.containsOnlyNulls();
742+
}
743+
744+
@ApplicationScoped
745+
public static class BeanProducingTargetedWithTombstones {
746+
747+
@Incoming("data")
748+
@Outgoing("output1")
749+
@Outgoing("output2")
750+
public Targeted process(int input) {
751+
return Targeted.of(
752+
"output1", Record.of(input, "value-" + input),
753+
"output2", Record.of(input, null)); // Tombstone record
754+
}
755+
756+
@Outgoing("data")
757+
public Flow.Publisher<Integer> source() {
758+
return Multi.createFrom().range(0, 10);
759+
}
760+
761+
}
762+
697763
}

0 commit comments

Comments
 (0)