-
Notifications
You must be signed in to change notification settings - Fork 308
feat: implement startup verification checks for KafkaSQL, related refactoring #6911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements startup verification checks for KafkaSQL storage to prevent data loss due to misconfigured Kafka topics, along with significant refactoring of Kafka-related utilities and configuration management.
Key Changes:
- Startup verification: New
KafkaAdminUtilvalidates topic configurations (cleanup policy, retention settings) and checks for v2/v3 compatibility issues in journal topics - Refactoring: Moved utilities (
ConcurrentUtil,ProducerActions,AsyncProducer) to appropriate packages, convertedKafkaSqlConfigurationfrom interface to class, and introducedLazyResourcefor resource management - Testing infrastructure: Added
extra-testsmodule with comprehensive integration tests for KafkaSQL startup scenarios
Reviewed Changes
Copilot reviewed 48 out of 48 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
app/src/main/java/io/apicurio/registry/storage/impl/util/KafkaAdminUtil.java |
New utility for topic creation/verification with startup checks |
app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlConfiguration.java |
Converted from interface to concrete class with getter methods and topic-specific configuration |
app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlFactory.java |
Simplified producer/consumer creation using new configuration structure |
app/src/main/java/io/apicurio/registry/types/LazyResource.java |
New abstract class for lazy resource initialization and cleanup |
utils/extra-tests/src/test/java/io/apicurio/registry/utils/tests/KafkaSqlStartupVerificationIT.java |
Integration tests for startup verification scenarios |
utils/kafka/src/main/java/io/apicurio/registry/utils/kafka/KafkaUtil.java |
Simplified to single utility method, removed topic creation logic |
common/src/main/java/io/apicurio/registry/utils/ConcurrentUtil.java |
Moved from utils/kafka with simplified API |
docs/modules/ROOT/partials/getting-started/ref-registry-all-configs.adoc |
Updated documentation for new configuration properties |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
app/src/main/java/io/apicurio/registry/storage/impl/util/KafkaAdminUtil.java
Show resolved
Hide resolved
app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlRegistryStorage.java
Show resolved
Hide resolved
...xtra-tests/src/test/java/io/apicurio/registry/utils/tests/KafkaSqlStartupVerificationIT.java
Outdated
Show resolved
Hide resolved
app/src/main/java/io/apicurio/registry/storage/impl/util/KafkaAdminUtil.java
Show resolved
Hide resolved
docs/modules/ROOT/partials/getting-started/ref-registry-all-configs.adoc
Outdated
Show resolved
Hide resolved
.../extra-tests/src/test/java/io/apicurio/registry/utils/tests/infra/AbstractRegistryInfra.java
Show resolved
Hide resolved
app/src/main/java/io/apicurio/registry/storage/impl/kafkasql/KafkaSqlConfiguration.java
Show resolved
Hide resolved
| props.putIfAbsent(TopicConfig.RETENTION_BYTES_CONFIG, "-1"); | ||
|
|
||
| tryToConfigureSecurity(props); | ||
| getTopicProperties().forEach(props::putIfAbsent); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this implementation, the three properties above cannot be overridden, correct? I assume that "getTopicProperties" returns whatever is configured by the user in the ENV.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. The getTopicProperties().forEach(props::putIfAbsent); implements the IMPORTANT: As a temporary compatibility measure, configuration properties for this topic are also inherited from 'apicurio.kafkasql.topic' unless explicitly overridden by this property. part, since we used the same properties for all topics in the past.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess I'm just wondering if we should be preferring the user-configured properties over the defaults we have in code. I think with this change it's impossible to set e.g. retention.bytes to anything except -1 right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was there a resolution to this?
| List.of( | ||
| configuration.getTopic(), | ||
| configuration.getSnapshotsTopic(), | ||
| configuration.getEventsTopic() | ||
| ).forEach(t -> kafkaAdmin.verifyTopicConfiguration(t)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is sort of an odd way to structure three calls to the same method. It turns three lines of code into five. 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I think it was a copy-paste thing:)
|
BTW The one test in extra tests is failing because |
| } catch (TopicExistsException e) { | ||
| log.info("Topic {} already exists, skipping.", configuration.topic()); | ||
| } | ||
| // This is sequential instead of concurrent, but I think for 3 topics it's worth the simplicity. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that concurrent is not needed. But wouldn't it be cleaner to just have a method called "createTopic" on the Kafka admin object that did all the work? Then just three calls to it for the three different topics. I'm not sure this "create a list of known # of items and then iterate over that list" is a pattern that adds simplicity. 🤷
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I'll change it, I think I overthought it.
…erties Test `io.apicurio.registry.utils.tests.KafkaSqlStartupVerificationIT#testBadSnapshotsTopicConfig2Override` in extra tests is failing because `apicurio.kafkasql.topic-configuration-verification-override-enabled` property has the prefix `apicurio.kafkasql.topic` which propagates it into topic configuration when creating the topic. The same issue is with `apicurio.kafkasql.topic.auto-create`, but we did not notice it because nobody has passed `-Dapicurio.kafkasql.topic.auto-create=true` to Registry before, since it's the default. This fixes the issue.
efe6542 to
cd82cf4
Compare
No description provided.