Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit b1e5482

Browse files
authored
Including all upstream node deps on BranchNode subnode execution (#543)
* waiting for upstream nodes on branch subnode evaluation Signed-off-by: Daniel Rammer <[email protected]> * removed dead comments Signed-off-by: Daniel Rammer <[email protected]> * added unit test Signed-off-by: Daniel Rammer <[email protected]> * fixed lint issues Signed-off-by: Daniel Rammer <[email protected]> --------- Signed-off-by: Daniel Rammer <[email protected]>
1 parent e462412 commit b1e5482

File tree

8 files changed

+156
-27
lines changed

8 files changed

+156
-27
lines changed

pkg/controller/executors/mocks/node_lookup.go

Lines changed: 82 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/controller/executors/node_lookup.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,27 @@ import (
1212
type NodeLookup interface {
1313
GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool)
1414
GetNodeExecutionStatus(ctx context.Context, id v1alpha1.NodeID) v1alpha1.ExecutableNodeStatus
15+
// Lookup for upstream edges, find all node ids from which this node can be reached.
16+
ToNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error)
17+
// Lookup for downstream edges, find all node ids that can be reached from the given node id.
18+
FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error)
1519
}
1620

1721
// Implements a contextual NodeLookup that can be composed of a disparate NodeGetter and a NodeStatusGetter
1822
type contextualNodeLookup struct {
1923
v1alpha1.NodeGetter
2024
v1alpha1.NodeStatusGetter
25+
DAGStructure
2126
}
2227

