Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,10 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
// and represent them as comparable quantities rather than strings.
taskResourceRequirements := util.GetCompleteTaskResourceRequirements(ctx, task.GetTemplate().GetId(), task)

// Only for CPU
doNotMatchCPULimit := m.config.TaskResourceConfiguration().GetAllowCPULimitToFloatFromRequest()
cpu := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.CPU, taskResourceRequirements.Limits.CPU,
platformTaskResources.Defaults.CPU, platformTaskResources.Limits.CPU)
platformTaskResources.Defaults.CPU, platformTaskResources.Limits.CPU, doNotMatchCPULimit)
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{
Name: core.Resources_CPU,
Value: cpu.Request.String(),
Expand All @@ -221,7 +223,7 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
})

memory := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.Memory, taskResourceRequirements.Limits.Memory,
platformTaskResources.Defaults.Memory, platformTaskResources.Limits.Memory)
platformTaskResources.Defaults.Memory, platformTaskResources.Limits.Memory, false)
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{
Name: core.Resources_MEMORY,
Value: memory.Request.String(),
Expand All @@ -237,7 +239,7 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
!taskResourceRequirements.Limits.EphemeralStorage.IsZero() ||
!platformTaskResources.Defaults.EphemeralStorage.IsZero() {
ephemeralStorage := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.EphemeralStorage, taskResourceRequirements.Limits.EphemeralStorage,
platformTaskResources.Defaults.EphemeralStorage, platformTaskResources.Limits.EphemeralStorage)
platformTaskResources.Defaults.EphemeralStorage, platformTaskResources.Limits.EphemeralStorage, false)
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: ephemeralStorage.Request.String(),
Expand All @@ -253,7 +255,7 @@ func (m *ExecutionManager) setCompiledTaskDefaults(ctx context.Context, task *co
!taskResourceRequirements.Limits.GPU.IsZero() ||
!platformTaskResources.Defaults.GPU.IsZero() {
gpu := flytek8s.AdjustOrDefaultResource(taskResourceRequirements.Defaults.GPU, taskResourceRequirements.Limits.GPU,
platformTaskResources.Defaults.GPU, platformTaskResources.Limits.GPU)
platformTaskResources.Defaults.GPU, platformTaskResources.Limits.GPU, false)
finalizedResourceRequests = append(finalizedResourceRequests, &core.Resources_ResourceEntry{
Name: core.Resources_GPU,
Value: gpu.Request.String(),
Expand Down
325 changes: 322 additions & 3 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func getMockExecutionsConfigProvider() runtimeInterfaces.Configuration {
runtimeMocks.NewMockQueueConfigurationProvider(
[]runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}),
nil,
runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits), nil, getMockNamespaceMappingConfig())
runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits, false), nil, getMockNamespaceMappingConfig())
mockExecutionsConfigProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration(
runtimeMocks.NewMockRegistrationValidationProvider())
return mockExecutionsConfigProvider
Expand Down Expand Up @@ -557,7 +557,7 @@ func TestCreateExecution_TaggedQueue(t *testing.T) {
},
}),
nil,
runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits), nil, getMockNamespaceMappingConfig())
runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits, false), nil, getMockNamespaceMappingConfig())
configProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration(
runtimeMocks.NewMockRegistrationValidationProvider())

Expand Down Expand Up @@ -1267,7 +1267,7 @@ func TestCreateExecution_CustomNamespaceMappingConfig(t *testing.T) {
runtimeMocks.NewMockQueueConfigurationProvider(
[]runtimeInterfaces.ExecutionQueue{}, []runtimeInterfaces.WorkflowConfig{}),
nil,
runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits), nil, &mockNs)
runtimeMocks.NewMockTaskResourceConfiguration(resourceDefaults, resourceLimits, false), nil, &mockNs)
mockExecutionsConfigProvider.(*runtimeMocks.MockConfigurationProvider).AddRegistrationValidationConfiguration(
runtimeMocks.NewMockRegistrationValidationProvider())

Expand Down Expand Up @@ -6029,3 +6029,322 @@ func TestQueryTemplate(t *testing.T) {
assert.Error(t, err)
})
}

