Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit f4cadb0

Browse files
authored
Fast fail if task resource requests exceed k8s resource limits (#488)
* checking if task resource requests exceed k8s limits Signed-off-by: Daniel Rammer <[email protected]> * added better message to task failure Signed-off-by: Daniel Rammer <[email protected]> * added request checks Signed-off-by: Daniel Rammer <[email protected]> * added tests for checking resource eligibility Signed-off-by: Daniel Rammer <[email protected]> * fixed lint issues Signed-off-by: Daniel Rammer <[email protected]> * updated comment Signed-off-by: Daniel Rammer <[email protected]> --------- Signed-off-by: Daniel Rammer <[email protected]>
1 parent 5b50d88 commit f4cadb0

File tree

3 files changed

+153
-29
lines changed

3 files changed

+153
-29
lines changed

pkg/controller/nodes/task/backoff/handler.go

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99

1010
"github.com/flyteorg/flyteplugins/go/tasks/errors"
1111
stdAtomic "github.com/flyteorg/flytestdlib/atomic"
12-
stdErrors "github.com/flyteorg/flytestdlib/errors"
1312
"github.com/flyteorg/flytestdlib/logger"
1413
v1 "k8s.io/api/core/v1"
1514
apiErrors "k8s.io/apimachinery/pkg/api/errors"
@@ -18,7 +17,10 @@ import (
1817
)
1918

2019
var (
21-
reqRegexp = regexp.MustCompile(`requested: (limits.[a-zA-Z]+=[a-zA-Z0-9]+[,]*)+`)
20+
limitedLimitsRegexp = regexp.MustCompile(`limited: (limits.[a-zA-Z]+=[a-zA-Z0-9]+[,]*)+`)
21+
limitedRequestsRegexp = regexp.MustCompile(`limited: (requests.[a-zA-Z]+=[a-zA-Z0-9]+[,]*)+`)
22+
requestedLimitsRegexp = regexp.MustCompile(`requested: (limits.[a-zA-Z]+=[a-zA-Z0-9]+[,]*)+`)
23+
requestedRequestsRegexp = regexp.MustCompile(`requested: (requests.[a-zA-Z]+=[a-zA-Z0-9]+[,]*)+`)
2224
)
2325

2426
// SimpleBackOffBlocker is a simple exponential back-off timer that keeps track of the back-off period
@@ -171,7 +173,7 @@ func (h *ComputeResourceAwareBackOffHandler) Handle(ctx context.Context, operati
171173
// It is necessary to parse the error message to get the actual constraints
172174
// in this case, if the error message indicates constraints on memory only, then we shouldn't be used to lower the CPU ceiling
173175
// even if CPU appears in requestedResourceList
174-
newCeiling := GetComputeResourceAndQuantityRequested(err)
176+
newCeiling := GetComputeResourceAndQuantity(err, requestedLimitsRegexp)
175177
h.ComputeResourceCeilings.updateAll(&newCeiling)
176178
}
177179

@@ -196,18 +198,18 @@ func IsBackOffError(err error) bool {
196198
return IsResourceQuotaExceeded(err) || apiErrors.IsTooManyRequests(err) || apiErrors.IsServerTimeout(err)
197199
}
198200

199-
func GetComputeResourceAndQuantityRequested(err error) v1.ResourceList {
201+
func GetComputeResourceAndQuantity(err error, resourceRegex *regexp.Regexp) v1.ResourceList {
200202
// Playground: https://play.golang.org/p/oOr6CMmW7IE
201203

202204
// Sample message:
203205
// "requested: limits.cpu=7,limits.memory=64Gi, used: limits.cpu=249,limits.memory=2012730Mi, limited: limits.cpu=250,limits.memory=2000Gi"
204206

205207
// Extracting "requested: limits.cpu=7,limits.memory=64Gi"
206-
matches := reqRegexp.FindAllStringSubmatch(err.Error(), -1)
207-
requestedComputeResources := v1.ResourceList{}
208+
matches := resourceRegex.FindAllStringSubmatch(err.Error(), -1)
209+
computeResources := v1.ResourceList{}
208210

209211
if len(matches) == 0 || len(matches[0]) == 0 {
210-
return requestedComputeResources
212+
return computeResources
211213
}
212214

213215
// Extracting "limits.cpu=7,limits.memory=64Gi"
@@ -226,11 +228,28 @@ func GetComputeResourceAndQuantityRequested(err error) v1.ResourceList {
226228
if len(tuple) < 2 {
227229
continue
228230
}
229-
requestedComputeResources[v1.ResourceName(tuple[0])] = resource.MustParse(tuple[1])
231+
computeResources[v1.ResourceName(tuple[0])] = resource.MustParse(tuple[1])
230232
}
231-
return requestedComputeResources
233+
return computeResources
232234
}
233235

234-
func IsBackoffError(err error) bool {
235-
return stdErrors.IsCausedBy(err, errors.BackOffError)
236+
func IsResourceRequestsEligible(err error) bool {
237+
limitedLimitsResourceList := GetComputeResourceAndQuantity(err, limitedLimitsRegexp)
238+
limitedRequestsResourceList := GetComputeResourceAndQuantity(err, limitedRequestsRegexp)
239+
requestedLimitsResourceList := GetComputeResourceAndQuantity(err, requestedLimitsRegexp)
240+
requestedRequestsResourceList := GetComputeResourceAndQuantity(err, requestedRequestsRegexp)
241+
242+
return isEligible(requestedLimitsResourceList, limitedLimitsResourceList) &&
243+
isEligible(requestedRequestsResourceList, limitedRequestsResourceList)
244+
}
245+
246+
func isEligible(requestedResourceList, quotaResourceList v1.ResourceList) (eligibility bool) {
247+
for resource, requestedQuantity := range requestedResourceList {
248+
quotaQuantity, exists := quotaResourceList[resource]
249+
if exists && requestedQuantity.Cmp(quotaQuantity) >= 0 {
250+
return false
251+
}
252+
}
253+
254+
return true
236255
}

pkg/controller/nodes/task/backoff/handler_test.go

Lines changed: 113 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"reflect"
7+
"regexp"
78
"testing"
89
"time"
910

@@ -344,32 +345,79 @@ func TestComputeResourceCeilings_updateAll(t *testing.T) {
344345

345346
func TestGetComputeResourceAndQuantityRequested(t *testing.T) {
346347
type args struct {
347-
err error
348+
err error
349+
regexp *regexp.Regexp
348350
}
349351
tests := []struct {
350352
name string
351353
args args
352354
want v1.ResourceList
353355
}{
354-
{name: "Memory request", args: args{err: apiErrors.NewForbidden(
356+
{name: "Limited memory limits", args: args{err: apiErrors.NewForbidden(
355357
schema.GroupResource{}, "", errors.New("is forbidden: "+
356-
"exceeded quota: project-quota, requested: limits.memory=3Gi, "+
357-
"used: limits.memory=7976Gi, limited: limits.memory=8000Gi"))},
358+
"exceeded quota: project-quota, requested: limits.memory=3Gi, used: limits.memory=7976Gi, limited: limits.memory=8000Gi")),
359+
regexp: limitedLimitsRegexp},
360+
want: v1.ResourceList{v1.ResourceMemory: resource.MustParse("8000Gi")}},
361+
{name: "Limited CPU limits", args: args{err: apiErrors.NewForbidden(
362+
schema.GroupResource{}, "", errors.New("is forbidden: "+
363+
"exceeded quota: project-quota, requested: limits.cpu=3640m, used: limits.cpu=6000m, limited: limits.cpu=8000m")),
364+
regexp: limitedLimitsRegexp},
365+
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("8000m")}},
366+
{name: "Limited multiple limits ", args: args{err: apiErrors.NewForbidden(
367+
schema.GroupResource{}, "", errors.New("is forbidden: "+
368+
"exceeded quota: project-quota, requested: limits.cpu=7,limits.memory=64Gi, used: limits.cpu=249,limits.memory=2012730Mi, limited: limits.cpu=250,limits.memory=2000Gi")),
369+
regexp: limitedLimitsRegexp},
370+
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250"), v1.ResourceMemory: resource.MustParse("2000Gi")}},
371+
{name: "Limited memory requests", args: args{err: apiErrors.NewForbidden(
372+
schema.GroupResource{}, "", errors.New("is forbidden: "+
373+
"exceeded quota: project-quota, requested: requests.memory=3Gi, used: requests.memory=7976Gi, limited: requests.memory=8000Gi")),
374+
regexp: limitedRequestsRegexp},
375+
want: v1.ResourceList{v1.ResourceMemory: resource.MustParse("8000Gi")}},
376+
{name: "Limited CPU requests", args: args{err: apiErrors.NewForbidden(
377+
schema.GroupResource{}, "", errors.New("is forbidden: "+
378+
"exceeded quota: project-quota, requested: requests.cpu=3640m, used: requests.cpu=6000m, limited: requests.cpu=8000m")),
379+
regexp: limitedRequestsRegexp},
380+
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("8000m")}},
381+
{name: "Limited multiple requests ", args: args{err: apiErrors.NewForbidden(
382+
schema.GroupResource{}, "", errors.New("is forbidden: "+
383+
"exceeded quota: project-quota, requested: requests.cpu=7,requests.memory=64Gi, used: requests.cpu=249,requests.memory=2012730Mi, limited: requests.cpu=250,requests.memory=2000Gi")),
384+
regexp: limitedRequestsRegexp},
385+
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("250"), v1.ResourceMemory: resource.MustParse("2000Gi")}},
386+
{name: "Requested memory limits", args: args{err: apiErrors.NewForbidden(
387+
schema.GroupResource{}, "", errors.New("is forbidden: "+
388+
"exceeded quota: project-quota, requested: limits.memory=3Gi, used: limits.memory=7976Gi, limited: limits.memory=8000Gi")),
389+
regexp: requestedLimitsRegexp},
390+
want: v1.ResourceList{v1.ResourceMemory: resource.MustParse("3Gi")}},
391+
{name: "Requested CPU limits", args: args{err: apiErrors.NewForbidden(
392+
schema.GroupResource{}, "", errors.New("is forbidden: "+
393+
"exceeded quota: project-quota, requested: limits.cpu=3640m, used: limits.cpu=6000m, limited: limits.cpu=8000m")),
394+
regexp: requestedLimitsRegexp},
395+
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("3640m")}},
396+
{name: "Requested multiple limits ", args: args{err: apiErrors.NewForbidden(
397+
schema.GroupResource{}, "", errors.New("is forbidden: "+
398+
"exceeded quota: project-quota, requested: limits.cpu=7,limits.memory=64Gi, used: limits.cpu=249,limits.memory=2012730Mi, limited: limits.cpu=250,limits.memory=2000Gi")),
399+
regexp: requestedLimitsRegexp},
400+
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("7"), v1.ResourceMemory: resource.MustParse("64Gi")}},
401+
{name: "Requested memory requests", args: args{err: apiErrors.NewForbidden(
402+
schema.GroupResource{}, "", errors.New("is forbidden: "+
403+
"exceeded quota: project-quota, requested: requests.memory=3Gi, used: requests.memory=7976Gi, limited: requests.memory=8000Gi")),
404+
regexp: requestedRequestsRegexp},
358405
want: v1.ResourceList{v1.ResourceMemory: resource.MustParse("3Gi")}},
359-
{name: "CPU request", args: args{err: apiErrors.NewForbidden(
406+
{name: "Requested CPU requests", args: args{err: apiErrors.NewForbidden(
360407
schema.GroupResource{}, "", errors.New("is forbidden: "+
361-
"exceeded quota: project-quota, requested: limits.cpu=3640m, "+
362-
"used: limits.cpu=6000m, limited: limits.cpu=8000m"))},
408+
"exceeded quota: project-quota, requested: requests.cpu=3640m, used: requests.cpu=6000m, limited: requests.cpu=8000m")),
409+
regexp: requestedRequestsRegexp},
363410
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("3640m")}},
364-
{name: "Multiple resources ", args: args{err: apiErrors.NewForbidden(
411+
{name: "Requested multiple requests ", args: args{err: apiErrors.NewForbidden(
365412
schema.GroupResource{}, "", errors.New("is forbidden: "+
366-
"exceeded quota: project-quota, requested: limits.cpu=7,limits.memory=64Gi, used: limits.cpu=249,limits.memory=2012730Mi, limited: limits.cpu=250,limits.memory=2000Gi"))},
413+
"exceeded quota: project-quota, requested: requests.cpu=7,requests.memory=64Gi, used: requests.cpu=249,requests.memory=2012730Mi, limited: requests.cpu=250,requests.memory=2000Gi")),
414+
regexp: requestedRequestsRegexp},
367415
want: v1.ResourceList{v1.ResourceCPU: resource.MustParse("7"), v1.ResourceMemory: resource.MustParse("64Gi")}},
368416
}
369417
for _, tt := range tests {
370418
t.Run(tt.name, func(t *testing.T) {
371-
if got := GetComputeResourceAndQuantityRequested(tt.args.err); !reflect.DeepEqual(got, tt.want) {
372-
t.Errorf("GetComputeResourceAndQuantityRequested() = %v, want %v", got, tt.want)
419+
if got := GetComputeResourceAndQuantity(tt.args.err, tt.args.regexp); !reflect.DeepEqual(got, tt.want) {
420+
t.Errorf("GetComputeResourceAndQuantity() = %v, want %v", got, tt.want)
373421
}
374422
})
375423
}
@@ -390,7 +438,7 @@ func TestIsBackoffError(t *testing.T) {
390438
}
391439
for _, tt := range tests {
392440
t.Run(tt.name, func(t *testing.T) {
393-
if got := IsBackoffError(tt.args.err); got != tt.want {
441+
if got := stdlibErrors.IsCausedBy(tt.args.err, taskErrors.BackOffError); got != tt.want {
394442
t.Errorf("IsBackoffError() = %v, want %v", got, tt.want)
395443
}
396444
})
@@ -516,3 +564,56 @@ func TestErrorTypes(t *testing.T) {
516564
assert.True(t, res)
517565
})
518566
}
567+
568+
func TestIsEligible(t *testing.T) {
569+
type args struct {
570+
requested v1.ResourceList
571+
quota v1.ResourceList
572+
}
573+
tests := []struct {
574+
name string
575+
args args
576+
want bool
577+
}{
578+
{
579+
name: "CPUElgible",
580+
args: args{
581+
requested: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")},
582+
quota: v1.ResourceList{v1.ResourceCPU: resource.MustParse("2")},
583+
},
584+
want: true,
585+
},
586+
{
587+
name: "CPUInelgible",
588+
args: args{
589+
requested: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1")},
590+
quota: v1.ResourceList{v1.ResourceCPU: resource.MustParse("200m")},
591+
},
592+
want: false,
593+
},
594+
{
595+
name: "MemoryElgible",
596+
args: args{
597+
requested: v1.ResourceList{v1.ResourceMemory: resource.MustParse("32Gi")},
598+
quota: v1.ResourceList{v1.ResourceMemory: resource.MustParse("64Gi")},
599+
},
600+
want: true,
601+
},
602+
{
603+
name: "MemoryInelgible",
604+
args: args{
605+
requested: v1.ResourceList{v1.ResourceMemory: resource.MustParse("64Gi")},
606+
quota: v1.ResourceList{v1.ResourceMemory: resource.MustParse("64Gi")},
607+
},
608+
want: false,
609+
},
610+
}
611+
612+
for _, tt := range tests {
613+
t.Run(tt.name, func(t *testing.T) {
614+
if got := isEligible(tt.args.requested, tt.args.quota); got != tt.want {
615+
t.Errorf("isEligible() = %v, want %v", got, tt.want)
616+
}
617+
})
618+
}
619+
}

pkg/controller/nodes/task/k8s/plugin_manager.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"reflect"
7-
"strings"
87
"time"
98

109
"github.com/flyteorg/flyteplugins/go/tasks/errors"
@@ -224,14 +223,19 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas
224223
}
225224

