diff --git a/flyteadmin/pkg/manager/impl/execution_manager.go b/flyteadmin/pkg/manager/impl/execution_manager.go index ba5dcbb38c..3e0bba1204 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager.go +++ b/flyteadmin/pkg/manager/impl/execution_manager.go @@ -209,8 +209,10 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co // and represent them as comparable quantities rather than strings. taskResourceRequirements := util.GetCompleteTaskResourceRequirements(ctx, task.GetTemplate().GetId(), task) + // Only for CPU + doNotMatchCPULimit := m.config.TaskResourceConfiguration().GetAllowCPULimitToFloatFromRequest() cpu := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.CPU, taskResourceRequirements.Limits.CPU, - platformTaskResources.Defaults.CPU, platformTaskResources.Limits.CPU) + platformTaskResources.Defaults.CPU, platformTaskResources.Limits.CPU, doNotMatchCPULimit) finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{ Name: core.Resources_CPU, Value: cpu.Request.String(), @@ -221,7 +223,7 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co }) memory := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.Memory, taskResourceRequirements.Limits.Memory, - platformTaskResources.Defaults.Memory, platformTaskResources.Limits.Memory) + platformTaskResources.Defaults.Memory, platformTaskResources.Limits.Memory, false) finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{ Name: core.Resources_MEMORY, Value: memory.Request.String(), @@ -237,7 +239,7 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co !taskResourceRequirements.Limits.EphemeralStorage.IsZero() || !platformTaskResources.Defaults.EphemeralStorage.IsZero() { ephemeralStorage := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.EphemeralStorage, taskResourceRequirements.Limits.EphemeralStorage, - platformTaskResources.Defaults.EphemeralStorage, platformTaskResources.Limits.EphemeralStorage) + platformTaskResources.Defaults.EphemeralStorage, platformTaskResources.Limits.EphemeralStorage, false) finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{ Name: core.Resources_EPHEMERAL_STORAGE, Value: ephemeralStorage.Request.String(), @@ -253,7 +255,7 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co !taskResourceRequirements.Limits.GPU.IsZero() || !platformTaskResources.Defaults.GPU.IsZero() { gpu := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.GPU, taskResourceRequirements.Limits.GPU, - platformTaskResources.Defaults.GPU, platformTaskResources.Limits.GPU) + platformTaskResources.Defaults.GPU, platformTaskResources.Limits.GPU, false) finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{ Name: core.Resources_GPU, Value: gpu.Request.String(), diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index fae95bf659..9df8befc5b 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -172,7 +172,7 @@ func getMockExecutionsConfigProvider() runtimeInterfaces.Configuration { runtimeMocks.NewMockQueueConfigurationProvider( []runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}), nil, - runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits), nil, getMockNamespaceMappingConfig()) + runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits, false), nil, getMockNamespaceMappingConfig()) mockExecutionsConfigProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration( runtimeMocks.NewMockRegistrationValidationProvider()) return mockExecutionsConfigProvider @@ -557,7 +557,7 @@ func TestCreateExecution_TaggedQueue(t *testing.T) { }, }), nil, - runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits), nil, getMockNamespaceMappingConfig()) + runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits, false), nil, getMockNamespaceMappingConfig()) configProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration( runtimeMocks.NewMockRegistrationValidationProvider()) @@ -1267,7 +1267,7 @@ func TestCreateExecution_CustomNamespaceMappingConfig(t *testing.T) { runtimeMocks.NewMockQueueConfigurationProvider( []runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}), nil, - runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits), nil, &mockNs) + runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits, false), nil, &mockNs) mockExecutionsConfigProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration( runtimeMocks.NewMockRegistrationValidationProvider()) @@ -6029,3 +6029,322 @@ func TestQueryTemplate(t *testing.T) { assert.Error(t, err) }) } + +func TestSetDefaultsWithAllowFloatingCPULimit(t *testing.T) { + task := &core.CompiledTask{ + Template: &core.TaskTemplate{ + Target: &core.TaskTemplate_Container{ + Container: &core.Container{ + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + { + Name: core.Resources_CPU, + Value: "250m", + }, + }, + }, + }, + }, + Id: &core.Identifier{ + Project: "project", + Domain: "domain", + Name: "task_name", + Version: "version", + }, + }, + } + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor) + + mockConfigurationProvider, ok := getMockExecutionsConfigProvider().(*runtimeMocks.MockConfigurationProvider) + assert.True(t, ok) + mockTaskResourceConfiguration, ok := mockConfigurationProvider.TaskResourceConfiguration().(*runtimeMocks.MockTaskResourceConfiguration) + assert.True(t, ok) + mockTaskResourceConfiguration.AllowCPULimitToFloatFromRequest = true + assert.True(t, mockTaskResourceConfiguration.GetAllowCPULimitToFloatFromRequest()) + + execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, mockConfigurationProvider, getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + + t.Run("tasks without cpu limit get a zero cpu limit", func(t *testing.T) { + execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{ + Defaults: runtimeInterfaces.TaskResourceSet{ + CPU: resource.MustParse("200m"), + GPU: resource.MustParse("4"), + Memory: resource.MustParse("200Gi"), + EphemeralStorage: resource.MustParse("500Mi"), + }, + Limits: runtimeInterfaces.TaskResourceSet{ + CPU: resource.MustParse("300m"), + GPU: resource.MustParse("8"), + Memory: resource.MustParse("500Gi"), + EphemeralStorage: resource.MustParse("501Mi"), + }, + }) + assert.True(t, proto.Equal( + &core.Container{ + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + { + Name: core.Resources_CPU, + Value: "250m", + }, + { + Name: core.Resources_MEMORY, + Value: "200Gi", + }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "500Mi", + }, + { + Name: core.Resources_GPU, + Value: "4", + }, + }, + Limits: []*core.Resources_ResourceEntry{ + { + Name: core.Resources_CPU, + Value: "0", + }, + { + Name: core.Resources_MEMORY, + Value: "200Gi", + }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "500Mi", + }, + { + Name: core.Resources_GPU, + Value: "4", + }, + }, + }, + }, + task.Template.GetContainer()), fmt.Sprintf("%+v", task.Template.GetContainer())) + }) +} + +func TestSetDefaults_MissingRequests_AllowFloat(t *testing.T) { + task := &core.CompiledTask{ + Template: &core.TaskTemplate{ + Target: &core.TaskTemplate_Container{ + Container: &core.Container{ + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + { + Name: core.Resources_CPU, + Value: "250m", + }, + }, + }, + }, + }, + Id: &core.Identifier{ + Project: "project", + Domain: "domain", + Name: "task_name", + Version: "version", + }, + }, + } + + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor) + mockConfigurationProvider, ok := getMockExecutionsConfigProvider().(*runtimeMocks.MockConfigurationProvider) + assert.True(t, ok) + mockTaskResourceConfiguration, ok := mockConfigurationProvider.TaskResourceConfiguration().(*runtimeMocks.MockTaskResourceConfiguration) + assert.True(t, ok) + mockTaskResourceConfiguration.AllowCPULimitToFloatFromRequest = true + assert.True(t, mockTaskResourceConfiguration.GetAllowCPULimitToFloatFromRequest()) + + execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, mockConfigurationProvider, getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + + t.Run("same functionality if default cpu limit is not set", func(t *testing.T) { + execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{ + Defaults: runtimeInterfaces.TaskResourceSet{ + CPU: resource.MustParse("200m"), + GPU: resource.MustParse("4"), + Memory: resource.MustParse("200Gi"), + }, + Limits: runtimeInterfaces.TaskResourceSet{ + GPU: resource.MustParse("8"), + // Because only the limit is set, this resource should not be injected. + EphemeralStorage: resource.MustParse("100"), + }, + }) + assert.True(t, proto.Equal( + &core.Container{ + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + { + Name: core.Resources_CPU, + Value: "250m", + }, + { + Name: core.Resources_MEMORY, + Value: "200Gi", + }, + { + Name: core.Resources_GPU, + Value: "4", + }, + }, + Limits: []*core.Resources_ResourceEntry{ + { + Name: core.Resources_CPU, + Value: "0", + }, + { + Name: core.Resources_MEMORY, + Value: "200Gi", + }, + { + Name: core.Resources_GPU, + Value: "4", + }, + }, + }, + }, + task.Template.GetContainer()), fmt.Sprintf("%+v", task.Template.GetContainer())) + }) +} + +func TestSetDefaults_OptionalRequiredResourcesAllowCPUToFloat(t *testing.T) { + taskConfigLimits := runtimeInterfaces.TaskResourceSet{ + CPU: resource.MustParse("300m"), + GPU: resource.MustParse("1"), + Memory: resource.MustParse("500Gi"), + EphemeralStorage: resource.MustParse("501Mi"), + } + + task := &core.CompiledTask{ + Template: &core.TaskTemplate{ + Target: &core.TaskTemplate_Container{ + Container: &core.Container{ + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + { + Name: core.Resources_CPU, + Value: "200m", + }, + }, + }, + }, + }, + Id: &taskIdentifier, + }, + } + // Set the config value + mockConfigurationProvider, ok := getMockExecutionsConfigProvider().(*runtimeMocks.MockConfigurationProvider) + assert.True(t, ok) + mockTaskResourceConfiguration, ok := mockConfigurationProvider.TaskResourceConfiguration().(*runtimeMocks.MockTaskResourceConfiguration) + assert.True(t, ok) + mockTaskResourceConfiguration.AllowCPULimitToFloatFromRequest = true + assert.True(t, mockTaskResourceConfiguration.GetAllowCPULimitToFloatFromRequest()) + + t.Run("don't inject ephemeral storage or gpu when only the limit is set in config", func(t *testing.T) { + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor) + + mockConfigurationProvider, ok := getMockExecutionsConfigProvider().(*runtimeMocks.MockConfigurationProvider) + assert.True(t, ok) + mockTaskResourceConfiguration, ok := mockConfigurationProvider.TaskResourceConfiguration().(*runtimeMocks.MockTaskResourceConfiguration) + assert.True(t, ok) + mockTaskResourceConfiguration.AllowCPULimitToFloatFromRequest = true + assert.True(t, mockTaskResourceConfiguration.GetAllowCPULimitToFloatFromRequest()) + + execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, mockConfigurationProvider, getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + + execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{ + Defaults: runtimeInterfaces.TaskResourceSet{ + CPU: resource.MustParse("200m"), + Memory: resource.MustParse("200Gi"), + }, + Limits: taskConfigLimits, + }) + assert.True(t, proto.Equal( + &core.Container{ + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + { + Name: core.Resources_CPU, + Value: "200m", + }, + { + Name: core.Resources_MEMORY, + Value: "200Gi", + }, + }, + Limits: []*core.Resources_ResourceEntry{ + { + Name: core.Resources_CPU, + Value: "0", + }, + { + Name: core.Resources_MEMORY, + Value: "200Gi", + }, + }, + }, + }, + task.Template.GetContainer()), fmt.Sprintf("%+v", task.Template.GetContainer())) + }) + + t.Run("respect non-required resources when defaults exist in config", func(t *testing.T) { + r := plugins.NewRegistry() + r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor) + + mockConfigurationProvider, ok := getMockExecutionsConfigProvider().(*runtimeMocks.MockConfigurationProvider) + assert.True(t, ok) + mockTaskResourceConfiguration, ok := mockConfigurationProvider.TaskResourceConfiguration().(*runtimeMocks.MockTaskResourceConfiguration) + assert.True(t, ok) + mockTaskResourceConfiguration.AllowCPULimitToFloatFromRequest = true + assert.True(t, mockTaskResourceConfiguration.GetAllowCPULimitToFloatFromRequest()) + + execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, mockConfigurationProvider, getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}) + + execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{ + Limits: taskConfigLimits, + Defaults: runtimeInterfaces.TaskResourceSet{ + CPU: resource.MustParse("200m"), + Memory: resource.MustParse("200Gi"), + EphemeralStorage: resource.MustParse("1"), + }, + }) + assert.True(t, proto.Equal( + &core.Container{ + Resources: &core.Resources{ + Requests: []*core.Resources_ResourceEntry{ + { + Name: core.Resources_CPU, + Value: "200m", + }, + { + Name: core.Resources_MEMORY, + Value: "200Gi", + }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "1", + }, + }, + Limits: []*core.Resources_ResourceEntry{ + { + Name: core.Resources_CPU, + Value: "0", + }, + { + Name: core.Resources_MEMORY, + Value: "200Gi", + }, + { + Name: core.Resources_EPHEMERAL_STORAGE, + Value: "1", + }, + }, + }, + }, + task.Template.GetContainer()), fmt.Sprintf("%+v", task.Template.GetContainer())) + }) +} diff --git a/flyteadmin/pkg/manager/impl/task_manager_test.go b/flyteadmin/pkg/manager/impl/task_manager_test.go index b0f36b7e48..4fce3ebba2 100644 --- a/flyteadmin/pkg/manager/impl/task_manager_test.go +++ b/flyteadmin/pkg/manager/impl/task_manager_test.go @@ -37,7 +37,7 @@ func getMockConfigForTaskTest() runtimeInterfaces.Configuration { whitelistConfiguration.EXPECT().GetTaskTypeWhitelist().Return(map[string][]runtimeInterfaces.WhitelistScope{}) mockConfig := runtimeMocks.NewMockConfigurationProvider( testutils.GetApplicationConfigWithDefaultDomains(), nil, nil, runtimeMocks.NewMockTaskResourceConfiguration( - runtimeInterfaces.TaskResourceSet{}, runtimeInterfaces.TaskResourceSet{}), whitelistConfiguration, nil) + runtimeInterfaces.TaskResourceSet{}, runtimeInterfaces.TaskResourceSet{}, false), whitelistConfiguration, nil) return mockConfig } diff --git a/flyteadmin/pkg/runtime/interfaces/task_resource_configuration.go b/flyteadmin/pkg/runtime/interfaces/task_resource_configuration.go index cbe8130376..f5de4afbbf 100644 --- a/flyteadmin/pkg/runtime/interfaces/task_resource_configuration.go +++ b/flyteadmin/pkg/runtime/interfaces/task_resource_configuration.go @@ -13,4 +13,5 @@ type TaskResourceSet struct { type TaskResourceConfiguration interface { GetDefaults() TaskResourceSet GetLimits() TaskResourceSet + GetAllowCPULimitToFloatFromRequest() bool } diff --git a/flyteadmin/pkg/runtime/mocks/mock_task_resource_provider.go b/flyteadmin/pkg/runtime/mocks/mock_task_resource_provider.go index 654ae6d381..c0a60ed6a6 100644 --- a/flyteadmin/pkg/runtime/mocks/mock_task_resource_provider.go +++ b/flyteadmin/pkg/runtime/mocks/mock_task_resource_provider.go @@ -3,8 +3,9 @@ package mocks import "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces" type MockTaskResourceConfiguration struct { - Defaults interfaces.TaskResourceSet - Limits interfaces.TaskResourceSet + Defaults interfaces.TaskResourceSet + Limits interfaces.TaskResourceSet + AllowCPULimitToFloatFromRequest bool } func (c *MockTaskResourceConfiguration) GetDefaults() interfaces.TaskResourceSet { @@ -13,10 +14,14 @@ func (c *MockTaskResourceConfiguration) GetDefaults() interfaces.TaskResourceSet func (c *MockTaskResourceConfiguration) GetLimits() interfaces.TaskResourceSet { return c.Limits } +func (c *MockTaskResourceConfiguration) GetAllowCPULimitToFloatFromRequest() bool { + return c.AllowCPULimitToFloatFromRequest +} -func NewMockTaskResourceConfiguration(defaults, limits interfaces.TaskResourceSet) interfaces.TaskResourceConfiguration { +func NewMockTaskResourceConfiguration(defaults, limits interfaces.TaskResourceSet, allowCPULimitToFloat bool) interfaces.TaskResourceConfiguration { return &MockTaskResourceConfiguration{ - Defaults: defaults, - Limits: limits, + Defaults: defaults, + Limits: limits, + AllowCPULimitToFloatFromRequest: allowCPULimitToFloat, } } diff --git a/flyteadmin/pkg/runtime/task_resource_provider.go b/flyteadmin/pkg/runtime/task_resource_provider.go index 7dd9c2dad7..37793d7c74 100644 --- a/flyteadmin/pkg/runtime/task_resource_provider.go +++ b/flyteadmin/pkg/runtime/task_resource_provider.go @@ -24,9 +24,12 @@ var taskResourceConfig = config.MustRegisterSection(taskResourceKey, &TaskResour type TaskResourceSpec struct { Defaults interfaces.TaskResourceSet `json:"defaults"` Limits interfaces.TaskResourceSet `json:"limits"` + + // If set here, make sure K8sPluginConfig is also set. + AllowCPULimitToFloatFromRequest bool `json:"allow-cpu-limit-to-float-from-request"` } -// Implementation of an interfaces.TaskResourceConfiguration +// TaskResourceProvider Implementation of an interfaces.TaskResourceConfiguration type TaskResourceProvider struct{} func (p *TaskResourceProvider) GetDefaults() interfaces.TaskResourceSet { @@ -37,6 +40,10 @@ func (p *TaskResourceProvider) GetLimits() interfaces.TaskResourceSet { return taskResourceConfig.GetConfig().(*TaskResourceSpec).Limits } +func (p *TaskResourceProvider) GetAllowCPULimitToFloatFromRequest() bool { + return taskResourceConfig.GetConfig().(*TaskResourceSpec).AllowCPULimitToFloatFromRequest +} + func NewTaskResourceProvider() interfaces.TaskResourceConfiguration { return &TaskResourceProvider{} } diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go index 47a4d1b6a5..bb4bcf898c 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go @@ -220,6 +220,10 @@ type K8sPluginConfig struct { AddTolerationsForExtendedResources []string `json:"add-tolerations-for-extended-resources" pflag:",Name of the extended resources for which tolerations should be added."` EnableDistributedErrorAggregation bool `json:"enable-distributed-error-aggregation" pflag:",If true, will aggregate errors of different worker pods for distributed tasks."` + + // AllowCPULimitToFloatFromRequest - if on, if CPU limit is missing, Flyte will not copy the CPU request value. + // If set, make sure on the Admin side that TaskResourceSpec config is also set. + AllowCPULimitToFloatFromRequest bool `json:"allow-cpu-limit-to-float-from-request" pflag:",Used to allow zero CPU limits, to get Burstable pods"` } // FlyteCoPilotConfig specifies configuration for the Flyte CoPilot system. FlyteCoPilot, allows running flytekit-less containers diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper.go index eddc610e05..e520b38d7b 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper.go @@ -101,7 +101,9 @@ func resolvePlatformDefaults(platformResources v1.ResourceRequirements, configCP // AdjustOrDefaultResource validates resources conform to platform limits and assigns defaults for Request and Limit values by // using the Request when the Limit is unset, and vice versa. -func AdjustOrDefaultResource(request, limit, platformDefault, platformLimit resource.Quantity) ResourceRequirement { +// doNotMatchLimitToRequest should pretty much always be off, only turn it on when running for CPU, and you're okay having +// different request and limit values (pod will no longer be Guaranteed). +func AdjustOrDefaultResource(request, limit, platformDefault, platformLimit resource.Quantity, doNotMatchLimitToRequest bool) ResourceRequirement { if request.IsZero() { if !limit.IsZero() { request = limit @@ -110,7 +112,7 @@ func AdjustOrDefaultResource(request, limit, platformDefault, platformLimit reso } } - if limit.IsZero() { + if limit.IsZero() && !doNotMatchLimitToRequest { limit = request } @@ -144,21 +146,27 @@ func ensureResourceRange(request, limit, platformLimit resource.Quantity) Resour } } +// adjustResourceRequirement mutates the resourceRequirements for the given resourceName func adjustResourceRequirement(resourceName v1.ResourceName, resourceRequirements, - platformResources v1.ResourceRequirements, assignIfUnset bool) { + platformResources v1.ResourceRequirements, assignIfUnset bool, doNotMatchLimitToRequest bool) { var resourceValue ResourceRequirement if assignIfUnset { resourceValue = AdjustOrDefaultResource(resourceRequirements.Requests[resourceName], resourceRequirements.Limits[resourceName], platformResources.Requests[resourceName], - platformResources.Limits[resourceName]) + platformResources.Limits[resourceName], doNotMatchLimitToRequest) } else { resourceValue = ensureResourceRange(resourceRequirements.Requests[resourceName], resourceRequirements.Limits[resourceName], platformResources.Limits[resourceName]) } resourceRequirements.Requests[resourceName] = resourceValue.Request - resourceRequirements.Limits[resourceName] = resourceValue.Limit + // if the limit is zero, we don't need to set it - there is no benefit in setting a 0 limit in K8s for any resource. + // Not setting this allows resources to float. Note this shouldn't affect K8s QoS classes either since something + // that has 0 requests is already just BestEffort + if !resourceValue.Limit.IsZero() { + resourceRequirements.Limits[resourceName] = resourceValue.Limit + } } // Convert GPU resource requirements named 'gpu' the recognized 'nvidia.com/gpu' identifier. @@ -193,14 +201,16 @@ func ApplyResourceOverrides(resources, platformResources v1.ResourceRequirements platformResources = resolvePlatformDefaults(platformResources, config.GetK8sPluginConfig().DefaultCPURequest, config.GetK8sPluginConfig().DefaultMemoryRequest) - adjustResourceRequirement(v1.ResourceCPU, resources, platformResources, assignIfUnset) - adjustResourceRequirement(v1.ResourceMemory, resources, platformResources, assignIfUnset) + // Only used for CPU. + doNotMatchLimitToRequest := config.GetK8sPluginConfig().AllowCPULimitToFloatFromRequest + adjustResourceRequirement(v1.ResourceCPU, resources, platformResources, assignIfUnset, doNotMatchLimitToRequest) + adjustResourceRequirement(v1.ResourceMemory, resources, platformResources, assignIfUnset, false) _, ephemeralStorageRequested := resources.Requests[v1.ResourceEphemeralStorage] _, ephemeralStorageLimited := resources.Limits[v1.ResourceEphemeralStorage] if ephemeralStorageRequested || ephemeralStorageLimited { - adjustResourceRequirement(v1.ResourceEphemeralStorage, resources, platformResources, assignIfUnset) + adjustResourceRequirement(v1.ResourceEphemeralStorage, resources, platformResources, assignIfUnset, false) } // TODO: Make configurable. 1/15/2019 Flyte Cluster doesn't support setting storage requests/limits. @@ -215,7 +225,7 @@ func ApplyResourceOverrides(resources, platformResources v1.ResourceRequirements } if shouldAdjustGPU { - adjustResourceRequirement(gpuResourceName, resources, platformResources, assignIfUnset) + adjustResourceRequirement(gpuResourceName, resources, platformResources, assignIfUnset, false) } return resources diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper_test.go index 420c098b82..c6b5a02174 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/container_helper_test.go @@ -24,49 +24,49 @@ func TestAssignResource(t *testing.T) { t.Run("Leave valid requests and limits unchanged", func(t *testing.T) { res := AdjustOrDefaultResource( resource.MustParse("1"), resource.MustParse("2"), - resource.MustParse("10"), resource.MustParse("20")) + resource.MustParse("10"), resource.MustParse("20"), false) assert.True(t, res.Request.Equal(resource.MustParse("1"))) assert.True(t, res.Limit.Equal(resource.MustParse("2"))) }) t.Run("Assign unset Request from Limit", func(t *testing.T) { res := AdjustOrDefaultResource( zeroQuantity, resource.MustParse("2"), - resource.MustParse("10"), resource.MustParse("20")) + resource.MustParse("10"), resource.MustParse("20"), false) assert.True(t, res.Request.Equal(resource.MustParse("2"))) assert.True(t, res.Limit.Equal(resource.MustParse("2"))) }) t.Run("Assign unset Limit from Request", func(t *testing.T) { res := AdjustOrDefaultResource( resource.MustParse("2"), zeroQuantity, - resource.MustParse("10"), resource.MustParse("20")) + resource.MustParse("10"), resource.MustParse("20"), false) assert.Equal(t, resource.MustParse("2"), res.Request) assert.Equal(t, resource.MustParse("2"), res.Limit) }) t.Run("Assign from platform defaults", func(t *testing.T) { res := AdjustOrDefaultResource( zeroQuantity, zeroQuantity, - resource.MustParse("10"), resource.MustParse("20")) + resource.MustParse("10"), resource.MustParse("20"), false) assert.Equal(t, resource.MustParse("10"), res.Request) assert.Equal(t, resource.MustParse("10"), res.Limit) }) t.Run("Adjust Limit when Request > Limit", func(t *testing.T) { res := AdjustOrDefaultResource( resource.MustParse("10"), resource.MustParse("2"), - resource.MustParse("10"), resource.MustParse("20")) + resource.MustParse("10"), resource.MustParse("20"), false) assert.Equal(t, resource.MustParse("2"), res.Request) assert.Equal(t, resource.MustParse("2"), res.Limit) }) t.Run("Adjust Limit > platformLimit", func(t *testing.T) { res := AdjustOrDefaultResource( resource.MustParse("1"), resource.MustParse("40"), - resource.MustParse("10"), resource.MustParse("20")) + resource.MustParse("10"), resource.MustParse("20"), false) assert.True(t, res.Request.Equal(resource.MustParse("1"))) assert.True(t, res.Limit.Equal(resource.MustParse("20"))) }) t.Run("Adjust Request, Limit > platformLimit", func(t *testing.T) { res := AdjustOrDefaultResource( resource.MustParse("40"), resource.MustParse("50"), - resource.MustParse("10"), resource.MustParse("20")) + resource.MustParse("10"), resource.MustParse("20"), false) assert.True(t, res.Request.Equal(resource.MustParse("20"))) assert.True(t, res.Limit.Equal(resource.MustParse("20"))) }) @@ -1340,3 +1340,71 @@ func TestAddFlyteCustomizationsToContainerWithPodTemplate(t *testing.T) { assert.Equal(t, containerMemoryRequest, container.Resources.Limits[v1.ResourceMemory]) // Should be set to request value }) } + +func TestApplyResourceOverrides_OverrideCpuAllowFloat(t *testing.T) { + cfg := config.GetK8sPluginConfig() + cfg.AllowCPULimitToFloatFromRequest = true + err := config.SetK8sPluginConfig(cfg) + assert.NoError(t, err) + platformRequirements := v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("3"), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("10"), + }, + } + cpuRequest := resource.MustParse("1") + overrides := ApplyResourceOverrides(v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: cpuRequest, + }, + }, platformRequirements, assignIfUnset) + assert.EqualValues(t, cpuRequest, overrides.Requests[v1.ResourceCPU]) + _, ok := overrides.Limits[v1.ResourceCPU] + assert.False(t, ok) +} + +func TestApplyResourceOverrides_EmptyCpuLimitAllowFloat(t *testing.T) { + cfg := config.GetK8sPluginConfig() + cfg.AllowCPULimitToFloatFromRequest = true + err := config.SetK8sPluginConfig(cfg) + assert.NoError(t, err) + + { + platformRequirements := v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("3"), + }, + } + cpuRequest := resource.MustParse("1") + //tenCpuRequest := resource.MustParse("10") + overrides := ApplyResourceOverrides(v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: cpuRequest, + }, + }, platformRequirements, assignIfUnset) + assert.EqualValues(t, cpuRequest, overrides.Requests[v1.ResourceCPU]) + _, ok := overrides.Limits[v1.ResourceCPU] + assert.False(t, ok) + } + { + platformRequirements := v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("3"), + }, + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("10"), + }, + } + cpuRequest := resource.MustParse("1") + overrides := ApplyResourceOverrides(v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: cpuRequest, + }, + }, platformRequirements, assignIfUnset) + assert.EqualValues(t, cpuRequest, overrides.Requests[v1.ResourceCPU]) + _, ok := overrides.Limits[v1.ResourceCPU] + assert.False(t, ok) + } +} diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/utils_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/utils_test.go index 1642d726e6..574228a482 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/utils_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/utils_test.go @@ -27,6 +27,29 @@ func TestToK8sEnvVar(t *testing.T) { assert.Empty(t, e) } +func TestToK8sDoesntFillIn(t *testing.T) { + { + // This shouldn't fill in resources that are not specified. + r, err := ToK8sResourceList([]*core.Resources_ResourceEntry{}) + + assert.NoError(t, err) + _, ok := r[v1.ResourceCPU] + assert.False(t, ok) + } + + { + // And also shouldn't fill in resources that are zero. + r, err := ToK8sResourceList([]*core.Resources_ResourceEntry{ + {Name: core.Resources_CPU, Value: "0"}, + {Name: core.Resources_MEMORY, Value: "1024Mi"}, + }) + + assert.NoError(t, err) + _, ok := r[v1.ResourceCPU] + assert.False(t, ok) + } +} + func TestToK8sResourceList(t *testing.T) { { r, err := ToK8sResourceList([]*core.Resources_ResourceEntry{