Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/executor/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,5 @@ kubernetes:
reasonRegexp: ".*"
gracePeriod: 5m
action: Retry
resourcesToSanitize:
- "pods"
2 changes: 2 additions & 0 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ scheduling:
resolution: "1"
- name: nvidia.com/gpu
resolution: "1"
- name: pods
resolution: "1"
disableScheduling: false
enableAssertions: false
enablePreferLargeJobOrdering: false
Expand Down
1 change: 1 addition & 0 deletions config/server/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ submission:
cpu: "1"
memory: "1Gi"
ephemeral-storage: "8Gi"
pods: "1"
defaultJobTolerations:
- key: "armadaproject.io/armada"
operator: "Equal"
Expand Down
1 change: 1 addition & 0 deletions internal/executor/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func setupExecutorApiComponents(
config.Kubernetes.PodDefaults,
config.Application.SubmitConcurrencyLimit,
config.Kubernetes.FatalPodSubmissionErrors,
config.Kubernetes.ResourcesToSanitize,
)

leaseRequester := service.NewJobLeaseRequester(executorApiClient, clusterContext)
Expand Down
1 change: 1 addition & 0 deletions internal/executor/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type KubernetesConfiguration struct {
FailedPodChecks podchecks.FailedChecks
PendingPodChecks *podchecks.Checks
FatalPodSubmissionErrors []string
ResourcesToSanitize []string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add docs for the new field?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our docs, we often link to config in https://pkg.go.dev, i.e. for the executor it will be https://pkg.go.dev/github.com/armadaproject/armada/internal/executor/configuration#ApplicationConfiguration, it is easier for the end-users if they can see the config field docs there.

// Minimum amount of resources marked as allocated to non-Armada pods on each node.
// I.e., if the total resources allocated to non-Armada pods on some node drops below this value,
// the executor adds a fictional allocation to make up the difference, such that the total is at least this.
Expand Down
29 changes: 29 additions & 0 deletions internal/executor/job/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,22 @@ type SubmitService struct {
podDefaults *configuration.PodDefaults
submissionThreadCount int
fatalPodSubmissionErrors []string
resourcesToSanitize []string
}

func NewSubmitter(
clusterContext context.ClusterContext,
podDefaults *configuration.PodDefaults,
submissionThreadCount int,
fatalPodSubmissionErrors []string,
resourcesToSanitize []string,
) *SubmitService {
return &SubmitService{
clusterContext: clusterContext,
podDefaults: podDefaults,
submissionThreadCount: submissionThreadCount,
fatalPodSubmissionErrors: fatalPodSubmissionErrors,
resourcesToSanitize: resourcesToSanitize,
}
}

Expand Down Expand Up @@ -115,6 +118,9 @@ func (submitService *SubmitService) submitPod(job *SubmitJob) (*v1.Pod, error) {
// Ensure the K8SService and K8SIngress fields are populated
submitService.applyExecutorSpecificIngressDetails(job)

// Remove any resources used for scheduling that are invalid during submission
submitService.sanitizePodResources(pod)

if len(job.Ingresses) > 0 || len(job.Services) > 0 {
pod.Annotations = util.MergeMaps(pod.Annotations, map[string]string{
domain.HasIngress: "true",
Expand Down Expand Up @@ -147,6 +153,29 @@ func (submitService *SubmitService) submitPod(job *SubmitJob) (*v1.Pod, error) {
return pod, err
}

// Sanitizes pod resources that may be used during scheduling but are invalid at runtime.
func (submitService *SubmitService) sanitizePodResources(pod *v1.Pod) {
Copy link
Member

@dejanzele dejanzele Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this function can be simplified to a simple sanitizePodResources function, no need for it to be a receiver method on SubmitService, same for sanitizeResourceList

for _, ic := range pod.Spec.InitContainers {
submitService.sanitizeResourceList(ic.Resources.Requests)
submitService.sanitizeResourceList(ic.Resources.Limits)
}

for _, c := range pod.Spec.Containers {
submitService.sanitizeResourceList(c.Resources.Requests)
submitService.sanitizeResourceList(c.Resources.Limits)
}
}

func (submitService *SubmitService) sanitizeResourceList(resourceList v1.ResourceList) {
for requestResource := range resourceList {
for _, resource := range submitService.resourcesToSanitize {
if requestResource.String() == resource {
delete(resourceList, requestResource)
}
}
}
}

// applyExecutorSpecificIngressDetails populates the executor specific details on ingresses
// These objects are mostly created server side however there will be details that are not known until submit time
// So the executor must fill them in before it creates the objects in kubernetes
Expand Down
83 changes: 78 additions & 5 deletions internal/executor/job/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package job
import (
"testing"

"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -17,6 +21,7 @@ const (
AdmissionWebhookRegex = "admission webhook"
NamespaceNotFoundRegex = "namespaces \".*\" not found"
HelloRegex = "hello, [a-z]+!"
podsResource = "pods"
)

var testAppConfig = configuration.ApplicationConfiguration{ClusterId: "test", Pool: "pool"}
Expand All @@ -27,23 +32,23 @@ func TestIsRecoverable_ArbitraryErrorIsNotRecoverable(t *testing.T) {
AdmissionWebhookRegex,
HelloRegex,
NamespaceNotFoundRegex,
})
}, []string{})

recoverable := submitter.isRecoverable(newArbitraryError("some error"))
assert.False(t, recoverable)
}

func TestIsRecoverable_KubernetesStatusInvalidIsUnrecoverable(t *testing.T) {
clusterContext := context.NewFakeClusterContext(testAppConfig, "kubernetes.io/hostname", []*context.NodeSpec{})
submitter := NewSubmitter(clusterContext, &configuration.PodDefaults{}, 1, []string{})
submitter := NewSubmitter(clusterContext, &configuration.PodDefaults{}, 1, []string{}, []string{})

recoverable := submitter.isRecoverable(newK8sApiError("", metav1.StatusReasonInvalid))
assert.False(t, recoverable)
}

func TestIsRecoverable_KubernetesStatusForbiddenIsUnrecoverable(t *testing.T) {
clusterContext := context.NewFakeClusterContext(testAppConfig, "kubernetes.io/hostname", []*context.NodeSpec{})
submitter := NewSubmitter(clusterContext, &configuration.PodDefaults{}, 1, []string{})
submitter := NewSubmitter(clusterContext, &configuration.PodDefaults{}, 1, []string{}, []string{})

recoverable := submitter.isRecoverable(newK8sApiError("", metav1.StatusReasonForbidden))
assert.False(t, recoverable)
Expand All @@ -55,7 +60,7 @@ func TestIsRecoverable_K8sApiMatchingRegexIsUnrecoverable(t *testing.T) {
AdmissionWebhookRegex,
HelloRegex,
NamespaceNotFoundRegex,
})
}, []string{})