2328
// Returns a Contextual NodeLookup using the given NodeGetter and a separate NodeStatusGetter.
2429
// Very useful in Subworkflows where the Subworkflow is the reservoir of the nodes, but the status for these nodes
2530
// maybe stored int he Top-level workflow node itself.
26-
func NewNodeLookup(n v1alpha1.NodeGetter, s v1alpha1.NodeStatusGetter) NodeLookup {
31+
func NewNodeLookup(n v1alpha1.NodeGetter, s v1alpha1.NodeStatusGetter, d DAGStructure) NodeLookup {
2732
return contextualNodeLookup{
2833
NodeGetter: n,
2934
NodeStatusGetter: s,
35+
DAGStructure: d,
3036
}
3137
}
3238

@@ -45,6 +51,14 @@ func (s staticNodeLookup) GetNodeExecutionStatus(_ context.Context, id v1alpha1.
4551
return s.status[id]
4652
}
4753

54+
func (s staticNodeLookup) ToNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) {
55+
return nil, nil
56+
}
57+
58+
func (s staticNodeLookup) FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) {
59+
return nil, nil
60+
}
61+
4862
// Returns a new NodeLookup useful in Testing. Not recommended to be used in production
4963
func NewTestNodeLookup(nodes map[v1alpha1.NodeID]v1alpha1.ExecutableNode, status map[v1alpha1.NodeID]v1alpha1.ExecutableNodeStatus) NodeLookup {
5064
return staticNodeLookup{

pkg/controller/executors/node_lookup_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,20 @@ type nsg struct {
1818
v1alpha1.NodeStatusGetter
1919
}
2020

21+
type dag struct {
22+
DAGStructure
23+
}
24+
2125
func TestNewNodeLookup(t *testing.T) {
2226
n := ng{}
2327
ns := nsg{}
24-
nl := NewNodeLookup(n, ns)
28+
d := dag{}
29+
nl := NewNodeLookup(n, ns, d)
2530
assert.NotNil(t, nl)
2631
typed := nl.(contextualNodeLookup)
2732
assert.Equal(t, n, typed.NodeGetter)
2833
assert.Equal(t, ns, typed.NodeStatusGetter)
34+
assert.Equal(t, d, typed.DAGStructure)
2935
}
3036

3137
func TestNewTestNodeLookup(t *testing.T) {

pkg/controller/nodes/branch/handler.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,11 @@ func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.Node
136136
childNodeStatus := nl.GetNodeExecutionStatus(ctx, branchTakenNode.GetID())
137137
childNodeStatus.SetDataDir(nodeStatus.GetDataDir())
138138
childNodeStatus.SetOutputDir(nodeStatus.GetOutputDir())
139-
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID())
139+
upstreamNodeIds, err := nCtx.ContextualNodeLookup().ToNode(branchTakenNode.GetID())
140+
if err != nil {
141+
return handler.UnknownTransition, err
142+
}
143+
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), append(upstreamNodeIds, nCtx.NodeID())...)
140144
execContext, err := b.getExecutionContextForDownstream(nCtx)
141145
if err != nil {
142146
return handler.UnknownTransition, err
@@ -196,7 +200,11 @@ func (b *branchHandler) Abort(ctx context.Context, nCtx handler.NodeExecutionCon
196200
// TODO we should replace the call to RecursiveNodeHandler with a call to SingleNode Handler. The inputs are also already known ahead of time
197201
// There is no DAGStructure for the branch nodes, the branch taken node is the leaf node. The node itself may be arbitrarily complex, but in that case the node should reference a subworkflow etc
198202
// The parent of the BranchTaken Node is the actual Branch Node and all the data is just forwarded from the Branch to the executed node.
199-
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID())
203+
upstreamNodeIds, err := nCtx.ContextualNodeLookup().ToNode(branchTakenNode.GetID())
204+
if err != nil {
205+
return err
206+
}
207+
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), append(upstreamNodeIds, nCtx.NodeID())...)
200208
execContext, err := b.getExecutionContextForDownstream(nCtx)
201209
if err != nil {
202210
return err
@@ -236,7 +244,11 @@ func (b *branchHandler) Finalize(ctx context.Context, nCtx handler.NodeExecution
236244
// TODO we should replace the call to RecursiveNodeHandler with a call to SingleNode Handler. The inputs are also already known ahead of time
237245
// There is no DAGStructure for the branch nodes, the branch taken node is the leaf node. The node itself may be arbitrarily complex, but in that case the node should reference a subworkflow etc
238246
// The parent of the BranchTaken Node is the actual Branch Node and all the data is just forwarded from the Branch to the executed node.
239-
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID())
247+
upstreamNodeIds, err := nCtx.ContextualNodeLookup().ToNode(branchTakenNode.GetID())
248+
if err != nil {
249+
return err
250+
}
251+
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), append(upstreamNodeIds, nCtx.NodeID())...)
240252
execContext, err := b.getExecutionContextForDownstream(nCtx)
241253
if err != nil {
242254
return err

pkg/controller/nodes/branch/handler_test.go

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -158,24 +158,34 @@ func TestBranchHandler_RecurseDownstream(t *testing.T) {
158158
isErr bool
159159
expectedPhase handler.EPhase
160160
childPhase v1alpha1.NodePhase
161-
nl *execMocks.NodeLookup
161+
upstreamNodeID string
162162
}{
163+
{"upstreamNodeExists", executors.NodeStatusPending, nil,
164+
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseRunning, v1alpha1.NodePhaseQueued, "n2"},
163165
{"childNodeError", executors.NodeStatusUndefined, fmt.Errorf("err"),
164-
&mocks2.ExecutableNodeStatus{}, bn, true, handler.EPhaseUndefined, v1alpha1.NodePhaseFailed, &execMocks.NodeLookup{}},
166+
&mocks2.ExecutableNodeStatus{}, bn, true, handler.EPhaseUndefined, v1alpha1.NodePhaseFailed, ""},
165167
{"childPending", executors.NodeStatusPending, nil,
166-
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseRunning, v1alpha1.NodePhaseQueued, &execMocks.NodeLookup{}},
168+
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseRunning, v1alpha1.NodePhaseQueued, ""},
167169
{"childStillRunning", executors.NodeStatusRunning, nil,
168-
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseRunning, v1alpha1.NodePhaseRunning, &execMocks.NodeLookup{}},
170+
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseRunning, v1alpha1.NodePhaseRunning, ""},
169171
{"childFailure", executors.NodeStatusFailed(expectedError), nil,
170-
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseFailed, v1alpha1.NodePhaseFailed, &execMocks.NodeLookup{}},
172+
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseFailed, v1alpha1.NodePhaseFailed, ""},
171173
{"childComplete", executors.NodeStatusComplete, nil,
172-
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseSuccess, v1alpha1.NodePhaseSucceeded, &execMocks.NodeLookup{}},
174+
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseSuccess, v1alpha1.NodePhaseSucceeded, ""},
173175
}
174176
for _, test := range tests {
175177
t.Run(test.name, func(t *testing.T) {
176178
eCtx := &execMocks.ExecutionContext{}
177179
eCtx.OnGetParentInfo().Return(parentInfo{})
178-
nCtx, _ := createNodeContext(v1alpha1.BranchNodeNotYetEvaluated, &childNodeID, n, nil, test.nl, eCtx)
180+
181+
mockNodeLookup := &execMocks.NodeLookup{}
182+
if len(test.upstreamNodeID) > 0 {
183+
mockNodeLookup.OnToNodeMatch(childNodeID).Return([]string{test.upstreamNodeID}, nil)
184+
} else {
185+
mockNodeLookup.OnToNodeMatch(childNodeID).Return(nil, nil)
186+
}
187+
188+
nCtx, _ := createNodeContext(v1alpha1.BranchNodeNotYetEvaluated, &childNodeID, n, nil, mockNodeLookup, eCtx)
179189
newParentInfo, _ := common.CreateParentInfo(parentInfo{}, nCtx.NodeID(), nCtx.CurrentAttempt())
180190
expectedExecContext := executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo)
181191
mockNodeExecutor := &execMocks.Node{}
@@ -187,23 +197,27 @@ func TestBranchHandler_RecurseDownstream(t *testing.T) {
187197
fList, err1 := d.FromNode("x")
188198
dList, err2 := d.ToNode(childNodeID)
189199
b := assert.NoError(t, err1)
190-
b = b && assert.Equal(t, fList, []v1alpha1.NodeID{})
200+
b = b && assert.Equal(t, []v1alpha1.NodeID{}, fList)
191201
b = b && assert.NoError(t, err2)
192-
b = b && assert.Equal(t, dList, []v1alpha1.NodeID{nodeID})
202+
dListExpected := []v1alpha1.NodeID{nodeID}
203+
if len(test.upstreamNodeID) > 0 {
204+
dListExpected = append([]string{test.upstreamNodeID}, dListExpected...)
205+
}
206+
b = b && assert.Equal(t, dListExpected, dList)
193207
return b
194208
}
195209
return false
196210
}),
197-
mock.MatchedBy(func(lookup executors.NodeLookup) bool { return assert.Equal(t, lookup, test.nl) }),
211+
mock.MatchedBy(func(lookup executors.NodeLookup) bool { return assert.Equal(t, lookup, mockNodeLookup) }),
198212
mock.MatchedBy(func(n v1alpha1.ExecutableNode) bool { return assert.Equal(t, n.GetID(), childNodeID) }),
199213
).Return(test.ns, test.err)
200214

