Skip to content

Commit

Permalink
[RayCluster][Feature] skip suspending worker groups if the in-tree au…
Browse files Browse the repository at this point in the history
…toscaler is enabled to prevent ray cluster from malfunctioning

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Jan 14, 2025
1 parent 42f299a commit a68ca2e
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 10 deletions.
26 changes: 16 additions & 10 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,16 +772,24 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
return err
}

// Delete all workers if worker group is suspended and skip reconcile
// Delete all workers if worker group is suspended and skip reconcile if enableInTreeAutoscaling is not enabled.
enableInTreeAutoscaling := (instance.Spec.EnableInTreeAutoscaling != nil) && (*instance.Spec.EnableInTreeAutoscaling)
if worker.Suspend != nil && *worker.Suspend {
if _, err := r.deleteAllPods(ctx, common.RayClusterGroupPodsAssociationOptions(instance, worker.GroupName)); err != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeleteWorkerPodCollection),
"Failed deleting worker Pods for suspended group %s in RayCluster %s/%s, %v", worker.GroupName, instance.Namespace, instance.Name, err)
return errstd.Join(utils.ErrFailedDeleteWorkerPod, err)
if enableInTreeAutoscaling {
// TODO: This can be supported in future Ray. We should check the RayVersion on the CR once we know the future version.
r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.InvalidRayClusterStatus),
"Suspending the worker group %s is not supported in RayCluster %s/%s because its Autoscaler is enabled", worker.GroupName, instance.Namespace, instance.Name)
continue
} else {
if _, err := r.deleteAllPods(ctx, common.RayClusterGroupPodsAssociationOptions(instance, worker.GroupName)); err != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, string(utils.FailedToDeleteWorkerPodCollection),
"Failed deleting worker Pods for suspended group %s in RayCluster %s/%s, %v", worker.GroupName, instance.Namespace, instance.Name, err)
return errstd.Join(utils.ErrFailedDeleteWorkerPod, err)
}
r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedWorkerPod),
"Deleted all pods for suspended worker group %s in RayCluster %s/%s", worker.GroupName, instance.Namespace, instance.Name)
continue
}
r.Recorder.Eventf(instance, corev1.EventTypeNormal, string(utils.DeletedWorkerPod),
"Deleted all pods for suspended worker group %s in RayCluster %s/%s", worker.GroupName, instance.Namespace, instance.Name)
continue
}

// Delete unhealthy worker Pods.
Expand Down Expand Up @@ -869,8 +877,6 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
// diff < 0 indicates the need to delete some Pods to match the desired number of replicas. However,
// randomly deleting Pods is certainly not ideal. So, if autoscaling is enabled for the cluster, we
// will disable random Pod deletion, making Autoscaler the sole decision-maker for Pod deletions.
enableInTreeAutoscaling := (instance.Spec.EnableInTreeAutoscaling != nil) && (*instance.Spec.EnableInTreeAutoscaling)

// TODO (kevin85421): `enableRandomPodDelete` is a feature flag for KubeRay v0.6.0. If users want to use
// the old behavior, they can set the environment variable `ENABLE_RANDOM_POD_DELETE` to `true`. When the
// default behavior is stable enough, we can remove this feature flag.
Expand Down
41 changes: 41 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,47 @@ var _ = Context("Inside the default namespace", func() {
})
})

Describe("Suspend RayCluster worker group with Autoscaler enabled", Ordered, func() {
ctx := context.Background()
namespace := "default"
rayCluster := rayClusterTemplate("raycluster-suspend-workergroup-autoscaler", namespace)
rayCluster.Spec.EnableInTreeAutoscaling = ptr.To(true)
allPods := corev1.PodList{}
allFilters := common.RayClusterAllPodsAssociationOptions(rayCluster).ToListOptions()
workerFilters := common.RayClusterGroupPodsAssociationOptions(rayCluster, rayCluster.Spec.WorkerGroupSpecs[0].GroupName).ToListOptions()
headFilters := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions()

It("Create a RayCluster custom resource", func() {
err := k8sClient.Create(ctx, rayCluster)
Expect(err).NotTo(HaveOccurred(), "Failed to create RayCluster")
Eventually(getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster),
time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayCluster: %v", rayCluster.Name)
})

It("Check the number of Pods and add finalizers", func() {
Eventually(listResourceFunc(ctx, &allPods, allFilters...), time.Second*3, time.Millisecond*500).
Should(Equal(4), fmt.Sprintf("all pods %v", allPods.Items))
})

It("Setting suspend=true in first worker group should not fail", func() {
// suspend the Raycluster worker group
err := updateRayClusterWorkerGroupSuspendField(ctx, rayCluster, true)
Expect(err).NotTo(HaveOccurred(), "Failed to update RayCluster")
})

It("Worker pods should be not deleted and head pod should still be running", func() {
Consistently(listResourceFunc(ctx, &allPods, workerFilters...), time.Second*5, time.Millisecond*500).
Should(Equal(3), fmt.Sprintf("all pods %v", allPods.Items))
Consistently(listResourceFunc(ctx, &allPods, headFilters...), time.Second*5, time.Millisecond*500).
Should(Equal(1), fmt.Sprintf("all pods %v", allPods.Items))
})

It("Delete the cluster", func() {
err := k8sClient.Delete(ctx, rayCluster)
Expect(err).NotTo(HaveOccurred())
})
})

Describe("RayCluster with a multi-host worker group", Ordered, func() {
ctx := context.Background()
namespace := "default"
Expand Down

0 comments on commit a68ca2e

Please sign in to comment.