Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package com.linkedin.davinci;

import com.linkedin.davinci.client.DaVinciSeekCheckpointInfo;
import com.linkedin.davinci.config.StoreBackendConfig;
import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.meta.VersionStatus;
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
import com.linkedin.venice.serialization.StoreDeserializerCache;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.ConcurrentRef;
import com.linkedin.venice.utils.ReferenceCounted;
import com.linkedin.venice.utils.RegionUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -135,26 +137,13 @@ private void setDaVinciFutureVersion(VersionBackend version) {
}

public CompletableFuture<Void> subscribe(ComplementSet<Integer> partitions) {
return subscribe(partitions, Optional.empty(), Collections.emptyMap(), null, Collections.emptyMap());
return subscribe(partitions, Optional.empty(), null);
}

public CompletableFuture<Void> seekToTimestamps(Long allPartitionTimestamp, Optional<Version> storeVersion) {
return subscribe(
ComplementSet.universalSet(),
storeVersion,
new HashMap<>(),
allPartitionTimestamp,
Collections.emptyMap());
}

public CompletableFuture<Void> seekToCheckPoints(
Map<Integer, PubSubPosition> checkpoints,
public CompletableFuture<Void> seekToCheckpoint(
DaVinciSeekCheckpointInfo checkpointInfo,
Optional<Version> storeVersion) {
return subscribe(ComplementSet.wrap(checkpoints.keySet()), storeVersion, Collections.emptyMap(), null, checkpoints);
}

public CompletableFuture<Void> seekToTimestamps(Map<Integer, Long> timestamps, Optional<Version> storeVersion) {
return subscribe(ComplementSet.wrap(timestamps.keySet()), storeVersion, timestamps, null, Collections.emptyMap());
return subscribe(checkpointInfo.getPartitions(), storeVersion, checkpointInfo);
}

private Version getCurrentVersion() {
Expand All @@ -168,9 +157,7 @@ private Version getLatestNonFaultyVersion() {
public synchronized CompletableFuture<Void> subscribe(
ComplementSet<Integer> partitions,
Optional<Version> bootstrapVersion,
Map<Integer, Long> timestamps,
Long allPartitionsTimestamp,
Map<Integer, PubSubPosition> positionMap) {
DaVinciSeekCheckpointInfo checkpointInfo) {
if (daVinciCurrentVersion == null) {
setDaVinciCurrentVersion(new VersionBackend(backend, bootstrapVersion.orElseGet(() -> {
Version version = getCurrentVersion();
Expand Down Expand Up @@ -207,33 +194,44 @@ public synchronized CompletableFuture<Void> subscribe(
if (daVinciFutureVersion == null) {
trySubscribeDaVinciFutureVersion();
} else {
daVinciFutureVersion.subscribe(partitions, timestamps, allPartitionsTimestamp, positionMap)
.whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e));
daVinciFutureVersion.subscribe(partitions, null).whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e));
}
}

VersionBackend savedVersion = daVinciCurrentVersion;
return daVinciCurrentVersion.subscribe(partitions, timestamps, allPartitionsTimestamp, positionMap)
.exceptionally(e -> {
synchronized (this) {
addFaultyVersion(savedVersion, e);
// Don't propagate failure to subscribe() caller, if future version has become current and is ready to
// serve.
if (daVinciCurrentVersion != null && daVinciCurrentVersion.isReadyToServe(subscription)) {
return null;
}
}
throw (e instanceof CompletionException) ? (CompletionException) e : new CompletionException(e);
})
.whenComplete((v, e) -> {
synchronized (this) {
if (e == null) {
LOGGER.info("Ready to serve partitions {} of {}", subscription, daVinciCurrentVersion);
} else {
LOGGER.warn("Failed to subscribe to partitions {} of {}", subscription, savedVersion, e);
}
}
});
List<Integer> partitionList = daVinciCurrentVersion.getPartitions(partitions);
if (checkpointInfo != null && checkpointInfo.getAllPartitionsTimestamp() != null) {
Map<Integer, Long> timestamps = new HashMap<>();
for (int partition: partitionList) {
timestamps.put(partition, checkpointInfo.getAllPartitionsTimestamp());
}
checkpointInfo.setTimestampsMap(timestamps);
} else if (checkpointInfo != null && checkpointInfo.isSeekToTail()) {
Map<Integer, PubSubPosition> positionMap = new HashMap<>();
for (int partition: partitionList) {
positionMap.put(partition, PubSubSymbolicPosition.LATEST);
}
checkpointInfo.setPositionMap(positionMap);
}
return daVinciCurrentVersion.subscribe(partitions, checkpointInfo).exceptionally(e -> {
synchronized (this) {
addFaultyVersion(savedVersion, e);
// Don't propagate failure to subscribe() caller, if future version has become current and is ready to
// serve.
if (daVinciCurrentVersion != null && daVinciCurrentVersion.isReadyToServe(subscription)) {
return null;
}
}
throw (e instanceof CompletionException) ? (CompletionException) e : new CompletionException(e);
}).whenComplete((v, e) -> {
synchronized (this) {
if (e == null) {
LOGGER.info("Ready to serve partitions {} of {}", subscription, daVinciCurrentVersion);
} else {
LOGGER.warn("Failed to subscribe to partitions {} of {}", subscription, savedVersion, e);
}
}
});
}

public synchronized void unsubscribe(ComplementSet<Integer> partitions) {
Expand Down Expand Up @@ -302,8 +300,7 @@ synchronized void trySubscribeDaVinciFutureVersion() {
LOGGER.info("Subscribing to future version {}", targetVersion.kafkaTopicName());
setDaVinciFutureVersion(new VersionBackend(backend, targetVersion, stats));
// For future version subscription, we don't need to pass any timestamps or position map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we update this comment please

daVinciFutureVersion.subscribe(subscription, Collections.emptyMap(), null, Collections.emptyMap())
.whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e));
daVinciFutureVersion.subscribe(subscription, null).whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e));
} else {
LOGGER.info(
"Skipping subscribe to future version: {} in region: {} because the target version status is: {} and the target regions are: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_HEARTBEAT_INTERVAL_IN_SECONDS;
import static com.linkedin.venice.ConfigKeys.SERVER_STOP_CONSUMPTION_TIMEOUT_IN_SECONDS;

import com.linkedin.davinci.client.DaVinciSeekCheckpointInfo;
import com.linkedin.davinci.client.InternalDaVinciRecordTransformerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.listener.response.NoOpReadResponseStats;
Expand Down Expand Up @@ -365,23 +366,8 @@ synchronized boolean isReadyToServe(ComplementSet<Integer> partitions) {

synchronized CompletableFuture<Void> subscribe(
ComplementSet<Integer> partitions,
Map<Integer, Long> timestamps,
Long allPartitionTimestamp,
Map<Integer, PubSubPosition> positionMap) {
DaVinciSeekCheckpointInfo checkpointInfo) {
Instant startTime = Instant.now();
int validCheckPointCount = 0;
if (!timestamps.isEmpty()) {
validCheckPointCount++;
}
if (!positionMap.isEmpty()) {
validCheckPointCount++;
}
if (allPartitionTimestamp != null) {
validCheckPointCount++;
}
if (validCheckPointCount > 1) {
throw new VeniceException("Multiple checkpoint types are not supported");
}
List<Integer> partitionList = getPartitions(partitions);
if (partitionList.isEmpty()) {
LOGGER.error("No partitions to subscribe to for {}", this);
Expand All @@ -407,9 +393,6 @@ synchronized CompletableFuture<Void> subscribe(
} else {
partitionFutures.computeIfAbsent(partition, k -> new CompletableFuture<>());
partitionsToStartConsumption.add(partition);
if (allPartitionTimestamp != null) {
timestamps.put(partition, allPartitionTimestamp);
}
}
partitionToBatchReportEOIPEnabled.put(partition, batchReportEOIPStatusEnabled);
futures.add(partitionFutures.get(partition));
Expand All @@ -425,8 +408,14 @@ synchronized CompletableFuture<Void> subscribe(
backend.getHeartbeatMonitoringService()
.updateLagMonitor(version.kafkaTopicName(), partition, HeartbeatLagMonitorAction.SET_FOLLOWER_MONITOR);
// AtomicReference of storage engine will be updated internally.
Optional<PubSubPosition> pubSubPosition = backend.getIngestionService()
.getPubSubPosition(config, partition, timestamps.get(partition), positionMap.get(partition));
Optional<PubSubPosition> pubSubPosition = checkpointInfo == null
? Optional.empty()
: backend.getIngestionService()
.getPubSubPosition(
config,
partition,
checkpointInfo.getTimestampsMap(),
checkpointInfo.getPostitionMap());
backend.getIngestionBackend().startConsumption(config, partition, pubSubPosition);
tryStartHeartbeat();
}
Expand Down Expand Up @@ -573,7 +562,7 @@ Map<Integer, List<String>> getPartitionToPendingReportIncrementalPushList() {
return partitionToPendingReportIncrementalPushList;
}

private List<Integer> getPartitions(ComplementSet<Integer> partitions) {
public List<Integer> getPartitions(ComplementSet<Integer> partitions) {
return IntStream.range(0, version.getPartitionCount())
.filter(partitions::contains)
.boxed()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -262,6 +261,24 @@ private Optional<Version> getVersion() {
return Optional.of(version);
}

protected CompletableFuture<Void> seekToTail() {
if (getBackend().isIsolatedIngestion()) {
throw new VeniceClientException("Isolated Ingestion is not supported with seekToCheckpoint");
}
throwIfNotReady();
addPartitionsToSubscription(ComplementSet.universalSet());
return getStoreBackend().seekToCheckpoint(new DaVinciSeekCheckpointInfo(null, null, null, true), getVersion());
}

protected CompletableFuture<Void> seekToTail(Set<Integer> partitionSet) {
if (getBackend().isIsolatedIngestion()) {
throw new VeniceClientException("Isolated Ingestion is not supported with seekToCheckpoint");
}
throwIfNotReady();
addPartitionsToSubscription(ComplementSet.wrap(partitionSet));
return getStoreBackend().seekToCheckpoint(new DaVinciSeekCheckpointInfo(null, null, null, true), getVersion());
}

protected CompletableFuture<Void> seekToCheckpoint(Set<VeniceChangeCoordinate> checkpoints) {
if (getBackend().isIsolatedIngestion()) {
throw new VeniceClientException("Isolated Ingestion is not supported with seekToCheckpoint");
Expand All @@ -276,7 +293,8 @@ protected CompletableFuture<Void> seekToCheckpoint(Set<VeniceChangeCoordinate> c
positionMap.put(changeCoordinate.getPartition(), changeCoordinate.getPosition());
}
addPartitionsToSubscription(ComplementSet.wrap(positionMap.keySet()));
return getStoreBackend().seekToCheckPoints(positionMap, getVersion());
return getStoreBackend()
.seekToCheckpoint(new DaVinciSeekCheckpointInfo(positionMap, null, null, false), getVersion());
}

protected CompletableFuture<Void> seekToTimestamps(Map<Integer, Long> timestamps) {
Expand All @@ -285,22 +303,24 @@ protected CompletableFuture<Void> seekToTimestamps(Map<Integer, Long> timestamps
}
throwIfNotReady();
addPartitionsToSubscription(ComplementSet.wrap(timestamps.keySet()));
return getStoreBackend().seekToTimestamps(timestamps, getVersion());
return getStoreBackend()
.seekToCheckpoint(new DaVinciSeekCheckpointInfo(null, timestamps, null, false), getVersion());
}

protected CompletableFuture<Void> seekToTimestamps(Long timestamps) {
protected CompletableFuture<Void> seekToTimestamps(Long timestamp) {
if (getBackend().isIsolatedIngestion()) {
throw new VeniceClientException("Isolated Ingestion is not supported with seekToTimestamps");
}
throwIfNotReady();
addPartitionsToSubscription(ComplementSet.universalSet());
return getStoreBackend().seekToTimestamps(timestamps, getVersion());
return getStoreBackend()
.seekToCheckpoint(new DaVinciSeekCheckpointInfo(null, null, timestamp, false), getVersion());
}

protected CompletableFuture<Void> subscribe(ComplementSet<Integer> partitions) {
throwIfNotReady();
addPartitionsToSubscription(partitions);
return getStoreBackend().subscribe(partitions, getVersion(), Collections.emptyMap(), null, Collections.emptyMap());
return getStoreBackend().subscribe(partitions, getVersion(), null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.linkedin.davinci.consumer.VeniceChangeCoordinate;
import com.linkedin.davinci.storage.chunking.GenericChunkingAdapter;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.utils.VeniceProperties;
Expand Down Expand Up @@ -57,11 +56,11 @@ public CompletableFuture<Void> seekToCheckpoint(Set<VeniceChangeCoordinate> chec

@Override
public CompletableFuture<Void> seekToTail() {
throw new VeniceClientException("seekToTail is not supported yet");
return super.seekToTail();
}

@Override
public CompletableFuture<Void> seekToTail(Set<Integer> partitions) {
throw new VeniceClientException("seekToTail is not supported yet");
return super.seekToTail(partitions);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.linkedin.davinci.client;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.utils.ComplementSet;
import java.util.Map;


public class DaVinciSeekCheckpointInfo {
private Long allPartitionsTimestamp;
private Map<Integer, PubSubPosition> postitionMap;
private Map<Integer, Long> timestampsMap;
private boolean seekToTail = false;

public DaVinciSeekCheckpointInfo(
Map<Integer, PubSubPosition> postitionMap,
Map<Integer, Long> timestampsMap,
Long allPartitionsTimestamp,
boolean seekToTail) {
this.allPartitionsTimestamp = allPartitionsTimestamp;
this.postitionMap = postitionMap;
this.timestampsMap = timestampsMap;
this.seekToTail = seekToTail;
int validCheckPointCount = 0;
if (allPartitionsTimestamp != null) {
validCheckPointCount++;
}
if (seekToTail) {
validCheckPointCount++;
}
if (timestampsMap != null) {
validCheckPointCount++;
}
if (postitionMap != null) {
validCheckPointCount++;
}
if (validCheckPointCount > 1) {
throw new VeniceException("Multiple checkpoint types are not supported");
}
}

public Long getAllPartitionsTimestamp() {
return allPartitionsTimestamp;
}

public Map<Integer, PubSubPosition> getPostitionMap() {
return postitionMap;
}

public void setPositionMap(Map<Integer, PubSubPosition> postitionMap) {
this.postitionMap = postitionMap;
}

public void setTimestampsMap(Map<Integer, Long> timestampsMap) {
this.timestampsMap = timestampsMap;
}

public Map<Integer, Long> getTimestampsMap() {
return timestampsMap;
}

public boolean isSeekToTail() {
return seekToTail;
}

public ComplementSet<Integer> getPartitions() {
if (postitionMap != null) {
return ComplementSet.newSet(postitionMap.keySet());
} else if (timestampsMap != null) {
return ComplementSet.newSet(timestampsMap.keySet());
} else {
return ComplementSet.universalSet();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,11 @@ public CompletableFuture<Void> seekToEndOfPush() {
}

public CompletableFuture<Void> seekToTail(Set<Integer> partitions) {
// ToDo: Seek to latest
throw new VeniceClientException("seekToTail is not supported yet");
return daVinciClient.seekToTail(partitions);
}

public CompletableFuture<Void> seekToTail() {
return this.seekToTail(Collections.emptySet());
return daVinciClient.seekToTail();
}

public CompletableFuture<Void> seekToCheckpoint(Set<VeniceChangeCoordinate> checkpoints) {
Expand Down
Loading
Loading