Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,25 @@ public List<Map<String, String>> evaluateCriteria(Criteria recipientCriteria,
String resourceName = recipientCriteria.getResource();
String instanceName = recipientCriteria.getInstanceName();

// If only instance name is specified (no resource/partition/state),
// use LIVEINSTANCES for efficient lookup. This is safe because we filter by
// liveParticipants anyway, so querying LIVEINSTANCES directly is more efficient.
if (Strings.isNullOrEmpty(resourceName)
&& Strings.isNullOrEmpty(recipientCriteria.getPartition())
&& Strings.isNullOrEmpty(recipientCriteria.getPartitionState())
&& dataSource == DataSource.EXTERNALVIEW) {
dataSource = DataSource.LIVEINSTANCES;
}

// Since we always filter by liveParticipants anyway, if the dataSource
// is INSTANCES and we're querying for all instances, just use
// LIVEINSTANCES directly to avoid redundant ZK reads and filtering
if (dataSource == DataSource.INSTANCES
&& (Strings.isNullOrEmpty(instanceName) || instanceName.equals(MATCH_ALL_SYM)
|| instanceName.equals("*"))) {
dataSource = DataSource.LIVEINSTANCES;
}

switch (dataSource) {
case EXTERNALVIEW:
properties = getProperty(accessor, resourceName, keyBuilder.externalViews(),
Expand All @@ -93,18 +112,32 @@ public List<Map<String, String>> evaluateCriteria(Criteria recipientCriteria,
// flatten the data
List<ZNRecordRow> allRows = ZNRecordRow.flatten(HelixProperty.convertToList(properties));

// save the matches
// TODO: Apply strict check on the getChildValuesMap() call.
// TODO: For backward compatibility, allow partial read for now. This may reduce the
// TODO: match result eventually.
Set<String> liveParticipants =
accessor.getChildValuesMap(keyBuilder.liveInstances(), false).keySet();
// Only fetch liveParticipants if dataSource is not already LIVEINSTANCES
// When dataSource is LIVEINSTANCES, all rows are already live by definition
Set<String> liveParticipants = null;
if (dataSource != DataSource.LIVEINSTANCES) {
// TODO: Apply strict check on the getChildValuesMap() call.
// TODO: For backward compatibility, allow partial read for now. This may reduce the
// TODO: match result eventually.
Map<String, HelixProperty> liveInstanceMap =
accessor.getChildValuesMap(keyBuilder.liveInstances(), false);
liveParticipants = liveInstanceMap != null ? liveInstanceMap.keySet() : Collections.emptySet();

//if there are no live instances, no point in filtering
if (liveParticipants.isEmpty()) {
logger.info("No live participants found");
return Lists.newArrayList();
}
}

List<ZNRecordRow> result = Lists.newArrayList();
for (ZNRecordRow row : allRows) {
// The participant instance name is stored in the return value of either getRecordId() or
// getMapSubKey()
if (rowMatches(recipientCriteria, row) && (liveParticipants.contains(row.getRecordId())
|| liveParticipants.contains(row.getMapSubKey()))) {
boolean isLive = (liveParticipants == null)
|| liveParticipants.contains(row.getRecordId())
|| liveParticipants.contains(row.getMapSubKey());
if (rowMatches(recipientCriteria, row) && isLive) {
result.add(row);
}
}
Expand All @@ -125,7 +158,7 @@ public List<Map<String, String>> evaluateCriteria(Criteria recipientCriteria,
!recipientCriteria.getPartitionState().equals("") ? row.getMapValue() : "");
selected.add(resultRow);
}
logger.info("Query returned " + selected.size() + " rows");
logger.info("Query returned {} rows", selected.size());
return Lists.newArrayList(selected);
}

Expand Down Expand Up @@ -153,7 +186,7 @@ && stringMatches(partitionName, Strings.nullToEmpty(row.getMapKey()))
* @return Java matches expression (i.e. contains ".*?"s and '.'s)
*/
private String normalizePattern(String pattern) {
if (pattern == null || pattern.equals("") || pattern.equals("*")) {
if (Strings.isNullOrEmpty(pattern) || pattern.equals("*")) {
pattern = "%";
}
StringBuilder builder = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,30 @@ public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg,
// Clean up if workflow marked for deletion
TargetState targetState = workflowCfg.getTargetState();
if (targetState == TargetState.DELETE) {
LOG.debug("Workflow is marked as deleted {} cleaning up the workflow context.", workflow);
// Always validate DELETE operations against fresh ZooKeeper state to prevent stale cache issues
// This is critical because events like MessageChange don't refresh ResourceConfig cache
// The extra ZK read is acceptable since DELETE operations are relatively rare and destructive
WorkflowConfig freshConfig = getFreshWorkflowConfig(workflow);

if (freshConfig == null) {
LOG.info("Workflow {} already deleted from ZooKeeper, skipping cleanup", workflow);
return;
}

if (freshConfig.getTargetState() != TargetState.DELETE) {
// This likely indicates a race condition where the workflow was recreated or modified
// between cache refresh cycles
LOG.warn("Stale DELETE state detected in cache for workflow {}. " +
"Fresh state from ZooKeeper: {}. Skipping deletion to prevent data loss.",
workflow, freshConfig.getTargetState());
// force cache refresh for next pipeline run to get the latest state
_clusterDataCache.requireFullRefresh();
return;
}

// DELETE state confirmed from fresh read - safe to proceed
LOG.info("DELETE state confirmed for workflow {} from fresh ZooKeeper read. Proceeding with cleanup.",
workflow);
updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
cleanupWorkflow(workflow);
return;
Expand Down Expand Up @@ -170,6 +193,15 @@ && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
}

/**
* Get fresh workflow config from ZooKeeper. Protected for testability.
* @param workflow the workflow name
* @return fresh WorkflowConfig from ZooKeeper, or null if not found
*/
protected WorkflowConfig getFreshWorkflowConfig(String workflow) {
return TaskUtil.getWorkflowConfig(_manager.getHelixDataAccessor(), workflow);
}

private void updateInflightJobs(String workflow, WorkflowContext workflowCtx,
CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleOutput) {
// Update jobs already inflight
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ public <T extends HelixProperty> T getProperty(PropertyKey key) {
PropertyType type = key.getType();
if (type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES) {
return (T) new ExternalView(_externalView);
} else if (type == PropertyType.LIVEINSTANCES) {
// Support querying for a specific live instance
String path = key.getPath();
String instanceName = path.substring(path.lastIndexOf('/') + 1);
for (ZNRecord record : _liveInstances) {
if (record.getId().equals(instanceName)) {
return (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record);
}
}
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package org.apache.helix.task;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.lang.reflect.Field;
import java.util.Collections;

import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.common.caches.TaskDataCache;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

/**
* Unit tests for WorkflowDispatcher to verify the stale cache fix.
* Tests focus on verifying behavior differences between MessageChange and ResourceConfigChange events.
*/
public class TestWorkflowDispatcherStaleCacheFix {

@Mock private HelixManager manager;
@Mock private HelixDataAccessor accessor;
@Mock private PropertyKey.Builder keyBuilder;
@Mock private WorkflowControllerDataProvider clusterDataCache;
@Mock private CurrentStateOutput currentStateOutput;
@Mock private BestPossibleStateOutput bestPossibleOutput;
@Mock private ClusterStatusMonitor clusterStatusMonitor;
@Mock private TaskDataCache taskDataCache;
@Mock private ZkHelixPropertyStore<ZNRecord> propertyStore;

private WorkflowDispatcher dispatcher;
private AutoCloseable mocks;

@BeforeMethod
public void setUp() throws Exception {
mocks = MockitoAnnotations.openMocks(this);

dispatcher = spy(new WorkflowDispatcher());
dispatcher.init(manager);

Field cacheField = WorkflowDispatcher.class.getDeclaredField("_clusterDataCache");
cacheField.setAccessible(true);
cacheField.set(dispatcher, clusterDataCache);

Field monitorField = AbstractTaskDispatcher.class.getDeclaredField("_clusterStatusMonitor");
monitorField.setAccessible(true);
monitorField.set(dispatcher, clusterStatusMonitor);

when(manager.getHelixDataAccessor()).thenReturn(accessor);
when(accessor.keyBuilder()).thenReturn(keyBuilder);
when(manager.getClusterName()).thenReturn("TestCluster");
when(clusterDataCache.getTaskDataCache()).thenReturn(taskDataCache);
when(manager.getHelixPropertyStore()).thenReturn(propertyStore);
}

@AfterMethod
public void tearDown() throws Exception {
if (mocks != null) {
mocks.close();
}
}

/**
* MessageChange event with stale DELETE cache triggers requireFullRefresh.
* This is the primary bug scenario - MessageChange events don't refresh ResourceConfig cache.
*/
@Test
public void testMessageChangeEventWithStaleDelete() {
String workflowName = "TestWorkflow";

WorkflowConfig staleDeleteConfig = mock(WorkflowConfig.class);
when(staleDeleteConfig.getTargetState()).thenReturn(TargetState.DELETE);
when(staleDeleteConfig.getJobDag()).thenReturn(mock(JobDag.class));
when(staleDeleteConfig.getJobDag().getAllNodes()).thenReturn(Collections.emptySet());

WorkflowConfig freshStartConfig = mock(WorkflowConfig.class);
when(freshStartConfig.getTargetState()).thenReturn(TargetState.START);
doReturn(freshStartConfig).when(dispatcher).getFreshWorkflowConfig(workflowName);

WorkflowContext workflowContext = mock(WorkflowContext.class);
when(workflowContext.getFinishTime()).thenReturn(-1L);
when(workflowContext.getJobStates()).thenReturn(Collections.emptyMap());

dispatcher.updateWorkflowStatus(workflowName, staleDeleteConfig, workflowContext,
currentStateOutput, bestPossibleOutput);

// Verify: Fresh read was performed
verify(dispatcher, times(1)).getFreshWorkflowConfig(workflowName);

// Verify: requireFullRefresh was called due to stale DELETE detection
verify(clusterDataCache, times(1)).requireFullRefresh();

// Verify: Workflow was NOT deleted
verify(accessor, never()).removeProperty(any(PropertyKey.class));
}

/**
* ResourceConfigChange event with fresh cache doesn't need validation.
* ResourceConfigChange events refresh the cache, so no stale DELETE issue.
*/
@Test
public void testResourceConfigChangeEventWithFreshCache() {
String workflowName = "TestWorkflow";

WorkflowConfig freshStartConfig = mock(WorkflowConfig.class);
when(freshStartConfig.getTargetState()).thenReturn(TargetState.START);
when(freshStartConfig.getJobDag()).thenReturn(mock(JobDag.class));
when(freshStartConfig.getJobDag().getAllNodes()).thenReturn(Collections.emptySet());

WorkflowContext workflowContext = mock(WorkflowContext.class);
when(workflowContext.getFinishTime()).thenReturn(-1L);
when(workflowContext.getJobStates()).thenReturn(Collections.emptyMap());

dispatcher.updateWorkflowStatus(workflowName, freshStartConfig, workflowContext,
currentStateOutput, bestPossibleOutput);

// Verify: No fresh read needed (state is not DELETE)
verify(dispatcher, never()).getFreshWorkflowConfig(anyString());

// Verify: No requireFullRefresh needed
verify(clusterDataCache, never()).requireFullRefresh();

// Verify: No deletion attempted
verify(accessor, never()).removeProperty(any(PropertyKey.class));
}

/**
* Legitimate DELETE still works when fresh state confirms DELETE.
*/
@Test
public void testLegitimateDeleteWithFreshValidation() {
String workflowName = "TestWorkflow";

WorkflowConfig deleteConfig = mock(WorkflowConfig.class);
when(deleteConfig.getTargetState()).thenReturn(TargetState.DELETE);
when(deleteConfig.isTerminable()).thenReturn(true);
when(deleteConfig.getJobDag()).thenReturn(mock(JobDag.class));
when(deleteConfig.getJobDag().getAllNodes()).thenReturn(Collections.emptySet());

doReturn(deleteConfig).when(dispatcher).getFreshWorkflowConfig(workflowName);
when(clusterDataCache.getWorkflowConfig(workflowName)).thenReturn(deleteConfig);

WorkflowContext workflowContext = mock(WorkflowContext.class);
when(workflowContext.getFinishTime()).thenReturn(System.currentTimeMillis());
when(workflowContext.getJobStates()).thenReturn(Collections.emptyMap());

PropertyKey workflowKey = mock(PropertyKey.class);
when(keyBuilder.resourceConfig(workflowName)).thenReturn(workflowKey);
when(keyBuilder.idealStates(workflowName)).thenReturn(mock(PropertyKey.class));
when(keyBuilder.workflowContext(workflowName)).thenReturn(mock(PropertyKey.class));
when(accessor.removeProperty(any(PropertyKey.class))).thenReturn(true);
when(accessor.getPropertyStat(any(PropertyKey.class)))
.thenReturn(mock(org.apache.helix.HelixProperty.Stat.class));

dispatcher.updateWorkflowStatus(workflowName, deleteConfig, workflowContext,
currentStateOutput, bestPossibleOutput);

// Verify: Fresh read was performed
verify(dispatcher, times(1)).getFreshWorkflowConfig(workflowName);

// Verify: Deletion proceeded (both states confirmed DELETE)
verify(accessor, atLeastOnce()).removeProperty(any(PropertyKey.class));

// Verify: No unnecessary full refresh
verify(clusterDataCache, never()).requireFullRefresh();
}

/**
* Workflow already deleted from ZK (fresh config null).
*/
@Test
public void testWorkflowAlreadyDeletedFromZK() {
String workflowName = "TestWorkflow";

WorkflowConfig deleteConfig = mock(WorkflowConfig.class);
when(deleteConfig.getTargetState()).thenReturn(TargetState.DELETE);
when(deleteConfig.getJobDag()).thenReturn(mock(JobDag.class));
when(deleteConfig.getJobDag().getAllNodes()).thenReturn(Collections.emptySet());

doReturn(null).when(dispatcher).getFreshWorkflowConfig(workflowName);

WorkflowContext workflowContext = mock(WorkflowContext.class);
when(workflowContext.getFinishTime()).thenReturn(System.currentTimeMillis());
when(workflowContext.getJobStates()).thenReturn(Collections.emptyMap());

dispatcher.updateWorkflowStatus(workflowName, deleteConfig, workflowContext,
currentStateOutput, bestPossibleOutput);

// Verify: Fresh read was performed
verify(dispatcher, times(1)).getFreshWorkflowConfig(workflowName);

// Verify: No deletion attempted (already gone)
verify(accessor, never()).removeProperty(any(PropertyKey.class));

// Verify: No full refresh needed
verify(clusterDataCache, never()).requireFullRefresh();
}
}