Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import com.linkedin.venice.kafka.protocol.enums.ControlMessageType;
import com.linkedin.venice.meta.BackupStrategy;
import com.linkedin.venice.meta.BufferReplayPolicy;
import com.linkedin.venice.meta.ConcurrentPushDetectionStrategy;
import com.linkedin.venice.meta.DarkClusterConfig;
import com.linkedin.venice.meta.DataReplicationPolicy;
import com.linkedin.venice.meta.ETLStoreConfig;
Expand Down Expand Up @@ -4569,6 +4570,17 @@ private boolean truncateKafkaTopicInParentFabrics(String kafkaTopicName) {
boolean allTopicsAreDeleted = true;
Set<String> parentFabrics = multiClusterConfigs.getParentFabrics();
for (String parentFabric: parentFabrics) {
ConcurrentPushDetectionStrategy concurrentPushDetectionStrategy =
multiClusterConfigs.getControllerConfig(parentFabric).getConcurrentPushDetectionStrategy();
if (!concurrentPushDetectionStrategy.isTopicWriteNeeded()) {
LOGGER.info(
"Skipping truncation of topic: {} in parent fabric: {} since the topic was not created there due to"
+ "concurrent push detection strategy: {}",
kafkaTopicName,
parentFabric,
concurrentPushDetectionStrategy.name());
continue;
}
String kafkaBootstrapServerAddress = getPubSubBootstrapServersForRegion(parentFabric);
allTopicsAreDeleted &= truncateKafkaTopic(
getTopicManager(kafkaBootstrapServerAddress),
Expand Down
Loading