diff --git a/go.mod b/go.mod index 91d90e327..0d1b11ecf 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/imdario/mergo v0.3.8 // indirect github.com/jmespath/go-jmespath v0.3.0 // indirect github.com/lyft/datacatalog v0.2.1 - github.com/lyft/flyteidl v0.17.24 + github.com/lyft/flyteidl v0.17.27 github.com/lyft/flyteplugins v0.3.21 github.com/lyft/flytestdlib v0.3.3 github.com/magiconair/properties v1.8.1 diff --git a/go.sum b/go.sum index da2ab9b78..99b7876b7 100644 --- a/go.sum +++ b/go.sum @@ -391,6 +391,8 @@ github.com/lyft/flyteidl v0.17.9 h1:JXT9PovHqS9V3YN74x9zWT0kvIEL48c2uNoujF1KMes= github.com/lyft/flyteidl v0.17.9/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteidl v0.17.24 h1:N5mmk2/0062VjbIeUXLHWVZwkxGW20RdZtshaea2nL0= github.com/lyft/flyteidl v0.17.24/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= +github.com/lyft/flyteidl v0.17.27 h1:0EdSHauzdPEYmubYib/XC6fLb+srzP4yDRN1P9o4W/I= +github.com/lyft/flyteidl v0.17.27/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flyteplugins v0.3.21 h1:0PaQ5CZkUY07cNiBPcxdL1Pm26A0QRwoFw1VT6ly8tU= github.com/lyft/flyteplugins v0.3.21/go.mod h1:NDhdkOAn2q6p7YLq9a0/lxyS0dburoAEgipOY5TiO8A= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= diff --git a/pkg/apis/flyteworkflow/v1alpha1/iface.go b/pkg/apis/flyteworkflow/v1alpha1/iface.go index 9a11dfe95..3014ff694 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -262,6 +262,8 @@ type ExecutableNodeStatus interface { GetWorkflowNodeStatus() ExecutableWorkflowNodeStatus GetTaskNodeStatus() ExecutableTaskNodeStatus + GetQueuingBudget() *metav1.Duration + IsCached() bool } diff --git a/pkg/apis/flyteworkflow/v1alpha1/node_status.go b/pkg/apis/flyteworkflow/v1alpha1/node_status.go index 9a9b7f6c0..634f43327 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/node_status.go +++ b/pkg/apis/flyteworkflow/v1alpha1/node_status.go @@ -183,6 +183,9 @@ type NodeStatus struct { // Not Persisted DataReferenceConstructor storage.ReferenceConstructor `json:"-"` + + // queuing budget a node can consume across at its attempts + QueuingBudget *metav1.Duration `json:"queuingBudget,omitempty"` } func (in *NodeStatus) IsDirty() bool { @@ -420,6 +423,10 @@ func (in *NodeStatus) UpdatePhase(p NodePhase, occurredAt metav1.Time, reason st in.SetDirty() } +func (in *NodeStatus) GetQueuingBudget() *metav1.Duration { + return in.QueuingBudget +} + func (in *NodeStatus) GetStartedAt() *metav1.Time { return in.StartedAt } diff --git a/pkg/apis/flyteworkflow/v1alpha1/workflow.go b/pkg/apis/flyteworkflow/v1alpha1/workflow.go index 1176b6469..4782f4b0e 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/workflow.go +++ b/pkg/apis/flyteworkflow/v1alpha1/workflow.go @@ -47,6 +47,10 @@ type FlyteWorkflow struct { // non-Serialized fields DataReferenceConstructor storage.ReferenceConstructor `json:"-"` + + // Description + // +optional + QueuingBudgetSeconds *int64 } type NodeDefaults struct { diff --git a/pkg/compiler/transformers/k8s/workflow.go b/pkg/compiler/transformers/k8s/workflow.go index ff458ce86..892d6c446 100644 --- a/pkg/compiler/transformers/k8s/workflow.go +++ b/pkg/compiler/transformers/k8s/workflow.go @@ -164,6 +164,12 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li interruptible = wf.GetMetadataDefaults().GetInterruptible() } + var queuingBudgetSeconds *int64 + if wf.GetMetadata() != nil && wf.GetMetadata().GetQueuingBudget() != nil { + budgetSeconds := wf.GetMetadata().GetQueuingBudget().GetSeconds() + queuingBudgetSeconds = &budgetSeconds + } + obj := &v1alpha1.FlyteWorkflow{ TypeMeta: v1.TypeMeta{ Kind: v1alpha1.FlyteWorkflowKind, @@ -173,11 +179,12 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li Namespace: namespace, Labels: map[string]string{}, }, - Inputs: &v1alpha1.Inputs{LiteralMap: inputs}, - WorkflowSpec: primarySpec, - SubWorkflows: subwfs, - Tasks: buildTasks(tasks, errs.NewScope()), - NodeDefaults: v1alpha1.NodeDefaults{Interruptible: interruptible}, + Inputs: &v1alpha1.Inputs{LiteralMap: inputs}, + WorkflowSpec: primarySpec, + SubWorkflows: subwfs, + Tasks: buildTasks(tasks, errs.NewScope()), + NodeDefaults: v1alpha1.NodeDefaults{Interruptible: interruptible}, + QueuingBudgetSeconds: queuingBudgetSeconds, } var err error diff --git a/pkg/controller/executors/node.go b/pkg/controller/executors/node.go index f8177ae19..3f8c09e91 100644 --- a/pkg/controller/executors/node.go +++ b/pkg/controller/executors/node.go @@ -70,7 +70,7 @@ type Node interface { // - 1. It finds a blocking node (not ready, or running) // - 2. A node fails and hence the workflow will fail // - 3. The final/end node has completed and the workflow should be stopped - RecursiveNodeHandler(ctx context.Context, execContext ExecutionContext, dag DAGStructure, nl NodeLookup, currentNode v1alpha1.ExecutableNode) (NodeStatus, error) + RecursiveNodeHandler(ctx context.Context, execContext ExecutionContext, dag DAGStructure, nl NodeLookup, queuingBudgetHandler QueuingBudgetHandler, currentNode v1alpha1.ExecutableNode) (NodeStatus, error) // This aborts the given node. If the given node is complete then it recursively finds the running nodes and aborts them AbortHandler(ctx context.Context, execContext ExecutionContext, dag DAGStructure, nl NodeLookup, currentNode v1alpha1.ExecutableNode, reason string) error diff --git a/pkg/controller/executors/queue_budget_handler.go b/pkg/controller/executors/queue_budget_handler.go new file mode 100644 index 000000000..f3d0f4199 --- /dev/null +++ b/pkg/controller/executors/queue_budget_handler.go @@ -0,0 +1,90 @@ +package executors + +import ( + "context" + "time" + + "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" +) + +// Interface for the Workflow p. This is the mutable portion for a Workflow +type QueuingBudgetHandler interface { + GetNodeQueuingParameters(ctx context.Context, id v1alpha1.NodeID) (*NodeQueuingParameters, error) +} + +type NodeQueuingParameters struct { + IsInterruptible bool + MaxQueueTime time.Duration +} + +type defaultQueuingBudgetHandler struct { + dag DAGStructure + nl NodeLookup + wfBudget time.Duration +} + +func (in *defaultQueuingBudgetHandler) GetNodeQueuingParameters(ctx context.Context, id v1alpha1.NodeID) (*NodeQueuingParameters, error) { + + if id == v1alpha1.StartNodeID { + return nil, nil + } + + upstreamNodes, err := in.dag.ToNode(id) + if err != nil { + return nil, err + } + + nodeBudget := in.wfBudget + for _, upstreamNodeID := range upstreamNodes { + upstreamNodeStatus := in.nl.GetNodeExecutionStatus(ctx, upstreamNodeID) + + if upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseSkipped { + // TODO handle skipped parent case: if parent doesn't have queue budget info then get it from its parent. + continue + } + + var budget time.Duration + if upstreamNodeStatus.GetQueuingBudget() != nil && *&upstreamNodeStatus.GetQueuingBudget().Duration > 0 { + budget = *&upstreamNodeStatus.GetQueuingBudget().Duration + } + + if upstreamNodeStatus.GetQueuedAt() != nil { + queuedAt := upstreamNodeStatus.GetQueuedAt().Time + if upstreamNodeStatus.GetLastAttemptStartedAt() == nil { + // nothing used + } + lastAttemptStartedAt := upstreamNodeStatus.GetLastAttemptStartedAt().Time + queuingDelay := lastAttemptStartedAt.Sub(queuedAt) + parentRemainingBudget := budget - queuingDelay + + if nodeBudget > parentRemainingBudget { + nodeBudget = parentRemainingBudget + } + } + } + + currNode, exists := in.nl.GetNode(id) + if !exists { + // mtoledo: what should be the error here + return nil, err + } + var interruptible bool + if currNode.IsInterruptible() != nil { + interruptible = *currNode.IsInterruptible() + } + // TODO: Where to get config value from? + //// a node is not considered interruptible if the system failures have exceeded the configured threshold + // currNodeStatus := in.nl.GetNodeExecutionStatus(ctx, id) + //if interruptible && currNodeStatus.GetSystemFailures() >= c.interruptibleFailureThreshold { + // interruptible = false + // c.metrics.InterruptedThresholdHit.Inc(ctx) + //} + // + + return &NodeQueuingParameters{IsInterruptible: interruptible, MaxQueueTime: time.Second * time.Duration(nodeBudget)}, nil +} + +// instead of *int64 use duration? +func NewDefaultQueuingBudgetHandler(dag DAGStructure, nl NodeLookup, queueingBudget *int64) QueuingBudgetHandler { + return &defaultQueuingBudgetHandler{dag: dag, nl: nl, wfBudget: time.Duration(*queueingBudget)} +} diff --git a/pkg/controller/nodes/branch/handler.go b/pkg/controller/nodes/branch/handler.go index e4b409f52..da4919635 100644 --- a/pkg/controller/nodes/branch/handler.go +++ b/pkg/controller/nodes/branch/handler.go @@ -32,6 +32,10 @@ func (b *branchHandler) Setup(ctx context.Context, setupContext handler.SetupCon } func (b *branchHandler) HandleBranchNode(ctx context.Context, branchNode v1alpha1.ExecutableBranchNode, nCtx handler.NodeExecutionContext, nl executors.NodeLookup) (handler.Transition, error) { + // Create Queue Budget Handler from nodeContext + // TODO(mtoledo): get dagStructure, ask about duration + queuingBudgetHandler := executors.NewDefaultQueuingBudgetHandler(nil, nl, nil) + if nCtx.NodeStateReader().GetBranchNode().FinalizedNodeID == nil { nodeInputs, err := nCtx.InputReader().Get(ctx) if err != nil { @@ -64,7 +68,7 @@ func (b *branchHandler) HandleBranchNode(ctx context.Context, branchNode v1alpha logger.Debugf(ctx, "Recursively executing branchNode's chosen path") nodeStatus := nl.GetNodeExecutionStatus(ctx, nCtx.NodeID()) - return b.recurseDownstream(ctx, nCtx, nodeStatus, finalNode) + return b.recurseDownstream(ctx, nCtx, nodeStatus, finalNode, queuingBudgetHandler) } // If the branchNodestatus was already evaluated i.e, Node is in Running status @@ -89,7 +93,7 @@ func (b *branchHandler) HandleBranchNode(ctx context.Context, branchNode v1alpha // Recurse downstream nodeStatus := nl.GetNodeExecutionStatus(ctx, nCtx.NodeID()) - return b.recurseDownstream(ctx, nCtx, nodeStatus, branchTakenNode) + return b.recurseDownstream(ctx, nCtx, nodeStatus, branchTakenNode, queuingBudgetHandler) } func (b *branchHandler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) { @@ -104,12 +108,13 @@ func (b *branchHandler) Handle(ctx context.Context, nCtx handler.NodeExecutionCo return b.HandleBranchNode(ctx, branchNode, nCtx, nl) } -func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.NodeExecutionContext, nodeStatus v1alpha1.ExecutableNodeStatus, branchTakenNode v1alpha1.ExecutableNode) (handler.Transition, error) { +func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.NodeExecutionContext, nodeStatus v1alpha1.ExecutableNodeStatus, branchTakenNode v1alpha1.ExecutableNode, queuingBudgetHandler executors.QueuingBudgetHandler) (handler.Transition, error) { // TODO we should replace the call to RecursiveNodeHandler with a call to SingleNode Handler. The inputs are also already known ahead of time // There is no DAGStructure for the branch nodes, the branch taken node is the leaf node. The node itself may be arbitrarily complex, but in that case the node should reference a subworkflow etc // The parent of the BranchTaken Node is the actual Branch Node and all the data is just forwarded from the Branch to the executed node. dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID()) - downstreamStatus, err := b.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), dag, nCtx.ContextualNodeLookup(), branchTakenNode) + + downstreamStatus, err := b.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), dag, nCtx.ContextualNodeLookup(), queuingBudgetHandler, branchTakenNode) if err != nil { return handler.UnknownTransition, err } diff --git a/pkg/controller/nodes/dynamic/handler.go b/pkg/controller/nodes/dynamic/handler.go index 1c4ceae81..28a6b021b 100644 --- a/pkg/controller/nodes/dynamic/handler.go +++ b/pkg/controller/nodes/dynamic/handler.go @@ -102,7 +102,10 @@ func (d dynamicNodeTaskNodeHandler) handleDynamicSubNodes(ctx context.Context, n "DynamicWorkflowBuildFailed", err.Error(), nil)), handler.DynamicNodeState{Phase: v1alpha1.DynamicNodePhaseFailing, Reason: err.Error()}, nil } - trns, newState, err := d.progressDynamicWorkflow(ctx, execContext, dynamicWF, nl, nCtx, prevState) + // get queuing budget + queuingBudgetHandler := executors.NewDefaultQueuingBudgetHandler(dynamicWF, nl, nil) + + trns, newState, err := d.progressDynamicWorkflow(ctx, execContext, dynamicWF, nl, nCtx, prevState, queuingBudgetHandler) if err != nil { return handler.UnknownTransition, prevState, err } @@ -434,9 +437,9 @@ func (d dynamicNodeTaskNodeHandler) getLaunchPlanInterfaces(ctx context.Context, } func (d dynamicNodeTaskNodeHandler) progressDynamicWorkflow(ctx context.Context, execContext executors.ExecutionContext, dynamicWorkflow v1alpha1.ExecutableWorkflow, nl executors.NodeLookup, - nCtx handler.NodeExecutionContext, prevState handler.DynamicNodeState) (handler.Transition, handler.DynamicNodeState, error) { + nCtx handler.NodeExecutionContext, prevState handler.DynamicNodeState, queuingBudgetHandler executors.QueuingBudgetHandler) (handler.Transition, handler.DynamicNodeState, error) { - state, err := d.nodeExecutor.RecursiveNodeHandler(ctx, execContext, dynamicWorkflow, nl, dynamicWorkflow.StartNode()) + state, err := d.nodeExecutor.RecursiveNodeHandler(ctx, execContext, dynamicWorkflow, nl, queuingBudgetHandler, dynamicWorkflow.StartNode()) if err != nil { return handler.UnknownTransition, prevState, err } diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index 47e5e3b99..516ac707c 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -490,7 +490,7 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur // The space search for the next node to execute is implemented like a DFS algorithm. handleDownstream visits all the nodes downstream from // the currentNode. Visit a node is the RecursiveNodeHandler. A visit may be partial, complete or may result in a failure. -func (c *nodeExecutor) handleDownstream(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) { +func (c *nodeExecutor) handleDownstream(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, queueingBudgetHandler executors.QueuingBudgetHandler, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) { logger.Debugf(ctx, "Handling downstream Nodes") // This node is success. Handle all downstream nodes downstreamNodes, err := dag.FromNode(currentNode.GetID()) @@ -512,7 +512,7 @@ func (c *nodeExecutor) handleDownstream(ctx context.Context, execContext executo if !ok { return executors.NodeStatusFailed(errors.Errorf(errors.BadSpecificationError, currentNode.GetID(), "Unable to find Downstream Node [%v]", downstreamNodeName)), nil } - state, err := c.RecursiveNodeHandler(ctx, execContext, dag, nl, downstreamNode) + state, err := c.RecursiveNodeHandler(ctx, execContext, dag, nl, queueingBudgetHandler, downstreamNode) if err != nil { return executors.NodeStatusUndefined, err } @@ -569,7 +569,7 @@ func (c *nodeExecutor) SetInputsForStartNode(ctx context.Context, execContext ex return executors.NodeStatusComplete, nil } -func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) { +func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext executors.ExecutionContext, dag executors.DAGStructure, nl executors.NodeLookup, queuingBudgetHandler executors.QueuingBudgetHandler, currentNode v1alpha1.ExecutableNode) (executors.NodeStatus, error) { currentNodeCtx := contextutils.WithNodeID(ctx, currentNode.GetID()) nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID()) @@ -594,7 +594,7 @@ func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext exe return executors.NodeStatusRunning, nil } - nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl) + nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl, queuingBudgetHandler) if err != nil { // NodeExecution creation failure is a permanent fail / system error. // Should a system failure always return an err? @@ -612,7 +612,7 @@ func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, execContext exe // Currently we treat either Skip or Success the same way. In this approach only one node will be skipped // at a time. As we iterate down, further nodes will be skipped case v1alpha1.NodePhaseSucceeded, v1alpha1.NodePhaseSkipped: - return c.handleDownstream(ctx, execContext, dag, nl, currentNode) + return c.handleDownstream(ctx, execContext, dag, nl, queuingBudgetHandler, currentNode) case v1alpha1.NodePhaseFailed: logger.Debugf(currentNodeCtx, "Node Failed") return executors.NodeStatusFailed(errors.Errorf(errors.RuntimeExecutionError, currentNode.GetID(), "Node Failed.")), nil @@ -636,7 +636,7 @@ func (c *nodeExecutor) FinalizeHandler(ctx context.Context, execContext executor return err } - nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl) + nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl, nil) if err != nil { return err } @@ -689,7 +689,7 @@ func (c *nodeExecutor) AbortHandler(ctx context.Context, execContext executors.E return err } - nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl) + nCtx, err := c.newNodeExecContextDefault(ctx, currentNode.GetID(), execContext, nl, nil) if err != nil { return err } diff --git a/pkg/controller/nodes/handler/node_exec_context.go b/pkg/controller/nodes/handler/node_exec_context.go index 187d8bb85..6523f96eb 100644 --- a/pkg/controller/nodes/handler/node_exec_context.go +++ b/pkg/controller/nodes/handler/node_exec_context.go @@ -2,6 +2,7 @@ package handler import ( "context" + "time" "github.com/lyft/flyteidl/clients/go/events" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" @@ -38,6 +39,7 @@ type NodeExecutionMetadata interface { GetAnnotations() map[string]string GetK8sServiceAccount() string IsInterruptible() bool + GetMaxQueueTime() time.Duration } type NodeExecutionContext interface { diff --git a/pkg/controller/nodes/mocks/output_resolver.go b/pkg/controller/nodes/mocks/output_resolver.go index 061e87687..405cb4f74 100644 --- a/pkg/controller/nodes/mocks/output_resolver.go +++ b/pkg/controller/nodes/mocks/output_resolver.go @@ -6,6 +6,8 @@ import ( context "context" core "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + executors "github.com/lyft/flytepropeller/pkg/controller/executors" + mock "github.com/stretchr/testify/mock" v1alpha1 "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" @@ -24,8 +26,8 @@ func (_m OutputResolver_ExtractOutput) Return(values *core.Literal, err error) * return &OutputResolver_ExtractOutput{Call: _m.Call.Return(values, err)} } -func (_m *OutputResolver) OnExtractOutput(ctx context.Context, w v1alpha1.BaseWorkflowWithStatus, n v1alpha1.ExecutableNode, bindToVar string) *OutputResolver_ExtractOutput { - c := _m.On("ExtractOutput", ctx, w, n, bindToVar) +func (_m *OutputResolver) OnExtractOutput(ctx context.Context, nl executors.NodeLookup, n v1alpha1.ExecutableNode, bindToVar string) *OutputResolver_ExtractOutput { + c := _m.On("ExtractOutput", ctx, nl, n, bindToVar) return &OutputResolver_ExtractOutput{Call: c} } @@ -34,13 +36,13 @@ func (_m *OutputResolver) OnExtractOutputMatch(matchers ...interface{}) *OutputR return &OutputResolver_ExtractOutput{Call: c} } -// ExtractOutput provides a mock function with given fields: ctx, w, n, bindToVar -func (_m *OutputResolver) ExtractOutput(ctx context.Context, w v1alpha1.BaseWorkflowWithStatus, n v1alpha1.ExecutableNode, bindToVar string) (*core.Literal, error) { - ret := _m.Called(ctx, w, n, bindToVar) +// ExtractOutput provides a mock function with given fields: ctx, nl, n, bindToVar +func (_m *OutputResolver) ExtractOutput(ctx context.Context, nl executors.NodeLookup, n v1alpha1.ExecutableNode, bindToVar string) (*core.Literal, error) { + ret := _m.Called(ctx, nl, n, bindToVar) var r0 *core.Literal - if rf, ok := ret.Get(0).(func(context.Context, v1alpha1.BaseWorkflowWithStatus, v1alpha1.ExecutableNode, string) *core.Literal); ok { - r0 = rf(ctx, w, n, bindToVar) + if rf, ok := ret.Get(0).(func(context.Context, executors.NodeLookup, v1alpha1.ExecutableNode, string) *core.Literal); ok { + r0 = rf(ctx, nl, n, bindToVar) } else { if ret.Get(0) != nil { r0 = ret.Get(0).(*core.Literal) @@ -48,8 +50,8 @@ func (_m *OutputResolver) ExtractOutput(ctx context.Context, w v1alpha1.BaseWork } var r1 error - if rf, ok := ret.Get(1).(func(context.Context, v1alpha1.BaseWorkflowWithStatus, v1alpha1.ExecutableNode, string) error); ok { - r1 = rf(ctx, w, n, bindToVar) + if rf, ok := ret.Get(1).(func(context.Context, executors.NodeLookup, v1alpha1.ExecutableNode, string) error); ok { + r1 = rf(ctx, nl, n, bindToVar) } else { r1 = ret.Error(1) } diff --git a/pkg/controller/nodes/node_exec_context.go b/pkg/controller/nodes/node_exec_context.go index 225d712fb..9a5547ff9 100644 --- a/pkg/controller/nodes/node_exec_context.go +++ b/pkg/controller/nodes/node_exec_context.go @@ -4,15 +4,16 @@ import ( "context" "fmt" "strconv" + "time" "github.com/lyft/flyteidl/clients/go/events" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytestdlib/logger" "github.com/lyft/flytestdlib/storage" "k8s.io/apimachinery/pkg/types" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/ioutils" - "github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/lyft/flytepropeller/pkg/controller/executors" "github.com/lyft/flytepropeller/pkg/controller/nodes/handler" @@ -25,9 +26,12 @@ const NodeInterruptibleLabel = "interruptible" type nodeExecMetadata struct { v1alpha1.Meta - nodeExecID *core.NodeExecutionIdentifier - interrutptible bool - nodeLabels map[string]string + nodeExecID *core.NodeExecutionIdentifier + nodeLabels map[string]string + + // TODO ssingh merge these two parameters into QueuingParameters + interruptible bool + maxQueueTime time.Duration } func (e nodeExecMetadata) GetNodeExecutionID() *core.NodeExecutionIdentifier { @@ -43,7 +47,11 @@ func (e nodeExecMetadata) GetOwnerID() types.NamespacedName { } func (e nodeExecMetadata) IsInterruptible() bool { - return e.interrutptible + return e.interruptible +} + +func (e nodeExecMetadata) GetMaxQueueTime() time.Duration { + return e.maxQueueTime } func (e nodeExecMetadata) GetLabels() map[string]string { @@ -135,14 +143,31 @@ func (e nodeExecContext) MaxDatasetSizeBytes() int64 { return e.maxDatasetSizeBytes } -func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext executors.ExecutionContext, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, interruptible bool, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector) *nodeExecContext { +func newNodeExecContext(ctx context.Context, store *storage.DataStore, execContext executors.ExecutionContext, nl executors.NodeLookup, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error, rawOutputPrefix storage.DataReference, outputShardSelector ioutils.ShardSelector, queuingBudgetHandler executors.QueuingBudgetHandler) *nodeExecContext { + + // queuingBudgetHandler doesn't need to worry about current-attempt-# or time spent in queued state in previous attempts, + // as it only cares about overall time spent between node first qu and start of this attempt. + + isInterruptible := false + maxQueueTime := time.Duration(0) + if queuingBudgetHandler != nil { + param, err := queuingBudgetHandler.GetNodeQueuingParameters(ctx, node.GetID()) + if err != nil { + // TODO: return err + logger.Error(ctx, err) + } + isInterruptible = param.IsInterruptible + maxQueueTime = param.MaxQueueTime + } + md := nodeExecMetadata{ Meta: execContext, nodeExecID: &core.NodeExecutionIdentifier{ NodeId: node.GetID(), ExecutionId: execContext.GetExecutionID().WorkflowExecutionIdentifier, }, - interrutptible: interruptible, + interruptible: isInterruptible, + maxQueueTime: maxQueueTime, } // Copy the wf labels before adding node specific labels. @@ -154,7 +179,8 @@ func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext if tr != nil && tr.GetTaskID() != nil { nodeLabels[TaskNameLabel] = utils.SanitizeLabelValue(tr.GetTaskID().Name) } - nodeLabels[NodeInterruptibleLabel] = strconv.FormatBool(interruptible) + + nodeLabels[NodeInterruptibleLabel] = strconv.FormatBool(isInterruptible) md.nodeLabels = nodeLabels return &nodeExecContext{ @@ -175,7 +201,7 @@ func newNodeExecContext(_ context.Context, store *storage.DataStore, execContext } } -func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNodeID v1alpha1.NodeID, executionContext executors.ExecutionContext, nl executors.NodeLookup) (*nodeExecContext, error) { +func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNodeID v1alpha1.NodeID, executionContext executors.ExecutionContext, nl executors.NodeLookup, queuingBudgetHandler executors.QueuingBudgetHandler) (*nodeExecContext, error) { n, ok := nl.GetNode(currentNodeID) if !ok { return nil, fmt.Errorf("failed to find node with ID [%s] in execution [%s]", currentNodeID, executionContext.GetID()) @@ -198,19 +224,7 @@ func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNod return nil } - interrutible := executionContext.IsInterruptible() - if n.IsInterruptible() != nil { - interrutible = *n.IsInterruptible() - } - s := nl.GetNodeExecutionStatus(ctx, currentNodeID) - - // a node is not considered interruptible if the system failures have exceeded the configured threshold - if interrutible && s.GetSystemFailures() >= c.interruptibleFailureThreshold { - interrutible = false - c.metrics.InterruptedThresholdHit.Inc(ctx) - } - return newNodeExecContext(ctx, c.store, executionContext, nl, n, s, ioutils.NewCachedInputReader( ctx, @@ -224,7 +238,6 @@ func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNod ), ), ), - interrutible, c.maxDatasetSizeBytes, &taskEventRecorder{TaskEventRecorder: c.taskRecorder}, tr, @@ -234,5 +247,6 @@ func (c *nodeExecutor) newNodeExecContextDefault(ctx context.Context, currentNod // https://github.com/lyft/flyte/issues/211 c.defaultDataSandbox, c.shardSelector, + queuingBudgetHandler, ), nil } diff --git a/pkg/controller/nodes/subworkflow/subworkflow.go b/pkg/controller/nodes/subworkflow/subworkflow.go index 082425d91..32c7bff2e 100644 --- a/pkg/controller/nodes/subworkflow/subworkflow.go +++ b/pkg/controller/nodes/subworkflow/subworkflow.go @@ -48,13 +48,17 @@ func (s *subworkflowHandler) startAndHandleSubWorkflow(ctx context.Context, nCtx errorCode, _ := errors.GetErrorCode(startStatus.Err) return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(errorCode, startStatus.Err.Error(), nil)), nil } - return s.handleSubWorkflow(ctx, nCtx, subWorkflow, nl) + + // Get queuing budget + queuingBudgetHandler := executors.NewDefaultQueuingBudgetHandler(subWorkflow, nl, nil) + + return s.handleSubWorkflow(ctx, nCtx, subWorkflow, nl, queuingBudgetHandler) } // Calls the recursive node executor to handle the SubWorkflow and translates the results after the success -func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup) (handler.Transition, error) { +func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup, queuingBudgetHandler executors.QueuingBudgetHandler) (handler.Transition, error) { - state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), subworkflow, nl, subworkflow.StartNode()) + state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), subworkflow, nl, queuingBudgetHandler, subworkflow.StartNode()) if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err } @@ -116,7 +120,9 @@ func (s *subworkflowHandler) handleSubWorkflow(ctx context.Context, nCtx handler // https://github.com/lyft/flyte/issues/265 func (s *subworkflowHandler) HandleFailureNodeOfSubWorkflow(ctx context.Context, nCtx handler.NodeExecutionContext, subworkflow v1alpha1.ExecutableSubWorkflow, nl executors.NodeLookup) (handler.Transition, error) { if subworkflow.GetOnFailureNode() != nil { - state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), subworkflow, nl, subworkflow.GetOnFailureNode()) + + // TODO: pass queuingBudgetHandler when this method is invoked from caller. + state, err := s.nodeExecutor.RecursiveNodeHandler(ctx, nCtx.ExecutionContext(), subworkflow, nl, nil, subworkflow.GetOnFailureNode()) if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoUndefined), err } diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index c552a379a..28fcf9380 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -243,6 +243,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta } }() childCtx := context.WithValue(ctx, pluginContextKey, p.GetID()) + // this is where plugin is called and RQ is handled trns, err = p.Handle(childCtx, tCtx) return }() @@ -365,6 +366,12 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) // So now we will derive this from the plugin phase // TODO @kumare re-evaluate this decision + // STEP 0: bookkeeping step just to accept the request in taskhandler, so any queueing from here onwards is measured in the task + // handler + if ts.PluginPhase == pluginCore.PhaseUndefined { + + } + // STEP 1: Check Cache if ts.PluginPhase == pluginCore.PhaseUndefined && checkCatalog { // This is assumed to be first time. we will check catalog and call handle @@ -405,6 +412,8 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) // Lets check if this value in cache is less than or equal to one in the store if barrierTick <= ts.BarrierClockTick { var err error + + // thie is where the object is created the first time pluginTrns, err = t.invokePlugin(ctx, p, tCtx, ts) if err != nil { return handler.UnknownTransition, errors.Wrapf(errors.RuntimeExecutionError, nCtx.NodeID(), err, "failed during plugin execution") diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 998bbc42b..f999620e0 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -178,6 +178,7 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas cfg := nodeTaskConfig.GetConfig() backOffHandler := e.backOffController.GetOrCreateHandler(ctx, key, cfg.BackOffConfig.BaseSecond, cfg.BackOffConfig.MaxDuration.Duration) + // this is returning an error on RQ, which keep node in same queued state. We should instead return PhaseNotReady err = backOffHandler.Handle(ctx, func() error { return e.kubeClient.GetClient().Create(ctx, o) }, podRequestedResources) @@ -282,6 +283,7 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio return pluginsCore.UnknownTransition, errors.Wrapf(errors.CorruptedPluginState, err, "Failed to read unmarshal custom state") } if ps.Phase == PluginPhaseNotStarted { + // backoff on RQ t, err := e.LaunchResource(ctx, tCtx) if err == nil && t.Info().Phase() == pluginsCore.PhaseQueued { if err := tCtx.PluginStateWriter().Put(pluginStateVersion, &PluginState{Phase: PluginPhaseStarted}); err != nil { diff --git a/pkg/controller/workflow/executor.go b/pkg/controller/workflow/executor.go index 3c82d6863..95a447eba 100644 --- a/pkg/controller/workflow/executor.go +++ b/pkg/controller/workflow/executor.go @@ -119,7 +119,9 @@ func (c *workflowExecutor) handleRunningWorkflow(ctx context.Context, w *v1alpha if startNode == nil { return StatusFailed(errors.Errorf(errors.IllegalStateError, w.GetID(), "StartNode not found in running workflow?")), nil } - state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, w, w, w, startNode) + + queueingBudgetHandler := executors.NewDefaultQueuingBudgetHandler(w, w, w.QueuingBudgetSeconds) + state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, w, w, w, queueingBudgetHandler, startNode) if err != nil { return StatusRunning, err } @@ -146,9 +148,10 @@ func (c *workflowExecutor) handleFailingWorkflow(ctx context.Context, w *v1alpha logger.Errorf(ctx, "Failed to propagate Abort for workflow:%v. Error: %v", w.ExecutionID.WorkflowExecutionIdentifier, err) } + queueingBudgetHandler := executors.NewDefaultQueuingBudgetHandler(w, w, w.QueuingBudgetSeconds) errorNode := w.GetOnFailureNode() if errorNode != nil { - state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, w, w, w, errorNode) + state, err := c.nodeExecutor.RecursiveNodeHandler(ctx, w, w, w, queueingBudgetHandler, errorNode) if err != nil { return StatusFailing(nil), err }