Skip to content

Commit 884c9ff

Browse files
committed
Remove the ordering decorator and place it in Kafka source before the throttled commit strategy receive
1 parent 749f411 commit 884c9ff

File tree

5 files changed

+89
-124
lines changed

5 files changed

+89
-124
lines changed

smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/IncomingKafkaRecord.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.time.Instant;
44
import java.util.ArrayList;
5+
import java.util.concurrent.CompletableFuture;
56
import java.util.concurrent.CompletionStage;
67
import java.util.function.BiFunction;
78
import java.util.function.Function;
@@ -26,6 +27,7 @@ public class IncomingKafkaRecord<K, T> implements KafkaRecord<K, T>, MetadataInj
2627
private final KafkaCommitHandler commitHandler;
2728
private final KafkaFailureHandler onNack;
2829
private final T payload;
30+
private Runnable afterProcessing;
2931

3032
public IncomingKafkaRecord(ConsumerRecord<K, T> record,
3133
String channel,
@@ -125,17 +127,23 @@ public BiFunction<Throwable, Metadata, CompletionStage<Void>> getNackWithMetadat
125127

126128
@Override
127129
public CompletionStage<Void> ack(Metadata metadata) {
128-
return commitHandler.handle(this).subscribeAsCompletionStage();
130+
CompletableFuture<Void> ack = commitHandler.handle(this).subscribeAsCompletionStage();
131+
return afterProcessing == null ? ack : ack.thenRun(afterProcessing);
129132
}
130133

131134
@Override
132135
public CompletionStage<Void> nack(Throwable reason, Metadata metadata) {
133-
return onNack.handle(this, reason, metadata).subscribeAsCompletionStage();
136+
CompletableFuture<Void> nack = onNack.handle(this, reason, metadata).subscribeAsCompletionStage();
137+
return afterProcessing == null ? nack : nack.thenRun(afterProcessing);
134138
}
135139

136140
@Override
137141
public synchronized void injectMetadata(Object metadata) {
138142
this.metadata = this.metadata.with(metadata);
139143
}
140144

145+
public void afterProcessing(Runnable runnable) {
146+
this.afterProcessing = runnable;
147+
}
148+
141149
}

smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import io.smallrye.reactive.messaging.ClientCustomizer;
2929
import io.smallrye.reactive.messaging.health.HealthReport;
3030
import io.smallrye.reactive.messaging.kafka.*;
31+
import io.smallrye.reactive.messaging.kafka.api.IncomingKafkaRecordMetadata;
3132
import io.smallrye.reactive.messaging.kafka.commit.ContextHolder;
3233
import io.smallrye.reactive.messaging.kafka.commit.KafkaCommitHandler;
3334
import io.smallrye.reactive.messaging.kafka.fault.KafkaDeadLetterQueue;
@@ -36,6 +37,7 @@
3637
import io.smallrye.reactive.messaging.kafka.health.KafkaSourceHealth;
3738
import io.smallrye.reactive.messaging.kafka.tracing.KafkaOpenTelemetryInstrumenter;
3839
import io.smallrye.reactive.messaging.kafka.tracing.KafkaTrace;
40+
import io.smallrye.reactive.messaging.providers.helpers.PausableMulti;
3941
import io.vertx.core.impl.ContextInternal;
4042
import io.vertx.core.impl.VertxInternal;
4143
import io.vertx.mutiny.core.Vertx;
@@ -166,14 +168,43 @@ public KafkaSource(Vertx vertx,
166168
log.unableToReadRecord(topics, t);
167169
reportFailure(t, false);
168170
});
169-
170-
Multi<IncomingKafkaRecord<K, V>> incomingMulti = multi.onItem().transformToUni(rec -> {
171-
IncomingKafkaRecord<K, V> record = new IncomingKafkaRecord<>(rec, channel, index, commitHandler,
172-
failureHandler, isCloudEventEnabled, isTracingEnabled);
171+
Multi<IncomingKafkaRecord<K, V>> incomingMulti = multi.onItem()
172+
.transform(rec -> new IncomingKafkaRecord<>(rec, channel, index, commitHandler,
173+
failureHandler, isCloudEventEnabled, isTracingEnabled));
174+
175+
// preserve order by key if configured
176+
ProcessingOrder processingOrder = ProcessingOrder.of(config.getThrottledProcessingOrder());
177+
incomingMulti = switch (processingOrder) {
178+
case UNORDERED -> incomingMulti;
179+
case ORDERED_BY_KEY -> incomingMulti.group()
180+
.by(message -> TopicPartitionKey.ofKey(
181+
message.getMetadata(IncomingKafkaRecordMetadata.class).get().getRecord()))
182+
.onItem().transformToMulti(g -> {
183+
PausableMulti<IncomingKafkaRecord<K, V>> pausable = new PausableMulti<>(g, false);
184+
return pausable.invoke(rec -> {
185+
pausable.pause();
186+
rec.afterProcessing(pausable::resume);
187+
});
188+
})
189+
.merge(128);
190+
case ORDERED_BY_PARTITION -> incomingMulti.group()
191+
.by(message -> TopicPartitionKey.ofPartition(
192+
message.getMetadata(IncomingKafkaRecordMetadata.class).get().getRecord()))
193+
.onItem().transformToMulti(g -> {
194+
PausableMulti<IncomingKafkaRecord<K, V>> pausable = new PausableMulti<>(g, false);
195+
return pausable.invoke(rec -> {
196+
pausable.pause();
197+
rec.afterProcessing(pausable::resume);
198+
});
199+
})
200+
.merge(128);
201+
};
202+
203+
incomingMulti = incomingMulti.onItem().transformToUni(record -> {
173204
if ((failureHandler instanceof KafkaDeadLetterQueue)
174-
&& rec.headers() != null
175-
&& rec.headers().lastHeader(DESERIALIZATION_FAILURE_DLQ) != null) {
176-
Header reasonMsgHeader = rec.headers().lastHeader(DESERIALIZATION_FAILURE_REASON);
205+
&& record.getHeaders() != null
206+
&& record.getHeaders().lastHeader(DESERIALIZATION_FAILURE_DLQ) != null) {
207+
Header reasonMsgHeader = record.getHeaders().lastHeader(DESERIALIZATION_FAILURE_REASON);
177208
String message = reasonMsgHeader != null ? new String(reasonMsgHeader.value()) : null;
178209
RecordDeserializationException reason = new RecordDeserializationException(
179210
TopicPartitions.getTopicPartition(record), record.getOffset(), message, null);

smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KeyBasedOrderingDecorator.java

Lines changed: 0 additions & 72 deletions
This file was deleted.

smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/ThrottledConcurrencyTest.java

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import java.util.concurrent.TimeUnit;
1111
import java.util.concurrent.atomic.AtomicInteger;
1212
import java.util.stream.IntStream;
13-
import java.util.stream.Stream;
1413

1514
import jakarta.enterprise.context.ApplicationScoped;
1615

@@ -26,19 +25,20 @@
2625

2726
public class ThrottledConcurrencyTest extends KafkaCompanionTestBase {
2827

28+
int partitions = 10;
29+
int concurrency = 10;
30+
int recordsParPartition = 100;
31+
2932
@Test
3033
public void testUnorderedParallelProcessing() {
3134
// Create topic and produce messages with different keys
32-
companion.topics().createAndWait(topic, 5);
35+
companion.topics().createAndWait(topic, partitions);
3336

3437
// Produce messages to different partitions
3538
companion.produceStrings()
36-
.fromRecords(IntStream.range(0, 10).boxed().flatMap(i -> Stream.of(
37-
new ProducerRecord<>(topic, 0, "A", "A-" + i),
38-
new ProducerRecord<>(topic, 1, "B", "B-" + i),
39-
new ProducerRecord<>(topic, 2, "C", "C-" + i),
40-
new ProducerRecord<>(topic, 3, "D", "D-" + i),
41-
new ProducerRecord<>(topic, 4, "E", "E-" + i))).toList())
39+
.fromRecords(IntStream.range(0, recordsParPartition).boxed().flatMap(i -> IntStream.range(0, partitions).boxed()
40+
.map(p -> new ProducerRecord<>(topic, p, "key-" + p, "value-" + i)))
41+
.toList())
4242
.awaitCompletion();
4343

4444
MapBasedConfig config = kafkaConfig("mp.messaging.incoming.unordered")
@@ -49,18 +49,20 @@ public void testUnorderedParallelProcessing() {
4949
.with("key.deserializer", StringDeserializer.class.getName())
5050
.with("commit-strategy", "throttled")
5151
.withPrefix("")
52-
.with("smallrye.messaging.worker.my-pool.max-concurrency", 5);
52+
.with("smallrye.messaging.worker.my-pool.max-concurrency", concurrency);
5353

5454
UnorderedParallelConsumer app = runApplication(config, UnorderedParallelConsumer.class);
5555
long start = System.currentTimeMillis();
5656
System.out.println("Started processing messages " + start);
5757

5858
// Wait for all messages to be processed
59-
await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> assertThat(app.received()).hasSize(50));
59+
await().atMost(2, TimeUnit.MINUTES)
60+
.untilAsserted(() -> assertThat(app.received())
61+
.hasSize(partitions * recordsParPartition));
6062
long duration = System.currentTimeMillis() - start;
6163
System.out.println("Processing duration: " + duration + " ms");
6264

63-
assertThat(app.maxConcurrency()).isEqualTo(5);
65+
assertThat(app.maxConcurrency()).isEqualTo(concurrency);
6466
}
6567

6668
@ApplicationScoped
@@ -93,16 +95,13 @@ public int maxConcurrency() {
9395
@Test
9496
public void testOrderedByKeyProcessing() {
9597
// Create topic and produce messages with different keys
96-
companion.topics().createAndWait(topic, 5);
98+
companion.topics().createAndWait(topic, partitions);
9799

98100
// Produce messages to different keys
99101
companion.produceStrings()
100-
.fromRecords(IntStream.range(0, 10).boxed().flatMap(i -> Stream.of(
101-
new ProducerRecord<>(topic, 0, "A", "A-" + i),
102-
new ProducerRecord<>(topic, 1, "B", "B-" + i),
103-
new ProducerRecord<>(topic, 2, "C", "C-" + i),
104-
new ProducerRecord<>(topic, 3, "D", "D-" + i),
105-
new ProducerRecord<>(topic, 4, "E", "E-" + i))).toList())
102+
.fromRecords(IntStream.range(0, recordsParPartition).boxed().flatMap(i -> IntStream.range(0, partitions).boxed()
103+
.map(p -> new ProducerRecord<>(topic, p, "key-" + p, "value-" + i)))
104+
.toList())
106105
.awaitCompletion();
107106

108107
MapBasedConfig config = kafkaConfig("mp.messaging.incoming.key-ordered")
@@ -112,25 +111,27 @@ public void testOrderedByKeyProcessing() {
112111
.with("value.deserializer", StringDeserializer.class.getName())
113112
.with("key.deserializer", StringDeserializer.class.getName())
114113
.with("throttled.processing-order", "ordered_by_key")
115-
.with("commit-strategy", "throttled");
114+
.with("commit-strategy", "throttled")
115+
.withPrefix("")
116+
.with("smallrye.messaging.worker.my-pool.max-concurrency", concurrency);
116117

117118
OrderedByKeyParallelConsumer app = runApplication(config, OrderedByKeyParallelConsumer.class);
118119
long start = System.currentTimeMillis();
119120
System.out.println("Started processing messages " + start);
120121

121122
// Wait for all messages to be processed
122-
await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> assertThat(app.received()).hasSize(50));
123+
await().atMost(2, TimeUnit.MINUTES)
124+
.untilAsserted(() -> assertThat(app.received())
125+
.hasSize(partitions * recordsParPartition));
123126
long duration = System.currentTimeMillis() - start;
124127
System.out.println("Processing duration: " + duration + " ms");
125128

126129
// With ordered-by-key, messages with the same key should be processed sequentially
127130
// So max concurrent processing per key should be 1
128-
assertThat(app.keyMaxCounter("A")).hasValue(1);
129-
assertThat(app.keyMaxCounter("B")).hasValue(1);
130-
assertThat(app.keyMaxCounter("C")).hasValue(1);
131-
assertThat(app.keyMaxCounter("D")).hasValue(1);
132-
assertThat(app.keyMaxCounter("E")).hasValue(1);
133-
assertThat(app.maxConcurrency()).isEqualTo(5);
131+
for (int i = 0; i < partitions; i++) {
132+
assertThat(app.keyMaxCounter("key-" + i)).hasValue(1);
133+
}
134+
assertThat(app.maxConcurrency()).isEqualTo(partitions);
134135
}
135136

136137
@ApplicationScoped
@@ -177,16 +178,13 @@ public int maxConcurrency() {
177178
@Test
178179
public void testOrderedByPartitionProcessing() {
179180
// Create topic with 2 partitions
180-
companion.topics().createAndWait(topic, 5);
181+
companion.topics().createAndWait(topic, partitions);
181182

182183
// Produce messages to different partitions
183184
companion.produceStrings()
184-
.fromRecords(IntStream.range(0, 10).boxed().flatMap(i -> Stream.of(
185-
new ProducerRecord<>(topic, 0, "key-" + i, "p0-" + i),
186-
new ProducerRecord<>(topic, 1, "key-" + i, "p1-" + i),
187-
new ProducerRecord<>(topic, 2, "key-" + i, "p2-" + i),
188-
new ProducerRecord<>(topic, 3, "key-" + i, "p3-" + i),
189-
new ProducerRecord<>(topic, 4, "key-" + i, "p4-" + i))).toList())
185+
.fromRecords(IntStream.range(0, recordsParPartition).boxed().flatMap(i -> IntStream.range(0, partitions).boxed()
186+
.map(p -> new ProducerRecord<>(topic, p, "key-" + p, "value-" + i)))
187+
.toList())
190188
.awaitCompletion();
191189

192190
MapBasedConfig config = kafkaConfig("mp.messaging.incoming.partition-ordered")
@@ -196,25 +194,27 @@ public void testOrderedByPartitionProcessing() {
196194
.with("value.deserializer", StringDeserializer.class.getName())
197195
.with("key.deserializer", StringDeserializer.class.getName())
198196
.with("throttled.processing-order", "ordered_by_partition")
199-
.with("commit-strategy", "throttled");
197+
.with("commit-strategy", "throttled")
198+
.withPrefix("")
199+
.with("smallrye.messaging.worker.my-pool.max-concurrency", concurrency);
200200

201201
OrderedByPartitionParallelConsumer app = runApplication(config, OrderedByPartitionParallelConsumer.class);
202202
long start = System.currentTimeMillis();
203203
System.out.println("Started processing messages " + start);
204204

205205
// Wait for all messages to be processed
206-
await().atMost(1, TimeUnit.MINUTES).untilAsserted(() -> assertThat(app.received()).hasSize(50));
206+
await().atMost(2, TimeUnit.MINUTES)
207+
.untilAsserted(() -> assertThat(app.received())
208+
.hasSize(partitions * recordsParPartition));
207209
long duration = System.currentTimeMillis() - start;
208210
System.out.println("Processing duration: " + duration + " ms");
209211

210212
// With ordered-by-partition, messages from the same partition should be processed sequentially
211213
// So max concurrent processing per partition should be 1
212-
assertThat(app.partitionMaxCounter(0)).hasValue(1);
213-
assertThat(app.partitionMaxCounter(1)).hasValue(1);
214-
assertThat(app.partitionMaxCounter(2)).hasValue(1);
215-
assertThat(app.partitionMaxCounter(3)).hasValue(1);
216-
assertThat(app.partitionMaxCounter(4)).hasValue(1);
217-
assertThat(app.maxConcurrency()).isEqualTo(5);
214+
for (int i = 0; i < partitions; i++) {
215+
assertThat(app.partitionMaxCounter(i)).hasValue(1);
216+
}
217+
assertThat(app.maxConcurrency()).isEqualTo(partitions);
218218
}
219219

220220
@ApplicationScoped

smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/base/WeldTestBase.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import io.smallrye.reactive.messaging.kafka.fault.KafkaFailureHandler;
2727
import io.smallrye.reactive.messaging.kafka.fault.KafkaIgnoreFailure;
2828
import io.smallrye.reactive.messaging.kafka.impl.KafkaClientServiceImpl;
29-
import io.smallrye.reactive.messaging.kafka.impl.KeyBasedOrderingDecorator;
3029
import io.smallrye.reactive.messaging.kafka.impl.TopicPartitions;
3130
import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReplyFactory;
3231
import io.smallrye.reactive.messaging.kafka.reply.UUIDCorrelationIdHandler;
@@ -115,7 +114,6 @@ public void initWeld() {
115114
weld.addBeanClass(ObservationDecorator.class);
116115
weld.addBeanClass(OutgoingObservationDecorator.class);
117116
weld.addBeanClass(PausableChannelDecorator.class);
118-
weld.addBeanClass(KeyBasedOrderingDecorator.class);
119117
weld.disableDiscovery();
120118
}
121119

0 commit comments

Comments
 (0)