func TestSetDefaultsWithAllowFloatingCPULimit(t *testing.T) {
task := &core.CompiledTask{
Template: &core.TaskTemplate{
Target: &core.TaskTemplate_Container{
Container: &core.Container{
Resources: &core.Resources{
Requests: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "250m",
},
},
},
},
},
Id: &core.Identifier{
Project: "project",
Domain: "domain",
Name: "task_name",
Version: "version",
},
},
}
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)

mockConfigurationProvider, ok := getMockExecutionsConfigProvider().(*runtimeMocks.MockConfigurationProvider)
assert.True(t, ok)
mockTaskResourceConfiguration, ok := mockConfigurationProvider.TaskResourceConfiguration().(*runtimeMocks.MockTaskResourceConfiguration)
assert.True(t, ok)
mockTaskResourceConfiguration.AllowCPULimitToFloatFromRequest = true
assert.True(t, mockTaskResourceConfiguration.GetAllowCPULimitToFloatFromRequest())

execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, mockConfigurationProvider, getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})

t.Run("tasks without cpu limit get a zero cpu limit", func(t *testing.T) {
execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{
Defaults: runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("200m"),
GPU: resource.MustParse("4"),
Memory: resource.MustParse("200Gi"),
EphemeralStorage: resource.MustParse("500Mi"),
},
Limits: runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("300m"),
GPU: resource.MustParse("8"),
Memory: resource.MustParse("500Gi"),
EphemeralStorage: resource.MustParse("501Mi"),
},
})
assert.True(t, proto.Equal(
&core.Container{
Resources: &core.Resources{
Requests: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "250m",
},
{
Name: core.Resources_MEMORY,
Value: "200Gi",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "500Mi",
},
{
Name: core.Resources_GPU,
Value: "4",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "0",
},
{
Name: core.Resources_MEMORY,
Value: "200Gi",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "500Mi",
},
{
Name: core.Resources_GPU,
Value: "4",
},
},
},
},
task.Template.GetContainer()), fmt.Sprintf("%+v", task.Template.GetContainer()))
})
}

func TestSetDefaults_MissingRequests_AllowFloat(t *testing.T) {
task := &core.CompiledTask{
Template: &core.TaskTemplate{
Target: &core.TaskTemplate_Container{
Container: &core.Container{
Resources: &core.Resources{
Requests: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "250m",
},
},
},
},
},
Id: &core.Identifier{
Project: "project",
Domain: "domain",
Name: "task_name",
Version: "version",
},
},
}

r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)
mockConfigurationProvider, ok := getMockExecutionsConfigProvider().(*runtimeMocks.MockConfigurationProvider)
assert.True(t, ok)
mockTaskResourceConfiguration, ok := mockConfigurationProvider.TaskResourceConfiguration().(*runtimeMocks.MockTaskResourceConfiguration)
assert.True(t, ok)
mockTaskResourceConfiguration.AllowCPULimitToFloatFromRequest = true
assert.True(t, mockTaskResourceConfiguration.GetAllowCPULimitToFloatFromRequest())

execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, mockConfigurationProvider, getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})

t.Run("same functionality if default cpu limit is not set", func(t *testing.T) {
execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{
Defaults: runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("200m"),
GPU: resource.MustParse("4"),
Memory: resource.MustParse("200Gi"),
},
Limits: runtimeInterfaces.TaskResourceSet{
GPU: resource.MustParse("8"),
// Because only the limit is set, this resource should not be injected.
EphemeralStorage: resource.MustParse("100"),
},
})
assert.True(t, proto.Equal(
&core.Container{
Resources: &core.Resources{
Requests: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "250m",
},
{
Name: core.Resources_MEMORY,
Value: "200Gi",
},
{
Name: core.Resources_GPU,
Value: "4",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "0",
},
{
Name: core.Resources_MEMORY,
Value: "200Gi",
},
{
Name: core.Resources_GPU,
Value: "4",
},
},
},
},
task.Template.GetContainer()), fmt.Sprintf("%+v", task.Template.GetContainer()))
})
}

func TestSetDefaults_OptionalRequiredResourcesAllowCPUToFloat(t *testing.T) {
taskConfigLimits := runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("300m"),
GPU: resource.MustParse("1"),
Memory: resource.MustParse("500Gi"),
EphemeralStorage: resource.MustParse("501Mi"),
}

