Skip to content

Commit c7f2820

Browse files
committed
fixed test
1 parent 3926c11 commit c7f2820

File tree

11 files changed

+158
-100
lines changed

11 files changed

+158
-100
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java

Lines changed: 38 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.linkedin.davinci;
22

3+
import com.linkedin.davinci.client.DaVinciSeekCheckpointInfo;
34
import com.linkedin.davinci.config.StoreBackendConfig;
45
import com.linkedin.davinci.config.VeniceServerConfig;
56
import com.linkedin.venice.exceptions.VeniceException;
@@ -14,7 +15,6 @@
1415
import com.linkedin.venice.utils.ConcurrentRef;
1516
import com.linkedin.venice.utils.ReferenceCounted;
1617
import com.linkedin.venice.utils.RegionUtils;
17-
import java.util.Collections;
1818
import java.util.HashMap;
1919
import java.util.HashSet;
2020
import java.util.List;
@@ -137,44 +137,38 @@ private void setDaVinciFutureVersion(VersionBackend version) {
137137
}
138138

139139
public CompletableFuture<Void> subscribe(ComplementSet<Integer> partitions) {
140-
return subscribe(partitions, Optional.empty(), Collections.emptyMap(), null, Collections.emptyMap(), false);
140+
return subscribe(partitions, Optional.empty(), null);
141141
}
142142

143-
public CompletableFuture<Void> seekToTimestamps(Long allPartitionTimestamp, Optional<Version> storeVersion) {
144-
return subscribe(
145-
ComplementSet.universalSet(),
146-
storeVersion,
147-
new HashMap<>(),
148-
allPartitionTimestamp,
149-
Collections.emptyMap(),
150-
false);
151-
}
152-
153-
public CompletableFuture<Void> seekToTail(Optional<Version> storeVersion) {
154-
return subscribe(ComplementSet.universalSet(), storeVersion, Collections.emptyMap(), null, new HashMap<>(), true);
155-
}
143+
// public CompletableFuture<Void> seekToTimestamps(Long allPartitionTimestamp, Optional<Version> storeVersion) {
144+
// return subscribe(
145+
// ComplementSet.universalSet(),
146+
// storeVersion, null);
147+
// }
156148

