-
Notifications
You must be signed in to change notification settings - Fork 108
[server][da-vinci][test] Optimize Heartbeat Messages by Excluding VTP Headers #2295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
… Headers Problem Statement Heartbeat messages in Venice were including Venice Transport Protocol (VTP) headers unnecessarily, adding overhead to frequent heartbeat communications. Since heartbeats are primarily for liveness detection rather than data transport, this overhead was wasteful. Solution Implemented an optimization to conditionally exclude VTP headers from heartbeat messages when Venice's robust schema evolution infrastructure is available to handle compatibility. When dependent features are enabled, heartbeat messages skip VTP headers because: - KME Schema Registration: Servers register schemas during startup via ControllerClientBackedSystemSchemaInitializer - Schema Reader: RouterBackedSchemaReader automatically fetches unknown schemas from system store - Graceful Handling: InternalAvroSpecificSerializer handles unknown protocol versions via schema reader Testing - Updated existing tests to maintain compatibility - Add new integration test to validate that optimization works correctly with Venice's KME schema evolution infrastructure (TestServerKMERegistrationFromMessageHeader.java)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR optimizes heartbeat messages in Venice by conditionally excluding Venice Transport Protocol (VTP) headers when the system's schema evolution infrastructure can handle compatibility. The optimization reduces overhead for frequent heartbeat communications while maintaining schema evolution capabilities through KME schema registration and RouterBackedSchemaReader.
Key changes:
- Added logic to conditionally skip VTP headers in heartbeat messages based on dependent feature flags
- Updated VeniceWriter to accept a new parameter controlling VTP header inclusion
- Extended test coverage with new integration tests and updated existing mocks
Reviewed Changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| VeniceWriter.java | Refactored header logic to conditionally add VTP headers based on message type and feature flags |
| StoreIngestionTask.java | Added storage for system schema initialization flag |
| LeaderFollowerStoreIngestionTask.java | Integrated dependent feature check before sending heartbeats |
| KafkaStoreIngestionService.java | Added method to check KME schema reader presence and improved logging |
| HeartbeatMonitoringService.java | Added getter for KafkaStoreIngestionService |
| VeniceWriterUnitTest.java | Updated test to cover VTP header skip logic with additional test parameters |
| VeniceWriterHeartbeatHeaderTest.java | New test file validating VTP header behavior for heartbeats vs data messages |
| TestServerKMERegistrationFromMessageHeader.java | Added integration test for heartbeat optimization and refactored common setup |
| MaterializedViewWriterTest.java | Updated mocks to include new heartbeat parameter |
| StoreIngestionTaskTest.java | Updated mocks to include new heartbeat parameter |
| LeaderFollowerStoreIngestionTaskTest.java | Added comprehensive test for dependent feature flag combinations |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...nci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java
Outdated
Show resolved
Hide resolved
| * | ||
| * Server 1 deploys -> Registers KME v2 in system store during startup. (requires isSystemSchemaInitializationAtStartTimeEnabled = true) | ||
| * Server 1 sends heartbeat -> Serialized with KME v2 (no VTP header) | ||
| * Server 2 receives heartbeat -> Detects unknown protocol version, schema reader fetches KME v2. (requires KME scheme reader to be present) |
Copilot
AI
Nov 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected spelling of 'scheme' to 'schema' in comment.
| * Server 2 receives heartbeat -> Detects unknown protocol version, schema reader fetches KME v2. (requires KME scheme reader to be present) | |
| * Server 2 receives heartbeat -> Detects unknown protocol version, schema reader fetches KME v2. (requires KME schema reader to be present) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| for (int i = 0; i < 10; i++) { | ||
| veniceWriter.sendHeartbeat( | ||
| topicPartition, | ||
| null, // No callback needed for this test |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Passing null for the callback is acceptable here, but it would be more explicit to use a mock callback or a no-op callback to better demonstrate intent and avoid potential NPE issues if the implementation changes.
| null, // No callback needed for this test | |
| (metadata, exception) -> {}, // No-op callback for clarity and safety |
| leaderCompleteState, | ||
| originTimeStampMs); | ||
| } | ||
|
|
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Changing the visibility of 'sendIngestionHeartbeat' from private to package-private for testing purposes is acceptable, but consider adding a comment explaining that this is intentionally package-private for test access to make the design decision explicit.
| /** | |
| * Package-private for test access. This method's visibility is intentionally not private | |
| * to allow unit tests in the same package to invoke it. | |
| */ |
| kafkaMessageEnvelopeSchemaReader.ifPresent(reader -> { | ||
| LOGGER.info( | ||
| "Initialized KME schema reader. Type: {}, Latest value schema ID: {}", | ||
| reader.getClass().getSimpleName(), | ||
| reader.getLatestValueSchemaId()); | ||
| kafkaValueSerializer.setSchemaReader(reader); |
Copilot
AI
Nov 18, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The log message at INFO level is logged every time the service initializes. Consider whether this should be rate-limited or moved to DEBUG level to avoid log pollution in environments with frequent restarts, or ensure it's only logged once during service lifetime.
Problem Statement
Heartbeat messages in Venice were including Venice Transport Protocol (VTP) headers unnecessarily, adding overhead to frequent heartbeat communications. Since heartbeats are primarily for liveness detection rather than data transport, this overhead was wasteful.
Solution
Implemented an optimization to conditionally exclude VTP headers from heartbeat messages when Venice's robust schema evolution infrastructure is available to handle compatibility.
When dependent features are enabled, heartbeat messages skip VTP headers because:
Testing
Code changes
Concurrency-Specific Checks
Both reviewer and PR author to verify
synchronized,RWLock) are used where needed.ConcurrentHashMap,CopyOnWriteArrayList).How was this PR tested?
Does this PR introduce any user-facing or breaking changes?