task := &core.CompiledTask{
Template: &core.TaskTemplate{
Target: &core.TaskTemplate_Container{
Container: &core.Container{
Resources: &core.Resources{
Requests: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "200m",
},
},
},
},
},
Id: &taskIdentifier,
},
}
// Set the config value
mockConfigurationProvider, ok := getMockExecutionsConfigProvider().(*runtimeMocks.MockConfigurationProvider)
assert.True(t, ok)
mockTaskResourceConfiguration, ok := mockConfigurationProvider.TaskResourceConfiguration().(*runtimeMocks.MockTaskResourceConfiguration)
assert.True(t, ok)
mockTaskResourceConfiguration.AllowCPULimitToFloatFromRequest = true
assert.True(t, mockTaskResourceConfiguration.GetAllowCPULimitToFloatFromRequest())

t.Run("don't inject ephemeral storage or gpu when only the limit is set in config", func(t *testing.T) {
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)

mockConfigurationProvider, ok := getMockExecutionsConfigProvider().(*runtimeMocks.MockConfigurationProvider)
assert.True(t, ok)
mockTaskResourceConfiguration, ok := mockConfigurationProvider.TaskResourceConfiguration().(*runtimeMocks.MockTaskResourceConfiguration)
assert.True(t, ok)
mockTaskResourceConfiguration.AllowCPULimitToFloatFromRequest = true
assert.True(t, mockTaskResourceConfiguration.GetAllowCPULimitToFloatFromRequest())

execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, mockConfigurationProvider, getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})

execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{
Defaults: runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("200m"),
Memory: resource.MustParse("200Gi"),
},
Limits: taskConfigLimits,
})
assert.True(t, proto.Equal(
&core.Container{
Resources: &core.Resources{
Requests: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "200m",
},
{
Name: core.Resources_MEMORY,
Value: "200Gi",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "0",
},
{
Name: core.Resources_MEMORY,
Value: "200Gi",
},
},
},
},
task.Template.GetContainer()), fmt.Sprintf("%+v", task.Template.GetContainer()))
})

t.Run("respect non-required resources when defaults exist in config", func(t *testing.T) {
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)

mockConfigurationProvider, ok := getMockExecutionsConfigProvider().(*runtimeMocks.MockConfigurationProvider)
assert.True(t, ok)
mockTaskResourceConfiguration, ok := mockConfigurationProvider.TaskResourceConfiguration().(*runtimeMocks.MockTaskResourceConfiguration)
assert.True(t, ok)
mockTaskResourceConfiguration.AllowCPULimitToFloatFromRequest = true
assert.True(t, mockTaskResourceConfiguration.GetAllowCPULimitToFloatFromRequest())

execManager := NewExecutionManager(repositoryMocks.NewMockRepository(), r, mockConfigurationProvider, getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})

execManager.(*ExecutionManager).setCompiledTaskDefaults(context.Background(), task, workflowengineInterfaces.TaskResources{
Limits: taskConfigLimits,
Defaults: runtimeInterfaces.TaskResourceSet{
CPU: resource.MustParse("200m"),
Memory: resource.MustParse("200Gi"),
EphemeralStorage: resource.MustParse("1"),
},
})
assert.True(t, proto.Equal(
&core.Container{
Resources: &core.Resources{
Requests: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "200m",
},
{
Name: core.Resources_MEMORY,
Value: "200Gi",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "1",
},
},
Limits: []*core.Resources_ResourceEntry{
{
Name: core.Resources_CPU,
Value: "0",
},
{
Name: core.Resources_MEMORY,
Value: "200Gi",
},
{
Name: core.Resources_EPHEMERAL_STORAGE,
Value: "1",
},
},
},
},
task.Template.GetContainer()), fmt.Sprintf("%+v", task.Template.GetContainer()))
})
}
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/task_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func getMockConfigForTaskTest() runtimeInterfaces.Configuration {
whitelistConfiguration.EXPECT().GetTaskTypeWhitelist().Return(map[string][]runtimeInterfaces.WhitelistScope{})
mockConfig := runtimeMocks.NewMockConfigurationProvider(
testutils.GetApplicationConfigWithDefaultDomains(), nil, nil, runtimeMocks.NewMockTaskResourceConfiguration(
runtimeInterfaces.TaskResourceSet{}, runtimeInterfaces.TaskResourceSet{}), whitelistConfiguration, nil)
runtimeInterfaces.TaskResourceSet{}, runtimeInterfaces.TaskResourceSet{}, false), whitelistConfiguration, nil)
return mockConfig
}

Expand Down
Loading
Loading