Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v3.5.2-atlan-0.7 #8

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/cluster_workflow_template_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ func (cwftmpl *ClusterWorkflowTemplate) GetResourceScope() ResourceScope {
return ResourceScopeCluster
}

// GetPodMetadata returns the PodMetadata of cluster workflow template.
func (cwftmpl *ClusterWorkflowTemplate) GetPodMetadata() *Metadata {
return cwftmpl.Spec.PodMetadata
}

// GetWorkflowSpec returns the WorkflowSpec of cluster workflow template.
func (cwftmpl *ClusterWorkflowTemplate) GetWorkflowSpec() *WorkflowSpec {
return &cwftmpl.Spec
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/workflow/v1alpha1/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type TemplateHolder interface {
GroupVersionKind() schema.GroupVersionKind
GetTemplateByName(name string) *Template
GetResourceScope() ResourceScope
GetPodMetadata() *Metadata
}

// WorkflowSpecHolder is an object that holds a WorkflowSpec; e.g., WorkflowTemplate, and ClusterWorkflowTemplate
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_template_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func (wftmpl *WorkflowTemplate) GetResourceScope() ResourceScope {
return ResourceScopeNamespaced
}

// GetPodMetadata returns the PodMetadata of workflow template.
func (wftmpl *WorkflowTemplate) GetPodMetadata() *Metadata {
return wftmpl.Spec.PodMetadata
}

// GetWorkflowSpec returns the WorkflowSpec of workflow template.
func (wftmpl *WorkflowTemplate) GetWorkflowSpec() *WorkflowSpec {
return &wftmpl.Spec
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3353,6 +3353,11 @@ func (wf *Workflow) GetResourceScope() ResourceScope {
return ResourceScopeLocal
}

// GetPodMetadata returns the PodMetadata of a workflow.
func (wf *Workflow) GetPodMetadata() *Metadata {
return wf.Spec.PodMetadata
}

// GetWorkflowSpec returns the Spec of a workflow.
func (wf *Workflow) GetWorkflowSpec() WorkflowSpec {
return wf.Spec
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro
}

tmpl := &wfv1.Template{}
addSchedulingConstraints(pod, woc.execWf.Spec.DeepCopy(), tmpl)
woc.addSchedulingConstraints(pod, woc.execWf.Spec.DeepCopy(), tmpl, "")
woc.addMetadata(pod, tmpl)

if woc.controller.Config.InstanceID != "" {
Expand Down
4 changes: 2 additions & 2 deletions workflow/controller/container_set_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
node, err := woc.wf.GetNodeByName(nodeName)
if err != nil {
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, opts.nodeFlag)
Expand All @@ -21,7 +21,7 @@ func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName str
includeScriptOutput: includeScriptOutput,
onExitPod: opts.onExitTemplate,
executionDeadline: opts.executionDeadline,
})
}, localParams)
if err != nil {
return woc.requeueIfTransientErr(err, node.Name)
}
Expand Down
51 changes: 37 additions & 14 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,13 +632,23 @@ func (wfc *WorkflowController) deleteOffloadedNodesForWorkflow(uid string, versi
if !ok {
return fmt.Errorf("object %+v is not an unstructured", workflows[0])
}
key := un.GetNamespace() + "/" + un.GetName()
wfc.workflowKeyLock.Lock(key)
defer wfc.workflowKeyLock.Unlock(key)

obj, ok := wfc.getWorkflowByKey(key)
if !ok {
return fmt.Errorf("failed to get workflow by key after locking")
}
un, ok = obj.(*unstructured.Unstructured)
if !ok {
return fmt.Errorf("object %+v is not an unstructured", obj)
}
wf, err = util.FromUnstructured(un)
if err != nil {
return err
}
key := wf.ObjectMeta.Namespace + "/" + wf.ObjectMeta.Name
wfc.workflowKeyLock.Lock(key)
defer wfc.workflowKeyLock.Unlock(key)

// workflow might still be hydrated
if wfc.hydrator.IsHydrated(wf) {
log.WithField("uid", wf.UID).Info("Hydrated workflow encountered")
Expand Down Expand Up @@ -712,20 +722,14 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
}
defer wfc.wfQueue.Done(key)

obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key.(string))
if err != nil {
log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer")
return true
}
if !exists {
// This happens after a workflow was labeled with completed=true
// or was deleted, but the work queue still had an entry for it.
return true
}