201215
childNodeStatus := &mocks2.ExecutableNodeStatus{}
202-
if test.nl != nil {
216+
if mockNodeLookup != nil {
203217
childNodeStatus.OnGetOutputDir().Return("parent-output-dir")
204218
test.nodeStatus.OnGetDataDir().Return("parent-data-dir")
205219
test.nodeStatus.OnGetOutputDir().Return("parent-output-dir")
206-
test.nl.OnGetNodeExecutionStatus(ctx, childNodeID).Return(childNodeStatus)
220+
mockNodeLookup.OnGetNodeExecutionStatus(ctx, childNodeID).Return(childNodeStatus)
207221
childNodeStatus.On("SetDataDir", storage.DataReference("parent-data-dir")).Once()
208222
childNodeStatus.On("SetOutputDir", storage.DataReference("parent-output-dir")).Once()
209223
}
@@ -295,17 +309,18 @@ func TestBranchHandler_AbortNode(t *testing.T) {
295309

296310
t.Run("BranchNodeSuccess", func(t *testing.T) {
297311
mockNodeExecutor := &execMocks.Node{}
298-
nl := &execMocks.NodeLookup{}
312+
mockNodeLookup := &execMocks.NodeLookup{}
313+
mockNodeLookup.OnToNodeMatch(mock.Anything).Return(nil, nil)
299314
eCtx := &execMocks.ExecutionContext{}
300315
eCtx.OnGetParentInfo().Return(parentInfo{})
301-
nCtx, s := createNodeContext(v1alpha1.BranchNodeSuccess, &n1, n, nil, nl, eCtx)
316+
nCtx, s := createNodeContext(v1alpha1.BranchNodeSuccess, &n1, n, nil, mockNodeLookup, eCtx)
302317
newParentInfo, _ := common.CreateParentInfo(parentInfo{}, nCtx.NodeID(), nCtx.CurrentAttempt())
303318
expectedExecContext := executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo)
304319
mockNodeExecutor.OnAbortHandlerMatch(mock.Anything,
305320
mock.MatchedBy(func(e executors.ExecutionContext) bool { return assert.Equal(t, e, expectedExecContext) }),
306321
mock.Anything,
307322
mock.Anything, mock.Anything, mock.Anything).Return(nil)
308-
nl.OnGetNode(*s.s.FinalizedNodeID).Return(n, true)
323+
mockNodeLookup.OnGetNode(*s.s.FinalizedNodeID).Return(n, true)
309324
branch := New(mockNodeExecutor, eventConfig, promutils.NewTestScope())
310325
err := branch.Abort(ctx, nCtx, "")
311326
assert.NoError(t, err)

pkg/controller/nodes/dynamic/dynamic_workflow.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
183183
subWorkflow: compiledWf,
184184
subWorkflowClosure: workflowCacheContents.CompiledWorkflow,
185185
execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), compiledWf, compiledWf, newParentInfo, nCtx.ExecutionContext()),
186-
nodeLookup: executors.NewNodeLookup(compiledWf, dynamicNodeStatus),
186+
nodeLookup: executors.NewNodeLookup(compiledWf, dynamicNodeStatus, compiledWf),
187187
dynamicJobSpecURI: string(f.GetLoc()),
188188
}, nil
189189
}
@@ -216,7 +216,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
216216
subWorkflow: dynamicWf,
217217
subWorkflowClosure: closure,
218218
execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), dynamicWf, dynamicWf, newParentInfo, nCtx.ExecutionContext()),
219-
nodeLookup: executors.NewNodeLookup(dynamicWf, dynamicNodeStatus),
219+
nodeLookup: executors.NewNodeLookup(dynamicWf, dynamicNodeStatus, dynamicWf),
220220
dynamicJobSpecURI: string(f.GetLoc()),
221221
}, nil
222222
}

