Skip to content

Conversation

@xunyin8
Copy link
Contributor

@xunyin8 xunyin8 commented Nov 13, 2025

Problem Statement

This is part 2/2 for the same problem described in #2245. The client needs to use a completely different mechanism to perform version swap. The old client's version swap mechanism of periodically polling and seek to new version's EOP will not work.

Solution

  1. Introducing a new class VersionSwapMessageState to keep track all relevant state and info about an ongoing version swap. This class is NOT thread safe and all interactions (both read and write) to this state should be protected. In the case of VeniceChangelogConsumerImpl and VeniceAfterImageConsumerImpl we are using the subscriptionLock.writeLock().
  2. Upon encountering relevant version swap messages during user poll we either start the version swap process by initializing a new VersionSwapMessageState or use existing VersionSwapMessageState to handle the version swap message by updating the internal states. If a partition has consumed all version swap messages in the old topic then we will unsubscribe from it. This is to reduce duplicate consumption when we go to the new topic and allow remaining partitions to finish consuming all the version swap messages.
  3. When we start a new version swap we will also start looking for the checkpoints to go to in the new topic asynchronously using the internalSeekConsumer in VeniceAfterImageConsumerImpl which was also used to seek to EOP for old version swap mechanism. The progress of this is tracked via a CompletableFuture which we will also set it in the VersionSwapMessageState.
  4. If the consumer is undergoing a version swap during the beginning of every poll we will check if all partitions are ready to move to consume the new topic. Here the branching will be as follow:
  • Some partitions are still not ready: continue polling those partitions from the old topic.
  • All partitions are ready but we haven't finish finding all the checkpoints to go to in the new topic yet: return an empty collection.
  • All partitions are ready and we have the checkpoints to go to in the new topic: subscribe and seek to those checkpoints in the new topic and resume regular consumption (clear version swap state).
  1. To prevent users from seeking in between version swap messages we will also replace the returned VCC with a low watermark pub sub position during version swap. There is also a timeout for version swap to limit the time window. If we have reached the timeout for the version swap, we need to forcefully seek to the new topic using the EOP positions for any remaining partitions as our backup plan which should cover a variety of edge cases (e.g. consumer is not polling fast enough, consumer starting position was in between version swaps, a region is down, etc...) In all these edge cases it's better to go to the new topic and consume a lot of duplicate messages than staying on the old topic which will eventually be deleted. Any partitions that were forcefully switched to the new version's EOP will be logged in the completion log message. Example:
2025-11-16 17:49:23 - [] INFO [VeniceChangelogConsumerImpl] [TestNG-method=testCDCMultiDataCenterVersionSwapMessageHandling-1] Version swap completed after timeout from topic: test-store_a1e58fa0dfa9_8310349f_v1 to topic: test-store_a1e58fa0dfa9_8310349f_v2, generation id: 1763344153822. Partitions: [0, 1, 2] are seeked to EOP positions.

Code changes

  • Added new code behind a config. If so list the config names and their default values in the PR description.
  • Introduced new log lines.
    • Confirmed if logs need to be rate limited to avoid excessive logging.

Concurrency-Specific Checks

Both reviewer and PR author to verify

  • Code has no race conditions or thread safety issues.
  • Proper synchronization mechanisms (e.g., synchronized, RWLock) are used where needed.
  • No blocking calls inside critical sections that could lead to deadlocks or performance degradation.
  • Verified thread-safe collections are used (e.g., ConcurrentHashMap, CopyOnWriteArrayList).
  • Validated proper exception handling in multi-threaded code to avoid silent thread termination.

How was this PR tested?

Integration tests
TODO: new integration test to also verify deferred version swap in a multi colo environment
TODO: new unit tests

  • New unit tests added.
  • New integration tests added.
  • Modified or extended existing tests.
  • Verified backward compatibility (if applicable).

Does this PR introduce any user-facing or breaking changes?

  • No. You can skip the rest of this section.
  • Yes. Clearly explain the behavior change and its impact.

@xunyin8 xunyin8 force-pushed the new-version-swap-client-change branch from 28f76b0 to 4907fa4 Compare November 17, 2025 01:53
@xunyin8 xunyin8 force-pushed the new-version-swap-client-change branch from 4907fa4 to 03d399a Compare November 25, 2025 05:46
@xunyin8 xunyin8 force-pushed the new-version-swap-client-change branch from 63eb55d to d6ee740 Compare November 26, 2025 06:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants