Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4112,6 +4112,18 @@ public void deleteOneStoreVersion(String clusterName, String storeName, int vers
deleteOneStoreVersion(clusterName, storeName, versionNumber, false);
}

/**
* Check if we should skip truncating topic. If it's parent fabrics and the topic write is needed, return true;
* Otherwise, return false
* @param clusterName the cluster name to check
* @return true if topic truncation is needed, false otherwise
*/
public boolean shouldSkipTruncatingTopic(String clusterName) {
return isParent() && !getMultiClusterConfigs().getControllerConfig(clusterName)
.getConcurrentPushDetectionStrategy()
.isTopicWriteNeeded();
}

private void deleteOneStoreVersion(String clusterName, String storeName, int versionNumber, boolean isForcedDelete) {
HelixVeniceClusterResources resources = getHelixVeniceClusterResources(clusterName);
try (AutoCloseableLock ignore = resources.getClusterLockManager().createStoreWriteLock(storeName)) {
Expand Down Expand Up @@ -4145,10 +4157,14 @@ private void deleteOneStoreVersion(String clusterName, String storeName, int ver
if (!store.isMigrating()) {
// Not using deletedVersion.get().kafkaTopicName() because it's incorrect for Zk shared stores.
String versionTopicName = Version.composeKafkaTopic(storeName, deletedVersion.get().getNumber());
if (fatalDataValidationFailureRetentionMs != -1 && hasFatalDataValidationError) {
truncateKafkaTopic(versionTopicName, fatalDataValidationFailureRetentionMs);
} else {
truncateKafkaTopic(versionTopicName);

// skip truncating topic if it's parent controller and topic write is not needed
if (!shouldSkipTruncatingTopic(clusterName)) {
if (fatalDataValidationFailureRetentionMs != -1 && hasFatalDataValidationError) {
truncateKafkaTopic(versionTopicName, fatalDataValidationFailureRetentionMs);
} else {
truncateKafkaTopic(versionTopicName);
}
}

if (deletedVersion.get().getPushType().isStreamReprocessing()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2400,7 +2400,7 @@ public void rollForwardToFutureVersion(String clusterName, String storeName, Str
Version futureVersion = getStore(clusterName, storeName).getVersion(futureVersionBeforeRollForward);
boolean onlyDeferredSwap =
futureVersion.isVersionSwapDeferred() && StringUtils.isEmpty(futureVersion.getTargetSwapRegion());
if (onlyDeferredSwap) {
if (onlyDeferredSwap && !getVeniceHelixAdmin().shouldSkipTruncatingTopic(clusterName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in parent controller just checking getConcurrentPushDetectionStrategy should be enough.

LOGGER.info(
"Truncating topic {} after child controllers tried to roll forward to not block new versions",
kafkaTopic);
Expand Down Expand Up @@ -4449,7 +4449,8 @@ private void truncateTopicsOptionally(

if ((failedBatchPush || nonIncPushBatchSuccess && !isDeferredVersionSwap || incPushEnabledBatchPushSuccess
|| isTargetRegionPushWithDeferredSwap)
&& !getMultiClusterConfigs().getCommonConfig().disableParentTopicTruncationUponCompletion()) {
&& !getMultiClusterConfigs().getCommonConfig().disableParentTopicTruncationUponCompletion()
&& !getVeniceHelixAdmin().shouldSkipTruncatingTopic(clusterName)) {
LOGGER.info("Truncating kafka topic: {} with job status: {}", kafkaTopic, currentReturnStatus);
truncateKafkaTopic(kafkaTopic);
if (version != null && version.getPushType().isStreamReprocessing()) {
Expand Down Expand Up @@ -4813,7 +4814,9 @@ public void killOfflinePush(String clusterName, String kafkaTopic, boolean isFor
if (maxErroredTopicNumToKeep == 0) {
// Truncate Kafka topic
LOGGER.info("Truncating topic when kill offline push job, topic: {}", kafkaTopic);
truncateKafkaTopic(kafkaTopic);
if (!getVeniceHelixAdmin().shouldSkipTruncatingTopic(clusterName)) {
truncateKafkaTopic(kafkaTopic);
}
PubSubTopic correspondingStreamReprocessingTopic =
pubSubTopicRepository.getTopic(Version.composeStreamReprocessingTopicFromVersionTopic(kafkaTopic));
if (getTopicManager().containsTopic(correspondingStreamReprocessingTopic)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.linkedin.venice.helix.SafeHelixDataAccessor;
import com.linkedin.venice.helix.SafeHelixManager;
import com.linkedin.venice.ingestion.control.RealTimeTopicSwitcher;
import com.linkedin.venice.meta.ConcurrentPushDetectionStrategy;
import com.linkedin.venice.meta.HybridStoreConfig;
import com.linkedin.venice.meta.Instance;
import com.linkedin.venice.meta.MaterializedViewParameters;
Expand Down Expand Up @@ -1658,4 +1659,57 @@ public void testCheckStoreGraveyardForRecreation() {
assertTrue(customException.getMessage().contains("Required waiting period: 3600 seconds"));
}

@Test
public void testShouldSkipTruncatingTopicForChildControllers() {
VeniceHelixAdmin admin = mock(VeniceHelixAdmin.class);
VeniceControllerClusterConfig config = mock(VeniceControllerClusterConfig.class);

Map<String, VeniceControllerClusterConfig> configMap = new HashMap<>();
configMap.put(clusterName, config);
doReturn(new VeniceControllerMultiClusterConfig(configMap)).when(admin).getMultiClusterConfigs();
doReturn(false).when(admin).isParent();
doCallRealMethod().when(admin).shouldSkipTruncatingTopic(clusterName);

boolean shouldSkip = admin.shouldSkipTruncatingTopic(clusterName);
verify(admin, times(1)).isParent();
assertFalse(shouldSkip);
}

@Test
public void testShouldSkipTruncatingTopicForParentControllersTopicWriteNeeded() {
VeniceHelixAdmin admin = mock(VeniceHelixAdmin.class);
VeniceControllerClusterConfig config = mock(VeniceControllerClusterConfig.class);

Map<String, VeniceControllerClusterConfig> configMap = new HashMap<>();
doReturn(ConcurrentPushDetectionStrategy.TOPIC_BASED_ONLY).when(config).getConcurrentPushDetectionStrategy();
configMap.put(clusterName, config);
doReturn(new VeniceControllerMultiClusterConfig(configMap)).when(admin).getMultiClusterConfigs();
doReturn(true).when(admin).isParent();
doCallRealMethod().when(admin).shouldSkipTruncatingTopic(clusterName);

boolean shouldSkip = admin.shouldSkipTruncatingTopic(clusterName);
verify(admin, times(1)).getMultiClusterConfigs();
verify(admin, times(1)).isParent();
assertFalse(shouldSkip);
}

@Test
public void testShouldSkipTruncatingTopicForParentControllersTopicWriteNotNeeded() {
VeniceHelixAdmin admin = mock(VeniceHelixAdmin.class);
VeniceControllerClusterConfig config = mock(VeniceControllerClusterConfig.class);

Map<String, VeniceControllerClusterConfig> configMap = new HashMap<>();
doReturn(ConcurrentPushDetectionStrategy.PARENT_VERSION_STATUS_ONLY).when(config)
.getConcurrentPushDetectionStrategy();
configMap.put(clusterName, config);
doReturn(new VeniceControllerMultiClusterConfig(configMap)).when(admin).getMultiClusterConfigs();
doReturn(true).when(admin).isParent();
doCallRealMethod().when(admin).shouldSkipTruncatingTopic(clusterName);

boolean shouldSkip = admin.shouldSkipTruncatingTopic(clusterName);
verify(admin, times(1)).getMultiClusterConfigs();
verify(admin, times(1)).isParent();
assertTrue(shouldSkip);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@ public void testKillOfflinePushJob() {
Optional.of(LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION)));
Store store = mock(Store.class);
doReturn(store).when(internalAdmin).getStore(clusterName, pubSubTopic.getStoreName());
doReturn(false).when(internalAdmin).shouldSkipTruncatingTopic(clusterName);

parentAdmin.initStorageCluster(clusterName);
parentAdmin.killOfflinePush(clusterName, pubSubTopic.getName(), false);
Expand Down Expand Up @@ -1746,6 +1747,7 @@ public void testGetExecutionStatus() {
HelixVeniceClusterResources resources = mock(HelixVeniceClusterResources.class);
doReturn(mock(ClusterLockManager.class)).when(resources).getClusterLockManager();
doReturn(resources).when(internalAdmin).getHelixVeniceClusterResources(anyString());
doReturn(false).when(internalAdmin).shouldSkipTruncatingTopic(clusterName);
ReadWriteStoreRepository repository = mock(ReadWriteStoreRepository.class);
doReturn(repository).when(resources).getStoreMetadataRepository();
doReturn(store).when(repository).getStore(anyString());
Expand Down Expand Up @@ -3228,6 +3230,7 @@ public void testRollForwardSuccess() {

doNothing().when(adminSpy)
.sendAdminMessageAndWaitForConsumed(eq(clusterName), eq(storeName), any(AdminOperation.class));
doReturn(false).when(internalAdmin).shouldSkipTruncatingTopic(clusterName);
doReturn(true).when(adminSpy).truncateKafkaTopic(Version.composeKafkaTopic(storeName, 5));

Map<String, Integer> after = Collections.singletonMap("r1", 5);
Expand Down Expand Up @@ -3259,6 +3262,8 @@ public void testRollForwardPartialFailure() {

doNothing().when(adminSpy)
.sendAdminMessageAndWaitForConsumed(eq(clusterName), eq(storeName), any(AdminOperation.class));

doReturn(false).when(internalAdmin).shouldSkipTruncatingTopic(clusterName);
doReturn(true).when(adminSpy).truncateKafkaTopic(anyString());

for (Map.Entry<String, ControllerClient> entry: controllerClients.entrySet()) {
Expand Down
Loading