Skip to content

Commit 39bf77a

Browse files
authored
Merge branch 'main' into minnguye/system_store_admmin_op
2 parents 0130542 + ac09987 commit 39bf77a

File tree

54 files changed

+1260
-2004
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1260
-2004
lines changed

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,8 @@ subprojects {
328328
// project(':internal:venice-common').file('src/main/resources/avro/KafkaMessageEnvelope/v12', PathValidation.DIRECTORY)
329329
def versionOverrides = [
330330
project(':internal:venice-common').file('src/main/resources/avro/StoreMetaValue/v36', PathValidation.DIRECTORY),
331-
project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v91', PathValidation.DIRECTORY)
331+
project(':services:venice-controller').file('src/main/resources/avro/AdminOperation/v91', PathValidation.DIRECTORY),
332+
project(':internal:venice-common').file('src/main/resources/avro/PartitionState/v19', PathValidation.DIRECTORY),
332333
]
333334

334335
def schemaDirs = [sourceDir]

clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.linkedin.davinci;
22

3-
import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY;
43
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX;
54
import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION;
65
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL;
@@ -71,16 +70,13 @@
7170
import com.linkedin.venice.service.AbstractVeniceService;
7271
import com.linkedin.venice.service.ICProvider;
7372
import com.linkedin.venice.stats.TehutiUtils;
74-
import com.linkedin.venice.utils.ComplementSet;
7573
import com.linkedin.venice.utils.DaemonThreadFactory;
7674
import com.linkedin.venice.utils.Utils;
7775
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
7876
import io.tehuti.metrics.MetricsRepository;
7977
import java.io.Closeable;
8078
import java.util.Collections;
81-
import java.util.HashMap;
8279
import java.util.HashSet;
83-
import java.util.List;
8480
import java.util.Map;
8581
import java.util.Objects;
8682
import java.util.Optional;
@@ -439,49 +435,6 @@ private Function<String, Boolean> functionToCheckWhetherStorageEngineShouldBeKep
439435

440436
@VisibleForTesting
441437
final synchronized void bootstrap() {
442-
List<StorageEngine> storageEngines = getStorageService().getStorageEngineRepository().getAllLocalStorageEngines();
443-
LOGGER.info("Starting bootstrap, storageEngines: {}", storageEngines);
444-
Map<String, Version> storeNameToBootstrapVersionMap = new HashMap<>();
445-
Map<String, List<Integer>> storeNameToPartitionListMap = new HashMap<>();
446-
447-
for (StorageEngine storageEngine: storageEngines) {
448-
String kafkaTopicName = storageEngine.getStoreVersionName();
449-
String storeName = Version.parseStoreFromKafkaTopicName(kafkaTopicName);
450-
if (VeniceSystemStoreType.META_STORE.isSystemStore(storeName)) {
451-
// Do not bootstrap meta system store via DaVinci backend initialization since the operation is not supported by
452-
// ThinClientMetaStoreBasedRepository. This shouldn't happen normally, but it's possible if the user was using
453-
// DVC based metadata for the same store and switched to thin client based metadata.
454-
continue;
455-
}
456-
457-
try {
458-
getStoreOrThrow(storeName); // throws VeniceNoStoreException
459-
} catch (VeniceNoStoreException e) {
460-
throw new VeniceException("Unexpected to encounter non-existing store here: " + storeName);
461-
}
462-
463-
int versionNumber = Version.parseVersionFromKafkaTopicName(kafkaTopicName);
464-
465-
Version version = storeRepository.getStoreOrThrow(storeName).getVersion(versionNumber);
466-
if (version == null) {
467-
throw new VeniceException(
468-
"Could not find version: " + versionNumber + " for store: " + storeName + " in storeRepository!");
469-
}
470-
471-
/**
472-
* Set the target bootstrap version for the store in the below order:
473-
* 1. CURRENT_VERSION: store's CURRENT_VERSION exists locally.
474-
* 2. FUTURE_VERSION: store's CURRENT_VERSION does not exist locally, but FUTURE_VERSION exists locally.
475-
* In most case, we will choose 1, as the CURRENT_VERSION will always exists locally regardless of the FUTURE_VERSION
476-
* Case 2 will only exist when store version retention policy is > 2, and rollback happens on Venice side.
477-
*/
478-
if (!(storeNameToBootstrapVersionMap.containsKey(storeName)
479-
&& (storeNameToBootstrapVersionMap.get(storeName).getNumber() < versionNumber))) {
480-
storeNameToBootstrapVersionMap.put(storeName, version);
481-
storeNameToPartitionListMap.put(storeName, getStorageService().getUserPartitions(kafkaTopicName));
482-
}
483-
}
484-
485438
/**
486439
* In order to make bootstrap logic compatible with ingestion isolation, we first scan all local storage engines,
487440
* record all store versions that are up-to-date and close all storage engines. This will make sure child process
@@ -518,24 +471,6 @@ final synchronized void bootstrap() {
518471
blobTransferManager,
519472
configLoader.getVeniceServerConfig());
520473
ingestionBackend.addIngestionNotifier(ingestionListener);
521-
522-
if (configLoader.getCombinedProperties().getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)) {
523-
// Subscribe all bootstrap version partitions.
524-
storeNameToBootstrapVersionMap.forEach((storeName, version) -> {
525-
List<Integer> partitions = storeNameToPartitionListMap.get(storeName);
526-
String versionTopic = version.kafkaTopicName();
527-
LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic);
528-
StorageEngine storageEngine = getStorageService().getStorageEngine(versionTopic);
529-
aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine);
530-
StoreBackend storeBackend = getStoreOrThrow(storeName);
531-
storeBackend.subscribe(
532-
ComplementSet.newSet(partitions),
533-
Optional.of(version),
534-
Collections.emptyMap(),
535-
null,
536-
Collections.emptyMap());
537-
});
538-
}
539474
}
540475

541476
@Override

clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION;
1111
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
1212
import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME;
13-
import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY;
1413
import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT;
1514
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
1615
import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS;
@@ -203,13 +202,6 @@ protected AvroGenericDaVinciClient(
203202
throw new VeniceClientException("Ingestion Isolation is not supported with DaVinciRecordTransformer");
204203
}
205204

206-
if (backendConfig.getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)
207-
&& recordTransformerConfig != null) {
208-
throw new VeniceClientException(
209-
DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY
210-
+ " must be set to false when using DaVinciRecordTransformer");
211-
}
212-
213205
preValidation.run();
214206
}
215207

clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/VersionSpecificAvroGenericDaVinciClient.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,6 @@
1111
/**
1212
* Version-specific DaVinci client implementation that subscribes to a specific store version.
1313
*
14-
* This is only intended for internal Venice use.
15-
* Must be used with {@link com.linkedin.venice.ConfigKeys#DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY
16-
* set to false. Otherwise, the client may subscribe to an unintended version based on what's on disk.
17-
*
1814
* Key features:
1915
* - Subscribes to a specific version (does not follow version swaps)
2016
* - Validates version existence when subscribing

clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/BootstrappingVeniceChangelogConsumer.java

Lines changed: 47 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,72 +1,82 @@
11
package com.linkedin.davinci.consumer;
22

3-
import com.linkedin.venice.annotation.Experimental;
43
import com.linkedin.venice.pubsub.api.PubSubMessage;
54
import java.util.Collection;
65
import java.util.Set;
76
import java.util.concurrent.CompletableFuture;
87

98

109
/**
11-
* This interface is meant for users where local state must be built off of the entirety of a venice data set
12-
* (i.e. Non-idempotent event ingestion), rather than dealing with an event at a time. THIS IS EXPENSIVE.
13-
* It's highly recommended that users use the {@link VeniceChangelogConsumer} interface as a means to consume Venice
14-
* Change capture data.
10+
* This interface is meant for users where local state must be built off of the entirety of a Venice data set.
1511
*
16-
* Implementations of this interface rely on access to a compacted view to the data and scanning the entirety of that
17-
* compacted view initial calls to poll(). This is the only supported pattern with this interface today. {@link VeniceChangelogConsumer}
18-
* enables finer control. This interface is intentionally limited as implementors rely on local checkpointing and
19-
* maintenance of state which might be easily corrupted with byzantine seek() calls.
20-
* @param <K>
21-
* @param <V>
12+
* This interface provides automatic state management with local checkpointing and efficient data access
13+
* through local compaction. It eliminates the need for manual checkpoint management and improves restart performance.
14+
*
15+
* KEY BENEFITS:
16+
* - Automatic State Management: No need to manually manage checkpoints.
17+
* The client handles all state management automatically.
18+
* - Efficient Restart: Resumes from the last checkpoint on restart, consuming only new changes since the last
19+
* Kafka checkpoint. This reduces recovery time and eliminates the need to re-consume every event from Kafka
20+
* on restart.
21+
* - Local Compaction: All data is compacted locally, providing efficient access to the current state without consuming
22+
* duplicate events.
23+
* - Fast Bootstrap on Fresh Nodes: On fresh nodes without local state, obtains a complete data snapshot from existing
24+
* nodes instead of consuming evert Kafka event (requires blob transfer to be enabled).
25+
*
26+
* This interface intentionally does not expose seek() operations for simplicity.
27+
* For more fine-grained control over seeking, see {@link VeniceChangelogConsumer}.
28+
*
29+
* @param <K> Key type
30+
* @param <V> Value type
2231
*/
23-
@Experimental
2432
public interface BootstrappingVeniceChangelogConsumer<K, V> {
2533
/**
26-
* Start performs both a topic subscription and catch up. The client will look at the latest offset in the server and
27-
* sync bootstrap data up to that point in changes. Once that is done for all partitions, the future will complete.
28-
*
29-
* NOTE: This future may take some time to complete depending on how much data needs to be ingested in order to catch
30-
* up with the time that this client started.
31-
*
32-
* NOTE: In the experimental client, the future will complete when there is at least one message to be polled.
33-
* We don't wait for all partitions to catch up, as loading every message into a buffer will result in an
34-
* Out Of Memory error. Instead, use the {@link #isCaughtUp()} method to determine once all subscribed partitions have
35-
* caught up.
34+
* Starts the consumer by subscribing to the specified partitions. On restart, the client automatically resumes
35+
* from the last checkpoint. On fresh start, it begins from the beginning of the topic or leverages blob transfer
36+
* if available.
3637
*
37-
* NOTE: In the experimental client, if you pass in an empty set, it will subscribe to all partitions for the store
38+
* The returned future completes when there is at least one message available to be polled.
39+
* Use {@link #isCaughtUp()} to determine when all subscribed partitions have caught up to the latest offset.
3840
*
39-
* @param partitions which partition id's to catch up with
40-
* @return a future that completes once catch up is complete for all passed in partitions.
41+
* @param partitions Set of partition IDs to subscribe to. Pass empty set to subscribe to all partitions.
42+
* @return A future that completes when at least one message is available to poll.
4143
*/
4244
CompletableFuture<Void> start(Set<Integer> partitions);
4345

46+
/**
47+
* Subscribes to every partition for the Venice store. See {@link #start(Set)} for more information.
48+
*/
4449
CompletableFuture<Void> start();
4550

4651
void stop() throws Exception;
4752

4853
/**
49-
* polls for the next batch of change events. The first records returned following calling 'start()' will be from the bootstrap state.
50-
* Once this state is consumed, subsequent calls to poll will be based off of recent updates to the Venice store.
54+
* Polls for the next batch of change events. The first records returned after calling {@link #start(Set)} will be from the
55+
* local compacted state. Once the local state is fully consumed, subsequent calls will return
56+
* real-time updates made to the Venice store.
5157
*
52-
* In the experimental client, records will be returned in batches configured to the MAX_BUFFER_SIZE. So the initial
53-
* calls to poll will be from records from the bootstrap state, until the partitions have caught up.
54-
* Additionally, if the buffer hits the MAX_BUFFER_SIZE before the timeout is hit, poll will return immediately.
58+
* Records are returned in batches up to the configured MAX_BUFFER_SIZE. This method will return immediately if:
59+
* 1. The buffer reaches MAX_BUFFER_SIZE before the timeout expires, OR
60+
* 2. The timeout is reached
5561
*
56-
* If the PubSubMessage came from disk (after restart), the following fields will be set to sentinel values since
57-
* record metadata information is not available to reduce disk utilization:
62+
* NOTE: If the PubSubMessage came from disk (after restart), the following fields will be set to sentinel values
63+
* since record metadata information is not persisted to reduce disk utilization:
5864
* - PubSubMessageTime
5965
* - Position
6066
*
61-
* @param timeoutInMs Maximum timeout of the poll invocation
62-
* @return a collection of Venice PubSubMessages
67+
* @param timeoutInMs Maximum timeout of the poll invocation in milliseconds
68+
* @return A collection of Venice PubSubMessages containing change events
6369
*/
6470
Collection<PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate>> poll(long timeoutInMs);
6571

6672
/**
67-
* In the experimental client, once this becomes true it will stay true even if we start to lag after the
68-
* bootstrapping phase.
69-
* @return True if all subscribed partitions have caught up.
73+
* Indicates whether all subscribed partitions have caught up to the latest offset at the time of subscription.
74+
* Once this becomes true, it will remain true even if the consumer begins to lag later on.
75+
*
76+
* This is for determining when the initial bootstrap phase has completed and the consumer has transitioned
77+
* to consuming real-time events.
78+
*
79+
* @return True if all subscribed partitions have caught up to their target offsets.
7080
*/
7181
boolean isCaughtUp();
7282

clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/ChangelogClientConfig.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public class ChangelogClientConfig<T extends SpecificRecord> {
5959
*/
6060
private boolean skipFailedToAssembleRecords = true;
6161

62-
private Boolean isExperimentalClientEnabled = false;
62+
private Boolean isNewStatelessClientEnabled = false;
6363
private int maxBufferSize = 1000;
6464
private boolean useRequestBasedMetadataRepository = false;
6565

@@ -257,8 +257,8 @@ public ChangelogClientConfig setRocksDBBlockCacheSizeInBytes(long rocksDBBlockCa
257257
}
258258

259259
/**
260-
* If you're using the experimental client, and you want to deserialize your keys into
261-
* {@link org.apache.avro.specific.SpecificRecord} thenr set this configuration.
260+
* If you're using the {@link BootstrappingVeniceChangelogConsumer}, and you want to deserialize your keys into
261+
* {@link org.apache.avro.specific.SpecificRecord} then set this configuration.
262262
*/
263263
public ChangelogClientConfig setSpecificKey(Class specificKey) {
264264
this.innerClientConfig.setSpecificKeyClass(specificKey);
@@ -271,7 +271,7 @@ public ChangelogClientConfig setSpecificValue(Class<T> specificValue) {
271271
}
272272

273273
/**
274-
* If you're using the experimental client, and you want to deserialize your values into
274+
* If you're using the {@link BootstrappingVeniceChangelogConsumer}, and you want to deserialize your values into
275275
* {@link org.apache.avro.specific.SpecificRecord} then set this configuration.
276276
*/
277277
public ChangelogClientConfig setSpecificValueSchema(Schema specificValueSchema) {
@@ -345,7 +345,7 @@ public static <V extends SpecificRecord> ChangelogClientConfig<V> cloneConfig(Ch
345345
.setDatabaseSyncBytesInterval(config.getDatabaseSyncBytesInterval())
346346
.setShouldCompactMessages(config.shouldCompactMessages())
347347
.setIsBeforeImageView(config.isBeforeImageView())
348-
.setIsExperimentalClientEnabled(config.isExperimentalClientEnabled())
348+
.setIsNewStatelessClientEnabled(config.isNewStatelessClientEnabled())
349349
.setMaxBufferSize(config.getMaxBufferSize())
350350
.setSeekThreadPoolSize(config.getSeekThreadPoolSize())
351351
.setShouldSkipFailedToAssembleRecords(config.shouldSkipFailedToAssembleRecords())
@@ -367,16 +367,15 @@ public ChangelogClientConfig setIsBeforeImageView(Boolean beforeImageView) {
367367
return this;
368368
}
369369

370-
protected Boolean isExperimentalClientEnabled() {
371-
return isExperimentalClientEnabled;
370+
protected Boolean isNewStatelessClientEnabled() {
371+
return isNewStatelessClientEnabled;
372372
}
373373

374374
/**
375-
* This uses a highly experimental client.
376-
* It is currently only supported for {@link BootstrappingVeniceChangelogConsumer}.
375+
* Set this to true to use the new {@link VeniceChangelogConsumer}.
377376
*/
378-
public ChangelogClientConfig setIsExperimentalClientEnabled(Boolean experimentalClientEnabled) {
379-
isExperimentalClientEnabled = experimentalClientEnabled;
377+
public ChangelogClientConfig setIsNewStatelessClientEnabled(Boolean newStatelessClientEnabled) {
378+
this.isNewStatelessClientEnabled = newStatelessClientEnabled;
380379
return this;
381380
}
382381

@@ -388,7 +387,6 @@ protected int getMaxBufferSize() {
388387
* Sets the maximum number of records that can be buffered and returned to the user when calling poll.
389388
* When the maximum number of records is reached, ingestion will be paused until the buffer is drained.
390389
* Please note that this is separate from {@link com.linkedin.venice.ConfigKeys#SERVER_KAFKA_MAX_POLL_RECORDS}.
391-
* In order for this feature to be used, {@link #setIsExperimentalClientEnabled(Boolean)} must be set to true.
392390
* It is currently only supported for {@link BootstrappingVeniceChangelogConsumer}.
393391
*/
394392
public ChangelogClientConfig setMaxBufferSize(int maxBufferSize) {

0 commit comments

Comments
 (0)