diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java index 27087413587..a170bfdba94 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java @@ -1,5 +1,6 @@ package com.linkedin.davinci; +import com.linkedin.davinci.client.DaVinciSeekCheckpointInfo; import com.linkedin.davinci.config.StoreBackendConfig; import com.linkedin.davinci.config.VeniceServerConfig; import com.linkedin.venice.exceptions.VeniceException; @@ -7,15 +8,16 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionStatus; import com.linkedin.venice.pubsub.api.PubSubPosition; +import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition; import com.linkedin.venice.serialization.AvroStoreDeserializerCache; import com.linkedin.venice.serialization.StoreDeserializerCache; import com.linkedin.venice.utils.ComplementSet; import com.linkedin.venice.utils.ConcurrentRef; import com.linkedin.venice.utils.ReferenceCounted; import com.linkedin.venice.utils.RegionUtils; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -135,26 +137,13 @@ private void setDaVinciFutureVersion(VersionBackend version) { } public CompletableFuture subscribe(ComplementSet partitions) { - return subscribe(partitions, Optional.empty(), Collections.emptyMap(), null, Collections.emptyMap()); + return subscribe(partitions, Optional.empty(), null); } - public CompletableFuture seekToTimestamps(Long allPartitionTimestamp, Optional storeVersion) { - return subscribe( - ComplementSet.universalSet(), - storeVersion, - new HashMap<>(), - allPartitionTimestamp, - Collections.emptyMap()); - } - - public CompletableFuture seekToCheckPoints( - Map checkpoints, + public CompletableFuture seekToCheckpoint( + DaVinciSeekCheckpointInfo checkpointInfo, Optional storeVersion) { - return subscribe(ComplementSet.wrap(checkpoints.keySet()), storeVersion, Collections.emptyMap(), null, checkpoints); - } - - public CompletableFuture seekToTimestamps(Map timestamps, Optional storeVersion) { - return subscribe(ComplementSet.wrap(timestamps.keySet()), storeVersion, timestamps, null, Collections.emptyMap()); + return subscribe(checkpointInfo.getPartitions(), storeVersion, checkpointInfo); } private Version getCurrentVersion() { @@ -168,9 +157,7 @@ private Version getLatestNonFaultyVersion() { public synchronized CompletableFuture subscribe( ComplementSet partitions, Optional bootstrapVersion, - Map timestamps, - Long allPartitionsTimestamp, - Map positionMap) { + DaVinciSeekCheckpointInfo checkpointInfo) { if (daVinciCurrentVersion == null) { setDaVinciCurrentVersion(new VersionBackend(backend, bootstrapVersion.orElseGet(() -> { Version version = getCurrentVersion(); @@ -207,33 +194,44 @@ public synchronized CompletableFuture subscribe( if (daVinciFutureVersion == null) { trySubscribeDaVinciFutureVersion(); } else { - daVinciFutureVersion.subscribe(partitions, timestamps, allPartitionsTimestamp, positionMap) - .whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e)); + daVinciFutureVersion.subscribe(partitions, null).whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e)); } } VersionBackend savedVersion = daVinciCurrentVersion; - return daVinciCurrentVersion.subscribe(partitions, timestamps, allPartitionsTimestamp, positionMap) - .exceptionally(e -> { - synchronized (this) { - addFaultyVersion(savedVersion, e); - // Don't propagate failure to subscribe() caller, if future version has become current and is ready to - // serve. - if (daVinciCurrentVersion != null && daVinciCurrentVersion.isReadyToServe(subscription)) { - return null; - } - } - throw (e instanceof CompletionException) ? (CompletionException) e : new CompletionException(e); - }) - .whenComplete((v, e) -> { - synchronized (this) { - if (e == null) { - LOGGER.info("Ready to serve partitions {} of {}", subscription, daVinciCurrentVersion); - } else { - LOGGER.warn("Failed to subscribe to partitions {} of {}", subscription, savedVersion, e); - } - } - }); + List partitionList = daVinciCurrentVersion.getPartitions(partitions); + if (checkpointInfo != null && checkpointInfo.getAllPartitionsTimestamp() != null) { + Map timestamps = new HashMap<>(); + for (int partition: partitionList) { + timestamps.put(partition, checkpointInfo.getAllPartitionsTimestamp()); + } + checkpointInfo.setTimestampsMap(timestamps); + } else if (checkpointInfo != null && checkpointInfo.isSeekToTail()) { + Map positionMap = new HashMap<>(); + for (int partition: partitionList) { + positionMap.put(partition, PubSubSymbolicPosition.LATEST); + } + checkpointInfo.setPositionMap(positionMap); + } + return daVinciCurrentVersion.subscribe(partitions, checkpointInfo).exceptionally(e -> { + synchronized (this) { + addFaultyVersion(savedVersion, e); + // Don't propagate failure to subscribe() caller, if future version has become current and is ready to + // serve. + if (daVinciCurrentVersion != null && daVinciCurrentVersion.isReadyToServe(subscription)) { + return null; + } + } + throw (e instanceof CompletionException) ? (CompletionException) e : new CompletionException(e); + }).whenComplete((v, e) -> { + synchronized (this) { + if (e == null) { + LOGGER.info("Ready to serve partitions {} of {}", subscription, daVinciCurrentVersion); + } else { + LOGGER.warn("Failed to subscribe to partitions {} of {}", subscription, savedVersion, e); + } + } + }); } public synchronized void unsubscribe(ComplementSet partitions) { @@ -302,8 +300,7 @@ synchronized void trySubscribeDaVinciFutureVersion() { LOGGER.info("Subscribing to future version {}", targetVersion.kafkaTopicName()); setDaVinciFutureVersion(new VersionBackend(backend, targetVersion, stats)); // For future version subscription, we don't need to pass any timestamps or position map - daVinciFutureVersion.subscribe(subscription, Collections.emptyMap(), null, Collections.emptyMap()) - .whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e)); + daVinciFutureVersion.subscribe(subscription, null).whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e)); } else { LOGGER.info( "Skipping subscribe to future version: {} in region: {} because the target version status is: {} and the target regions are: {}", diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java index f14d8a630dc..0453000d4ca 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java @@ -4,6 +4,7 @@ import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_HEARTBEAT_INTERVAL_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.SERVER_STOP_CONSUMPTION_TIMEOUT_IN_SECONDS; +import com.linkedin.davinci.client.DaVinciSeekCheckpointInfo; import com.linkedin.davinci.client.InternalDaVinciRecordTransformerConfig; import com.linkedin.davinci.config.VeniceStoreVersionConfig; import com.linkedin.davinci.listener.response.NoOpReadResponseStats; @@ -365,23 +366,8 @@ synchronized boolean isReadyToServe(ComplementSet partitions) { synchronized CompletableFuture subscribe( ComplementSet partitions, - Map timestamps, - Long allPartitionTimestamp, - Map positionMap) { + DaVinciSeekCheckpointInfo checkpointInfo) { Instant startTime = Instant.now(); - int validCheckPointCount = 0; - if (!timestamps.isEmpty()) { - validCheckPointCount++; - } - if (!positionMap.isEmpty()) { - validCheckPointCount++; - } - if (allPartitionTimestamp != null) { - validCheckPointCount++; - } - if (validCheckPointCount > 1) { - throw new VeniceException("Multiple checkpoint types are not supported"); - } List partitionList = getPartitions(partitions); if (partitionList.isEmpty()) { LOGGER.error("No partitions to subscribe to for {}", this); @@ -407,9 +393,6 @@ synchronized CompletableFuture subscribe( } else { partitionFutures.computeIfAbsent(partition, k -> new CompletableFuture<>()); partitionsToStartConsumption.add(partition); - if (allPartitionTimestamp != null) { - timestamps.put(partition, allPartitionTimestamp); - } } partitionToBatchReportEOIPEnabled.put(partition, batchReportEOIPStatusEnabled); futures.add(partitionFutures.get(partition)); @@ -425,8 +408,14 @@ synchronized CompletableFuture subscribe( backend.getHeartbeatMonitoringService() .updateLagMonitor(version.kafkaTopicName(), partition, HeartbeatLagMonitorAction.SET_FOLLOWER_MONITOR); // AtomicReference of storage engine will be updated internally. - Optional pubSubPosition = backend.getIngestionService() - .getPubSubPosition(config, partition, timestamps.get(partition), positionMap.get(partition)); + Optional pubSubPosition = checkpointInfo == null + ? Optional.empty() + : backend.getIngestionService() + .getPubSubPosition( + config, + partition, + checkpointInfo.getTimestampsMap(), + checkpointInfo.getPostitionMap()); backend.getIngestionBackend().startConsumption(config, partition, pubSubPosition); tryStartHeartbeat(); } @@ -573,7 +562,7 @@ Map> getPartitionToPendingReportIncrementalPushList() { return partitionToPendingReportIncrementalPushList; } - private List getPartitions(ComplementSet partitions) { + public List getPartitions(ComplementSet partitions) { return IntStream.range(0, version.getPartitionCount()) .filter(partitions::contains) .boxed() diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java index cc2c851dd34..1133a5a3707 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java @@ -65,7 +65,6 @@ import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -262,6 +261,24 @@ private Optional getVersion() { return Optional.of(version); } + protected CompletableFuture seekToTail() { + if (getBackend().isIsolatedIngestion()) { + throw new VeniceClientException("Isolated Ingestion is not supported with seekToCheckpoint"); + } + throwIfNotReady(); + addPartitionsToSubscription(ComplementSet.universalSet()); + return getStoreBackend().seekToCheckpoint(new DaVinciSeekCheckpointInfo(null, null, null, true), getVersion()); + } + + protected CompletableFuture seekToTail(Set partitionSet) { + if (getBackend().isIsolatedIngestion()) { + throw new VeniceClientException("Isolated Ingestion is not supported with seekToCheckpoint"); + } + throwIfNotReady(); + addPartitionsToSubscription(ComplementSet.wrap(partitionSet)); + return getStoreBackend().seekToCheckpoint(new DaVinciSeekCheckpointInfo(null, null, null, true), getVersion()); + } + protected CompletableFuture seekToCheckpoint(Set checkpoints) { if (getBackend().isIsolatedIngestion()) { throw new VeniceClientException("Isolated Ingestion is not supported with seekToCheckpoint"); @@ -276,7 +293,8 @@ protected CompletableFuture seekToCheckpoint(Set c positionMap.put(changeCoordinate.getPartition(), changeCoordinate.getPosition()); } addPartitionsToSubscription(ComplementSet.wrap(positionMap.keySet())); - return getStoreBackend().seekToCheckPoints(positionMap, getVersion()); + return getStoreBackend() + .seekToCheckpoint(new DaVinciSeekCheckpointInfo(positionMap, null, null, false), getVersion()); } protected CompletableFuture seekToTimestamps(Map timestamps) { @@ -285,22 +303,24 @@ protected CompletableFuture seekToTimestamps(Map timestamps } throwIfNotReady(); addPartitionsToSubscription(ComplementSet.wrap(timestamps.keySet())); - return getStoreBackend().seekToTimestamps(timestamps, getVersion()); + return getStoreBackend() + .seekToCheckpoint(new DaVinciSeekCheckpointInfo(null, timestamps, null, false), getVersion()); } - protected CompletableFuture seekToTimestamps(Long timestamps) { + protected CompletableFuture seekToTimestamps(Long timestamp) { if (getBackend().isIsolatedIngestion()) { throw new VeniceClientException("Isolated Ingestion is not supported with seekToTimestamps"); } throwIfNotReady(); addPartitionsToSubscription(ComplementSet.universalSet()); - return getStoreBackend().seekToTimestamps(timestamps, getVersion()); + return getStoreBackend() + .seekToCheckpoint(new DaVinciSeekCheckpointInfo(null, null, timestamp, false), getVersion()); } protected CompletableFuture subscribe(ComplementSet partitions) { throwIfNotReady(); addPartitionsToSubscription(partitions); - return getStoreBackend().subscribe(partitions, getVersion(), Collections.emptyMap(), null, Collections.emptyMap()); + return getStoreBackend().subscribe(partitions, getVersion(), null); } @Override diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericSeekableDaVinciClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericSeekableDaVinciClient.java index 5b5e0cdbd66..1b029309bef 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericSeekableDaVinciClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericSeekableDaVinciClient.java @@ -2,7 +2,6 @@ import com.linkedin.davinci.consumer.VeniceChangeCoordinate; import com.linkedin.davinci.storage.chunking.GenericChunkingAdapter; -import com.linkedin.venice.client.exceptions.VeniceClientException; import com.linkedin.venice.client.store.ClientConfig; import com.linkedin.venice.service.ICProvider; import com.linkedin.venice.utils.VeniceProperties; @@ -57,11 +56,11 @@ public CompletableFuture seekToCheckpoint(Set chec @Override public CompletableFuture seekToTail() { - throw new VeniceClientException("seekToTail is not supported yet"); + return super.seekToTail(); } @Override public CompletableFuture seekToTail(Set partitions) { - throw new VeniceClientException("seekToTail is not supported yet"); + return super.seekToTail(partitions); } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciSeekCheckpointInfo.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciSeekCheckpointInfo.java new file mode 100644 index 00000000000..d7d3d52af9d --- /dev/null +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciSeekCheckpointInfo.java @@ -0,0 +1,75 @@ +package com.linkedin.davinci.client; + +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.pubsub.api.PubSubPosition; +import com.linkedin.venice.utils.ComplementSet; +import java.util.Map; + + +public class DaVinciSeekCheckpointInfo { + private Long allPartitionsTimestamp; + private Map postitionMap; + private Map timestampsMap; + private boolean seekToTail = false; + + public DaVinciSeekCheckpointInfo( + Map postitionMap, + Map timestampsMap, + Long allPartitionsTimestamp, + boolean seekToTail) { + this.allPartitionsTimestamp = allPartitionsTimestamp; + this.postitionMap = postitionMap; + this.timestampsMap = timestampsMap; + this.seekToTail = seekToTail; + int validCheckPointCount = 0; + if (allPartitionsTimestamp != null) { + validCheckPointCount++; + } + if (seekToTail) { + validCheckPointCount++; + } + if (timestampsMap != null) { + validCheckPointCount++; + } + if (postitionMap != null) { + validCheckPointCount++; + } + if (validCheckPointCount > 1) { + throw new VeniceException("Multiple checkpoint types are not supported"); + } + } + + public Long getAllPartitionsTimestamp() { + return allPartitionsTimestamp; + } + + public Map getPostitionMap() { + return postitionMap; + } + + public void setPositionMap(Map postitionMap) { + this.postitionMap = postitionMap; + } + + public void setTimestampsMap(Map timestampsMap) { + this.timestampsMap = timestampsMap; + } + + public Map getTimestampsMap() { + return timestampsMap; + } + + public boolean isSeekToTail() { + return seekToTail; + } + + public ComplementSet getPartitions() { + if (postitionMap != null) { + return ComplementSet.newSet(postitionMap.keySet()); + } else if (timestampsMap != null) { + return ComplementSet.newSet(timestampsMap.keySet()); + } else { + return ComplementSet.universalSet(); + } + } +} diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerDaVinciRecordTransformerImpl.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerDaVinciRecordTransformerImpl.java index 1b26bc4ac9c..a02db16c5b6 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerDaVinciRecordTransformerImpl.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerDaVinciRecordTransformerImpl.java @@ -326,12 +326,11 @@ public CompletableFuture seekToEndOfPush() { } public CompletableFuture seekToTail(Set partitions) { - // ToDo: Seek to latest - throw new VeniceClientException("seekToTail is not supported yet"); + return daVinciClient.seekToTail(partitions); } public CompletableFuture seekToTail() { - return this.seekToTail(Collections.emptySet()); + return daVinciClient.seekToTail(); } public CompletableFuture seekToCheckpoint(Set checkpoints) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index c4634520118..0ca0c959cc2 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -721,19 +721,18 @@ public void stopInner() { public Optional getPubSubPosition( VeniceStoreVersionConfig veniceStore, int partitionId, - Long timestamp, - PubSubPosition pubSubPosition) { - if (pubSubPosition != null) { - return Optional.of(pubSubPosition); + Map timestampMap, + Map pubSubPositionMap) { + if (pubSubPositionMap != null) { + return Optional.ofNullable(pubSubPositionMap.get(partitionId)); } final String topic = veniceStore.getStoreVersionName(); - + PubSubTopicPartition partition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), partitionId); + TopicManager topicManager = + getPubSubContext().getTopicManagerRepository().getTopicManager(serverConfig.getKafkaBootstrapServers()); Optional position = Optional.empty(); - if (timestamp != null) { - PubSubTopicPartition partition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), partitionId); - TopicManager topicManager = - getPubSubContext().getTopicManagerRepository().getTopicManager(serverConfig.getKafkaBootstrapServers()); - position = Optional.of(topicManager.getPositionByTime(partition, timestamp)); + if (timestampMap != null) { + position = Optional.of(topicManager.getPositionByTime(partition, timestampMap.get(partitionId))); } return position; } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java index cd41901aadd..27e046fe547 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java @@ -9,6 +9,7 @@ import com.linkedin.venice.pubsub.api.DefaultPubSubMessage; import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; import com.linkedin.venice.pubsub.api.PubSubPosition; +import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.pubsub.api.exceptions.PubSubUnsubscribedTopicPartitionException; @@ -155,7 +156,11 @@ synchronized void subscribe( PubSubTopicPartition topicPartitionToSubscribe, PubSubPosition lastReadPosition) { long delegateSubscribeStartTime = System.currentTimeMillis(); - this.delegate.subscribe(topicPartitionToSubscribe, lastReadPosition); + boolean inclusive = false; + if (lastReadPosition == PubSubSymbolicPosition.LATEST) { + inclusive = true; + } + this.delegate.subscribe(topicPartitionToSubscribe, lastReadPosition, inclusive); PubSubTopic previousVersionTopic = subscribedTopicPartitionToVersionTopic.put(topicPartitionToSubscribe, versionTopic); if (previousVersionTopic != null && !previousVersionTopic.equals(versionTopic)) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionService.java index f0cd7fbc541..5f2d44d6267 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionService.java @@ -7,6 +7,7 @@ import com.linkedin.davinci.stats.AggVersionedIngestionStats; import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.writer.VeniceWriterFactory; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -112,6 +113,6 @@ void demoteToStandby( Optional getPubSubPosition( VeniceStoreVersionConfig veniceStore, int partitionId, - Long timestamp, - PubSubPosition pubSubPosition); + Map timestampMap, + Map pubSubPosition); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 67bb05c1b2b..b4e8677f236 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -2300,11 +2300,16 @@ protected void processCommonConsumerAction(ConsumerAction consumerAction) throws // Subscribe to local version topic. PubSubPosition subscribePosition; - if (consumerAction.getPubSubPosition() != null && isDaVinciClient) { + if (consumerAction.getPubSubPosition() != null) { + if (recordTransformer == null) { + throw new VeniceException("seekToCheckpoint will not be supported for non-transformed client"); + } skipValidationForSeekableClientEnabled = true; subscribePosition = consumerAction.getPubSubPosition(); - LOGGER.info("Subscribed to user partition : {} position: {}", topicPartition, subscribePosition); - + LOGGER.info("Subscribed to user given partition: {} position: {}", topicPartition, subscribePosition); + // report completion immediately for user seek subscription + partitionConsumptionStateMap.get(partition).lagHasCaughtUp(); + reportCompleted(partitionConsumptionStateMap.get(partition), true); } else { subscribePosition = getLocalVtSubscribePosition(newPartitionConsumptionState); LOGGER.info("Subscribed to local: {} position: {}", topicPartition, subscribePosition); @@ -2314,6 +2319,7 @@ protected void processCommonConsumerAction(ConsumerAction consumerAction) throws newPartitionConsumptionState, subscribePosition, localKafkaServer); + LOGGER.info("Subscribed to: {} position: {}", topicPartition, subscribePosition); if (isGlobalRtDivEnabled()) { // TODO: remove. this is a temporary log for debugging while the feature is in its infancy diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java index 0e79c94b330..3c7134d3813 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/StoreBackendTest.java @@ -248,12 +248,8 @@ void testSubscribeBootstrapVersion() throws Exception { int partition = 2; // Expecting to subscribe to the specified version (version1), which is neither current nor latest. - CompletableFuture subscribeResult = storeBackend.subscribe( - ComplementSet.of(partition), - Optional.of(version1), - Collections.emptyMap(), - null, - Collections.emptyMap()); + CompletableFuture subscribeResult = + storeBackend.subscribe(ComplementSet.of(partition), Optional.of(version1), null); versionMap.get(version1.kafkaTopicName()).completePartition(partition); subscribeResult.get(3, TimeUnit.SECONDS); // Verify that subscribe selected the specified version as current. @@ -284,16 +280,11 @@ void testSubscribeVersionSpecific() throws Exception { for (int partitionId = 0; partitionId < partitionCount; partitionId++) { // Subscribe to the specified version (version1) with version-specific client - CompletableFuture subscribeResult = storeBackend.subscribe( - ComplementSet.of(partitionId), - Optional.of(version1), - Collections.emptyMap(), - null, - Collections.emptyMap()); + CompletableFuture subscribeResult = + storeBackend.subscribe(ComplementSet.of(partitionId), Optional.of(version1), null); versionMap.get(version1.kafkaTopicName()).completePartition(partitionId); subscribeResult.get(3, TimeUnit.SECONDS); } - // Verify that subscribe selected the specified version as current try (ReferenceCounted versionRef = storeBackend.getDaVinciCurrentVersion()) { assertEquals(versionRef.get().getVersion().getNumber(), version1.getNumber()); @@ -302,14 +293,7 @@ void testSubscribeVersionSpecific() throws Exception { verify(storeBackend, never()).trySubscribeDaVinciFutureVersion(); // Try to subscribe to a new version - assertThrows( - VeniceException.class, - () -> storeBackend.subscribe( - ComplementSet.of(1), - Optional.of(version3), - Collections.emptyMap(), - null, - Collections.emptyMap())); + assertThrows(VeniceException.class, () -> storeBackend.subscribe(ComplementSet.of(1), Optional.of(version3), null)); } @Test diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java index 9fb277785f1..aace0e0a4e7 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/VersionBackendTest.java @@ -29,7 +29,6 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.meta.VersionImpl; import com.linkedin.venice.meta.ZKStore; -import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pushmonitor.ExecutionStatus; import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter; import com.linkedin.venice.utils.ComplementSet; @@ -43,7 +42,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; @@ -209,9 +207,7 @@ public void testRecordTransformerSubscribe() { ComplementSet complementSet = ComplementSet.newSet(partitionList); // First subscription - Map emptyTimestamps = new HashMap<>(); - Map emptyPositionMap = new HashMap<>(); - versionBackend.subscribe(complementSet, emptyTimestamps, null, emptyPositionMap); + versionBackend.subscribe(complementSet, null); // Verify the latch count is set to 3 (number of partitions) verify(internalRecordTransformerConfig).setStartConsumptionLatchCount(3); @@ -228,7 +224,7 @@ public void testRecordTransformerSubscribe() { // Test with overlapping partitions partitionList = Arrays.asList(2, 3, 4); complementSet = ComplementSet.newSet(partitionList); - versionBackend.subscribe(complementSet, emptyTimestamps, 0L, emptyPositionMap); + versionBackend.subscribe(complementSet, null); // Shouldn't try to start consumption on already subscribed partition (2) verify(mockIngestionBackend, never()).startConsumption(any(), eq(2), any()); @@ -239,7 +235,7 @@ public void testRecordTransformerSubscribe() { verify(internalRecordTransformerConfig, never()).setStartConsumptionLatchCount(anyInt()); // Test empty subscription - versionBackend.subscribe(ComplementSet.emptySet(), emptyTimestamps, null, emptyPositionMap); + versionBackend.subscribe(ComplementSet.emptySet(), null); verify(mockIngestionBackend, never()).startConsumption(any(), eq(0), any()); verify(internalRecordTransformerConfig, never()).setStartConsumptionLatchCount(anyInt()); } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/client/AvroGenericDaVinciClientTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/client/AvroGenericDaVinciClientTest.java index f6ff9adf38c..08b0b1f5a21 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/client/AvroGenericDaVinciClientTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/client/AvroGenericDaVinciClientTest.java @@ -45,6 +45,7 @@ import java.security.AccessController; import java.security.PrivilegedAction; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -418,7 +419,7 @@ public void testSeekToCheckpoint() throws Exception { // Mock the seekToCheckpoint method doReturn(CompletableFuture.completedFuture(null)).when(mockStoreBackend) - .seekToCheckPoints(anyMap(), eq(Optional.empty())); + .seekToCheckpoint(any(DaVinciSeekCheckpointInfo.class), eq(Optional.empty())); doReturn(true).when(dvcClient).isReady(); when(dvcClient.getStoreBackend()).thenReturn(mockStoreBackend); @@ -443,12 +444,13 @@ public void testSeekToTimestamp() throws Exception { Field backendField = AvroGenericDaVinciClient.class.getDeclaredField("daVinciBackend"); backendField.setAccessible(true); // Mock the seek method - doReturn(CompletableFuture.completedFuture(null)).when(mockStoreBackend) - .seekToTimestamps(anyMap(), eq(Optional.empty())); doReturn(true).when(dvcClient).isReady(); when(dvcClient.getStoreBackend()).thenReturn(mockStoreBackend); Map timestamps = new HashMap<>(); timestamps.put(1, 1000L); + doReturn(CompletableFuture.completedFuture(null)).when(mockStoreBackend) + .seekToCheckpoint(any(DaVinciSeekCheckpointInfo.class), eq(Optional.empty())); + // Test CompletableFuture future = dvcClient.seekToTimestamps(timestamps); future.get(); // Wait for completion @@ -457,6 +459,54 @@ public void testSeekToTimestamp() throws Exception { assertTrue(future.isDone() && !future.isCompletedExceptionally()); } + @Test + public void testSeekToTail() throws Exception { + // Setup + ClientConfig clientConfig = new ClientConfig(storeName); + AvroGenericSeekableDaVinciClient dvcClient = + (AvroGenericSeekableDaVinciClient) setUpSeekableClient(clientConfig, true); + + // Mock backend + StoreBackend mockStoreBackend = mock(StoreBackend.class); + // Use reflection to set the private daVinciBackend field + Field backendField = AvroGenericDaVinciClient.class.getDeclaredField("daVinciBackend"); + backendField.setAccessible(true); + // Mock the seek method + doReturn(CompletableFuture.completedFuture(null)).when(mockStoreBackend) + .seekToCheckpoint(any(DaVinciSeekCheckpointInfo.class), eq(Optional.empty())); + doReturn(true).when(dvcClient).isReady(); + when(dvcClient.getStoreBackend()).thenReturn(mockStoreBackend); + // Test + CompletableFuture future = dvcClient.seekToTail(); + future.get(); // Wait for completion + // Verify + verify(dvcClient).seekToTail(); + assertTrue(future.isDone() && !future.isCompletedExceptionally()); + + future = dvcClient.seekToTail(Collections.singleton(1)); + future.get(); // Wait for completion + // Verify + verify(dvcClient).seekToTail(Collections.singleton(1)); + assertTrue(future.isDone() && !future.isCompletedExceptionally()); + + } + + @Test + public void testSeekToTailWhenNotReady() throws Exception { + // Setup + ClientConfig clientConfig = new ClientConfig(storeName); + AvroGenericSeekableDaVinciClient dvcClient = + (AvroGenericSeekableDaVinciClient) setUpSeekableClient(clientConfig, true); + + // Test and verify exception + try { + CompletableFuture future = dvcClient.seekToTail(Collections.emptySet()); + future.get(); + fail("Expected VeniceClientException to be thrown when client is not ready"); + } catch (VeniceClientException e) { + } + } + @Test public void testSeekToTimestampWithException() throws Exception { // Setup @@ -471,14 +521,11 @@ public void testSeekToTimestampWithException() throws Exception { backendField.setAccessible(true); // Mock the seek method doReturn(CompletableFuture.completedFuture(null)).when(mockStoreBackend) - .seekToTimestamps(anyMap(), eq(Optional.empty())); + .seekToCheckpoint(any(DaVinciSeekCheckpointInfo.class), eq(Optional.empty())); doReturn(false).when(dvcClient).isReady(); when(dvcClient.getStoreBackend()).thenReturn(mockStoreBackend); Map timestamps = new HashMap<>(); timestamps.put(1, 1000L); - // Test - - // Test // Verify the exception is propagated try { CompletableFuture future = dvcClient.seekToTimestamps(timestamps); diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/client/VersionSpecificAvroGenericDaVinciClientTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/client/VersionSpecificAvroGenericDaVinciClientTest.java index 28bbbecf9dc..3bafdf96049 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/client/VersionSpecificAvroGenericDaVinciClientTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/client/VersionSpecificAvroGenericDaVinciClientTest.java @@ -17,7 +17,6 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.utils.ComplementSet; import java.util.Arrays; -import java.util.Collections; import java.util.Optional; import org.mockito.MockedStatic; import org.testng.annotations.AfterMethod; @@ -62,8 +61,7 @@ public void testSubscribeWithExistingVersion() { versionSpecificAvroGenericDaVinciClient.subscribe(partitionsSet); verify(versionSpecificAvroGenericDaVinciClient).addPartitionsToSubscription(partitionsSet); - verify(storeBackend) - .subscribe(partitionsSet, Optional.of(version), Collections.emptyMap(), null, Collections.emptyMap()); + verify(storeBackend).subscribe(partitionsSet, Optional.of(version), null); } @Test diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index b7cb2a46c9b..6f2c4bf439b 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -60,6 +60,7 @@ import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl; import com.linkedin.venice.pubsub.PubSubTopicRepository; import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; +import com.linkedin.venice.pubsub.api.PubSubPosition; import com.linkedin.venice.pubsub.api.PubSubSecurityProtocol; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; @@ -220,6 +221,53 @@ private void setupMockConfig() { doReturn(mockVeniceClusterConfig).when(mockVeniceConfigLoader).getVeniceClusterConfig(); } + @Test + public void testGetPubSubPosition() { + // Setup + String storeName = "test-store"; + int partitionId = 0; + PubSubPosition expectedPosition = mock(PubSubPosition.class); + VeniceStoreVersionConfig storeConfig = mock(VeniceStoreVersionConfig.class); + when(storeConfig.getStoreVersionName()).thenReturn(storeName + "_v1"); + Map positionMap = new HashMap<>(); + positionMap.put(partitionId, expectedPosition); + + // Test with non-null pubSubPosition + Optional result = kafkaStoreIngestionService.getPubSubPosition( + storeConfig, + partitionId, + null, // timestamp + positionMap); + + // Verify + assertTrue(result.isPresent()); + assertEquals(expectedPosition, result.get()); + // Test case 2: When positionMap is null + result = kafkaStoreIngestionService.getPubSubPosition(storeConfig, partitionId, null, null); + assertFalse(result.isPresent()); + + // Test case 3: When positionMap doesn't contain the partition + result = kafkaStoreIngestionService.getPubSubPosition( + storeConfig, + partitionId + 1, // different partition + null, + positionMap); + assertFalse(result.isPresent()); + + // Test case 4: When positionMap contains null position for the partition + positionMap.put(partitionId, null); + result = kafkaStoreIngestionService.getPubSubPosition(storeConfig, partitionId, null, positionMap); + assertFalse(result.isPresent()); + + // Test case 5: When timestamp is provided (should be ignored when positionMap has the position) + Map tsMap = new HashMap<>(); + tsMap.put(partitionId, System.currentTimeMillis()); + positionMap.put(partitionId, expectedPosition); + result = kafkaStoreIngestionService.getPubSubPosition(storeConfig, partitionId, tsMap, positionMap); + assertTrue(result.isPresent()); + assertEquals(expectedPosition, result.get()); + } + @Test public void testDisableMetricsEmission() { String mockStoreName = "test"; @@ -236,7 +284,7 @@ public void testDisableMetricsEmission() { for (int i = 1; i <= taskNum; i++) { StoreIngestionTask task = mock(StoreIngestionTask.class); - topicNameToIngestionTaskMap.put(mockStoreName + "_v" + String.valueOf(i), task); + topicNameToIngestionTaskMap.put(mockStoreName + "_v" + i, task); } topicNameToIngestionTaskMap.put(mockSimilarStoreName + "_v1", mock(StoreIngestionTask.class)); @@ -246,7 +294,7 @@ public void testDisableMetricsEmission() { doReturn(mockStore).when(mockMetadataRepo).getStore(mockStoreName); VeniceStoreVersionConfig mockStoreConfig = mock(VeniceStoreVersionConfig.class); - doReturn(mockStoreName + "_v" + String.valueOf(taskNum)).when(mockStoreConfig).getStoreVersionName(); + doReturn(mockStoreName + "_v" + taskNum).when(mockStoreConfig).getStoreVersionName(); kafkaStoreIngestionService.updateStatsEmission(topicNameToIngestionTaskMap, mockStoreName, maxVersionNumber); diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientRecordTransformerTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientRecordTransformerTest.java index 56025b9c3a4..a7b7457786b 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientRecordTransformerTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientRecordTransformerTest.java @@ -36,6 +36,7 @@ import static com.linkedin.venice.utils.ByteUtils.BYTES_PER_MB; import static com.linkedin.venice.utils.IntegrationTestPushUtils.createStoreForJob; import static com.linkedin.venice.utils.IntegrationTestPushUtils.defaultVPJProps; +import static com.linkedin.venice.utils.IntegrationTestPushUtils.sendStreamingRecord; import static com.linkedin.venice.utils.SslUtils.LOCAL_KEYSTORE_JKS; import static com.linkedin.venice.utils.SslUtils.LOCAL_PASSWORD; import static com.linkedin.venice.utils.TestWriteUtils.DEFAULT_USER_DATA_RECORD_COUNT; @@ -80,6 +81,7 @@ import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer; import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; +import com.linkedin.venice.samza.VeniceSystemProducer; import com.linkedin.venice.store.rocksdb.RocksDBUtils; import com.linkedin.venice.utils.ForkedJavaProcess; import com.linkedin.venice.utils.IntegrationTestPushUtils; @@ -885,8 +887,15 @@ public void testRecordTransformerDaVinciWithSeeking() throws Exception { String customValue = "a"; int numKeys = 10; - setUpStore(storeName, false, false, CompressionStrategy.NO_OP, customValue, numKeys, 1); - + setUpStore(storeName, false, false, CompressionStrategy.NO_OP, customValue, 0, 1); + try (VeniceSystemProducer veniceProducer = + IntegrationTestPushUtils.getSamzaProducer(cluster, storeName, Version.PushType.STREAM)) { + for (int i = 1; i <= numKeys; ++i) { + String value = "a" + i; + sendStreamingRecord(veniceProducer, storeName, i, value); + Thread.sleep(500); + } + } VeniceProperties backendConfig = buildRecordTransformerBackendConfig(pushStatusStoreEnabled); MetricsRepository metricsRepository = new MetricsRepository(); @@ -930,27 +939,45 @@ public void testRecordTransformerDaVinciWithSeeking() throws Exception { SeekableDaVinciClient clientWithRecordTransformer = factory.getAndStartGenericSeekableAvroClient(storeName, clientConfig); - // test seek by timestamp - clientWithRecordTransformer.seekToTimestamp(pubSubMessage.getValue().producerMetadata.messageTimestamp).get(); + // test seek by change coordinate + clientWithRecordTransformer.seekToCheckpoint(Collections.singleton(changeCoordinate)).get(); TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> { for (int k = 1; k <= numKeys; ++k) { // Record shouldn't be stored in Da Vinci assertNull(clientWithRecordTransformer.get(k).get()); // Record should be stored in inMemoryDB String expectedValue = "a" + k + "Transformed"; - assertEquals(recordTransformer.get(k), k < 3 ? null : expectedValue); + assertEquals(recordTransformer.get(k), k < 7 ? null : expectedValue); } }); clientWithRecordTransformer.unsubscribeAll(); - // test seek by change coordinate - clientWithRecordTransformer.seekToCheckpoint(Collections.singleton(changeCoordinate)).get(); + clientWithRecordTransformer.close(); + recordTransformer.clearInMemoryDB(); + clientWithRecordTransformer.start(); + + // test seek by timestamp + clientWithRecordTransformer.seekToTimestamp(pubSubMessage.getValue().producerMetadata.messageTimestamp).get(); TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> { for (int k = 1; k <= numKeys; ++k) { // Record shouldn't be stored in Da Vinci assertNull(clientWithRecordTransformer.get(k).get()); // Record should be stored in inMemoryDB String expectedValue = "a" + k + "Transformed"; - assertEquals(recordTransformer.get(k), k < 3 ? null : expectedValue); + assertEquals(recordTransformer.get(k), k < 7 ? null : expectedValue); + } + }); + clientWithRecordTransformer.unsubscribeAll(); + clientWithRecordTransformer.close(); + recordTransformer.clearInMemoryDB(); + clientWithRecordTransformer.start(); + // test seek to tail + clientWithRecordTransformer.seekToTail().get(); + + TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> { + for (int k = 1; k <= numKeys; ++k) { + // Record shouldn't be stored in Da Vinci nor inMemoryDB + assertNull(clientWithRecordTransformer.get(k).get()); + assertNull(recordTransformer.get(k)); } }); } @@ -1025,7 +1052,8 @@ protected void setUpStore( writeAvroFileRunnable, valueSchema, inputDir, - numPartitions); + numPartitions, + numKeys == 0); } /* @@ -1062,7 +1090,8 @@ private void setUpStore( writeAvroFileRunnable, valueSchema, inputDir, - numPartitions); + numPartitions, + false); } protected void setUpStore( @@ -1075,7 +1104,8 @@ protected void setUpStore( Runnable writeAvroFileRunnable, String valueSchema, File inputDir, - int numPartitions) { + int numPartitions, + boolean emptyPush) { // Produce input data. writeAvroFileRunnable.run(); @@ -1099,7 +1129,11 @@ protected void setUpStore( cluster.createPushStatusSystemStore(storeName); } TestUtils.assertCommand(controllerClient.updateStore(storeName, params)); - runVPJ(vpjProperties, 1, cluster); + if (emptyPush) { + controllerClient.sendEmptyPushAndWait(storeName, "test-push", 1, 30 * Time.MS_PER_SECOND); + } else { + runVPJ(vpjProperties, 1, cluster); + } } } @@ -1136,7 +1170,8 @@ protected void setUpStore( writeAvroFileRunnable, valueSchema, inputDir, - 3); + 3, + false); } private static void runVPJ(Properties vpjProperties, int expectedVersionNumber, VeniceClusterWrapper cluster) {