Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4112,6 +4112,18 @@ public void deleteOneStoreVersion(String clusterName, String storeName, int vers
deleteOneStoreVersion(clusterName, storeName, versionNumber, false);
}

/**
* Check if truncating topic is needed; If it's child fabrics or parent fabric with topic write needed, return true;
* Otherwise, return false
* @param clusterName
* @return
*/
public boolean isTruncatingTopicNeeded(String clusterName) {
return !multiClusterConfigs.isParent() || multiClusterConfigs.getControllerConfig(clusterName)
.getConcurrentPushDetectionStrategy()
.isTopicWriteNeeded();
}

private void deleteOneStoreVersion(String clusterName, String storeName, int versionNumber, boolean isForcedDelete) {
HelixVeniceClusterResources resources = getHelixVeniceClusterResources(clusterName);
try (AutoCloseableLock ignore = resources.getClusterLockManager().createStoreWriteLock(storeName)) {
Expand Down Expand Up @@ -4145,10 +4157,14 @@ private void deleteOneStoreVersion(String clusterName, String storeName, int ver
if (!store.isMigrating()) {
// Not using deletedVersion.get().kafkaTopicName() because it's incorrect for Zk shared stores.
String versionTopicName = Version.composeKafkaTopic(storeName, deletedVersion.get().getNumber());
if (fatalDataValidationFailureRetentionMs != -1 && hasFatalDataValidationError) {
truncateKafkaTopic(versionTopicName, fatalDataValidationFailureRetentionMs);
} else {
truncateKafkaTopic(versionTopicName);

// skip truncating topic if it's parent controller and topic write is not needed
if (isTruncatingTopicNeeded(clusterName)) {
if (fatalDataValidationFailureRetentionMs != -1 && hasFatalDataValidationError) {
truncateKafkaTopic(versionTopicName, fatalDataValidationFailureRetentionMs);
} else {
truncateKafkaTopic(versionTopicName);
}
}

if (deletedVersion.get().getPushType().isStreamReprocessing()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2400,7 +2400,7 @@ public void rollForwardToFutureVersion(String clusterName, String storeName, Str
Version futureVersion = getStore(clusterName, storeName).getVersion(futureVersionBeforeRollForward);
boolean onlyDeferredSwap =
futureVersion.isVersionSwapDeferred() && StringUtils.isEmpty(futureVersion.getTargetSwapRegion());
if (onlyDeferredSwap) {
if (onlyDeferredSwap && getVeniceHelixAdmin().isTruncatingTopicNeeded(clusterName)) {
LOGGER.info(
"Truncating topic {} after child controllers tried to roll forward to not block new versions",
kafkaTopic);
Expand Down Expand Up @@ -4449,7 +4449,8 @@ private void truncateTopicsOptionally(

if ((failedBatchPush || nonIncPushBatchSuccess && !isDeferredVersionSwap || incPushEnabledBatchPushSuccess
|| isTargetRegionPushWithDeferredSwap)
&& !getMultiClusterConfigs().getCommonConfig().disableParentTopicTruncationUponCompletion()) {
&& !getMultiClusterConfigs().getCommonConfig().disableParentTopicTruncationUponCompletion()
&& getVeniceHelixAdmin().isTruncatingTopicNeeded(clusterName)) {
LOGGER.info("Truncating kafka topic: {} with job status: {}", kafkaTopic, currentReturnStatus);
truncateKafkaTopic(kafkaTopic);
if (version != null && version.getPushType().isStreamReprocessing()) {
Expand Down Expand Up @@ -4810,7 +4811,7 @@ public void killOfflinePush(String clusterName, String kafkaTopic, boolean isFor
*
* The reason is that every errored push will call this function.
*/
if (maxErroredTopicNumToKeep == 0) {
if (getVeniceHelixAdmin().isTruncatingTopicNeeded(clusterName) && maxErroredTopicNumToKeep == 0) {
// Truncate Kafka topic
LOGGER.info("Truncating topic when kill offline push job, topic: {}", kafkaTopic);
truncateKafkaTopic(kafkaTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@ public void testKillOfflinePushJob() {
Optional.of(LATEST_SCHEMA_ID_FOR_ADMIN_OPERATION)));
Store store = mock(Store.class);
doReturn(store).when(internalAdmin).getStore(clusterName, pubSubTopic.getStoreName());
doReturn(true).when(internalAdmin).isTruncatingTopicNeeded(clusterName);

parentAdmin.initStorageCluster(clusterName);
parentAdmin.killOfflinePush(clusterName, pubSubTopic.getName(), false);
Expand Down Expand Up @@ -1746,6 +1747,7 @@ public void testGetExecutionStatus() {
HelixVeniceClusterResources resources = mock(HelixVeniceClusterResources.class);
doReturn(mock(ClusterLockManager.class)).when(resources).getClusterLockManager();
doReturn(resources).when(internalAdmin).getHelixVeniceClusterResources(anyString());
doReturn(true).when(internalAdmin).isTruncatingTopicNeeded(clusterName);
ReadWriteStoreRepository repository = mock(ReadWriteStoreRepository.class);
doReturn(repository).when(resources).getStoreMetadataRepository();
doReturn(store).when(repository).getStore(anyString());
Expand Down Expand Up @@ -3228,6 +3230,7 @@ public void testRollForwardSuccess() {

doNothing().when(adminSpy)
.sendAdminMessageAndWaitForConsumed(eq(clusterName), eq(storeName), any(AdminOperation.class));
doReturn(true).when(internalAdmin).isTruncatingTopicNeeded(clusterName);
doReturn(true).when(adminSpy).truncateKafkaTopic(Version.composeKafkaTopic(storeName, 5));

Map<String, Integer> after = Collections.singletonMap("r1", 5);
Expand Down Expand Up @@ -3259,6 +3262,8 @@ public void testRollForwardPartialFailure() {

doNothing().when(adminSpy)
.sendAdminMessageAndWaitForConsumed(eq(clusterName), eq(storeName), any(AdminOperation.class));

doReturn(true).when(internalAdmin).isTruncatingTopicNeeded(clusterName);
doReturn(true).when(adminSpy).truncateKafkaTopic(anyString());

for (Map.Entry<String, ControllerClient> entry: controllerClients.entrySet()) {
Expand Down
Loading