Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ require (
k8s.io/component-base v0.29.0 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
volcano.sh/apis v1.8.2 // indirect
)

require (
Expand Down
2 changes: 2 additions & 0 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2053,3 +2053,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
volcano.sh/apis v1.8.2 h1:MJ1EXpdQeKG+XEhb/I3liWgMFzkgW3qCcj6qdhTuvfA=
volcano.sh/apis v1.8.2/go.mod h1:h+xbUpkjfRaHjktAi8h+7JNnNahjwhRSgpN9FUUwNXQ=
1 change: 1 addition & 0 deletions flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ require (
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
volcano.sh/apis v1.8.2 // indirect
)

replace (
Expand Down
2 changes: 2 additions & 0 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -804,3 +804,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
volcano.sh/apis v1.8.2 h1:MJ1EXpdQeKG+XEhb/I3liWgMFzkgW3qCcj6qdhTuvfA=
volcano.sh/apis v1.8.2/go.mod h1:h+xbUpkjfRaHjktAi8h+7JNnNahjwhRSgpN9FUUwNXQ=
1 change: 1 addition & 0 deletions flyteplugins/go/tasks/config_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func TestLoadConfig(t *testing.T) {
assert.True(t, expectedMemory.Equal(k8sConfig.DefaultMemoryRequest))
assert.Equal(t, map[string]string{"x/interruptible": "true"}, k8sConfig.InterruptibleNodeSelector)
assert.Equal(t, "x/flyte", k8sConfig.InterruptibleTolerations[0].Key)
assert.Equal(t, false, k8sConfig.EnableCreatePodGroupForPod)
assert.Equal(t, "interruptible", k8sConfig.InterruptibleTolerations[0].Value)
assert.NotNil(t, k8sConfig.DefaultPodSecurityContext)
assert.NotNil(t, k8sConfig.DefaultPodSecurityContext.FSGroup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ type K8sPluginConfig struct {
AddTolerationsForExtendedResources []string `json:"add-tolerations-for-extended-resources" pflag:",Name of the extended resources for which tolerations should be added."`

EnableDistributedErrorAggregation bool `json:"enable-distributed-error-aggregation" pflag:",If true, will aggregate errors of different worker pods for distributed tasks."`

// EnableCreatePodGroupForPod enables creating volcano podgroups for pods.
EnableCreatePodGroupForPod bool `json:"enable-create-pod-group-for-pod" pflag:",If true, propeller won't create volcano podgroups."`
}

// FlyteCoPilotConfig specifies configuration for the Flyte CoPilot system. FlyteCoPilot, allows running flytekit-less containers
Expand Down
8 changes: 8 additions & 0 deletions flyteplugins/go/tasks/plugins/k8s/pod/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"

pluginserrors "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/logs"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils"
Expand Down Expand Up @@ -260,6 +263,11 @@
}

func init() {
if config.GetK8sPluginConfig().EnableCreatePodGroupForPod {
if err := volcanov1beta1.AddToScheme(scheme.Scheme); err != nil {
panic(err)

Check warning on line 268 in flyteplugins/go/tasks/plugins/k8s/pod/plugin.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/pod/plugin.go#L267-L268

Added lines #L267 - L268 were not covered by tests
}
}
// Register ContainerTaskType and SidecarTaskType plugin entries. These separate task types
// still exist within the system, only now both are evaluated using the same internal pod plugin
// instance. This simplifies migration as users may keep the same configuration but are
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go/tasks/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ plugins:
default-security-context:
allowPrivilegeEscalation: false
enable-host-networking-pod: true
enable-create-pod-group-for-pod: false
default-pod-dns-config:
options:
- name: "ndots"
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ require (
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
volcano.sh/apis v1.8.2 // indirect
)

replace (
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -872,3 +872,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
volcano.sh/apis v1.8.2 h1:MJ1EXpdQeKG+XEhb/I3liWgMFzkgW3qCcj6qdhTuvfA=
volcano.sh/apis v1.8.2/go.mod h1:h+xbUpkjfRaHjktAi8h+7JNnNahjwhRSgpN9FUUwNXQ=
101 changes: 101 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
Expand Down Expand Up @@ -196,6 +197,93 @@
return podRequestedResources
}

// getRequestResource will get the actual request resource.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this module is not the right place to add this.

// It will first check if there is a defined request resource, if not, it will use the limit resource as the request.
func getRequestResource(resources v1.ResourceRequirements) v1.ResourceList {
merged := v1.ResourceList{}
for name, req := range resources.Requests {
merged[name] = req
}

Check warning on line 206 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L205-L206

Added lines #L205 - L206 were not covered by tests
// If resources don't have requests, it defaults limit if that is explicitly specified.
for name, lim := range resources.Limits {
if _, ok := merged[name]; !ok {
merged[name] = lim
}
}
return merged
}

// createPodGroupForPod will build the associated volcano podgroup for a pod.
func createPodGroupForPod(taskCtx pluginsCore.TaskExecutionMetadata, pod *v1.Pod) *volcanov1beta1.PodGroup {
// minResources is a concept in Volcano. A PodGroup will be enqueued when the required minResources
// are available in the cluster. Enqueue is a pre-filtering step before resource allocation,
// so strict computation is not necessary as long as the PodGroup meets the criteria for enqueuing.
//
// The minResources value is determined by:
// - The the total resources requested by all main containers.
// - The highest individual resource request among the init containers.
minResources := v1.ResourceList{}
for _, c := range pod.Spec.Containers {
requestResources := getRequestResource(c.Resources)
for name, quantity := range requestResources {
if q, ok := minResources[name]; !ok {
minResources[name] = quantity.DeepCopy()
} else {
q.Add(quantity)
minResources[name] = q
}
}
}
// InitContainers are run sequentially before other containers start, so the highest
// init container resource is compared against the sum of app containers to determine
// the effective usage.
for _, c := range pod.Spec.InitContainers {
requestResources := getRequestResource(c.Resources)
for name, quantity := range requestResources {
if value, ok := minResources[name]; !ok || quantity.Cmp(value) > 0 {
minResources[name] = quantity.DeepCopy()
}

Check warning on line 245 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L241-L245

Added lines #L241 - L245 were not covered by tests
}
}
podGroup := &volcanov1beta1.PodGroup{
TypeMeta: metav1.TypeMeta{
APIVersion: volcanov1beta1.SchemeGroupVersion.String(),
Kind: "PodGroup",
},
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
OwnerReferences: []metav1.OwnerReference{taskCtx.GetOwnerReference()},
Annotations: map[string]string{},
Labels: map[string]string{},
},
Spec: volcanov1beta1.PodGroupSpec{
MinMember: 1,
PriorityClassName: pod.Spec.PriorityClassName,
MinResources: &minResources,
},
}
if pod.Labels != nil {
for k, v := range pod.Labels {
podGroup.Labels[k] = v
}

Check warning on line 269 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L267-L269

Added lines #L267 - L269 were not covered by tests
}
if pod.Annotations != nil {
for k, v := range pod.Annotations {
podGroup.Annotations[k] = v
}

Check warning on line 274 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L272-L274

Added lines #L272 - L274 were not covered by tests
}
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
if queueName, ok := pod.Annotations[volcanov1beta1.QueueNameAnnotationKey]; ok {
podGroup.Spec.Queue = queueName
}

Check warning on line 281 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L280-L281

Added lines #L280 - L281 were not covered by tests
// This is to prevent volcano from creating the podgroup for the pod
pod.Annotations[volcanov1beta1.KubeGroupNameAnnotationKey] = podGroup.Name
return podGroup
}

func (e *PluginManager) launchResource(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (pluginsCore.Transition, error) {
tmpl, err := tCtx.TaskReader().Read(ctx)
if err != nil {
Expand Down Expand Up @@ -232,6 +320,19 @@
cfg := nodeTaskConfig.GetConfig()
backOffHandler := e.backOffController.GetOrCreateHandler(ctx, key, cfg.BackOffConfig.BaseSecond, cfg.BackOffConfig.MaxDuration.Duration)

if config.GetK8sPluginConfig().EnableCreatePodGroupForPod {
podGroup := createPodGroupForPod(k8sTaskCtxMetadata, pod)
err := e.kubeClient.GetClient().Create(ctx, podGroup)
if err != nil {
if !k8serrors.IsAlreadyExists(err) {
reason := k8serrors.ReasonForError(err)
return pluginsCore.UnknownTransition, errors.Wrapf(stdErrors.ErrorCode(reason), err, "failed to create volcano podgroup for pod")
}

Check warning on line 330 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L324-L330

Added lines #L324 - L330 were not covered by tests
// The error is IsAlreadyExists
logger.Warnf(ctx, "PodGroup [%s] is already exists. Err: %v", podGroup.Name, err)

Check warning on line 332 in flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go#L332

Added line #L332 was not covered by tests
}
}

err = backOffHandler.Handle(ctx, func() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As before, this error will still be handled by the logic a few lines below.

return e.kubeClient.GetClient().Create(ctx, o)
}, podRequestedResources)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,41 @@ func TestResourceManagerConstruction(t *testing.T) {
assert.NotNil(t, rm)
}

func TestPluginManager_CreatePodGroupForPod(t *testing.T) {
t.Run("Build volcano podgroup for a pod", func(t *testing.T) {
tctx := getMockTaskContext(PluginPhaseNotStarted, PluginPhaseNotStarted)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: tctx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(),
Namespace: tctx.TaskExecutionMetadata().GetNamespace(),
},
TypeMeta: metav1.TypeMeta{
Kind: flytek8s.PodKind,
APIVersion: v1.SchemeGroupVersion.String(),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("1"), v1.ResourceMemory: resource.MustParse("1Gi")},
}},
{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("2"), v1.ResourceMemory: resource.MustParse("2Gi")},
}},
{Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{v1.ResourceCPU: resource.MustParse("3"), v1.ResourceMemory: resource.MustParse("3Gi")},
}},
},
},
}
podgroup := createPodGroupForPod(tctx.TaskExecutionMetadata(), pod)
assert.Equal(t, podgroup.Name, pod.Name)
assert.Equal(t, podgroup.Namespace, pod.Namespace)
assert.Equal(t, podgroup.Kind, "PodGroup")
assert.Equal(t, podgroup.Spec.MinResources.Cpu().Cmp(resource.MustParse("6")), 0)
assert.Equal(t, podgroup.Spec.MinResources.Memory().Cmp(resource.MustParse("6Gi")), 0)
})
}

func TestFinalize(t *testing.T) {
t.Run("DeleteResourceOnFinalize=True", func(t *testing.T) {
ctx := context.Background()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ require (
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
volcano.sh/apis v1.8.2 // indirect
)

replace (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2082,3 +2082,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+s
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
volcano.sh/apis v1.8.2 h1:MJ1EXpdQeKG+XEhb/I3liWgMFzkgW3qCcj6qdhTuvfA=
volcano.sh/apis v1.8.2/go.mod h1:h+xbUpkjfRaHjktAi8h+7JNnNahjwhRSgpN9FUUwNXQ=
Loading