diff --git a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java index f0e9ef58ff..1c13d547e8 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java @@ -70,6 +70,25 @@ public List> evaluateCriteria(Criteria recipientCriteria, String resourceName = recipientCriteria.getResource(); String instanceName = recipientCriteria.getInstanceName(); + // If only instance name is specified (no resource/partition/state), + // use LIVEINSTANCES for efficient lookup. This is safe because we filter by + // liveParticipants anyway, so querying LIVEINSTANCES directly is more efficient. + if (Strings.isNullOrEmpty(resourceName) + && Strings.isNullOrEmpty(recipientCriteria.getPartition()) + && Strings.isNullOrEmpty(recipientCriteria.getPartitionState()) + && dataSource == DataSource.EXTERNALVIEW) { + dataSource = DataSource.LIVEINSTANCES; + } + + // Since we always filter by liveParticipants anyway, if the dataSource + // is INSTANCES and we're querying for all instances, just use + // LIVEINSTANCES directly to avoid redundant ZK reads and filtering + if (dataSource == DataSource.INSTANCES + && (Strings.isNullOrEmpty(instanceName) || instanceName.equals(MATCH_ALL_SYM) + || instanceName.equals("*"))) { + dataSource = DataSource.LIVEINSTANCES; + } + switch (dataSource) { case EXTERNALVIEW: properties = getProperty(accessor, resourceName, keyBuilder.externalViews(), @@ -93,18 +112,32 @@ public List> evaluateCriteria(Criteria recipientCriteria, // flatten the data List allRows = ZNRecordRow.flatten(HelixProperty.convertToList(properties)); - // save the matches - // TODO: Apply strict check on the getChildValuesMap() call. - // TODO: For backward compatibility, allow partial read for now. This may reduce the - // TODO: match result eventually. - Set liveParticipants = - accessor.getChildValuesMap(keyBuilder.liveInstances(), false).keySet(); + // Only fetch liveParticipants if dataSource is not already LIVEINSTANCES + // When dataSource is LIVEINSTANCES, all rows are already live by definition + Set liveParticipants = null; + if (dataSource != DataSource.LIVEINSTANCES) { + // TODO: Apply strict check on the getChildValuesMap() call. + // TODO: For backward compatibility, allow partial read for now. This may reduce the + // TODO: match result eventually. + Map liveInstanceMap = + accessor.getChildValuesMap(keyBuilder.liveInstances(), false); + liveParticipants = liveInstanceMap != null ? liveInstanceMap.keySet() : Collections.emptySet(); + + //if there are no live instances, no point in filtering + if (liveParticipants.isEmpty()) { + logger.info("No live participants found"); + return Lists.newArrayList(); + } + } + List result = Lists.newArrayList(); for (ZNRecordRow row : allRows) { // The participant instance name is stored in the return value of either getRecordId() or // getMapSubKey() - if (rowMatches(recipientCriteria, row) && (liveParticipants.contains(row.getRecordId()) - || liveParticipants.contains(row.getMapSubKey()))) { + boolean isLive = (liveParticipants == null) + || liveParticipants.contains(row.getRecordId()) + || liveParticipants.contains(row.getMapSubKey()); + if (rowMatches(recipientCriteria, row) && isLive) { result.add(row); } } @@ -125,7 +158,7 @@ public List> evaluateCriteria(Criteria recipientCriteria, !recipientCriteria.getPartitionState().equals("") ? row.getMapValue() : ""); selected.add(resultRow); } - logger.info("Query returned " + selected.size() + " rows"); + logger.info("Query returned {} rows", selected.size()); return Lists.newArrayList(selected); } @@ -153,7 +186,7 @@ && stringMatches(partitionName, Strings.nullToEmpty(row.getMapKey())) * @return Java matches expression (i.e. contains ".*?"s and '.'s) */ private String normalizePattern(String pattern) { - if (pattern == null || pattern.equals("") || pattern.equals("*")) { + if (Strings.isNullOrEmpty(pattern) || pattern.equals("*")) { pattern = "%"; } StringBuilder builder = new StringBuilder(); diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java index e32f34c7e8..3147d35e42 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java @@ -74,7 +74,30 @@ public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg, // Clean up if workflow marked for deletion TargetState targetState = workflowCfg.getTargetState(); if (targetState == TargetState.DELETE) { - LOG.debug("Workflow is marked as deleted {} cleaning up the workflow context.", workflow); + // Always validate DELETE operations against fresh ZooKeeper state to prevent stale cache issues + // This is critical because events like MessageChange don't refresh ResourceConfig cache + // The extra ZK read is acceptable since DELETE operations are relatively rare and destructive + WorkflowConfig freshConfig = getFreshWorkflowConfig(workflow); + + if (freshConfig == null) { + LOG.info("Workflow {} already deleted from ZooKeeper, skipping cleanup", workflow); + return; + } + + if (freshConfig.getTargetState() != TargetState.DELETE) { + // This likely indicates a race condition where the workflow was recreated or modified + // between cache refresh cycles + LOG.warn("Stale DELETE state detected in cache for workflow {}. " + + "Fresh state from ZooKeeper: {}. Skipping deletion to prevent data loss.", + workflow, freshConfig.getTargetState()); + // force cache refresh for next pipeline run to get the latest state + _clusterDataCache.requireFullRefresh(); + return; + } + + // DELETE state confirmed from fresh read - safe to proceed + LOG.info("DELETE state confirmed for workflow {} from fresh ZooKeeper read. Proceeding with cleanup.", + workflow); updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput); cleanupWorkflow(workflow); return; @@ -170,6 +193,15 @@ && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) { _clusterDataCache.updateWorkflowContext(workflow, workflowCtx); } + /** + * Get fresh workflow config from ZooKeeper. Protected for testability. + * @param workflow the workflow name + * @return fresh WorkflowConfig from ZooKeeper, or null if not found + */ + protected WorkflowConfig getFreshWorkflowConfig(String workflow) { + return TaskUtil.getWorkflowConfig(_manager.getHelixDataAccessor(), workflow); + } + private void updateInflightJobs(String workflow, WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleOutput) { // Update jobs already inflight diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java index bf98d5353e..807e9819a0 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java @@ -60,6 +60,15 @@ public T getProperty(PropertyKey key) { PropertyType type = key.getType(); if (type == PropertyType.EXTERNALVIEW || type == PropertyType.IDEALSTATES) { return (T) new ExternalView(_externalView); + } else if (type == PropertyType.LIVEINSTANCES) { + // Support querying for a specific live instance + String path = key.getPath(); + String instanceName = path.substring(path.lastIndexOf('/') + 1); + for (ZNRecord record : _liveInstances) { + if (record.getId().equals(instanceName)) { + return (T) HelixProperty.convertToTypedInstance(key.getTypeClass(), record); + } + } } return null; } diff --git a/helix-core/src/test/java/org/apache/helix/task/TestWorkflowDispatcherStaleCacheFix.java b/helix-core/src/test/java/org/apache/helix/task/TestWorkflowDispatcherStaleCacheFix.java new file mode 100644 index 0000000000..e7bb53eb34 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/task/TestWorkflowDispatcherStaleCacheFix.java @@ -0,0 +1,227 @@ +package org.apache.helix.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.lang.reflect.Field; +import java.util.Collections; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey; +import org.apache.helix.common.caches.TaskDataCache; +import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; +import org.apache.helix.controller.stages.BestPossibleStateOutput; +import org.apache.helix.controller.stages.CurrentStateOutput; +import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +/** + * Unit tests for WorkflowDispatcher to verify the stale cache fix. + * Tests focus on verifying behavior differences between MessageChange and ResourceConfigChange events. + */ +public class TestWorkflowDispatcherStaleCacheFix { + + @Mock private HelixManager manager; + @Mock private HelixDataAccessor accessor; + @Mock private PropertyKey.Builder keyBuilder; + @Mock private WorkflowControllerDataProvider clusterDataCache; + @Mock private CurrentStateOutput currentStateOutput; + @Mock private BestPossibleStateOutput bestPossibleOutput; + @Mock private ClusterStatusMonitor clusterStatusMonitor; + @Mock private TaskDataCache taskDataCache; + @Mock private ZkHelixPropertyStore propertyStore; + + private WorkflowDispatcher dispatcher; + private AutoCloseable mocks; + + @BeforeMethod + public void setUp() throws Exception { + mocks = MockitoAnnotations.openMocks(this); + + dispatcher = spy(new WorkflowDispatcher()); + dispatcher.init(manager); + + Field cacheField = WorkflowDispatcher.class.getDeclaredField("_clusterDataCache"); + cacheField.setAccessible(true); + cacheField.set(dispatcher, clusterDataCache); + + Field monitorField = AbstractTaskDispatcher.class.getDeclaredField("_clusterStatusMonitor"); + monitorField.setAccessible(true); + monitorField.set(dispatcher, clusterStatusMonitor); + + when(manager.getHelixDataAccessor()).thenReturn(accessor); + when(accessor.keyBuilder()).thenReturn(keyBuilder); + when(manager.getClusterName()).thenReturn("TestCluster"); + when(clusterDataCache.getTaskDataCache()).thenReturn(taskDataCache); + when(manager.getHelixPropertyStore()).thenReturn(propertyStore); + } + + @AfterMethod + public void tearDown() throws Exception { + if (mocks != null) { + mocks.close(); + } + } + + /** + * MessageChange event with stale DELETE cache triggers requireFullRefresh. + * This is the primary bug scenario - MessageChange events don't refresh ResourceConfig cache. + */ + @Test + public void testMessageChangeEventWithStaleDelete() { + String workflowName = "TestWorkflow"; + + WorkflowConfig staleDeleteConfig = mock(WorkflowConfig.class); + when(staleDeleteConfig.getTargetState()).thenReturn(TargetState.DELETE); + when(staleDeleteConfig.getJobDag()).thenReturn(mock(JobDag.class)); + when(staleDeleteConfig.getJobDag().getAllNodes()).thenReturn(Collections.emptySet()); + + WorkflowConfig freshStartConfig = mock(WorkflowConfig.class); + when(freshStartConfig.getTargetState()).thenReturn(TargetState.START); + doReturn(freshStartConfig).when(dispatcher).getFreshWorkflowConfig(workflowName); + + WorkflowContext workflowContext = mock(WorkflowContext.class); + when(workflowContext.getFinishTime()).thenReturn(-1L); + when(workflowContext.getJobStates()).thenReturn(Collections.emptyMap()); + + dispatcher.updateWorkflowStatus(workflowName, staleDeleteConfig, workflowContext, + currentStateOutput, bestPossibleOutput); + + // Verify: Fresh read was performed + verify(dispatcher, times(1)).getFreshWorkflowConfig(workflowName); + + // Verify: requireFullRefresh was called due to stale DELETE detection + verify(clusterDataCache, times(1)).requireFullRefresh(); + + // Verify: Workflow was NOT deleted + verify(accessor, never()).removeProperty(any(PropertyKey.class)); + } + + /** + * ResourceConfigChange event with fresh cache doesn't need validation. + * ResourceConfigChange events refresh the cache, so no stale DELETE issue. + */ + @Test + public void testResourceConfigChangeEventWithFreshCache() { + String workflowName = "TestWorkflow"; + + WorkflowConfig freshStartConfig = mock(WorkflowConfig.class); + when(freshStartConfig.getTargetState()).thenReturn(TargetState.START); + when(freshStartConfig.getJobDag()).thenReturn(mock(JobDag.class)); + when(freshStartConfig.getJobDag().getAllNodes()).thenReturn(Collections.emptySet()); + + WorkflowContext workflowContext = mock(WorkflowContext.class); + when(workflowContext.getFinishTime()).thenReturn(-1L); + when(workflowContext.getJobStates()).thenReturn(Collections.emptyMap()); + + dispatcher.updateWorkflowStatus(workflowName, freshStartConfig, workflowContext, + currentStateOutput, bestPossibleOutput); + + // Verify: No fresh read needed (state is not DELETE) + verify(dispatcher, never()).getFreshWorkflowConfig(anyString()); + + // Verify: No requireFullRefresh needed + verify(clusterDataCache, never()).requireFullRefresh(); + + // Verify: No deletion attempted + verify(accessor, never()).removeProperty(any(PropertyKey.class)); + } + + /** + * Legitimate DELETE still works when fresh state confirms DELETE. + */ + @Test + public void testLegitimateDeleteWithFreshValidation() { + String workflowName = "TestWorkflow"; + + WorkflowConfig deleteConfig = mock(WorkflowConfig.class); + when(deleteConfig.getTargetState()).thenReturn(TargetState.DELETE); + when(deleteConfig.isTerminable()).thenReturn(true); + when(deleteConfig.getJobDag()).thenReturn(mock(JobDag.class)); + when(deleteConfig.getJobDag().getAllNodes()).thenReturn(Collections.emptySet()); + + doReturn(deleteConfig).when(dispatcher).getFreshWorkflowConfig(workflowName); + when(clusterDataCache.getWorkflowConfig(workflowName)).thenReturn(deleteConfig); + + WorkflowContext workflowContext = mock(WorkflowContext.class); + when(workflowContext.getFinishTime()).thenReturn(System.currentTimeMillis()); + when(workflowContext.getJobStates()).thenReturn(Collections.emptyMap()); + + PropertyKey workflowKey = mock(PropertyKey.class); + when(keyBuilder.resourceConfig(workflowName)).thenReturn(workflowKey); + when(keyBuilder.idealStates(workflowName)).thenReturn(mock(PropertyKey.class)); + when(keyBuilder.workflowContext(workflowName)).thenReturn(mock(PropertyKey.class)); + when(accessor.removeProperty(any(PropertyKey.class))).thenReturn(true); + when(accessor.getPropertyStat(any(PropertyKey.class))) + .thenReturn(mock(org.apache.helix.HelixProperty.Stat.class)); + + dispatcher.updateWorkflowStatus(workflowName, deleteConfig, workflowContext, + currentStateOutput, bestPossibleOutput); + + // Verify: Fresh read was performed + verify(dispatcher, times(1)).getFreshWorkflowConfig(workflowName); + + // Verify: Deletion proceeded (both states confirmed DELETE) + verify(accessor, atLeastOnce()).removeProperty(any(PropertyKey.class)); + + // Verify: No unnecessary full refresh + verify(clusterDataCache, never()).requireFullRefresh(); + } + + /** + * Workflow already deleted from ZK (fresh config null). + */ + @Test + public void testWorkflowAlreadyDeletedFromZK() { + String workflowName = "TestWorkflow"; + + WorkflowConfig deleteConfig = mock(WorkflowConfig.class); + when(deleteConfig.getTargetState()).thenReturn(TargetState.DELETE); + when(deleteConfig.getJobDag()).thenReturn(mock(JobDag.class)); + when(deleteConfig.getJobDag().getAllNodes()).thenReturn(Collections.emptySet()); + + doReturn(null).when(dispatcher).getFreshWorkflowConfig(workflowName); + + WorkflowContext workflowContext = mock(WorkflowContext.class); + when(workflowContext.getFinishTime()).thenReturn(System.currentTimeMillis()); + when(workflowContext.getJobStates()).thenReturn(Collections.emptyMap()); + + dispatcher.updateWorkflowStatus(workflowName, deleteConfig, workflowContext, + currentStateOutput, bestPossibleOutput); + + // Verify: Fresh read was performed + verify(dispatcher, times(1)).getFreshWorkflowConfig(workflowName); + + // Verify: No deletion attempted (already gone) + verify(accessor, never()).removeProperty(any(PropertyKey.class)); + + // Verify: No full refresh needed + verify(clusterDataCache, never()).requireFullRefresh(); + } +} \ No newline at end of file