wfc.workflowKeyLock.Lock(key.(string))
defer wfc.workflowKeyLock.Unlock(key.(string))

obj, ok := wfc.getWorkflowByKey(key.(string))
if !ok {
return true
}

// The workflow informer receives unstructured objects to deal with the possibility of invalid
// workflow manifests that are unable to unmarshal to workflow objects
un, ok := obj.(*unstructured.Unstructured)
Expand Down Expand Up @@ -794,6 +798,20 @@ func (wfc *WorkflowController) processNextItem(ctx context.Context) bool {
return true
}

func (wfc *WorkflowController) getWorkflowByKey(key string) (interface{}, bool) {
obj, exists, err := wfc.wfInformer.GetIndexer().GetByKey(key)
if err != nil {
log.WithFields(log.Fields{"key": key, "error": err}).Error("Failed to get workflow from informer")
return nil, false
}
if !exists {
// This happens after a workflow was labeled with completed=true
// or was deleted, but the work queue still had an entry for it.
return nil, false
}
return obj, true
}

func reconciliationNeeded(wf metav1.Object) bool {
return wf.GetLabels()[common.LabelKeyCompleted] != "true" || slices.Contains(wf.GetFinalizers(), common.FinalizerArtifactGC)
}
Expand Down Expand Up @@ -929,6 +947,11 @@ func (wfc *WorkflowController) archiveWorkflow(ctx context.Context, obj interfac
}
wfc.workflowKeyLock.Lock(key)
defer wfc.workflowKeyLock.Unlock(key)
key, err = cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Error("failed to get key for object after locking")
return
}
err = wfc.archiveWorkflowAux(ctx, obj)
if err != nil {
log.WithField("key", key).WithError(err).Error("failed to archive workflow")
Expand Down
88 changes: 42 additions & 46 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,9 +1057,9 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
retryOnFailed = false
retryOnError = true
case wfv1.RetryPolicyOnTransientError:
retryOnError = true
if (lastChildNode.Phase == wfv1.NodeFailed || lastChildNode.Phase == wfv1.NodeError) && errorsutil.IsTransientErr(errors.InternalError(lastChildNode.Message)) {
retryOnFailed = true
retryOnError = true
}
case wfv1.RetryPolicyOnFailure:
retryOnFailed = true
Expand Down Expand Up @@ -1432,8 +1432,8 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
}
}

