Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v3.5.2-atlan-0.1 #1

Open
wants to merge 4 commits into
base: release-3.5.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions workflow/controller/container_set_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)

func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
node, err := woc.wf.GetNodeByName(nodeName)
if err != nil {
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, opts.nodeFlag)
Expand All @@ -21,7 +21,7 @@ func (woc *wfOperationCtx) executeContainerSet(ctx context.Context, nodeName str
includeScriptOutput: includeScriptOutput,
onExitPod: opts.onExitTemplate,
executionDeadline: opts.executionDeadline,
})
}, localParams)
if err != nil {
return woc.requeueIfTransientErr(err, node.Name)
}
Expand Down
29 changes: 14 additions & 15 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,9 +1057,9 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
retryOnFailed = false
retryOnError = true
case wfv1.RetryPolicyOnTransientError:
retryOnError = true
if (lastChildNode.Phase == wfv1.NodeFailed || lastChildNode.Phase == wfv1.NodeError) && errorsutil.IsTransientErr(errors.InternalError(lastChildNode.Message)) {
retryOnFailed = true
retryOnError = true
}
case wfv1.RetryPolicyOnFailure:
retryOnFailed = true
Expand Down Expand Up @@ -2082,7 +2082,6 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
woc.addChildNode(retryNodeName, nodeName)
node = nil

