Skip to content

Commit

Permalink
[RayCluster][Feature] skip suspending worker groups if the in-tree au…
Browse files Browse the repository at this point in the history
…toscaler is enabled to prevent ray cluster from malfunctioning

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Jan 17, 2025
1 parent decfb5a commit cb8a866
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 7 deletions.
19 changes: 12 additions & 7 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,16 @@ func validateRayClusterSpec(instance *rayv1.RayCluster) error {
// TODO (kevin85421): If GcsFaultToleranceOptions is set, users should use `GcsFaultToleranceOptions.RedisAddress` instead of `RAY_REDIS_ADDRESS`.
// TODO (kevin85421): If GcsFaultToleranceOptions is set, users should use `GcsFaultToleranceOptions.ExternalStorageNamespace` instead of
// the annotation `ray.io/external-storage-namespace`.

enableInTreeAutoscaling := (instance.Spec.EnableInTreeAutoscaling != nil) && (*instance.Spec.EnableInTreeAutoscaling)
if enableInTreeAutoscaling {
for _, workerGroup := range instance.Spec.WorkerGroupSpecs {
if workerGroup.Suspend != nil && *workerGroup.Suspend {
// TODO(rueian): This can be supported in future Ray. We should check the RayVersion once we know the version.
return fmt.Errorf("suspending worker groups is not supported with Autoscaler enabled")
}
}
}
return nil
}

Expand Down Expand Up @@ -824,14 +834,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
}

// Delete all workers if worker group is suspended and skip reconcile if enableInTreeAutoscaling is not enabled.
enableInTreeAutoscaling := (instance.Spec.EnableInTreeAutoscaling != nil) && (*instance.Spec.EnableInTreeAutoscaling)
if worker.Suspend != nil && *worker.Suspend {
if enableInTreeAutoscaling {
// TODO: This can be supported in future Ray. We should check the RayVersion on the CR once we know the future version.
r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.InvalidRayClusterSpec),
"Suspending the worker group %s is not supported in RayCluster %s/%s because its Autoscaler is enabled", worker.GroupName, instance.Namespace, instance.Name)
continue
}
if _, err := r.deleteAllPods(ctx, common.RayClusterGroupPodsAssociationOptions(instance, worker.GroupName)); err != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeleteWorkerPodCollection),
"Failed deleting worker Pods for suspended group %s in RayCluster %s/%s, %v", worker.GroupName, instance.Namespace, instance.Name, err)
Expand Down Expand Up @@ -927,6 +930,8 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
// diff < 0 indicates the need to delete some Pods to match the desired number of replicas. However,
// randomly deleting Pods is certainly not ideal. So, if autoscaling is enabled for the cluster, we
// will disable random Pod deletion, making Autoscaler the sole decision-maker for Pod deletions.
enableInTreeAutoscaling := (instance.Spec.EnableInTreeAutoscaling != nil) && (*instance.Spec.EnableInTreeAutoscaling)

// TODO (kevin85421): `enableRandomPodDelete` is a feature flag for KubeRay v0.6.0. If users want to use
// the old behavior, they can set the environment variable `ENABLE_RANDOM_POD_DELETE` to `true`. When the
// default behavior is stable enough, we can remove this feature flag.
Expand Down
62 changes: 62 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3729,6 +3729,68 @@ func TestValidateRayClusterSpecEmptyContainers(t *testing.T) {
}
}

func TestValidateRayClusterSpecSuspendingWorkerGroup(t *testing.T) {
headGroupSpec := rayv1.HeadGroupSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "ray-head"}},
},
},
}
workerGroupSpec := rayv1.WorkerGroupSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "ray-worker"}},
},
},
}
workerGroupSpecSuspended := *workerGroupSpec.DeepCopy()
workerGroupSpecSuspended.Suspend = ptr.To[bool](true)

tests := []struct {
rayCluster *rayv1.RayCluster
name string
errorMessage string
expectError bool
}{
{
name: "suspend without autoscaler",
rayCluster: &rayv1.RayCluster{
Spec: rayv1.RayClusterSpec{
HeadGroupSpec: headGroupSpec,
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{workerGroupSpecSuspended},
},
},
expectError: false,
},
{
// TODO(rueian): This can be supported in future Ray. We should check the RayVersion once we know the version.
name: "suspend with autoscaler",
rayCluster: &rayv1.RayCluster{
Spec: rayv1.RayClusterSpec{
HeadGroupSpec: headGroupSpec,
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{workerGroupSpecSuspended},
EnableInTreeAutoscaling: ptr.To[bool](true),
},
},
expectError: true,
errorMessage: "suspending worker groups is not supported with Autoscaler enabled",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateRayClusterSpec(tt.rayCluster)
if tt.expectError {
assert.Error(t, err)
assert.EqualError(t, err, tt.errorMessage)
} else {
assert.Nil(t, err)
}
})
}
}

func TestValidateRayClusterStatus(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit cb8a866

Please sign in to comment.