Skip to content

Commit 8ae27fb

Browse files
authored
Address kafkasql minor problems (#6930)
1 parent e60bcce commit 8ae27fb

File tree

5 files changed

+56
-51
lines changed

5 files changed

+56
-51
lines changed

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlConfiguration.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,11 @@ public Map<String, String> getEventsTopicProperties() {
159159
// So we convert them as soon as possible.
160160
var props = toMap(eventsTopicProperties);
161161

162-
// TODO: Check if the events topic can support multiple partitions. Currently all events are explicitly sent to partition 0.
162+
// Events topic is configured with a single partition to guarantee total ordering of all events.
163+
// This ensures consumers see events in the exact order they occurred. If per-aggregate ordering
164+
// is sufficient (rather than global ordering), multiple partitions could be supported by removing
165+
// the explicit partition assignment in KafkaSqlEventsProcessor and using the aggregateId as the
166+
// partition key. This would enable better scalability while maintaining ordering per aggregate.
163167
props.put(TOPIC_PARTITIONS_CONFIG, "1");
164168
props.putIfAbsent(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
165169
props.putIfAbsent(TopicConfig.RETENTION_MS_CONFIG, "-1");

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlCoordinator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@ public Object waitForResponse(UUID uuid) {
4949
Object rval = returnValues.remove(uuid);
5050
if (rval == NULL) {
5151
return null;
52-
} else if (rval instanceof RegistryException) {
53-
throw (RegistryException) rval; // TODO: Any exception
52+
} else if (rval instanceof RuntimeException) {
53+
// Rethrow any RuntimeException to preserve the original exception type
54+
// for proper handling by exception mappers.
55+
throw (RuntimeException) rval;
5456
}
5557
return rval;
5658
} catch (InterruptedException e) {

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlEventsProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ public class KafkaSqlEventsProcessor {
2424

2525
public void processEvent(@Observes KafkaSqlOutboxEvent event) {
2626
OutboxEvent outboxEvent = event.getOutboxEvent();
27-
// TODO: Are we only allowing a single partition?
27+
// Explicitly send to partition 0 to guarantee total ordering of all events.
28+
// See KafkaSqlConfiguration.getEventsTopicProperties() for details on this design decision.
2829
ProducerRecord<String, String> record = new ProducerRecord<>(configuration.getEventsTopic(), 0,
2930
outboxEvent.getAggregateId(), outboxEvent.getPayload().toString(), Collections.emptyList());
3031
blockOnResult(eventsProducer.apply(record));

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java

Lines changed: 39 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import io.apicurio.registry.utils.impexp.v3.GlobalRuleEntity;
4848
import io.apicurio.registry.utils.impexp.v3.GroupEntity;
4949
import io.apicurio.registry.utils.impexp.v3.GroupRuleEntity;
50+
import io.quarkus.arc.lookup.LookupIfProperty;
5051
import jakarta.annotation.PreDestroy;
5152
import jakarta.enterprise.context.ApplicationScoped;
5253
import jakarta.enterprise.event.Event;
@@ -83,6 +84,7 @@
8384
@PersistenceTimeoutReadinessApply
8485
@StorageMetricsApply
8586
@Logged
87+
@LookupIfProperty(name = "storage.type", stringValue = "kafkasql")
8688
public class KafkaSqlRegistryStorage extends RegistryStorageDecoratorReadOnlyBase implements RegistryStorage {
8789

8890
@Inject
@@ -137,6 +139,9 @@ public class KafkaSqlRegistryStorage extends RegistryStorageDecoratorReadOnlyBas
137139
// The snapshot id used to determine if this replica must process a snapshot message
138140
private volatile String lastTriggeredSnapshot = null;
139141

142+
// Reference to the consumer thread for health checks
143+
private volatile Thread consumerThread = null;
144+
140145
@Override
141146
public String storageName() {
142147
return "kafkasql";
@@ -180,8 +185,9 @@ public boolean isReady() {
180185

181186
@Override
182187
public boolean isAlive() {
183-
// TODO: Include readiness of Kafka consumers and producers? What happens if Kafka stops responding?
184-
return bootstrapped && !stopped;
188+
// Check that we've bootstrapped, haven't been stopped, and the consumer thread is still running.
189+
// If the consumer thread dies (e.g., due to Kafka connectivity issues), the storage is no longer alive.
190+
return bootstrapped && !stopped && consumerThread != null && consumerThread.isAlive();
185191
}
186192

187193
@PreDestroy
@@ -192,22 +198,30 @@ void onDestroy() {
192198
}
193199

194200
/**
195-
* Consume the snapshots topic, looking for the most recent snapshots in the topic. Once found, it
196-
* restores the internal h2 database using the snapshot's content. WARNING: This has the limitation of
197-
* processing the first 500 snapshots, which should be enough for most deployments.
198-
* TODO: ^ This is a serious limitation that should at least raise an error.
201+
* Consume the snapshots topic, looking for the most recent snapshot in the topic. Once found, it
202+
* restores the internal database using the snapshot's content. Polls in a loop until all messages
203+
* are consumed from the topic.
199204
*/
200205
private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsConsumer) {
201206
// Subscribe to the snapshots topic
202207
Collection<String> topics = Collections.singleton(configuration.getSnapshotsTopic());
203208
snapshotsConsumer.subscribe(topics);
204-
// TODO: We have to poll in a loop, no? We don't know how many records we get...
205-
ConsumerRecords<String, String> records = snapshotsConsumer.poll(configuration.getPollTimeout());
209+
206210
List<ConsumerRecord<String, String>> snapshots = new ArrayList<>();
207211
String snapshotRecordKey = null;
208-
if (records != null && !records.isEmpty()) {
209-
// collect all snapshots into a list
210-
records.forEach(snapshots::add);
212+
213+
// Poll in a loop until we get an empty result, indicating we've reached the end of the topic
214+
ConsumerRecords<String, String> records;
215+
do {
216+
records = snapshotsConsumer.poll(configuration.getPollTimeout());
217+
if (records != null && !records.isEmpty()) {
218+
records.forEach(snapshots::add);
219+
log.debug("Polled {} snapshot records, total collected: {}", records.count(), snapshots.size());
220+
}
221+
} while (records != null && !records.isEmpty());
222+
223+
if (!snapshots.isEmpty()) {
224+
log.info("Found {} total snapshots in the snapshots topic.", snapshots.size());
211225

212226
// sort snapshots by timestamp
213227
snapshots.sort(Comparator.comparingLong(ConsumerRecord::timestamp));
@@ -310,6 +324,7 @@ private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSq
310324
Thread thread = new Thread(runner);
311325
thread.setDaemon(true);
312326
thread.setName("KSQL Kafka Consumer Thread");
327+
consumerThread = thread;
313328
thread.start();
314329
}
315330

@@ -357,8 +372,11 @@ private void processRecord(ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> r
357372
return;
358373
}
359374

360-
// TODO instead of processing the journal record directly on the consumer thread, instead queue them
361-
// and have *another* thread process the queue
375+
// Note: We process journal records directly on the consumer thread. Since all messages in KafkaSQL
376+
// are in a single partition, ordering is guaranteed and processing must be sequential. Introducing
377+
// a separate processing thread with a queue would add complexity without significant benefit, as
378+
// we'd still need to ensure sequential processing for correctness. The coordinator mechanism already
379+
// handles response synchronization for write operations.
362380
kafkaSqlSink.processMessage(record);
363381
}
364382

@@ -714,21 +732,10 @@ public void importData(EntityInputStream entities, boolean preserveGlobalId, boo
714732
throws RegistryStorageException {
715733
DataImporter dataImporter = new SqlDataImporter(log, utils, this, preserveGlobalId,
716734
preserveContentId);
717-
dataImporter.importData(entities, () -> {
718-
// TODO Re-visit this, since Apicurio Registry 3 all messages live in the same partition, so there
719-
// should be no need to wait.
720-
// Because importing just pushes a bunch of Kafka messages, we may need to
721-
// wait for a few seconds before we send the reset messages. Due to partitioning,
722-
// we can't guarantee ordering of these next two messages, and we NEED them to
723-
// be consumed after all the import messages.
724-
// TODO We can wait until the last message is read (a specific one),
725-
// or create a new message type for this purpose (a sync message).
726-
try {
727-
Thread.sleep(2000);
728-
} catch (Exception e) {
729-
// Noop
730-
}
731-
});
735+
// All messages use the same partition key (__GLOBAL_PARTITION__), so Kafka guarantees
736+
// ordering within the partition. Reset messages sent after import messages will be
737+
// consumed in the correct order without needing any sleep/wait mechanism.
738+
dataImporter.importData(entities, () -> {});
732739
}
733740

734741
/**
@@ -739,21 +746,10 @@ public void upgradeData(EntityInputStream entities, boolean preserveGlobalId, bo
739746
throws RegistryStorageException {
740747
DataImporter dataImporter = new SqlDataUpgrader(log, utils, this, preserveGlobalId,
741748
preserveContentId);
742-
dataImporter.importData(entities, () -> {
743-
// TODO Re-visit this, since Apicurio Registry 3 all messages live in the same partition, so there
744-
// should be no need to wait.
745-
// Because importing just pushes a bunch of Kafka messages, we may need to
746-
// wait for a few seconds before we send the reset messages. Due to partitioning,
747-
// we can't guarantee ordering of these next two messages, and we NEED them to
748-
// be consumed after all the import messages.
749-
// TODO We can wait until the last message is read (a specific one),
750-
// or create a new message type for this purpose (a sync message).
751-
try {
752-
Thread.sleep(2000);
753-
} catch (Exception e) {
754-
// Noop
755-
}
756-
});
749+
// All messages use the same partition key (__GLOBAL_PARTITION__), so Kafka guarantees
750+
// ordering within the partition. Reset messages sent after import messages will be
751+
// consumed in the correct order without needing any sleep/wait mechanism.
752+
dataImporter.importData(entities, () -> {});
757753
}
758754

759755
/**

app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/sql/KafkaSqlSink.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,15 @@ public void processMessage(ConsumerRecord<KafkaSqlMessageKey, KafkaSqlMessage> r
5555
result != null ? result.toString() : "");
5656
log.debug("Kafka message successfully processed. Notifying listeners of response.");
5757
coordinator.notifyResponse(requestId, result);
58-
} catch (RegistryException e) {
59-
log.debug("Registry exception detected: {}", e.getMessage());
58+
} catch (RuntimeException e) {
59+
// Pass RuntimeException (including RegistryException) directly without wrapping
60+
// to preserve the original exception type for proper handling by exception mappers.
61+
log.debug("Runtime exception detected: {}", e.getMessage());
6062
coordinator.notifyResponse(requestId, e);
6163
} catch (Throwable e) {
64+
// Wrap checked exceptions and Errors in RegistryException
6265
log.debug("Unexpected exception detected: {}", e.getMessage());
63-
coordinator.notifyResponse(requestId, new RegistryException(e)); // TODO: Any exception (no
64-
// wrapping)
66+
coordinator.notifyResponse(requestId, new RegistryException(e));
6567
}
6668
}
6769

0 commit comments

Comments
 (0)