226225
if err != nil && !k8serrors.IsAlreadyExists(err) {
227-
if backoff.IsBackoffError(err) {
226+
if backoff.IsResourceQuotaExceeded(err) && !backoff.IsResourceRequestsEligible(err) {
227+
// if task resources exceed resource quotas then permanently fail because the task will
228+
// be stuck waiting for resources until the `node-active-deadline` terminates the node.
229+
logger.Errorf(ctx, "task resource requests exceed k8s resource limits. err: %v", err)
230+
return pluginsCore.DoTransition(pluginsCore.PhaseInfoFailure("ResourceRequestsExceedLimits",
231+
fmt.Sprintf("requested resources exceed limits: %v", err.Error()), nil)), nil
232+
} else if stdErrors.IsCausedBy(err, errors.BackOffError) {
228233
logger.Warnf(ctx, "Failed to launch job, resource quota exceeded. err: %v", err)
229234
return pluginsCore.DoTransition(pluginsCore.PhaseInfoWaitingForResourcesInfo(time.Now(), pluginsCore.DefaultPhaseVersion, fmt.Sprintf("Exceeded resourcequota: %s", err.Error()), nil)), nil
235+
} else if e.backOffController == nil && backoff.IsResourceQuotaExceeded(err) {
236+
logger.Warnf(ctx, "Failed to launch job, resource quota exceeded and the operation is not guarded by back-off. err: %v", err)
237+
return pluginsCore.DoTransition(pluginsCore.PhaseInfoWaitingForResourcesInfo(time.Now(), pluginsCore.DefaultPhaseVersion, fmt.Sprintf("Exceeded resourcequota: %s", err.Error()), nil)), nil
230238
} else if k8serrors.IsForbidden(err) {
231-
if e.backOffController == nil && strings.Contains(err.Error(), "exceeded quota") {
232-
logger.Warnf(ctx, "Failed to launch job, resource quota exceeded and the operation is not guarded by back-off. err: %v", err)
233-
return pluginsCore.DoTransition(pluginsCore.PhaseInfoWaitingForResourcesInfo(time.Now(), pluginsCore.DefaultPhaseVersion, fmt.Sprintf("Exceeded resourcequota: %s", err.Error()), nil)), nil
234-
}
235239
return pluginsCore.DoTransition(pluginsCore.PhaseInfoRetryableFailure("RuntimeFailure", err.Error(), nil)), nil
236240
} else if k8serrors.IsBadRequest(err) || k8serrors.IsInvalid(err) {
237241
logger.Errorf(ctx, "Badly formatted resource for plugin [%s], err %s", e.id, err)

0 commit comments

Comments
 (0)