// if we are transitioning from Pending to a different state, clear out unchanged message
if old.Phase == wfv1.NodePending && new.Phase != wfv1.NodePending && old.Message == new.Message {
// if we are transitioning from Pending to a different state (except Fail), clear out unchanged message
if old.Phase == wfv1.NodePending && new.Phase != wfv1.NodePending && new.Phase != wfv1.NodeFailed && old.Message == new.Message {
new.Message = ""
}

Expand Down Expand Up @@ -1777,6 +1777,10 @@ func getRetryNodeChildrenIds(node *wfv1.NodeStatus, nodes wfv1.Nodes) []string {

func buildRetryStrategyLocalScope(node *wfv1.NodeStatus, nodes wfv1.Nodes) map[string]interface{} {
localScope := make(map[string]interface{})
localScope[common.LocalVarRetriesLastExitCode] = "0"
localScope[common.LocalVarRetriesLastStatus] = ""
localScope[common.LocalVarRetriesLastDuration] = "0"
localScope[common.LocalVarRetriesLastMessage] = ""

// `retries` variable
childNodeIds, lastChildNode := getChildNodeIdsAndLastRetriedNode(node, nodes)
Expand Down Expand Up @@ -2071,24 +2075,28 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
localScope, realTimeScope := woc.prepareMetricScope(lastChildNode)
woc.computeMetrics(processedTmpl.Metrics.Prometheus, localScope, realTimeScope, false)
}
localScope := buildRetryStrategyLocalScope(retryParentNode, woc.wf.Status.Nodes)
for key, value := range localScope {
strKey := fmt.Sprintf("%v", key)
strValue := fmt.Sprintf("%v", value)
localParams[strKey] = strValue
}
retryNum := len(childNodeIDs)
localParams[common.LocalVarRetries] = strconv.Itoa(retryNum)
if lastChildNode != nil && !lastChildNode.Fulfilled() {
// Last child node is still running.
nodeName = lastChildNode.Name
node = lastChildNode
} else {
retryNum := len(childNodeIDs)
// Create a new child node and append it to the retry node.
nodeName = fmt.Sprintf("%s(%d)", retryNodeName, retryNum)
woc.addChildNode(retryNodeName, nodeName)
node = nil

localParams := make(map[string]string)
// Change the `pod.name` variable to the new retry node name
if processedTmpl.IsPodType() {
localParams[common.LocalVarPodName] = woc.getPodName(nodeName, processedTmpl.Name)
}
// Inject the retryAttempt number
localParams[common.LocalVarRetries] = strconv.Itoa(retryNum)

processedTmpl, err = common.SubstituteParams(processedTmpl, map[string]string{}, localParams)
if errorsutil.IsTransientErr(err) {
Expand All @@ -2102,21 +2110,21 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,

switch processedTmpl.GetType() {
case wfv1.TemplateTypeContainer:
node, err = woc.executeContainer(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
node, err = woc.executeContainer(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts, localParams)
case wfv1.TemplateTypeContainerSet:
node, err = woc.executeContainerSet(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
node, err = woc.executeContainerSet(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts, localParams)
case wfv1.TemplateTypeSteps:
node, err = woc.executeSteps(ctx, nodeName, newTmplCtx, templateScope, processedTmpl, orgTmpl, opts)
case wfv1.TemplateTypeScript:
node, err = woc.executeScript(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
node, err = woc.executeScript(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts, localParams)
case wfv1.TemplateTypeResource:
node, err = woc.executeResource(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
node, err = woc.executeResource(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts, localParams)
case wfv1.TemplateTypeDAG:
node, err = woc.executeDAG(ctx, nodeName, newTmplCtx, templateScope, processedTmpl, orgTmpl, opts)
case wfv1.TemplateTypeSuspend:
node, err = woc.executeSuspend(nodeName, templateScope, processedTmpl, orgTmpl, opts)
case wfv1.TemplateTypeData:
node, err = woc.executeData(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
node, err = woc.executeData(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts, localParams)
case wfv1.TemplateTypeHTTP:
node = woc.executeHTTPTemplate(nodeName, templateScope, processedTmpl, orgTmpl, opts)
case wfv1.TemplateTypePlugin:
Expand Down Expand Up @@ -2690,15 +2698,11 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node
// if we are about to execute a pod, make sure our parent hasn't reached it's limit
if boundaryID != "" && (node == nil || (node.Phase != wfv1.NodePending && node.Phase != wfv1.NodeRunning)) {
boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID)
if err != nil {
woc.log.Errorf("was unable to obtain node for %s", boundaryID)
return errors.InternalError("boundaryNode not found")
}
tmplCtx, err := woc.createTemplateContext(boundaryNode.GetTemplateScope())
if err != nil {
return err
}
_, boundaryTemplate, templateStored, err := tmplCtx.ResolveTemplate(boundaryNode)

boundaryTemplate, templateStored, err := woc.GetTemplateByBoundaryID(boundaryID)
if err != nil {
return err
}
Expand All @@ -2722,7 +2726,7 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node
return nil
}

func (woc *wfOperationCtx) executeContainer(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
func (woc *wfOperationCtx) executeContainer(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
node, err := woc.wf.GetNodeByName(nodeName)
if err != nil {
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, opts.nodeFlag)
Expand All @@ -2740,7 +2744,7 @@ func (woc *wfOperationCtx) executeContainer(ctx context.Context, nodeName string
includeScriptOutput: includeScriptOutput,
onExitPod: opts.onExitTemplate,
executionDeadline: opts.executionDeadline,
})
}, localParams)

if err != nil {
return woc.requeueIfTransientErr(err, node.Name)
Expand Down Expand Up @@ -2926,7 +2930,7 @@ loop:
return nodeName
}

func (woc *wfOperationCtx) executeScript(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
func (woc *wfOperationCtx) executeScript(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
node, err := woc.wf.GetNodeByName(nodeName)
if err != nil {
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, opts.nodeFlag)
Expand All @@ -2951,7 +2955,7 @@ func (woc *wfOperationCtx) executeScript(ctx context.Context, nodeName string, t
includeScriptOutput: includeScriptOutput,
onExitPod: opts.onExitTemplate,
executionDeadline: opts.executionDeadline,
})
}, localParams)
if err != nil {
return woc.requeueIfTransientErr(err, node.Name)
}
Expand Down Expand Up @@ -3197,7 +3201,7 @@ func (woc *wfOperationCtx) addChildNode(parent string, child string) {
}

// executeResource is runs a kubectl command against a manifest
func (woc *wfOperationCtx) executeResource(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
func (woc *wfOperationCtx) executeResource(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
node, err := woc.wf.GetNodeByName(nodeName)

if err != nil {
Expand Down Expand Up @@ -3226,15 +3230,15 @@ func (woc *wfOperationCtx) executeResource(ctx context.Context, nodeName string,

mainCtr := woc.newExecContainer(common.MainContainerName, tmpl)
mainCtr.Command = []string{"argoexec", "resource", tmpl.Resource.Action}
_, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline})
_, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline}, localParams)
if err != nil {
return woc.requeueIfTransientErr(err, node.Name)
}

return node, err
}

func (woc *wfOperationCtx) executeData(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
func (woc *wfOperationCtx) executeData(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
node, err := woc.wf.GetNodeByName(nodeName)
if err != nil {
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, opts.nodeFlag)
Expand All @@ -3249,7 +3253,7 @@ func (woc *wfOperationCtx) executeData(ctx context.Context, nodeName string, tem

mainCtr := woc.newExecContainer(common.MainContainerName, tmpl)
mainCtr.Command = []string{"argoexec", "data", string(dataTemplate)}
_, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline, includeScriptOutput: true})
_, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline, includeScriptOutput: true}, localParams)
if err != nil {
return woc.requeueIfTransientErr(err, node.Name)
}
Expand Down Expand Up @@ -3719,29 +3723,21 @@ func (woc *wfOperationCtx) deletePDBResource(ctx context.Context) error {
// Check if the output of this node is referenced elsewhere in the Workflow. If so, make sure to include it during
// execution.
func (woc *wfOperationCtx) includeScriptOutput(nodeName, boundaryID string) (bool, error) {
if boundaryID != "" {
if boundaryNode, err := woc.wf.Status.Nodes.Get(boundaryID); err == nil {
tmplCtx, err := woc.createTemplateContext(boundaryNode.GetTemplateScope())
if err != nil {
return false, err
}
_, parentTemplate, templateStored, err := tmplCtx.ResolveTemplate(boundaryNode)
if err != nil {
return false, err
}
// A new template was stored during resolution, persist it
if templateStored {
woc.updated = true
}
if boundaryID == "" {
return false, nil
}

name := getStepOrDAGTaskName(nodeName)
return hasOutputResultRef(name, parentTemplate), nil
} else {
woc.log.Errorf("was unable to obtain node for %s", boundaryID)
}
parentTemplate, templateStored, err := woc.GetTemplateByBoundaryID(boundaryID)
if err != nil {
return false, err
}
// A new template was stored during resolution, persist it
if templateStored {
woc.updated = true
}

return false, nil
name := getStepOrDAGTaskName(nodeName)
return hasOutputResultRef(name, parentTemplate), nil
}

func (woc *wfOperationCtx) fetchWorkflowSpec() (wfv1.WorkflowSpecHolder, error) {
Expand Down
Loading
Loading