Skip to content

Commit c619d31

Browse files
committed
[server][dvc] Do not drop PCS from partitionConsumptionStateMap for non-Helix triggered actions
Remove PCS if it's dvc client Fix flaky test Fix another flaky test Address review comments Delete test that's already covered in other tests Redistributed tests
1 parent b62ce22 commit c619d31

File tree

1 file changed

+5
-4
lines changed

1 file changed

+5
-4
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2368,11 +2368,12 @@ protected void processCommonConsumerAction(ConsumerAction consumerAction) throws
23682368
* {@link #kafkaDataValidationService}, we would like to drain all the buffered messages before cleaning up those
23692369
* two variables to avoid the race condition.
23702370
*/
2371-
partitionConsumptionStateMap.remove(partition);
2372-
if (consumerAction.isHelixTriggeredAction()) {
2371+
if (isDaVinciClient() || consumerAction.isHelixTriggeredAction()) {
23732372
LOGGER.info(
2374-
"Removing tracking of replica: {} from storage utilization manager as this UNSUBSCRIBE is helix triggered action",
2375-
topicPartition);
2373+
"Removing replica: {} from PCS map and storage utilization manager. Trigger: {}",
2374+
topicPartition,
2375+
isDaVinciClient() ? "DaVinci client unsubscribe" : "Helix-triggered unsubscribe");
2376+
partitionConsumptionStateMap.remove(partition);
23762377
storageUtilizationManager.removePartition(partition);
23772378
}
23782379
getDataIntegrityValidator().clearPartition(partition);

0 commit comments

Comments
 (0)