157-
public CompletableFuture<Void> seekToCheckPoints(
158-
Map<Integer, PubSubPosition> checkpoints,
149+
public CompletableFuture<Void> seekToCheckpoint(
150+
DaVinciSeekCheckpointInfo checkpointInfo,
159151
Optional<Version> storeVersion) {
160-
return subscribe(
161-
ComplementSet.wrap(checkpoints.keySet()),
162-
storeVersion,
163-
Collections.emptyMap(),
164-
null,
165-
checkpoints,
166-
false);
152+
return subscribe(checkpointInfo.getPartitions(), storeVersion, checkpointInfo);
167153
}
168154

169-
public CompletableFuture<Void> seekToTimestamps(Map<Integer, Long> timestamps, Optional<Version> storeVersion) {
170-
return subscribe(
171-
ComplementSet.wrap(timestamps.keySet()),
172-
storeVersion,
173-
timestamps,
174-
null,
175-
Collections.emptyMap(),
176-
false);
177-
}
155+
// public CompletableFuture<Void> seekToTail(Optional<Version> storeVersion) {
156+
// return subscribe(ComplementSet.universalSet(), storeVersion, null);
157+
// }
158+
159+
// public CompletableFuture<Void> seekToCheckPoints(
160+
// Map<Integer, PubSubPosition> checkpoints,
161+
// Optional<Version> storeVersion) {
162+
// return subscribe(
163+
// ComplementSet.wrap(checkpoints.keySet()),
164+
// storeVersion, null);
165+
// }
166+
167+
// public CompletableFuture<Void> seekToTimestamps(Map<Integer, Long> timestamps, Optional<Version> storeVersion) {
168+
// return subscribe(
169+
// ComplementSet.wrap(timestamps.keySet()),
170+
// storeVersion, null);
171+
// }
178172

179173
private Version getCurrentVersion() {
180174
return backend.getVeniceCurrentVersion(storeName);
@@ -187,10 +181,7 @@ private Version getLatestNonFaultyVersion() {
187181
public synchronized CompletableFuture<Void> subscribe(
188182
ComplementSet<Integer> partitions,
189183
Optional<Version> bootstrapVersion,
190-
Map<Integer, Long> timestamps,
191-
Long allPartitionsTimestamp,
192-
Map<Integer, PubSubPosition> positionMap,
193-
boolean seekToTails) {
184+
DaVinciSeekCheckpointInfo checkpointInfo) {
194185
if (daVinciCurrentVersion == null) {
195186
setDaVinciCurrentVersion(new VersionBackend(backend, bootstrapVersion.orElseGet(() -> {
196187
Version version = getCurrentVersion();
@@ -226,23 +217,26 @@ public synchronized CompletableFuture<Void> subscribe(
226217
if (daVinciFutureVersion == null) {
227218
trySubscribeDaVinciFutureVersion();
228219
} else {
229-
daVinciFutureVersion.subscribe(partitions, timestamps, positionMap)
230-
.whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e));
220+
daVinciFutureVersion.subscribe(partitions, null).whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e));
231221
}
232222
}
233223

234224
VersionBackend savedVersion = daVinciCurrentVersion;
235225
List<Integer> partitionList = daVinciCurrentVersion.getPartitions(partitions);
236-
if (allPartitionsTimestamp != null) {
226+
if (checkpointInfo.getAllPartitionsTimestamp() != null) {
227+
Map<Integer, Long> timestamps = new HashMap<>();
237228
for (int partition: partitionList) {
238-
timestamps.put(partition, allPartitionsTimestamp);
229+
timestamps.put(partition, checkpointInfo.getAllPartitionsTimestamp());
239230
}
240-
} else if (seekToTails) {
231+
checkpointInfo.setTimestampsMap(timestamps);
232+
} else if (checkpointInfo.isSeekToTail()) {
233+
Map<Integer, PubSubPosition> positionMap = new HashMap<>();
241234
for (int partition: partitionList) {
242235
positionMap.put(partition, PubSubSymbolicPosition.LATEST);
243236
}
237+
checkpointInfo.setPositionMap(positionMap);
244238
}
245-
return daVinciCurrentVersion.subscribe(partitions, timestamps, positionMap).exceptionally(e -> {
239+
return daVinciCurrentVersion.subscribe(partitions, checkpointInfo).exceptionally(e -> {
246240
synchronized (this) {
247241
addFaultyVersion(savedVersion, e);
248242
// Don't propagate failure to subscribe() caller, if future version has become current and is ready to
@@ -329,8 +323,7 @@ synchronized void trySubscribeDaVinciFutureVersion() {
329323
LOGGER.info("Subscribing to future version {}", targetVersion.kafkaTopicName());
330324
setDaVinciFutureVersion(new VersionBackend(backend, targetVersion, stats));
331325
// For future version subscription, we don't need to pass any timestamps or position map
332-
daVinciFutureVersion.subscribe(subscription, Collections.emptyMap(), Collections.emptyMap())
333-
.whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e));
326+
daVinciFutureVersion.subscribe(subscription, null).whenComplete((v, e) -> trySwapDaVinciCurrentVersion(e));
334327
} else {
335328
LOGGER.info(
336329
"Skipping subscribe to future version: {} in region: {} because the target version status is: {} and the target regions are: {}",

clients/da-vinci-client/src/main/java/com/linkedin/davinci/VersionBackend.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_HEARTBEAT_INTERVAL_IN_SECONDS;
55
import static com.linkedin.venice.ConfigKeys.SERVER_STOP_CONSUMPTION_TIMEOUT_IN_SECONDS;
66

7+
import com.linkedin.davinci.client.DaVinciSeekCheckpointInfo;
78
import com.linkedin.davinci.client.InternalDaVinciRecordTransformerConfig;
89
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
910
import com.linkedin.davinci.listener.response.NoOpReadResponseStats;
@@ -365,19 +366,8 @@ synchronized boolean isReadyToServe(ComplementSet<Integer> partitions) {
365366

366367
synchronized CompletableFuture<Void> subscribe(
367368
ComplementSet<Integer> partitions,
368-
Map<Integer, Long> timestamps,
369-
Map<Integer, PubSubPosition> positionMap) {
369+
DaVinciSeekCheckpointInfo checkpointInfo) {
370370
Instant startTime = Instant.now();
371-
int validCheckPointCount = 0;
372-
if (!timestamps.isEmpty()) {
373-
validCheckPointCount++;
374-
}
375-
if (!positionMap.isEmpty()) {
376-
validCheckPointCount++;
377-
}
378-
if (validCheckPointCount > 1) {
379-
throw new VeniceException("Multiple checkpoint types are not supported");
380-
}
381371
List<Integer> partitionList = getPartitions(partitions);
382372
if (partitionList.isEmpty()) {
383373
LOGGER.error("No partitions to subscribe to for {}", this);
@@ -418,8 +408,14 @@ synchronized CompletableFuture<Void> subscribe(
418408
backend.getHeartbeatMonitoringService()
419409
.updateLagMonitor(version.kafkaTopicName(), partition, HeartbeatLagMonitorAction.SET_FOLLOWER_MONITOR);
420410
// AtomicReference of storage engine will be updated internally.
421-
Optional<PubSubPosition> pubSubPosition = backend.getIngestionService()
422-
.getPubSubPosition(config, partition, timestamps.get(partition), positionMap.get(partition));
411+
Optional<PubSubPosition> pubSubPosition = checkpointInfo == null
412+
? Optional.empty()
413+
: backend.getIngestionService()
414+
.getPubSubPosition(
415+
config,
416+
partition,
417+
checkpointInfo.getTimestampsMap(),
418+
checkpointInfo.getPostitionMap());
423419
backend.getIngestionBackend().startConsumption(config, partition, pubSubPosition);
424420
tryStartHeartbeat();
425421
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
6666
import java.nio.ByteBuffer;
6767
import java.util.ArrayList;
68-
import java.util.Collections;
6968
import java.util.HashMap;
7069
import java.util.LinkedHashMap;
7170
import java.util.List;
@@ -268,7 +267,7 @@ protected CompletableFuture<Void> seekToTail() {
268267
}
269268
throwIfNotReady();
270269
addPartitionsToSubscription(ComplementSet.universalSet());
271-
return getStoreBackend().seekToTail(getVersion());
270+
return getStoreBackend().seekToCheckpoint(new DaVinciSeekCheckpointInfo(null, null, null, true), getVersion());
272271
}
273272

274273
protected CompletableFuture<Void> seekToTail(Set<Integer> partitionSet) {
@@ -277,7 +276,7 @@ protected CompletableFuture<Void> seekToTail(Set<Integer> partitionSet) {
277276
}
278277
throwIfNotReady();
279278
addPartitionsToSubscription(ComplementSet.wrap(partitionSet));
280-
return getStoreBackend().seekToTail(getVersion());
279+
return getStoreBackend().seekToCheckpoint(new DaVinciSeekCheckpointInfo(null, null, null, true), getVersion());
281280
}
282281

283282
protected CompletableFuture<Void> seekToCheckpoint(Set<VeniceChangeCoordinate> checkpoints) {
@@ -294,7 +293,8 @@ protected CompletableFuture<Void> seekToCheckpoint(Set<VeniceChangeCoordinate> c
294293
positionMap.put(changeCoordinate.getPartition(), changeCoordinate.getPosition());
295294
}
296295
addPartitionsToSubscription(ComplementSet.wrap(positionMap.keySet()));
297-
return getStoreBackend().seekToCheckPoints(positionMap, getVersion());
296+
return getStoreBackend()
297+
.seekToCheckpoint(new DaVinciSeekCheckpointInfo(positionMap, null, null, false), getVersion());
298298
}
299299

300300
protected CompletableFuture<Void> seekToTimestamps(Map<Integer, Long> timestamps) {
@@ -303,7 +303,8 @@ protected CompletableFuture<Void> seekToTimestamps(Map<Integer, Long> timestamps
303303
}
304304
throwIfNotReady();
305305
addPartitionsToSubscription(ComplementSet.wrap(timestamps.keySet()));
306-
return getStoreBackend().seekToTimestamps(timestamps, getVersion());
306+
return getStoreBackend()
307+
.seekToCheckpoint(new DaVinciSeekCheckpointInfo(null, timestamps, null, false), getVersion());
307308
}
308309

309310
protected CompletableFuture<Void> seekToTimestamps(Long timestamp) {
@@ -312,14 +313,14 @@ protected CompletableFuture<Void> seekToTimestamps(Long timestamp) {
312313
}
313314
throwIfNotReady();
314315
addPartitionsToSubscription(ComplementSet.universalSet());
315-
return getStoreBackend().seekToTimestamps(timestamp, getVersion());
316+
return getStoreBackend()
317+
.seekToCheckpoint(new DaVinciSeekCheckpointInfo(null, null, timestamp, false), getVersion());
316318
}
317319

318320
protected CompletableFuture<Void> subscribe(ComplementSet<Integer> partitions) {
319321
throwIfNotReady();
320322
addPartitionsToSubscription(partitions);
321-
return getStoreBackend()
322-
.subscribe(partitions, getVersion(), Collections.emptyMap(), null, Collections.emptyMap(), false);
323+
return getStoreBackend().subscribe(partitions, getVersion(), null);
323324
}
324325

325326
@Override
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package com.linkedin.davinci.client;
2+
3+
import com.linkedin.venice.exceptions.VeniceException;
4+
import com.linkedin.venice.pubsub.api.PubSubPosition;
5+
import com.linkedin.venice.utils.ComplementSet;
6+
import java.util.Map;
7+
8+
9+
public class DaVinciSeekCheckpointInfo {
10+
private Long allPartitionsTimestamp;
11+
private Map<Integer, PubSubPosition> postitionMap;
12+
private Map<Integer, Long> timestampsMap;
13+
private boolean seekToTail = false;
14+
15+
public DaVinciSeekCheckpointInfo(
16+
Map<Integer, PubSubPosition> postitionMap,
17+
Map<Integer, Long> timestampsMap,
18+
Long allPartitionsTimestamp,
19+
boolean seekToTail) {
20+
this.allPartitionsTimestamp = allPartitionsTimestamp;
21+
this.postitionMap = postitionMap;
22+
this.timestampsMap = timestampsMap;
23+
this.seekToTail = seekToTail;
24+
int validCheckPointCount = 0;
25+
if (allPartitionsTimestamp != null) {
26+
validCheckPointCount++;
27+
}
28+
if (seekToTail) {
29+
validCheckPointCount++;
30+
}
31+
if (timestampsMap != null) {
32+
validCheckPointCount++;
33+
}
34+
if (postitionMap != null) {
35+
validCheckPointCount++;
36+
}
37+
if (validCheckPointCount > 1) {
38+
throw new VeniceException("Multiple checkpoint types are not supported");
39+
}
40+
}
41+
42+
public Long getAllPartitionsTimestamp() {
43+
return allPartitionsTimestamp;
44+
}
45+
46+
public Map<Integer, PubSubPosition> getPostitionMap() {
47+
return postitionMap;
48+
}
49+
50+
public void setPositionMap(Map<Integer, PubSubPosition> postitionMap) {
51+
this.postitionMap = postitionMap;
52+
}
53+
54+
public void setTimestampsMap(Map<Integer, Long> timestampsMap) {
55+
this.timestampsMap = timestampsMap;
56+
}
57+
58+
public Map<Integer, Long> getTimestampsMap() {
59+
return timestampsMap;
60+
}
61+
62+
public boolean isSeekToTail() {
63+
return seekToTail;
64+
}
65+
66+
public ComplementSet<Integer> getPartitions() {
67+
if (postitionMap != null) {
68+
return ComplementSet.newSet(postitionMap.keySet());
69+
} else if (timestampsMap != null) {
70+
return ComplementSet.newSet(timestampsMap.keySet());
71+
} else {
72+
return ComplementSet.universalSet();
73+
}
74+
}
75+
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -706,18 +706,18 @@ public void stopInner() {
706706
public Optional<PubSubPosition> getPubSubPosition(
707707
VeniceStoreVersionConfig veniceStore,
708708
int partitionId,
709-
Long timestamp,
710-
PubSubPosition pubSubPosition) {
711-
if (pubSubPosition != null) {
712-
return Optional.of(pubSubPosition);
709+
Map<Integer, Long> timestampMap,
710+
Map<Integer, PubSubPosition> pubSubPositionMap) {
711+
if (pubSubPositionMap != null) {
712+
return Optional.of(pubSubPositionMap.get(partitionId));
713713
}
714714
final String topic = veniceStore.getStoreVersionName();
715715
PubSubTopicPartition partition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), partitionId);
716716
TopicManager topicManager =
717717
getPubSubContext().getTopicManagerRepository().getTopicManager(serverConfig.getKafkaBootstrapServers());
718718
Optional<PubSubPosition> position = Optional.empty();
719-
if (timestamp != null) {
720-
position = Optional.of(topicManager.getPositionByTime(partition, timestamp));
719+
if (timestampMap != null) {
720+
position = Optional.of(topicManager.getPositionByTime(partition, timestampMap.get(partitionId)));
721721
}
722722
return position;
723723
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.linkedin.davinci.stats.AggVersionedIngestionStats;
88
import com.linkedin.venice.pubsub.api.PubSubPosition;
99
import com.linkedin.venice.writer.VeniceWriterFactory;
10+
import java.util.Map;
1011
import java.util.Optional;
1112
import java.util.Set;
1213
import java.util.concurrent.CompletableFuture;
@@ -112,6 +113,6 @@ void demoteToStandby(
112113
Optional<PubSubPosition> getPubSubPosition(
113114
VeniceStoreVersionConfig veniceStore,
114115
int partitionId,
115-
Long timestamp,
116-
PubSubPosition pubSubPosition);
116+
Map<Integer, Long> timestampMap,
117+
Map<Integer, PubSubPosition> pubSubPosition);
117118
}

0 commit comments

Comments
 (0)