Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ require (
k8s.io/component-base v0.29.0 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
volcano.sh/apis v1.8.2 // indirect
)

require (
Expand Down
2 changes: 2 additions & 0 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2053,3 +2053,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
volcano.sh/apis v1.8.2 h1:MJ1EXpdQeKG+XEhb/I3liWgMFzkgW3qCcj6qdhTuvfA=
volcano.sh/apis v1.8.2/go.mod h1:h+xbUpkjfRaHjktAi8h+7JNnNahjwhRSgpN9FUUwNXQ=
1 change: 1 addition & 0 deletions flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ require (
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
volcano.sh/apis v1.8.2 // indirect
)

replace (
Expand Down
2 changes: 2 additions & 0 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -804,3 +804,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
volcano.sh/apis v1.8.2 h1:MJ1EXpdQeKG+XEhb/I3liWgMFzkgW3qCcj6qdhTuvfA=
volcano.sh/apis v1.8.2/go.mod h1:h+xbUpkjfRaHjktAi8h+7JNnNahjwhRSgpN9FUUwNXQ=
1 change: 1 addition & 0 deletions flyteplugins/go/tasks/config_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestLoadConfig(t *testing.T) {
assert.True(t, expectedMemory.Equal(k8sConfig.DefaultMemoryRequest))
assert.Equal(t, map[string]string{"x/interruptible": "true"}, k8sConfig.InterruptibleNodeSelector)
assert.Equal(t, "x/flyte", k8sConfig.InterruptibleTolerations[0].Key)
assert.Equal(t, false, k8sConfig.EnableCreatePodGroupForPod)
assert.Equal(t, "interruptible", k8sConfig.InterruptibleTolerations[0].Value)
assert.NotNil(t, k8sConfig.DefaultPodSecurityContext)
assert.NotNil(t, k8sConfig.DefaultPodSecurityContext.FSGroup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ type K8sPluginConfig struct {
AddTolerationsForExtendedResources []string `json:"add-tolerations-for-extended-resources" pflag:",Name of the extended resources for which tolerations should be added."`

EnableDistributedErrorAggregation bool `json:"enable-distributed-error-aggregation" pflag:",If true, will aggregate errors of different worker pods for distributed tasks."`

// EnableCreatePodGroupForPod enables creating volcano podgroups for pods.
EnableCreatePodGroupForPod bool `json:"enable-create-pod-group-for-pod" pflag:",If true, propeller will create volcano podgroups for podTasks."`
}

// FlyteCoPilotConfig specifies configuration for the Flyte CoPilot system. FlyteCoPilot, allows running flytekit-less containers
Expand Down
8 changes: 8 additions & 0 deletions flyteplugins/go/tasks/plugins/k8s/pod/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"

pluginserrors "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/logs"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils"
Expand Down Expand Up @@ -260,6 +263,11 @@
}

func init() {
if config.GetK8sPluginConfig().EnableCreatePodGroupForPod {
if err := volcanov1beta1.AddToScheme(scheme.Scheme); err != nil {
panic(err)

Check warning on line 268 in flyteplugins/go/tasks/plugins/k8s/pod/plugin.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/pod/plugin.go#L267-L268

Added lines #L267 - L268 were not covered by tests
}
}
// Register ContainerTaskType and SidecarTaskType plugin entries. These separate task types
// still exist within the system, only now both are evaluated using the same internal pod plugin
// instance. This simplifies migration as users may keep the same configuration but are
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go/tasks/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ plugins:
default-security-context:
allowPrivilegeEscalation: false
enable-host-networking-pod: true
enable-create-pod-group-for-pod: false
default-pod-dns-config:
options:
- name: "ndots"
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ require (
k8s.io/klog v1.0.0
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
sigs.k8s.io/controller-runtime v0.17.2
volcano.sh/apis v1.8.2
)

require (
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -872,3 +872,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
volcano.sh/apis v1.8.2 h1:MJ1EXpdQeKG+XEhb/I3liWgMFzkgW3qCcj6qdhTuvfA=
volcano.sh/apis v1.8.2/go.mod h1:h+xbUpkjfRaHjktAi8h+7JNnNahjwhRSgpN9FUUwNXQ=
101 changes: 101 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
Expand Down Expand Up @@ -196,6 +197,93 @@
return podRequestedResources
}

// getRequestResource will get the actual request resource.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this module is not the right place to add this.

// It will first check if there is a defined request resource, if not, it will use the limit resource as the request.
func getRequestResource(resources v1.ResourceRequirements) v1.ResourceList {
merged := v1.ResourceList{}
for name, req := range resources.Requests {
merged[name] = req
}

Check warning on line 206 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L205-L206

Added lines #L205 - L206 were not covered by tests
// If resources don't have requests, it defaults limit if that is explicitly specified.
for name, lim := range resources.Limits {
if _, ok := merged[name]; !ok {
merged[name] = lim
}
}
return merged
}

// createPodGroupForPod will build the associated volcano podgroup for a pod.
func createPodGroupForPod(taskCtx pluginsCore.TaskExecutionMetadata, pod *v1.Pod) *volcanov1beta1.PodGroup {
// minResources is a concept in Volcano. A PodGroup will be enqueued when the required minResources
// are available in the cluster. Enqueue is a pre-filtering step before resource allocation,
// so strict computation is not necessary as long as the PodGroup meets the criteria for enqueuing.
//
// The minResources value is determined by:
// - The the total resources requested by all main containers.
// - The highest individual resource request among the init containers.
minResources := v1.ResourceList{}
for _, c := range pod.Spec.Containers {
requestResources := getRequestResource(c.Resources)
for name, quantity := range requestResources {
if q, ok := minResources[name]; !ok {
minResources[name] = quantity.DeepCopy()
} else {
q.Add(quantity)
minResources[name] = q
}
}
}
// InitContainers are run sequentially before other containers start, so the highest
// init container resource is compared against the sum of app containers to determine
// the effective usage.
for _, c := range pod.Spec.InitContainers {
requestResources := getRequestResource(c.Resources)
for name, quantity := range requestResources {
if value, ok := minResources[name]; !ok || quantity.Cmp(value) > 0 {
minResources[name] = quantity.DeepCopy()
}

Check warning on line 245 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L241-L245

Added lines #L241 - L245 were not covered by tests
}
}
podGroup := &volcanov1beta1.PodGroup{
TypeMeta: metav1.TypeMeta{
APIVersion: volcanov1beta1.SchemeGroupVersion.String(),
Kind: "PodGroup",
},
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
OwnerReferences: []metav1.OwnerReference{taskCtx.GetOwnerReference()},
Annotations: map[string]string{},
Labels: map[string]string{},
},
Spec: volcanov1beta1.PodGroupSpec{
MinMember: 1,
PriorityClassName: pod.Spec.PriorityClassName,
MinResources: &minResources,
},
}
if pod.Labels != nil {
for k, v := range pod.Labels {
podGroup.Labels[k] = v
}

Check warning on line 269 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L267-L269

Added lines #L267 - L269 were not covered by tests
}
if pod.Annotations != nil {
for k, v := range pod.Annotations {
podGroup.Annotations[k] = v
}

Check warning on line 274 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L272-L274

Added lines #L272 - L274 were not covered by tests
}
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
if queueName, ok := pod.Annotations[volcanov1beta1.QueueNameAnnotationKey]; ok {
podGroup.Spec.Queue = queueName
}

Check warning on line 281 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L280-L281

Added lines #L280 - L281 were not covered by tests
// This is to prevent volcano from creating the podgroup for the pod
pod.Annotations[volcanov1beta1.KubeGroupNameAnnotationKey] = podGroup.Name
return podGroup
}

func (e *PluginManager) launchResource(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (pluginsCore.Transition, error) {
tmpl, err := tCtx.TaskReader().Read(ctx)
if err != nil {
Expand Down Expand Up @@ -232,6 +320,19 @@
cfg := nodeTaskConfig.GetConfig()
backOffHandler := e.backOffController.GetOrCreateHandler(ctx, key, cfg.BackOffConfig.BaseSecond, cfg.BackOffConfig.MaxDuration.Duration)

if config.GetK8sPluginConfig().EnableCreatePodGroupForPod {
podGroup := createPodGroupForPod(k8sTaskCtxMetadata, pod)
err := e.kubeClient.GetClient().Create(ctx, podGroup)
if err != nil {
if !k8serrors.IsAlreadyExists(err) {
reason := k8serrors.ReasonForError(err)
return pluginsCore.UnknownTransition, errors.Wrapf(stdErrors.ErrorCode(reason), err, "failed to create volcano podgroup for pod")
}

Check warning on line 330 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L324-L330

Added lines #L324 - L330 were not covered by tests
// The error is IsAlreadyExists
logger.Warnf(ctx, "PodGroup [%s] is already exists. Err: %v", podGroup.Name, err)

Check warning on line 332 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L332

Added line #L332 was not covered by tests
}
}

err = backOffHandler.Handle(ctx, func() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As before, this error will still be handled by the logic a few lines below.

return e.kubeClient.GetClient().Create(ctx, o)
}, podRequestedResources)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,41 @@ func TestResourceManagerConstruction(t *testing.T) {
assert.NotNil(t, rm)
}

func TestPluginManager_CreatePodGroupForPod(t *testing.T) {
t.Run("Build volcano podgroup for a pod", func(t *testing.T) {
tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseNotStarted)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(),
Namespace: tctx.TaskExecutionMetadata().GetNamespace(),
},
TypeMeta: metav1.TypeMeta{
Kind: flytek8s.PodKind,
APIVersion: v1.SchemeGroupVersion.String(),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")},
}},
{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("2"), v1.ResourceMemory: resource.MustParse("2Gi")},
}},
{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("3"), v1.ResourceMemory: resource.MustParse("3Gi")},
}},
},
},
}
podgroup := createPodGroupForPod(tctx.TaskExecutionMetadata(), pod)
assert.Equal(t, podgroup.Name, pod.Name)
assert.Equal(t, podgroup.Namespace, pod.Namespace)
assert.Equal(t, podgroup.Kind, "PodGroup")
assert.Equal(t, podgroup.Spec.MinResources.Cpu().Cmp(resource.MustParse("6")), 0)
assert.Equal(t, podgroup.Spec.MinResources.Memory().Cmp(resource.MustParse("6Gi")), 0)
})
}

func TestFinalize(t *testing.T) {
t.Run("DeleteResourceOnFinalize=True", func(t *testing.T) {
ctx := context.Background()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ require (
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
volcano.sh/apis v1.8.2 // indirect
)

replace (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2082,3 +2082,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
volcano.sh/apis v1.8.2 h1:MJ1EXpdQeKG+XEhb/I3liWgMFzkgW3qCcj6qdhTuvfA=
volcano.sh/apis v1.8.2/go.mod h1:h+xbUpkjfRaHjktAi8h+7JNnNahjwhRSgpN9FUUwNXQ=
Loading