recoverable := submitter.isRecoverable(newK8sApiError("admission webhook failure: some webhook failed validation", "other status"))
assert.False(t, recoverable)
Expand All @@ -72,12 +77,80 @@ func TestIsRecoverable_K8sApiMatchingRegexIsUnrecoverable(t *testing.T) {

func TestIsRecoverable_ArmadaErrCreateResourceIsRecoverable(t *testing.T) {
clusterContext := context.NewFakeClusterContext(testAppConfig, "kubernetes.io/hostname", []*context.NodeSpec{})
submitter := NewSubmitter(clusterContext, &configuration.PodDefaults{}, 1, []string{})
submitter := NewSubmitter(clusterContext, &configuration.PodDefaults{}, 1, []string{}, []string{})

recoverable := submitter.isRecoverable(newArmadaErrCreateResource())
assert.True(t, recoverable)
}

func TestSanitizePodResources(t *testing.T) {
clusterContext := context.NewFakeClusterContext(testAppConfig, "kubernetes.io/hostname", []*context.NodeSpec{})
submitter := NewSubmitter(clusterContext, &configuration.PodDefaults{}, 1, []string{}, []string{podsResource})

initContainerResources := v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("100Mi"),
v1.ResourceEphemeralStorage: resource.MustParse("100Mi"),
podsResource: resource.MustParse("1"),
},
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("100Mi"),
v1.ResourceEphemeralStorage: resource.MustParse("100Mi"),
podsResource: resource.MustParse("1"),
},
}

containerResources := v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("100Mi"),
v1.ResourceEphemeralStorage: resource.MustParse("100Mi"),
podsResource: resource.MustParse("1"),
},
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("100Mi"),
v1.ResourceEphemeralStorage: resource.MustParse("100Mi"),
podsResource: resource.MustParse("1"),
},
}

pod := &v1.Pod{
Spec: v1.PodSpec{
InitContainers: []v1.Container{
{
Resources: initContainerResources,
},
},
Containers: []v1.Container{
{
Resources: containerResources,
},
},
},
}

expectedResources := v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("100Mi"),
v1.ResourceEphemeralStorage: resource.MustParse("100Mi"),
},
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("100m"),
v1.ResourceMemory: resource.MustParse("100Mi"),
v1.ResourceEphemeralStorage: resource.MustParse("100Mi"),
},
}

submitter.sanitizePodResources(pod)

require.Equal(t, expectedResources, pod.Spec.InitContainers[0].Resources)
require.Equal(t, expectedResources, pod.Spec.Containers[0].Resources)
}

func newK8sApiError(message string, reason metav1.StatusReason) *k8s_errors.StatusError {
return &k8s_errors.StatusError{
ErrStatus: metav1.Status{
Expand Down
Loading