localParams := make(map[string]string)
// Change the `pod.name` variable to the new retry node name
if processedTmpl.IsPodType() {
localParams[common.LocalVarPodName] = woc.getPodName(nodeName, processedTmpl.Name)
Expand All @@ -2102,21 +2101,21 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,

switch processedTmpl.GetType() {
case wfv1.TemplateTypeContainer:
node, err = woc.executeContainer(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
node, err = woc.executeContainer(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts, localParams)
case wfv1.TemplateTypeContainerSet:
node, err = woc.executeContainerSet(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
node, err = woc.executeContainerSet(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts, localParams)
case wfv1.TemplateTypeSteps:
node, err = woc.executeSteps(ctx, nodeName, newTmplCtx, templateScope, processedTmpl, orgTmpl, opts)
case wfv1.TemplateTypeScript:
node, err = woc.executeScript(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
node, err = woc.executeScript(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts, localParams)
case wfv1.TemplateTypeResource:
node, err = woc.executeResource(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
node, err = woc.executeResource(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts, localParams)
case wfv1.TemplateTypeDAG:
node, err = woc.executeDAG(ctx, nodeName, newTmplCtx, templateScope, processedTmpl, orgTmpl, opts)
case wfv1.TemplateTypeSuspend:
node, err = woc.executeSuspend(nodeName, templateScope, processedTmpl, orgTmpl, opts)
case wfv1.TemplateTypeData:
node, err = woc.executeData(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts)
node, err = woc.executeData(ctx, nodeName, templateScope, processedTmpl, orgTmpl, opts, localParams)
case wfv1.TemplateTypeHTTP:
node = woc.executeHTTPTemplate(nodeName, templateScope, processedTmpl, orgTmpl, opts)
case wfv1.TemplateTypePlugin:
Expand Down Expand Up @@ -2722,7 +2721,7 @@ func (woc *wfOperationCtx) checkParallelism(tmpl *wfv1.Template, node *wfv1.Node
return nil
}

func (woc *wfOperationCtx) executeContainer(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
func (woc *wfOperationCtx) executeContainer(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
node, err := woc.wf.GetNodeByName(nodeName)
if err != nil {
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, opts.nodeFlag)
Expand All @@ -2740,7 +2739,7 @@ func (woc *wfOperationCtx) executeContainer(ctx context.Context, nodeName string
includeScriptOutput: includeScriptOutput,
onExitPod: opts.onExitTemplate,
executionDeadline: opts.executionDeadline,
})
}, localParams)

if err != nil {
return woc.requeueIfTransientErr(err, node.Name)
Expand Down Expand Up @@ -2926,7 +2925,7 @@ loop:
return nodeName
}

func (woc *wfOperationCtx) executeScript(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
func (woc *wfOperationCtx) executeScript(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
node, err := woc.wf.GetNodeByName(nodeName)
if err != nil {
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, opts.nodeFlag)
Expand All @@ -2951,7 +2950,7 @@ func (woc *wfOperationCtx) executeScript(ctx context.Context, nodeName string, t
includeScriptOutput: includeScriptOutput,
onExitPod: opts.onExitTemplate,
executionDeadline: opts.executionDeadline,
})
}, localParams)
if err != nil {
return woc.requeueIfTransientErr(err, node.Name)
}
Expand Down Expand Up @@ -3197,7 +3196,7 @@ func (woc *wfOperationCtx) addChildNode(parent string, child string) {
}

// executeResource is runs a kubectl command against a manifest
func (woc *wfOperationCtx) executeResource(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
func (woc *wfOperationCtx) executeResource(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
node, err := woc.wf.GetNodeByName(nodeName)

if err != nil {
Expand Down Expand Up @@ -3226,15 +3225,15 @@ func (woc *wfOperationCtx) executeResource(ctx context.Context, nodeName string,

mainCtr := woc.newExecContainer(common.MainContainerName, tmpl)
mainCtr.Command = []string{"argoexec", "resource", tmpl.Resource.Action}
_, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline})
_, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline}, localParams)
if err != nil {
return woc.requeueIfTransientErr(err, node.Name)
}

return node, err
}

func (woc *wfOperationCtx) executeData(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts) (*wfv1.NodeStatus, error) {
func (woc *wfOperationCtx) executeData(ctx context.Context, nodeName string, templateScope string, tmpl *wfv1.Template, orgTmpl wfv1.TemplateReferenceHolder, opts *executeTemplateOpts, localParams map[string]string) (*wfv1.NodeStatus, error) {
node, err := woc.wf.GetNodeByName(nodeName)
if err != nil {
node = woc.initializeExecutableNode(nodeName, wfv1.NodeTypePod, templateScope, tmpl, orgTmpl, opts.boundaryID, wfv1.NodePending, opts.nodeFlag)
Expand All @@ -3249,7 +3248,7 @@ func (woc *wfOperationCtx) executeData(ctx context.Context, nodeName string, tem

mainCtr := woc.newExecContainer(common.MainContainerName, tmpl)
mainCtr.Command = []string{"argoexec", "data", string(dataTemplate)}
_, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline, includeScriptOutput: true})
_, err = woc.createWorkflowPod(ctx, nodeName, []apiv1.Container{*mainCtr}, tmpl, &createWorkflowPodOpts{onExitPod: opts.onExitTemplate, executionDeadline: opts.executionDeadline, includeScriptOutput: true}, localParams)
if err != nil {
return woc.requeueIfTransientErr(err, node.Name)
}
Expand Down
33 changes: 14 additions & 19 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ var (
}
)

func (woc *wfOperationCtx) hasPodSpecPatch(tmpl *wfv1.Template) bool {
return woc.execWf.Spec.HasPodSpecPatch() || tmpl.HasPodSpecPatch()
}

// scheduleOnDifferentHost adds affinity to prevent retry on the same host when
// retryStrategy.affinity.nodeAntiAffinity{} is specified
func (woc *wfOperationCtx) scheduleOnDifferentHost(node *wfv1.NodeStatus, pod *apiv1.Pod) error {
Expand Down Expand Up @@ -77,7 +73,7 @@ type createWorkflowPodOpts struct {
executionDeadline time.Time
}

func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName string, mainCtrs []apiv1.Container, tmpl *wfv1.Template, opts *createWorkflowPodOpts) (*apiv1.Pod, error) {
func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName string, mainCtrs []apiv1.Container, tmpl *wfv1.Template, opts *createWorkflowPodOpts, localParams map[string]string) (*apiv1.Pod, error) {
nodeID := woc.wf.NodeID(nodeName)

// we must check to see if the pod exists rather than just optimistically creating the pod and see if we get
Expand Down Expand Up @@ -347,24 +343,23 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
}
}

// Apply the patch string from template
if woc.hasPodSpecPatch(tmpl) {
tmpl.PodSpecPatch, err = util.PodSpecPatchMerge(woc.wf, tmpl)
if err != nil {
return nil, errors.Wrap(err, "", "Failed to merge the workflow PodSpecPatch with the template PodSpecPatch due to invalid format")
}

// Apply the patch string from workflow and template
var podSpecPatchs []string
if woc.execWf.Spec.HasPodSpecPatch() {
// Final substitution for workflow level PodSpecPatch
localParams := make(map[string]string)
if tmpl.IsPodType() {
localParams[common.LocalVarPodName] = pod.Name
}
tmpl, err := common.ProcessArgs(tmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer())
newTmpl := tmpl.DeepCopy()
newTmpl.PodSpecPatch = woc.execWf.Spec.PodSpecPatch
processedTmpl, err := common.ProcessArgs(newTmpl, &wfv1.Arguments{}, woc.globalParams, localParams, false, woc.wf.Namespace, woc.controller.configMapInformer.GetIndexer())
if err != nil {
return nil, errors.Wrap(err, "", "Failed to substitute the PodSpecPatch variables")
}

patchedPodSpec, err := util.ApplyPodSpecPatch(pod.Spec, tmpl.PodSpecPatch)
podSpecPatchs = append(podSpecPatchs, processedTmpl.PodSpecPatch)
}
if tmpl.HasPodSpecPatch() {
podSpecPatchs = append(podSpecPatchs, tmpl.PodSpecPatch)
}
if len(podSpecPatchs) > 0 {
patchedPodSpec, err := util.ApplyPodSpecPatch(pod.Spec, podSpecPatchs...)
if err != nil {
return nil, errors.Wrap(err, "", "Error applying PodSpecPatch")
}
Expand Down
Loading
Loading