Skip to content

Commit b7f3f66

Browse files
authored
[server][dvc] report incremental push start/end status after completing blob transfer (#2304)
1 parent 0ed36c0 commit b7f3f66

File tree

11 files changed

+412
-22
lines changed

11 files changed

+412
-22
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,7 @@ public DaVinciBackend(
352352
.setAggVersionedBlobTransferStats(aggVersionedBlobTransferStats)
353353
.setBlobTransferSSLFactory(BlobTransferUtils.createSSLFactoryForBlobTransferInDVC(configLoader))
354354
.setBlobTransferAclHandler(BlobTransferUtils.createAclHandler(configLoader))
355+
.setPushStatusNotifierSupplier(() -> ingestionListener)
355356
.build();
356357
} else {
357358
aggVersionedBlobTransferStats = null;

clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/BlobTransferManagerBuilder.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import com.linkedin.davinci.blobtransfer.client.NettyFileTransferClient;
66
import com.linkedin.davinci.blobtransfer.server.P2PBlobTransferService;
7+
import com.linkedin.davinci.notifier.VeniceNotifier;
78
import com.linkedin.davinci.stats.AggVersionedBlobTransferStats;
89
import com.linkedin.davinci.storage.StorageEngineRepository;
910
import com.linkedin.davinci.storage.StorageMetadataService;
@@ -18,6 +19,7 @@
1819
import io.netty.handler.traffic.GlobalChannelTrafficShapingHandler;
1920
import java.util.Optional;
2021
import java.util.concurrent.CompletableFuture;
22+
import java.util.function.Supplier;
2123
import org.apache.logging.log4j.LogManager;
2224
import org.apache.logging.log4j.Logger;
2325

@@ -39,6 +41,7 @@ public class BlobTransferManagerBuilder {
3941
private Optional<BlobTransferAclHandler> aclHandler;
4042
private VeniceAdaptiveBlobTransferTrafficThrottler adaptiveBlobTransferWriteTrafficThrottler;
4143
private VeniceAdaptiveBlobTransferTrafficThrottler adaptiveBlobTransferReadTrafficThrottler;
44+
private Supplier<VeniceNotifier> veniceNotifier;
4245

4346
public BlobTransferManagerBuilder() {
4447
}
@@ -104,6 +107,11 @@ public BlobTransferManagerBuilder setAdaptiveBlobTransferReadTrafficThrottler(
104107
return this;
105108
}
106109

110+
public BlobTransferManagerBuilder setPushStatusNotifierSupplier(Supplier<VeniceNotifier> veniceNotifier) {
111+
this.veniceNotifier = veniceNotifier;
112+
return this;
113+
}
114+
107115
public BlobTransferManager<Void> build() {
108116
try {
109117
validateFields();
@@ -153,7 +161,8 @@ public BlobTransferManager<Void> build() {
153161
blobTransferConfig.getBlobReceiveTimeoutInMin(),
154162
blobTransferConfig.getBlobReceiveReaderIdleTimeInSeconds(),
155163
globalTrafficHandler,
156-
sslFactory),
164+
sslFactory,
165+
veniceNotifier),
157166
blobFinder,
158167
blobTransferConfig.getBaseDir(),
159168
aggVersionedBlobTransferStats);

clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/NettyFileTransferClient.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.linkedin.alpini.base.misc.ThreadPoolExecutor;
55
import com.linkedin.davinci.blobtransfer.BlobTransferUtils;
66
import com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferTableFormat;
7+
import com.linkedin.davinci.notifier.VeniceNotifier;
78
import com.linkedin.davinci.storage.StorageMetadataService;
89
import com.linkedin.venice.exceptions.VenicePeersConnectionException;
910
import com.linkedin.venice.listener.VerifySslHandler;
@@ -44,6 +45,7 @@
4445
import java.util.concurrent.LinkedBlockingQueue;
4546
import java.util.concurrent.ScheduledExecutorService;
4647
import java.util.concurrent.TimeUnit;
48+
import java.util.function.Supplier;
4749
import org.apache.logging.log4j.LogManager;
4850
import org.apache.logging.log4j.Logger;
4951

@@ -73,6 +75,7 @@ public class NettyFileTransferClient {
7375
// format: host -> timestamp of the last connection attempt
7476
private final VeniceConcurrentHashMap<String, Long> unconnectableHostsToTimestamp = new VeniceConcurrentHashMap<>();
7577
private final VeniceConcurrentHashMap<String, Long> connectedHostsToTimestamp = new VeniceConcurrentHashMap<>();
78+
private final Supplier<VeniceNotifier> notifierSupplier;
7679

7780
private final VerifySslHandler verifySsl = new VerifySslHandler();
7881

@@ -85,10 +88,12 @@ public NettyFileTransferClient(
8588
int blobReceiveTimeoutInMin,
8689
int blobReceiveReaderIdleTimeInSeconds,
8790
GlobalChannelTrafficShapingHandler globalChannelTrafficShapingHandler,
88-
Optional<SSLFactory> sslFactory) {
91+
Optional<SSLFactory> sslFactory,
92+
Supplier<VeniceNotifier> notifierSupplier) {
8993
this.baseDir = baseDir;
9094
this.serverPort = serverPort;
9195
this.storageMetadataService = storageMetadataService;
96+
this.notifierSupplier = notifierSupplier;
9297
this.peersConnectivityFreshnessInSeconds = peersConnectivityFreshnessInSeconds;
9398
this.blobReceiveTimeoutInMin = blobReceiveTimeoutInMin;
9499
this.blobReceiveReaderIdleTimeInSeconds = blobReceiveReaderIdleTimeInSeconds;
@@ -309,7 +314,8 @@ public CompletionStage<InputStream> get(
309314
storeName,
310315
version,
311316
partition,
312-
requestedTableFormat));
317+
requestedTableFormat,
318+
notifierSupplier));
313319
// Send a GET request
314320
ChannelFuture requestFuture =
315321
ch.writeAndFlush(prepareRequest(storeName, version, partition, requestedTableFormat));

clients/da-vinci-client/src/main/java/com/linkedin/davinci/blobtransfer/client/P2PMetadataTransferHandler.java

Lines changed: 106 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@
55
import com.linkedin.davinci.blobtransfer.BlobTransferPartitionMetadata;
66
import com.linkedin.davinci.blobtransfer.BlobTransferPayload;
77
import com.linkedin.davinci.blobtransfer.BlobTransferUtils.BlobTransferTableFormat;
8+
import com.linkedin.davinci.notifier.VeniceNotifier;
89
import com.linkedin.davinci.storage.StorageMetadataService;
910
import com.linkedin.venice.exceptions.VeniceException;
11+
import com.linkedin.venice.kafka.protocol.state.IncrementalPushReplicaStatus;
1012
import com.linkedin.venice.kafka.protocol.state.PartitionState;
1113
import com.linkedin.venice.kafka.protocol.state.StoreVersionState;
14+
import com.linkedin.venice.meta.Version;
1215
import com.linkedin.venice.offsets.OffsetRecord;
16+
import com.linkedin.venice.pushmonitor.ExecutionStatus;
1317
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
1418
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
1519
import com.linkedin.venice.utils.ObjectMapperFactory;
@@ -21,6 +25,8 @@
2125
import io.netty.handler.codec.http.HttpHeaderNames;
2226
import io.netty.handler.codec.http.HttpResponseStatus;
2327
import java.io.IOException;
28+
import java.util.Map;
29+
import java.util.function.Supplier;
2430
import org.apache.logging.log4j.LogManager;
2531
import org.apache.logging.log4j.Logger;
2632

@@ -38,15 +44,18 @@ public class P2PMetadataTransferHandler extends SimpleChannelInboundHandler<Full
3844
private final BlobTransferPayload payload;
3945
private BlobTransferPartitionMetadata metadata;
4046
private StorageMetadataService storageMetadataService;
47+
private Supplier<VeniceNotifier> veniceNotifierSupplier;
4148

4249
public P2PMetadataTransferHandler(
4350
StorageMetadataService storageMetadataService,
4451
String baseDir,
4552
String storeName,
4653
int version,
4754
int partition,
48-
BlobTransferTableFormat tableFormat) {
55+
BlobTransferTableFormat tableFormat,
56+
Supplier<VeniceNotifier> veniceNotifierSupplier) {
4957
this.storageMetadataService = storageMetadataService;
58+
this.veniceNotifierSupplier = veniceNotifierSupplier;
5059
this.payload = new BlobTransferPayload(baseDir, storeName, version, partition, tableFormat);
5160
}
5261

@@ -103,15 +112,105 @@ public void updateStorePartitionMetadata(
103112
LOGGER.info(
104113
"Start updating store partition metadata for {}",
105114
Utils.getReplicaId(transferredPartitionMetadata.topicName, transferredPartitionMetadata.partitionId));
106-
// update the offset record in storage service
107-
storageMetadataService.put(
108-
transferredPartitionMetadata.topicName,
109-
transferredPartitionMetadata.partitionId,
110-
new OffsetRecord(transferredPartitionMetadata.offsetRecord.array(), partitionStateSerializer, null));
111-
// update the metadata SVS
115+
OffsetRecord transferredOffsetRecord =
116+
new OffsetRecord(transferredPartitionMetadata.offsetRecord.array(), partitionStateSerializer, null);
117+
// 1. update the offset incremental push job information
118+
updateIncrementalPushInfoToStore(transferredOffsetRecord, transferredPartitionMetadata);
119+
// 2. update the offset record in storage service
120+
storageMetadataService
121+
.put(transferredPartitionMetadata.topicName, transferredPartitionMetadata.partitionId, transferredOffsetRecord);
122+
// 3. update the metadata SVS
112123
updateStorageVersionState(storageMetadataService, transferredPartitionMetadata);
113124
}
114125

126+
/**
127+
* Update the incremental push info to push status store from the transferred offset record trackingIncrementalPushStatus
128+
* @param offsetRecord
129+
* @param transferredPartitionMetadata
130+
*/
131+
public void updateIncrementalPushInfoToStore(
132+
OffsetRecord offsetRecord,
133+
BlobTransferPartitionMetadata transferredPartitionMetadata) {
134+
String storeName = Version.parseStoreFromKafkaTopicName(transferredPartitionMetadata.getTopicName());
135+
int partitionId = transferredPartitionMetadata.getPartitionId();
136+
String kafkaTopic = transferredPartitionMetadata.getTopicName();
137+
Map<String, IncrementalPushReplicaStatus> incPushVersionToStatusMap =
138+
offsetRecord.getTrackingIncrementalPushStatus();
139+
140+
if (incPushVersionToStatusMap == null || incPushVersionToStatusMap.isEmpty()) {
141+
LOGGER.info(
142+
"No incremental push info to update to push status store for {}, partition: {}",
143+
storeName,
144+
partitionId);
145+
return;
146+
}
147+
148+
VeniceNotifier veniceNotifier = getVeniceNotifier();
149+
if (veniceNotifier == null) {
150+
LOGGER.error(
151+
"VeniceNotifier is not available, cannot write incremental push status for replica: {}",
152+
Utils.getReplicaId(kafkaTopic, partitionId));
153+
return;
154+
}
155+
156+
// update the incremental push info to push status store per inc push version
157+
for (Map.Entry<String, IncrementalPushReplicaStatus> entry: incPushVersionToStatusMap.entrySet()) {
158+
String incPushVersion = entry.getKey();
159+
IncrementalPushReplicaStatus replicaStatus = entry.getValue();
160+
writeIncrementalPushStatusToPushStatusStore(
161+
veniceNotifier,
162+
kafkaTopic,
163+
partitionId,
164+
incPushVersion,
165+
replicaStatus);
166+
}
167+
168+
LOGGER.info(
169+
"Successfully updated incremental push info to push status store for {}, partition: {}, incPushVersionToStatusMap: {}",
170+
storeName,
171+
partitionId,
172+
incPushVersionToStatusMap);
173+
}
174+
175+
/**
176+
* Helper method to safely get VeniceNotifier from supplier
177+
* @return VeniceNotifier instance or null if not available
178+
*/
179+
private VeniceNotifier getVeniceNotifier() {
180+
if (veniceNotifierSupplier == null) {
181+
return null;
182+
}
183+
184+
try {
185+
return veniceNotifierSupplier.get();
186+
} catch (Exception e) {
187+
return null;
188+
}
189+
}
190+
191+
private void writeIncrementalPushStatusToPushStatusStore(
192+
VeniceNotifier veniceNotifier,
193+
String kafkaTopic,
194+
int partitionId,
195+
String incPushVersion,
196+
IncrementalPushReplicaStatus incrementalPushReplicaStatus) {
197+
198+
ExecutionStatus executionStatus = ExecutionStatus.valueOf(incrementalPushReplicaStatus.status);
199+
200+
// Notify based on execution status
201+
if (executionStatus == ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED) {
202+
veniceNotifier.startOfIncrementalPushReceived(kafkaTopic, partitionId, null, incPushVersion);
203+
} else if (executionStatus == ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED) {
204+
veniceNotifier.endOfIncrementalPushReceived(kafkaTopic, partitionId, null, incPushVersion);
205+
} else {
206+
LOGGER.warn(
207+
"Unexpected execution status {} for incremental push. Replica: {}, inc push version: {}",
208+
executionStatus,
209+
Utils.getReplicaId(kafkaTopic, partitionId),
210+
incPushVersion);
211+
}
212+
}
213+
115214
private void updateStorageVersionState(
116215
StorageMetadataService storageMetadataService,
117216
BlobTransferPartitionMetadata transferPartitionMetadata) {

clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class HelixParticipationService extends AbstractVeniceService
8383
private final CompletableFuture<SafeHelixManager> managerFuture; // complete this future when the manager is connected
8484
private final CompletableFuture<HelixPartitionStatusAccessor> partitionPushStatusAccessorFuture;
8585
private PushStatusStoreWriter statusStoreWriter;
86+
private PushStatusNotifier pushStatusNotifier;
8687
private ZkClient zkClient;
8788
private SafeHelixManager helixManager;
8889
private AbstractStateModelFactory leaderFollowerParticipantModelFactory;
@@ -404,6 +405,7 @@ private void asyncStart() {
404405
helixReadOnlyStoreRepository,
405406
instance.getNodeId(),
406407
veniceServerConfig.getIncrementalPushStatusWriteMode());
408+
this.pushStatusNotifier = pushStatusNotifier;
407409

408410
ingestionBackend.getStoreIngestionService().addIngestionNotifier(pushStatusNotifier);
409411

@@ -457,6 +459,10 @@ public PushStatusStoreWriter getStatusStoreWriter() {
457459
return statusStoreWriter;
458460
}
459461

462+
public PushStatusNotifier getPushStatusNotifier() {
463+
return pushStatusNotifier;
464+
}
465+
460466
public ReadOnlyStoreRepository getHelixReadOnlyStoreRepository() {
461467
return helixReadOnlyStoreRepository;
462468
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3247,11 +3247,12 @@ protected void processStartOfIncrementalPush(
32473247
if (!batchReportIncPushStatusEnabled || partitionConsumptionState.isComplete()) {
32483248
ingestionNotificationDispatcher
32493249
.reportStartOfIncrementalPushReceived(partitionConsumptionState, startVersion.toString());
3250-
partitionConsumptionState.setTrackingIncrementalPushStatus(
3251-
startVersion.toString(),
3252-
ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED.getValue(),
3253-
pubSubMessageTime);
32543250
}
3251+
3252+
partitionConsumptionState.setTrackingIncrementalPushStatus(
3253+
startVersion.toString(),
3254+
ExecutionStatus.START_OF_INCREMENTAL_PUSH_RECEIVED.getValue(),
3255+
pubSubMessageTime);
32553256
}
32563257

32573258
protected void processEndOfIncrementalPush(
@@ -3262,18 +3263,18 @@ protected void processEndOfIncrementalPush(
32623263
if (!batchReportIncPushStatusEnabled || partitionConsumptionState.isComplete()) {
32633264
ingestionNotificationDispatcher
32643265
.reportEndOfIncrementalPushReceived(partitionConsumptionState, endVersion.toString());
3265-
3266-
partitionConsumptionState.setTrackingIncrementalPushStatus(
3267-
endVersion.toString(),
3268-
ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED.getValue(),
3269-
pubSubMessageTime);
32703266
} else {
32713267
LOGGER.info(
32723268
"Adding incremental push: {} to pending batch report list for replica: {}.",
32733269
endVersion.toString(),
32743270
partitionConsumptionState.getReplicaId());
32753271
partitionConsumptionState.addIncPushVersionToPendingReportList(endVersion.toString());
32763272
}
3273+
3274+
partitionConsumptionState.setTrackingIncrementalPushStatus(
3275+
endVersion.toString(),
3276+
ExecutionStatus.END_OF_INCREMENTAL_PUSH_RECEIVED.getValue(),
3277+
pubSubMessageTime);
32773278
}
32783279

32793280
/**

clients/da-vinci-client/src/test/java/com/linkedin/davinci/blobtransfer/TestBlobTransferManagerBuilder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static org.mockito.Mockito.mock;
44

5+
import com.linkedin.davinci.notifier.VeniceNotifier;
56
import com.linkedin.davinci.stats.AggVersionedBlobTransferStats;
67
import com.linkedin.davinci.storage.StorageEngineRepository;
78
import com.linkedin.davinci.storage.StorageMetadataService;
@@ -31,6 +32,7 @@ public void testInitBlobTransferManager() throws IOException {
3132
StorageEngineRepository storageEngineRepository = mock(StorageEngineRepository.class);
3233
ClientConfig clientConfig = mock(ClientConfig.class);
3334
SSLFactory sslFactory = SslUtils.getVeniceLocalSslFactory();
35+
VeniceNotifier notifier = mock(VeniceNotifier.class);
3436

3537
BlobTransferAclHandler blobTransferAclHandler = mock(BlobTransferAclHandler.class);
3638

@@ -58,6 +60,7 @@ public void testInitBlobTransferManager() throws IOException {
5860
.setAggVersionedBlobTransferStats(blobTransferStats)
5961
.setBlobTransferSSLFactory(Optional.of(sslFactory))
6062
.setBlobTransferAclHandler(Optional.of(blobTransferAclHandler))
63+
.setPushStatusNotifierSupplier(() -> notifier)
6164
.build();
6265

6366
Assert.assertNotNull(blobTransferManager);
@@ -100,6 +103,7 @@ public void testFailToCreateBlobTransferManager() throws IOException {
100103
.setReadOnlyStoreRepository(readOnlyStoreRepository)
101104
.setStorageEngineRepository(storageEngineRepository)
102105
.setAggVersionedBlobTransferStats(blobTransferStats)
106+
.setPushStatusNotifierSupplier(() -> null)
103107
.build();
104108
Assert.assertNull(blobTransferManager);
105109
} catch (IllegalArgumentException e) {
@@ -117,6 +121,7 @@ public void testFailToCreateBlobTransferManager() throws IOException {
117121
.setReadOnlyStoreRepository(readOnlyStoreRepository)
118122
.setStorageEngineRepository(storageEngineRepository)
119123
.setAggVersionedBlobTransferStats(blobTransferStats)
124+
.setPushStatusNotifierSupplier(() -> null)
120125
.build();
121126
Assert.assertNull(blobTransferManager1);
122127
} catch (IllegalArgumentException e) {
@@ -133,6 +138,7 @@ public void testFailToCreateBlobTransferManager() throws IOException {
133138
.setReadOnlyStoreRepository(readOnlyStoreRepository)
134139
.setStorageEngineRepository(storageEngineRepository)
135140
.setAggVersionedBlobTransferStats(blobTransferStats)
141+
.setPushStatusNotifierSupplier(() -> null)
136142
.build();
137143
Assert.assertNull(blobTransferManager2);
138144
} catch (IllegalArgumentException e) {
@@ -154,6 +160,7 @@ public void testFailToCreateBlobTransferManager() throws IOException {
154160
.setAggVersionedBlobTransferStats(blobTransferStats)
155161
.setBlobTransferAclHandler(null)
156162
.setBlobTransferSSLFactory(Optional.ofNullable(sslFactory))
163+
.setPushStatusNotifierSupplier(() -> null)
157164
.build();
158165
Assert.assertNull(blobTransferManager3);
159166
} catch (IllegalArgumentException e) {
@@ -173,6 +180,7 @@ public void testFailToCreateBlobTransferManager() throws IOException {
173180
.setAggVersionedBlobTransferStats(blobTransferStats)
174181
.setBlobTransferAclHandler(Optional.empty())
175182
.setBlobTransferSSLFactory(Optional.of(sslFactory))
183+
.setPushStatusNotifierSupplier(() -> null)
176184
.build();
177185
Assert.assertNull(blobTransferManager4);
178186
} catch (IllegalArgumentException e) {

0 commit comments

Comments
 (0)