Skip to content

Commit 3526f8a

Browse files
committed
addressed comments
1 parent 08b8fdd commit 3526f8a

File tree

3 files changed

+5
-26
lines changed

3 files changed

+5
-26
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -140,36 +140,12 @@ public CompletableFuture<Void> subscribe(ComplementSet<Integer> partitions) {
140140
return subscribe(partitions, Optional.empty(), null);
141141
}
142142

143-
// public CompletableFuture<Void> seekToTimestamps(Long allPartitionTimestamp, Optional<Version> storeVersion) {
144-
// return subscribe(
145-
// ComplementSet.universalSet(),
146-
// storeVersion, null);
147-
// }
148-
149143
public CompletableFuture<Void> seekToCheckpoint(
150144
DaVinciSeekCheckpointInfo checkpointInfo,
151145
Optional<Version> storeVersion) {
152146
return subscribe(checkpointInfo.getPartitions(), storeVersion, checkpointInfo);
153147
}
154148

155-
// public CompletableFuture<Void> seekToTail(Optional<Version> storeVersion) {
156-
// return subscribe(ComplementSet.universalSet(), storeVersion, null);
157-
// }
158-
159-
// public CompletableFuture<Void> seekToCheckPoints(
160-
// Map<Integer, PubSubPosition> checkpoints,
161-
// Optional<Version> storeVersion) {
162-
// return subscribe(
163-
// ComplementSet.wrap(checkpoints.keySet()),
164-
// storeVersion, null);
165-
// }
166-
167-
// public CompletableFuture<Void> seekToTimestamps(Map<Integer, Long> timestamps, Optional<Version> storeVersion) {
168-
// return subscribe(
169-
// ComplementSet.wrap(timestamps.keySet()),
170-
// storeVersion, null);
171-
// }
172-
173149
private Version getCurrentVersion() {
174150
return backend.getVeniceCurrentVersion(storeName);
175151
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/consumer/VeniceChangelogConsumerDaVinciRecordTransformerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ public CompletableFuture<Void> seekToBeginningOfPush() {
318318
}
319319

320320
public CompletableFuture<Void> seekToEndOfPush(Set<Integer> partitions) {
321-
return daVinciClient.seekToTail(partitions);
321+
throw new VeniceClientException("seekToEndOfPush will not be supported");
322322
}
323323

324324
public CompletableFuture<Void> seekToEndOfPush() {

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2300,7 +2300,10 @@ protected void processCommonConsumerAction(ConsumerAction consumerAction) throws
23002300

23012301
// Subscribe to local version topic.
23022302
PubSubPosition subscribePosition;
2303-
if (consumerAction.getPubSubPosition() != null && recordTransformer != null) {
2303+
if (consumerAction.getPubSubPosition() != null) {
2304+
if (recordTransformer == null) {
2305+
throw new VeniceException("seekToCheckpoint will not be supported for non-transformed client");
2306+
}
23042307
skipValidationForSeekableClientEnabled = true;
23052308
subscribePosition = consumerAction.getPubSubPosition();
23062309
LOGGER.info("Subscribed to user given partition: {} position: {}", topicPartition, subscribePosition);

0 commit comments

Comments
 (0)