Skip to content

Commit a76feff

Browse files
committed
Custom timeout exception for Kafka request reply
1 parent fbe001d commit a76feff

File tree

3 files changed

+17
-5
lines changed

3 files changed

+17
-5
lines changed

smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -236,10 +236,9 @@ public Multi<Message<Rep>> requestMulti(Message<Req> request) {
236236
replyTopic,
237237
replyPartition,
238238
(MultiEmitter<Message<Rep>>) emitter));
239-
})
240-
.ifNoItem().after(replyTimeout).fail())
241-
.onTermination().invoke(() -> pendingReplies.remove(correlationId))
242-
.onItem().transformToUniAndMerge(m -> {
239+
}))
240+
.ifNoItem().after(replyTimeout).failWith(() -> new KafkaRequestReplyTimeoutException(correlationId))
241+
.onItem().transformToUniAndConcatenate(m -> {
243242
if (replyFailureHandler != null) {
244243
Throwable failure = replyFailureHandler.handleReply((KafkaRecord<?, ?>) m);
245244
if (failure != null) {
@@ -248,6 +247,7 @@ public Multi<Message<Rep>> requestMulti(Message<Req> request) {
248247
}
249248
return Uni.createFrom().item(m);
250249
})
250+
.onTermination().invoke(() -> pendingReplies.remove(correlationId))
251251
.plug(multi -> replyConverter != null ? multi.map(f -> replyConverter.apply(f)) : multi);
252252
}
253253

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.smallrye.reactive.messaging.kafka.reply;
2+
3+
/**
4+
* Exception thrown when a reply is not received within the configured timeout.
5+
*/
6+
public class KafkaRequestReplyTimeoutException extends RuntimeException {
7+
8+
public KafkaRequestReplyTimeoutException(CorrelationId correlationId) {
9+
super("Timeout waiting for a reply for request with correlation ID: " + correlationId);
10+
}
11+
12+
}

smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,7 @@ void testReplyTimeout() {
628628

629629
app.requestReply().request(1)
630630
.subscribe().withSubscriber(UniAssertSubscriber.create())
631-
.awaitFailure().assertFailedWith(TimeoutException.class);
631+
.awaitFailure().assertFailedWith(KafkaRequestReplyTimeoutException.class);
632632
}
633633

634634
@Test

0 commit comments

Comments
 (0)