[server] Global RT DIV: Max Age + Size-Based Sync#2302
[server] Global RT DIV: Max Age + Size-Based Sync#2302KaiSernLim merged 26 commits intolinkedin:mainfrom
Conversation
|
Hi there. This pull request has been inactive for 30 days. To keep our review queue healthy, we plan to close it in 7 days unless there is new activity. If you are still working on this, please push a commit, leave a comment, or convert it to draft to signal intent. Thank you for your time and contributions. |
…ded basic unit test for it. 😵
… server, during ingestion phase (before EOP) and after ingestion phase (after EOP, and consuming RT), and finally verifies that all ingested data can be successfully queried (no data loss). ❤️🩹
f551961 to
1533fa0
Compare
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Show resolved
Hide resolved
clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java
Outdated
Show resolved
Hide resolved
clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java
Outdated
Show resolved
Hide resolved
|
Claude Code PR Review Pull Request Review: #2302 Title: [server] Global RT DIV: Max Age + Size-Based Sync Status: Open (pending author response to review feedback) Summary This PR adds two synchronization mechanisms to Venice's Global Real-Time Data Integrity Validator (DIV):
The goal is to prevent unbounded state growth and enable proper cleanup during server restarts. Files Changed
Critical Issues Found by Reviewers 🚨
Severity: High Problem: shouldSyncOffsetFromSnapshot() is called in consumer thread Impact: Time window where the size condition continuously triggers before the counter resets, causing excessive/redundant sync operations. Fix Needed: Atomic counter operations or synchronized access to ensure the check and reset are coordinated between threads.
Severity: High The code removes entries during map iteration: Problem: This violates Java's iterator contract and will throw ConcurrentModificationException. Fix Needed: Use Iterator.remove() or collect keys to remove first, then delete after iteration. Reviewer suggests looking at PartitionTracker.clearExpiredStateAndUpdateOffsetRecord as reference.
Severity: Medium Question raised whether producerStateMaxAgeMs should be passed to the consumer DIV validator. This suggests incomplete dependency injection. Fix Needed: Clarify the configuration flow and ensure all validators receive necessary parameters. What I Like ✓
Design Concerns
Testing Gaps While unit/integration tests exist, they don't appear to catch:
Recommendation: Add multi-threaded stress tests that exercise concurrent reads/writes to exposed state. Code Quality: 5/10 The concept is solid and addresses real operational concerns (unbounded state growth). However, the implementation has blocking concurrency bugs that need resolution before merge. Verdict: ❌ Changes Requested Must Fix Before Merge:
Recommendation: Author should respond to reviewer lluwm's feedback and provide updated implementation addressing the threading issues. |
|
Hi there. This pull request has been inactive for 30 days. To keep our review queue healthy, we plan to close it in 7 days unless there is new activity. If you are still working on this, please push a commit, leave a comment, or convert it to draft to signal intent. Thank you for your time and contributions. |
- Fix ConcurrentModificationException risk in PartitionTracker.cloneVtProducerStates and cloneRtProducerStates by using Iterator.remove() instead of Map.remove() during iteration. - Pass producerStateMaxAgeMs to consumerDiv so stale producer states are evicted when cloning VT producer states for the offset record snapshot. - Reset processedRecordSizeSinceLastSync in the consumer thread after queuing a snapshot sync in syncOffsetFromSnapshotIfNeeded to prevent the size-based condition from continuously re-triggering for every subsequent record until the drainer thread resets the counter. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
0f6f82a to
9dd6d41
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java
Show resolved
Hide resolved
clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java
Outdated
Show resolved
Hide resolved
clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java
Outdated
Show resolved
Hide resolved
clients/da-vinci-client/src/test/java/com/linkedin/davinci/validation/TestPartitionTracker.java
Outdated
Show resolved
Hide resolved
- Use System.currentTimeMillis() instead of new Date().getTime() in cloneVtProducerStates and cloneRtProducerStates to avoid unnecessary object allocation on each call. - Fix ConcurrentModificationException risk in cloneRtProducerStates by using an explicit iterator for the outer rtSegments loop and calling brokerIterator.remove() instead of rtSegments.remove(). - Initialize lastRecordTimestamp from state.messageTimestamp in the Segment(partition, ProducerPartitionState) constructor. Previously, segments loaded from disk had lastRecordTimestamp = -1, causing them to appear as immediately stale and be pruned on the first clone after a server restart. - Increase the safety margin for the borderline segment in testCloneVtProducerStates and testCloneRtProducerStates from 1s to MAX_AGE_IN_MS/2 to avoid test flakiness under slow CI. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java:754
getSegments(type)returns aVeniceConcurrentHashMap(extendsConcurrentHashMap), but this loop usesiterator.remove()on itsentrySet()iterator.ConcurrentHashMapiterators do not supportremove()and will throwUnsupportedOperationExceptionwhen expiration is triggered. Please switch to removing via the map (e.g.,getSegments(type).remove(entry.getKey(), segment)after deciding) or another CHM-safe removal approach.
long minimumRequiredRecordProducerTimestamp = offsetRecord.calculateLatestMessageTimeInMs() - maxAgeInMs;
int numberOfClearedGUIDs = 0;
Iterator<Map.Entry<GUID, Segment>> iterator = getSegments(type).entrySet().iterator();
Map.Entry<GUID, Segment> entry;
Segment segment;
while (iterator.hasNext()) {
entry = iterator.next();
segment = entry.getValue();
if (segment.getLastRecordProducerTimestamp() < minimumRequiredRecordProducerTimestamp) {
iterator.remove();
removeProducerState(type, entry.getKey(), offsetRecord);
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Show resolved
Hide resolved
internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java
Outdated
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/validation/DataIntegrityValidator.java
Outdated
Show resolved
Hide resolved
clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java
Show resolved
Hide resolved
clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
internal/venice-common/src/main/java/com/linkedin/venice/kafka/validation/Segment.java
Outdated
Show resolved
Hide resolved
…p in PartitionTracker
…ent and remove redundant lastRecordTimestamp field
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java:186
setPartitionState(...)iterates overproducerPartitionStateMap.entrySet().iterator()and callsiterator.remove()when pruning. When this map comes fromOffsetRecord.getProducerPartitionStateMap(), it's aVeniceConcurrentHashMap/ConcurrentHashMap, whose iterators do not supportremove()and will throwUnsupportedOperationException. Consider collecting stale keys during iteration and removing them from the map after the loop (or useproducerPartitionStateMap.remove(key)withoutiterator.remove()only if you ensure the map type supports it).
Iterator<Map.Entry<CharSequence, ProducerPartitionState>> iterator =
producerPartitionStateMap.entrySet().iterator();
Map.Entry<CharSequence, ProducerPartitionState> entry;
GUID producerGuid;
ProducerPartitionState producerPartitionState;
while (iterator.hasNext()) {
entry = iterator.next();
producerGuid = GuidUtils.getGuidFromCharSequence(entry.getKey());
producerPartitionState = entry.getValue();
if (producerPartitionState.messageTimestamp >= earliestAllowableTimestamp) {
/**
* This {@link producerPartitionState} is eligible to be retained, so we'll set the state in the
* {@link PartitionTracker}.
*/
setSegment(type, producerGuid, new Segment(partition, producerPartitionState));
} else {
// The state is eligible to be cleared.
getSegments(type).remove(producerGuid);
iterator.remove();
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
clients/da-vinci-client/src/main/java/com/linkedin/davinci/validation/PartitionTracker.java
Show resolved
Hide resolved
...enice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestGlobalRtDiv.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Problem Statement
This PR extends the Global RT DIV feature with two improvements:
Producer State Max Age Pruning: Stale producer states accumulate indefinitely in the
PartitionTracker, representing producers that have long since stopped writing. Without pruning, these entries grow without bound and consume memory, and can also be checkpointed to disk unnecessarily. AmaxAgethreshold is needed to evict state for producers whose last recorded timestamp exceeds a configurable window.Size-Based Offset Sync for VT: The existing
shouldSyncOffsetFromSnapshot()only triggered syncs on Global RT DIV messages and non-segment control messages. In scenarios such as the initial consumption of VT before reaching RT and scenarios with infrequent control messages and no Global RT DIV activity, the VT offset record and producer states could go unsynced for an extended period, increasing the window for data inconsistency after a server crash/restart.Solution
Max Age Pruning for Producer States
PartitionTracker.cloneVtProducerStates(destTracker, maxAgeInMs)andcloneRtProducerStates(destTracker, brokerUrl, maxAgeInMs): Signatures updated to acceptmaxAgeInMs. During cloning, entries whoselastRecordTimestampis older thannow - maxAgeInMsare removed from the source tracker (not just excluded from the clone). This opportunistically prunes stale state from memory during the snapshot process.DataIntegrityValidator.setPartitionState(): Previously passedDISABLEDfor the age threshold when loading partition state from the offset record. Now correctly passesthis.maxAgeInMs, so stale producer states are pruned when reloading from disk too.DataIntegrityValidator.cloneVtProducerStates()andcloneRtProducerStates(): Updated to propagatemaxAgeInMsto the underlyingPartitionTrackercalls.StoreIngestionTask: TheconsumerDiv(DataIntegrityValidator for the consumer thread) is now initialized withproducerStateMaxAgeMs, so age-based pruning is applied consistently.Rename
getMaxMessageTimeInMs()→calculateLatestMessageTimeInMs(): Clarifies that this value is used as a reference timestamp for pruning, not a field accessor.ConcurrentModificationExceptionfix:cloneVtProducerStatesandcloneRtProducerStatesnow useIterator.remove()instead ofMap.remove()during iteration.Size-Based Offset Sync
shouldSyncOffsetFromSnapshot()extended: Adds a third sync trigger: when the total VT bytes consumed since the last sync >=2 x syncBytesInterval. The 2x factor ensures this condition only fires well after the RT DIV send interval, avoiding interference with the message-driven sync path.consumedBytesSinceLastSynckey semantics: The map key is now the version topic name when consuming from VT (previously always the broker URL). This cleanly separates VT consumption tracking from RT consumption tracking, allowingshouldSyncOffsetFromSnapshot()to query only VT-sourced bytes for its size-based condition.Reset counter on sync: After queuing a snapshot sync in
syncOffsetFromSnapshotIfNeeded(), the consumer-side VT byte counter is reset to 0. Without this, the size-based condition inshouldSyncOffsetFromSnapshot()would re-trigger on every subsequent record until the drainer thread resets it.updateOffsetMetadataAndSyncOffset()guard: When Global RT DIV is enabled, the standard sync path is skipped to prevent it from overwriting the snapshot-based offset record with stale or out-of-order state.Code changes
producerStateMaxAgeMs(existing config, now propagated toconsumerDiv)updateOffsetMetadataAndSyncOffset()is an info-level guard log (fires when Global RT DIV is enabled, which is not expected to be called in that path) — acceptable as-is.Concurrency-Specific Checks
Both reviewer and PR author to verify
synchronized,RWLock) are used where needed.ConcurrentHashMap,CopyOnWriteArrayList).VeniceConcurrentHashMapis used throughout.How was this PR tested?
New tests:
TestPartitionTracker.testCloneVtProducerStates(): Verifies thatcloneVtProducerStateswithDISABLEDmax age clones all segments, and with a max age threshold removes old segments from the source tracker while not including them in the destination.TestPartitionTracker.testCloneRtProducerStates(): Verifies thatcloneRtProducerStatesprunes stale entries from the source tracker and removes the broker entry entirely when it becomes empty after pruning.LeaderFollowerStoreIngestionTaskTest.testShouldSyncOffsetFromSnapshot()(extended): Tests four size-based cases: below threshold, at threshold, above threshold, and disabled interval (syncBytesInterval=0).Does this PR introduce any user-facing or breaking changes?