Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RayCluster][Feature] skip suspending worker groups if the in-tree autoscaler is enabled #2748

Merged
merged 5 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,16 @@ func validateRayClusterSpec(instance *rayv1.RayCluster) error {
// TODO (kevin85421): If GcsFaultToleranceOptions is set, users should use `GcsFaultToleranceOptions.RedisAddress` instead of `RAY_REDIS_ADDRESS`.
// TODO (kevin85421): If GcsFaultToleranceOptions is set, users should use `GcsFaultToleranceOptions.ExternalStorageNamespace` instead of
// the annotation `ray.io/external-storage-namespace`.

enableInTreeAutoscaling := (instance.Spec.EnableInTreeAutoscaling != nil) && (*instance.Spec.EnableInTreeAutoscaling)
if enableInTreeAutoscaling {
for _, workerGroup := range instance.Spec.WorkerGroupSpecs {
if workerGroup.Suspend != nil && *workerGroup.Suspend {
// TODO (rueian): This can be supported in future Ray. We should check the RayVersion once we know the version.
return fmt.Errorf("suspending worker groups is not currently supported with Autoscaler enabled")
}
}
}
return nil
}

Expand Down
41 changes: 41 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,47 @@ var _ = Context("Inside the default namespace", func() {
})
})

Describe("Suspend RayCluster worker group with Autoscaler enabled", Ordered, func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A unit test seems sufficient if the validation happens at the very beginning of the reconciliation, so no other logic is involved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test case could still be useful when we later add the support to suspend worker groups in an autoscaler-enabled cluster. Do you think we should delete it for now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense

ctx := context.Background()
namespace := "default"
rayCluster := rayClusterTemplate("raycluster-suspend-workergroup-autoscaler", namespace)
rayCluster.Spec.EnableInTreeAutoscaling = ptr.To(true)
allPods := corev1.PodList{}
allFilters := common.RayClusterAllPodsAssociationOptions(rayCluster).ToListOptions()
workerFilters := common.RayClusterGroupPodsAssociationOptions(rayCluster, rayCluster.Spec.WorkerGroupSpecs[0].GroupName).ToListOptions()
headFilters := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions()

It("Create a RayCluster custom resource", func() {
err := k8sClient.Create(ctx, rayCluster)
Expect(err).NotTo(HaveOccurred(), "Failed to create RayCluster")
Eventually(getResourceFunc(ctx, client.ObjectKey{Name: rayCluster.Name, Namespace: namespace}, rayCluster),
time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayCluster: %v", rayCluster.Name)
})

It("Check the number of Pods", func() {
Eventually(listResourceFunc(ctx, &allPods, allFilters...), time.Second*3, time.Millisecond*500).
Should(Equal(4), fmt.Sprintf("all pods %v", allPods.Items))
})

It("Setting suspend=true in first worker group should not fail", func() {
// suspend the Raycluster worker group
err := updateRayClusterWorkerGroupSuspendField(ctx, rayCluster, true)
Expect(err).NotTo(HaveOccurred(), "Failed to update RayCluster")
})

It("Worker pods should not be deleted and head pod should still be running", func() {
Consistently(listResourceFunc(ctx, &allPods, workerFilters...), time.Second*5, time.Millisecond*500).
Should(Equal(3), fmt.Sprintf("all pods %v", allPods.Items))
Consistently(listResourceFunc(ctx, &allPods, headFilters...), time.Second*5, time.Millisecond*500).
Should(Equal(1), fmt.Sprintf("all pods %v", allPods.Items))
})

It("Delete the cluster", func() {
err := k8sClient.Delete(ctx, rayCluster)
Expect(err).NotTo(HaveOccurred())
})
})

Describe("RayCluster with a multi-host worker group", Ordered, func() {
ctx := context.Background()
namespace := "default"
Expand Down
61 changes: 61 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3729,6 +3729,67 @@ func TestValidateRayClusterSpecEmptyContainers(t *testing.T) {
}
}

func TestValidateRayClusterSpecSuspendingWorkerGroup(t *testing.T) {
headGroupSpec := rayv1.HeadGroupSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "ray-head"}},
},
},
}
workerGroupSpecSuspended := rayv1.WorkerGroupSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "ray-worker"}},
},
},
}
workerGroupSpecSuspended.Suspend = ptr.To[bool](true)

tests := []struct {
rayCluster *rayv1.RayCluster
name string
errorMessage string
expectError bool
}{
{
name: "suspend without autoscaler",
rayCluster: &rayv1.RayCluster{
Spec: rayv1.RayClusterSpec{
HeadGroupSpec: headGroupSpec,
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{workerGroupSpecSuspended},
},
},
expectError: false,
},
{
// TODO (rueian): This can be supported in future Ray. We should check the RayVersion once we know the version.
name: "suspend with autoscaler",
rayCluster: &rayv1.RayCluster{
Spec: rayv1.RayClusterSpec{
HeadGroupSpec: headGroupSpec,
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{workerGroupSpecSuspended},
EnableInTreeAutoscaling: ptr.To[bool](true),
},
},
expectError: true,
errorMessage: "suspending worker groups is not currently supported with Autoscaler enabled",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateRayClusterSpec(tt.rayCluster)
if tt.expectError {
assert.Error(t, err)
assert.EqualError(t, err, tt.errorMessage)
} else {
assert.Nil(t, err)
}
})
}
}

func TestValidateRayClusterStatus(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading