Skip to content

Commit

Permalink
[RayCluster][Feature] skip suspending worker groups if the in-tree au…
Browse files Browse the repository at this point in the history
…toscaler is enabled (ray-project#2748)
  • Loading branch information
rueian authored and win5923 committed Jan 20, 2025
1 parent 26b7dbe commit 7430be3
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 0 deletions.
10 changes: 10 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,16 @@ func validateRayClusterSpec(instance *rayv1.RayCluster) error {
// TODO (kevin85421): If GcsFaultToleranceOptions is set, users should use `GcsFaultToleranceOptions.RedisAddress` instead of `RAY_REDIS_ADDRESS`.
// TODO (kevin85421): If GcsFaultToleranceOptions is set, users should use `GcsFaultToleranceOptions.ExternalStorageNamespace` instead of
// the annotation `ray.io/external-storage-namespace`.

enableInTreeAutoscaling := (instance.Spec.EnableInTreeAutoscaling != nil) && (*instance.Spec.EnableInTreeAutoscaling)
if enableInTreeAutoscaling {
for _, workerGroup := range instance.Spec.WorkerGroupSpecs {
if workerGroup.Suspend != nil && *workerGroup.Suspend {
// TODO (rueian): This can be supported in future Ray. We should check the RayVersion once we know the version.
return fmt.Errorf("suspending worker groups is not currently supported with Autoscaler enabled")
}
}
}
return nil
}

Expand Down
41 changes: 41 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,47 @@ var _ = Context("Inside the default namespace", func() {
})
})

Describe("Suspend RayCluster worker group with Autoscaler enabled", Ordered, func() {
ctx := context.Background()
namespace := "default"
rayCluster := rayClusterTemplate("raycluster-suspend-workergroup-autoscaler", namespace)
rayCluster.Spec.EnableInTreeAutoscaling = ptr.To(true)
allPods := corev1.PodList{}
allFilters := common.RayClusterAllPodsAssociationOptions(rayCluster).ToListOptions()
workerFilters := common.RayClusterGroupPodsAssociationOptions(rayCluster, rayCluster.Spec.WorkerGroupSpecs[0].GroupName).ToListOptions()
headFilters := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions()

It("Create a RayCluster custom resource", func() {
err := k8sClient.Create(ctx, rayCluster)
Expect(err).NotTo(HaveOccurred(), "Failed to create RayCluster")
Eventually(getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster),
time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayCluster: %v", rayCluster.Name)
})

It("Check the number of Pods", func() {
Eventually(listResourceFunc(ctx, &allPods, allFilters...), time.Second*3, time.Millisecond*500).
Should(Equal(4), fmt.Sprintf("all pods %v", allPods.Items))
})

It("Setting suspend=true in first worker group should not fail", func() {
// suspend the Raycluster worker group
err := updateRayClusterWorkerGroupSuspendField(ctx, rayCluster, true)
Expect(err).NotTo(HaveOccurred(), "Failed to update RayCluster")
})

It("Worker pods should not be deleted and head pod should still be running", func() {
Consistently(listResourceFunc(ctx, &allPods, workerFilters...), time.Second*5, time.Millisecond*500).
Should(Equal(3), fmt.Sprintf("all pods %v", allPods.Items))
Consistently(listResourceFunc(ctx, &allPods, headFilters...), time.Second*5, time.Millisecond*500).
Should(Equal(1), fmt.Sprintf("all pods %v", allPods.Items))
})

It("Delete the cluster", func() {
err := k8sClient.Delete(ctx, rayCluster)
Expect(err).NotTo(HaveOccurred())
})
})

Describe("RayCluster with a multi-host worker group", Ordered, func() {
ctx := context.Background()
namespace := "default"
Expand Down
61 changes: 61 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3729,6 +3729,67 @@ func TestValidateRayClusterSpecEmptyContainers(t *testing.T) {
}
}

func TestValidateRayClusterSpecSuspendingWorkerGroup(t *testing.T) {
headGroupSpec := rayv1.HeadGroupSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "ray-head"}},
},
},
}
workerGroupSpecSuspended := rayv1.WorkerGroupSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "ray-worker"}},
},
},
}
workerGroupSpecSuspended.Suspend = ptr.To[bool](true)

tests := []struct {
rayCluster *rayv1.RayCluster
name string
errorMessage string
expectError bool
}{
{
name: "suspend without autoscaler",
rayCluster: &rayv1.RayCluster{
Spec: rayv1.RayClusterSpec{
HeadGroupSpec: headGroupSpec,
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{workerGroupSpecSuspended},
},
},
expectError: false,
},
{
// TODO (rueian): This can be supported in future Ray. We should check the RayVersion once we know the version.
name: "suspend with autoscaler",
rayCluster: &rayv1.RayCluster{
Spec: rayv1.RayClusterSpec{
HeadGroupSpec: headGroupSpec,
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{workerGroupSpecSuspended},
EnableInTreeAutoscaling: ptr.To[bool](true),
},
},
expectError: true,
errorMessage: "suspending worker groups is not currently supported with Autoscaler enabled",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateRayClusterSpec(tt.rayCluster)
if tt.expectError {
assert.Error(t, err)
assert.EqualError(t, err, tt.errorMessage)
} else {
assert.Nil(t, err)
}
})
}
}

func TestValidateRayClusterStatus(t *testing.T) {
tests := []struct {
name string
Expand Down

0 comments on commit 7430be3

Please sign in to comment.