pkg/controller/nodes/subworkflow/subworkflow.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ func (s *subworkflowHandler) HandleFailingSubWorkflow(ctx context.Context, nCtx
209209
}
210210

211211
status := nCtx.NodeStatus()
212-
nodeLookup := executors.NewNodeLookup(subWorkflow, status)
212+
nodeLookup := executors.NewNodeLookup(subWorkflow, status, subWorkflow)
213213
return s.HandleFailureNodeOfSubWorkflow(ctx, nCtx, subWorkflow, nodeLookup)
214214
}
215215

@@ -220,7 +220,7 @@ func (s *subworkflowHandler) StartSubWorkflow(ctx context.Context, nCtx handler.
220220
}
221221

222222
status := nCtx.NodeStatus()
223-
nodeLookup := executors.NewNodeLookup(subWorkflow, status)
223+
nodeLookup := executors.NewNodeLookup(subWorkflow, status, subWorkflow)
224224

225225
// assert startStatus.IsComplete() == true
226226
return s.startAndHandleSubWorkflow(ctx, nCtx, subWorkflow, nodeLookup)
@@ -233,7 +233,7 @@ func (s *subworkflowHandler) CheckSubWorkflowStatus(ctx context.Context, nCtx ha
233233
}
234234

235235
status := nCtx.NodeStatus()
236-
nodeLookup := executors.NewNodeLookup(subWorkflow, status)
236+
nodeLookup := executors.NewNodeLookup(subWorkflow, status, subWorkflow)
237237
return s.handleSubWorkflow(ctx, nCtx, subWorkflow, nodeLookup)
238238
}
239239

@@ -243,7 +243,7 @@ func (s *subworkflowHandler) HandleAbort(ctx context.Context, nCtx handler.NodeE
243243
return err
244244
}
245245
status := nCtx.NodeStatus()
246-
nodeLookup := executors.NewNodeLookup(subWorkflow, status)
246+
nodeLookup := executors.NewNodeLookup(subWorkflow, status, subWorkflow)
247247
execContext, err := s.getExecutionContextForDownstream(nCtx)
248248
if err != nil {
249249
return err

pkg/controller/workflow/executor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (c *workflowExecutor) handleReadyWorkflow(ctx context.Context, w *v1alpha1.
124124
nodeStatus.SetDataDir(dataDir)
125125
nodeStatus.SetOutputDir(outputDir)
126126
execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow())
127-
s, err := c.nodeExecutor.SetInputsForStartNode(ctx, execcontext, w, executors.NewNodeLookup(w, w.GetExecutionStatus()), inputs)
127+
s, err := c.nodeExecutor.SetInputsForStartNode(ctx, execcontext, w, executors.NewNodeLookup(w, w.GetExecutionStatus(), w), inputs)
128128
if err != nil {
129129
return StatusReady, err
130130
}

0 commit comments

Comments
 (0)