-
Notifications
You must be signed in to change notification settings - Fork 697
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KEP-2170: Adding validation webhook for v2 trainjob #2307
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,9 +26,15 @@ const ( | |
// JobInitializer is the Job name for the initializer. | ||
JobInitializer string = "initializer" | ||
|
||
// JobExporter is the Job name for the exporter. | ||
JobExporter string = "exporter" | ||
|
||
// ContainerModelInitializer is the container name for the model initializer. | ||
ContainerModelInitializer string = "model-initializer" | ||
|
||
// ContainerModelExporter is the container name for the model exporter. | ||
ContainerModelExporter string = "model-exporter" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The same for the container. |
||
|
||
// ContainerDatasetInitializer is the container name for the dataset initializer. | ||
ContainerDatasetInitializer string = "dataset-initializer" | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ import ( | |
"k8s.io/apimachinery/pkg/util/validation/field" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/webhook/admission" | ||
jobsetv1alpha2 "sigs.k8s.io/jobset/api/jobset/v1alpha2" | ||
|
||
kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1" | ||
runtime "github.com/kubeflow/training-operator/pkg/runtime.v2" | ||
|
@@ -64,14 +65,19 @@ func (r *ClusterTrainingRuntime) EventHandlerRegistrars() []runtime.ReconcilerBu | |
} | ||
|
||
func (r *ClusterTrainingRuntime) ValidateObjects(ctx context.Context, old, new *kubeflowv2.TrainJob) (admission.Warnings, field.ErrorList) { | ||
clusterTrainingRuntime := &kubeflowv2.ClusterTrainingRuntime{} | ||
if err := r.client.Get(ctx, client.ObjectKey{ | ||
Namespace: old.Namespace, | ||
Name: old.Spec.RuntimeRef.Name, | ||
}, &kubeflowv2.ClusterTrainingRuntime{}); err != nil { | ||
Namespace: new.Namespace, | ||
Name: new.Spec.RuntimeRef.Name, | ||
Comment on lines
+70
to
+71
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have you ever seen the isseus when we use the old object names? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we get |
||
}, clusterTrainingRuntime); err != nil { | ||
return nil, field.ErrorList{ | ||
field.Invalid(field.NewPath("spec", "RuntimeRef"), old.Spec.RuntimeRef, | ||
field.Invalid(field.NewPath("spec", "RuntimeRef"), new.Spec.RuntimeRef, | ||
fmt.Sprintf("%v: specified clusterTrainingRuntime must be created before the TrainJob is created", err)), | ||
} | ||
} | ||
return r.framework.RunCustomValidationPlugins(old, new) | ||
info := r.runtimeInfo(ctx, new, clusterTrainingRuntime.Spec.Template, clusterTrainingRuntime.Spec.MLPolicy, clusterTrainingRuntime.Spec.PodGroupPolicy) | ||
jobSetTemplate := jobsetv1alpha2.JobSet{ | ||
Spec: clusterTrainingRuntime.Spec.Template.Spec, | ||
} | ||
return r.framework.RunCustomValidationPlugins(jobSetTemplate.DeepCopy(), info, old, new) | ||
} |
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -82,6 +82,26 @@ func (r *TrainingRuntime) NewObjects(ctx context.Context, trainJob *kubeflowv2.T | |||
func (r *TrainingRuntime) buildObjects( | ||||
ctx context.Context, trainJob *kubeflowv2.TrainJob, jobSetTemplateSpec kubeflowv2.JobSetTemplateSpec, mlPolicy *kubeflowv2.MLPolicy, podGroupPolicy *kubeflowv2.PodGroupPolicy, | ||||
) ([]client.Object, error) { | ||||
|
||||
info := r.runtimeInfo(ctx, trainJob, jobSetTemplateSpec, mlPolicy, podGroupPolicy) | ||||
if err := r.framework.RunEnforceMLPolicyPlugins(info, trainJob); err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
if err := r.framework.RunEnforcePodGroupPolicyPlugins(info, trainJob); err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
jobSetTemplate := jobsetv1alpha2.JobSet{ | ||||
Spec: jobSetTemplateSpec.Spec, | ||||
} | ||||
|
||||
return r.framework.RunComponentBuilderPlugins(ctx, jobSetTemplate.DeepCopy(), info, trainJob) | ||||
} | ||||
|
||||
func (r *TrainingRuntime) runtimeInfo( | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be part of Runtime interface:
And should we name this API more explicit (e.g. getRuntimeInfo() or initializeRuntimeInfo() ) ?
|
||||
ctx context.Context, trainJob *kubeflowv2.TrainJob, jobSetTemplateSpec kubeflowv2.JobSetTemplateSpec, mlPolicy *kubeflowv2.MLPolicy, podGroupPolicy *kubeflowv2.PodGroupPolicy) *runtime.Info { | ||||
|
||||
propagationLabels := jobSetTemplateSpec.Labels | ||||
if propagationLabels == nil && trainJob.Spec.Labels != nil { | ||||
propagationLabels = make(map[string]string, len(trainJob.Spec.Labels)) | ||||
|
@@ -112,19 +132,7 @@ func (r *TrainingRuntime) buildObjects( | |||
|
||||
info := runtime.NewInfo(opts...) | ||||
|
||||
if err := r.framework.RunEnforceMLPolicyPlugins(info, trainJob); err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
if err := r.framework.RunEnforcePodGroupPolicyPlugins(info, trainJob); err != nil { | ||||
return nil, err | ||||
} | ||||
|
||||
jobSetTemplate := jobsetv1alpha2.JobSet{ | ||||
Spec: jobSetTemplateSpec.Spec, | ||||
} | ||||
|
||||
return r.framework.RunComponentBuilderPlugins(ctx, jobSetTemplate.DeepCopy(), info, trainJob) | ||||
return info | ||||
} | ||||
|
||||
func (r *TrainingRuntime) EventHandlerRegistrars() []runtime.ReconcilerBuilder { | ||||
|
@@ -136,14 +144,19 @@ func (r *TrainingRuntime) EventHandlerRegistrars() []runtime.ReconcilerBuilder { | |||
} | ||||
|
||||
func (r *TrainingRuntime) ValidateObjects(ctx context.Context, old, new *kubeflowv2.TrainJob) (admission.Warnings, field.ErrorList) { | ||||
trainingRuntime := &kubeflowv2.TrainingRuntime{} | ||||
if err := r.client.Get(ctx, client.ObjectKey{ | ||||
Namespace: old.Namespace, | ||||
Name: old.Spec.RuntimeRef.Name, | ||||
}, &kubeflowv2.TrainingRuntime{}); err != nil { | ||||
Namespace: new.Namespace, | ||||
Name: new.Spec.RuntimeRef.Name, | ||||
}, trainingRuntime); err != nil { | ||||
return nil, field.ErrorList{ | ||||
field.Invalid(field.NewPath("spec", "runtimeRef"), old.Spec.RuntimeRef, | ||||
field.Invalid(field.NewPath("spec", "runtimeRef"), new.Spec.RuntimeRef, | ||||
fmt.Sprintf("%v: specified trainingRuntime must be created before the TrainJob is created", err)), | ||||
} | ||||
} | ||||
return r.framework.RunCustomValidationPlugins(old, new) | ||||
info := r.runtimeInfo(ctx, new, trainingRuntime.Spec.Template, trainingRuntime.Spec.MLPolicy, trainingRuntime.Spec.PodGroupPolicy) | ||||
jobSetTemplate := jobsetv1alpha2.JobSet{ | ||||
Spec: trainingRuntime.Spec.Template.Spec, | ||||
} | ||||
return r.framework.RunCustomValidationPlugins(jobSetTemplate.DeepCopy(), info, old, new) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the main goal to pass |
||||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -28,17 +28,20 @@ import ( | |||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||||
apiruntime "k8s.io/apimachinery/pkg/runtime" | ||||||
"k8s.io/apimachinery/pkg/runtime/schema" | ||||||
"k8s.io/apimachinery/pkg/util/validation/field" | ||||||
"k8s.io/utils/ptr" | ||||||
ctrl "sigs.k8s.io/controller-runtime" | ||||||
"sigs.k8s.io/controller-runtime/pkg/builder" | ||||||
"sigs.k8s.io/controller-runtime/pkg/client" | ||||||
ctrlutil "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" | ||||||
"sigs.k8s.io/controller-runtime/pkg/webhook/admission" | ||||||
jobsetv1alpha2 "sigs.k8s.io/jobset/api/jobset/v1alpha2" | ||||||
|
||||||
kubeflowv2 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v2alpha1" | ||||||
"github.com/kubeflow/training-operator/pkg/constants" | ||||||
runtime "github.com/kubeflow/training-operator/pkg/runtime.v2" | ||||||
"github.com/kubeflow/training-operator/pkg/runtime.v2/framework" | ||||||
util_v2 "github.com/kubeflow/training-operator/pkg/util.v2" | ||||||
) | ||||||
|
||||||
type JobSet struct { | ||||||
|
@@ -50,6 +53,7 @@ type JobSet struct { | |||||
|
||||||
var _ framework.WatchExtensionPlugin = (*JobSet)(nil) | ||||||
var _ framework.ComponentBuilderPlugin = (*JobSet)(nil) | ||||||
var _ framework.CustomValidationPlugin = (*JobSet)(nil) | ||||||
|
||||||
const Name = constants.JobSetKind | ||||||
|
||||||
|
@@ -140,3 +144,104 @@ func (j *JobSet) ReconcilerBuilders() []runtime.ReconcilerBuilder { | |||||
}, | ||||||
} | ||||||
} | ||||||
|
||||||
func (j *JobSet) Validate(runtimeJobTemplate client.Object, runtimeInfo *runtime.Info, oldObj, newObj *kubeflowv2.TrainJob) (admission.Warnings, field.ErrorList) { | ||||||
|
||||||
var allErrs field.ErrorList | ||||||
specPath := field.NewPath("spec") | ||||||
runtimeRefPath := specPath.Child("runtimeRef") | ||||||
|
||||||
jobSet, ok := runtimeJobTemplate.(*jobsetv1alpha2.JobSet) | ||||||
if !ok { | ||||||
return nil, nil | ||||||
} | ||||||
|
||||||
if newObj.Spec.ModelConfig != nil && newObj.Spec.ModelConfig.Input != nil { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think, for now we should check the initContainers in JobSet, as I mentioned here: https://github.com/kubeflow/training-operator/blob/master/pkg/runtime.v2/framework/plugins/jobset/builder.go#L87-L89 |
||||||
if !util_v2.JobExists(jobSet.Spec.ReplicatedJobs, constants.JobInitializer) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tenzen-y Can we use Go-native libs to verify that Name exists in the slice ? |
||||||
allErrs = append(allErrs, field.Invalid(runtimeRefPath, newObj.Spec.RuntimeRef, "trainingRuntime should have initializer job when trainJob is configured with input modelConfig")) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's use name from var here.
Suggested change
|
||||||
} else { | ||||||
for _, job := range jobSet.Spec.ReplicatedJobs { | ||||||
if job.Name == constants.JobInitializer { | ||||||
if !util_v2.ContainerExists(job.Template.Spec.Template.Spec.InitContainers, constants.ContainerModelInitializer) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question. |
||||||
allErrs = append(allErrs, field.Invalid(runtimeRefPath, newObj.Spec.RuntimeRef, "trainingRuntime should have container with name - model-initializer in the initializer job")) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same suggestion for the Container and Job name. |
||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
if newObj.Spec.DatasetConfig != nil { | ||||||
if !util_v2.JobExists(jobSet.Spec.ReplicatedJobs, constants.JobInitializer) { | ||||||
allErrs = append(allErrs, field.Invalid(runtimeRefPath, newObj.Spec.RuntimeRef, "trainingRuntime should have initializer job when trainJob is configured with datasetConfig")) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||||||
} else { | ||||||
for _, job := range jobSet.Spec.ReplicatedJobs { | ||||||
if job.Name == constants.JobInitializer { | ||||||
if !util_v2.ContainerExists(job.Template.Spec.Template.Spec.InitContainers, constants.ContainerDatasetInitializer) { | ||||||
allErrs = append(allErrs, field.Invalid(runtimeRefPath, newObj.Spec.RuntimeRef, "trainingRuntime should have container with name - dataset-initializer in the initializer job")) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
if newObj.Spec.ModelConfig != nil && newObj.Spec.ModelConfig.Output != nil { | ||||||
if !util_v2.JobExists(jobSet.Spec.ReplicatedJobs, constants.JobExporter) { | ||||||
allErrs = append(allErrs, field.Invalid(runtimeRefPath, newObj.Spec.RuntimeRef, "trainingRuntime should have exporter job when trainJob is configured with output modelConfig")) | ||||||
} else { | ||||||
for _, job := range jobSet.Spec.ReplicatedJobs { | ||||||
if job.Name == constants.JobExporter { | ||||||
if !util_v2.ContainerExists(job.Template.Spec.Template.Spec.InitContainers, constants.ContainerModelExporter) { | ||||||
allErrs = append(allErrs, field.Invalid(runtimeRefPath, newObj.Spec.RuntimeRef, "trainingRuntime should have container with name - model-exporter in the exporter job")) | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
Comment on lines
+187
to
+199
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's implement it once we support Output. |
||||||
|
||||||
if len(newObj.Spec.PodSpecOverrides) != 0 { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we implement this validation when we support PodSpecOverride ? |
||||||
podSpecOverridesPath := specPath.Child("podSpecOverrides") | ||||||
jobsMap := map[string]bool{} | ||||||
for _, job := range jobSet.Spec.ReplicatedJobs { | ||||||
jobsMap[job.Name] = true | ||||||
} | ||||||
// validate if jobOverrides are valid | ||||||
for idx, override := range newObj.Spec.PodSpecOverrides { | ||||||
for _, job := range override.TargetJobs { | ||||||
if _, found := jobsMap[job.Name]; !found { | ||||||
allErrs = append(allErrs, field.Invalid(podSpecOverridesPath, newObj.Spec.PodSpecOverrides, fmt.Sprintf("job: %s, configured in the podOverride should be present in the referenced training runtime", job))) | ||||||
} | ||||||
} | ||||||
if len(override.Containers) != 0 { | ||||||
// validate if containerOverrides are valid | ||||||
containerMap := map[string]bool{} | ||||||
for _, job := range jobSet.Spec.ReplicatedJobs { | ||||||
for _, container := range job.Template.Spec.Template.Spec.Containers { | ||||||
containerMap[container.Name] = true | ||||||
} | ||||||
} | ||||||
containerOverridePath := podSpecOverridesPath.Index(idx) | ||||||
for _, container := range override.Containers { | ||||||
if _, found := containerMap[container.Name]; !found { | ||||||
allErrs = append(allErrs, field.Invalid(containerOverridePath, override.Containers, fmt.Sprintf("container: %s, configured in the containerOverride should be present in the referenced training runtime", container.Name))) | ||||||
} | ||||||
} | ||||||
} | ||||||
if len(override.InitContainers) != 0 { | ||||||
// validate if initContainerOverrides are valid | ||||||
initContainerMap := map[string]bool{} | ||||||
for _, job := range jobSet.Spec.ReplicatedJobs { | ||||||
for _, initContainer := range job.Template.Spec.Template.Spec.InitContainers { | ||||||
initContainerMap[initContainer.Name] = true | ||||||
} | ||||||
} | ||||||
initContainerOverridePath := podSpecOverridesPath.Index(idx) | ||||||
for _, container := range override.Containers { | ||||||
if _, found := initContainerMap[container.Name]; !found { | ||||||
allErrs = append(allErrs, field.Invalid(initContainerOverridePath, override.InitContainers, fmt.Sprintf("initContainer: %s, configured in the initContainerOverride should be present in the referenced training runtime", container.Name))) | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
return nil, allErrs | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ package mpi | |
|
||
import ( | ||
"context" | ||
"strconv" | ||
|
||
"k8s.io/apimachinery/pkg/util/validation/field" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
|
@@ -55,7 +56,16 @@ func (m *MPI) EnforceMLPolicy(info *runtime.Info, trainJob *kubeflowv2.TrainJob) | |
return nil | ||
} | ||
|
||
// TODO: Need to implement validations for MPIJob. | ||
func (m *MPI) Validate(oldObj, newObj *kubeflowv2.TrainJob) (admission.Warnings, field.ErrorList) { | ||
return nil, nil | ||
func (m *MPI) Validate(runtimeJobTemplate client.Object, runtimeInfo *runtime.Info, oldJobObj, newJobObj *kubeflowv2.TrainJob) (admission.Warnings, field.ErrorList) { | ||
var allErrs field.ErrorList | ||
specPath := field.NewPath("spec") | ||
if newJobObj.Spec.Trainer != nil { | ||
numProcPerNodePath := specPath.Child("trainer").Child("numProcPerNode") | ||
if runtimeInfo.RuntimePolicy.MLPolicy.MPI != nil { | ||
if _, err := strconv.Atoi(*newJobObj.Spec.Trainer.NumProcPerNode); err != nil { | ||
allErrs = append(allErrs, field.Invalid(numProcPerNodePath, newJobObj.Spec.Trainer.NumProcPerNode, "should have an int value")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uhm, based on considering these string and integer conversion everywhere, we want to define the numProcPerNode as a typed IntOrString. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think so, is this value compatible with the k8s API conventions: https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md ? |
||
} | ||
} | ||
} | ||
return nil, allErrs | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please can we implement the validation for exporter in the future once we design it as part of: #2245 ?
We should discuss whether we want to use sidecar container or another ReplicatedJob for model checkpointing.
cc @saileshd1402 @akshaychitneni @tenzen-y