Skip to content

Commit 42f299a

Browse files
authored
[RayCluster][Fix] DesiredReplicas, MinReplicas and MaxReplicas should respect workerGroupSpec.Suspend (#2728)
1 parent b15f5af commit 42f299a

File tree

3 files changed

+130
-0
lines changed

3 files changed

+130
-0
lines changed

ray-operator/controllers/ray/raycluster_controller_unit_test.go

+67
Original file line numberDiff line numberDiff line change
@@ -1864,6 +1864,73 @@ func TestCalculateStatusWithoutDesiredReplicas(t *testing.T) {
18641864
assert.Nil(t, newInstance.Status.StateTransitionTimes)
18651865
}
18661866

1867+
// TestCalculateStatusWithSuspendedWorkerGroups tests that the cluster CR should be marked as Ready without workers
1868+
// and all desired resources are not counted with suspended workers
1869+
func TestCalculateStatusWithSuspendedWorkerGroups(t *testing.T) {
1870+
setupTest(t)
1871+
1872+
testRayCluster.Spec.WorkerGroupSpecs[0].Suspend = ptr.To[bool](true)
1873+
testRayCluster.Spec.WorkerGroupSpecs[0].MinReplicas = ptr.To[int32](100)
1874+
testRayCluster.Spec.WorkerGroupSpecs[0].MaxReplicas = ptr.To[int32](100)
1875+
testRayCluster.Spec.WorkerGroupSpecs[0].Template.Spec.Containers[0].Resources.Requests = corev1.ResourceList{
1876+
corev1.ResourceCPU: resource.MustParse("100m"),
1877+
corev1.ResourceMemory: resource.MustParse("100Mi"),
1878+
}
1879+
1880+
// Create a new scheme with CRDs, Pod, Service schemes.
1881+
newScheme := runtime.NewScheme()
1882+
_ = rayv1.AddToScheme(newScheme)
1883+
_ = corev1.AddToScheme(newScheme)
1884+
1885+
// Mock data
1886+
headServiceIP := "aaa.bbb.ccc.ddd"
1887+
headService, err := common.BuildServiceForHeadPod(context.Background(), *testRayCluster, nil, nil)
1888+
assert.Nil(t, err, "Failed to build head service.")
1889+
headService.Spec.ClusterIP = headServiceIP
1890+
headPod := &corev1.Pod{
1891+
ObjectMeta: metav1.ObjectMeta{
1892+
Name: "headNode",
1893+
Namespace: namespaceStr,
1894+
Labels: map[string]string{
1895+
utils.RayClusterLabelKey: instanceName,
1896+
utils.RayNodeTypeLabelKey: string(rayv1.HeadNode),
1897+
},
1898+
},
1899+
Status: corev1.PodStatus{
1900+
PodIP: headNodeIP,
1901+
Phase: corev1.PodRunning,
1902+
Conditions: []corev1.PodCondition{
1903+
{
1904+
Type: corev1.PodReady,
1905+
Status: corev1.ConditionTrue,
1906+
},
1907+
},
1908+
},
1909+
}
1910+
runtimeObjects := []runtime.Object{headPod, headService}
1911+
1912+
// Initialize a fake client with newScheme and runtimeObjects.
1913+
fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
1914+
ctx := context.Background()
1915+
1916+
// Initialize a RayCluster reconciler.
1917+
r := &RayClusterReconciler{
1918+
Client: fakeClient,
1919+
Recorder: &record.FakeRecorder{},
1920+
Scheme: scheme.Scheme,
1921+
}
1922+
1923+
newInstance, err := r.calculateStatus(ctx, testRayCluster, nil)
1924+
assert.Nil(t, err)
1925+
assert.Equal(t, newInstance.Status.DesiredWorkerReplicas, int32(0))
1926+
assert.Equal(t, newInstance.Status.MinWorkerReplicas, int32(0))
1927+
assert.Equal(t, newInstance.Status.MaxWorkerReplicas, int32(0))
1928+
assert.Equal(t, newInstance.Status.DesiredCPU, resource.Quantity{})
1929+
assert.Equal(t, newInstance.Status.DesiredMemory, resource.Quantity{})
1930+
assert.Equal(t, newInstance.Status.State, rayv1.Ready) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288
1931+
assert.NotNil(t, newInstance.Status.StateTransitionTimes)
1932+
}
1933+
18671934
// TestCalculateStatusWithReconcileErrorBackAndForth tests that the cluster CR should not be marked as Ready if reconcileErr != nil
18681935
// and the Ready state should not be removed after being Ready even if reconcileErr != nil
18691936
func TestCalculateStatusWithReconcileErrorBackAndForth(t *testing.T) {

ray-operator/controllers/ray/utils/util.go

+12
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,9 @@ func GetWorkerGroupDesiredReplicas(ctx context.Context, workerGroupSpec rayv1.Wo
322322
log := ctrl.LoggerFrom(ctx)
323323
// Always adhere to min/max replicas constraints.
324324
var workerReplicas int32
325+
if workerGroupSpec.Suspend != nil && *workerGroupSpec.Suspend {
326+
return 0
327+
}
325328
if *workerGroupSpec.MinReplicas > *workerGroupSpec.MaxReplicas {
326329
log.Info("minReplicas is greater than maxReplicas, using maxReplicas as desired replicas. "+
327330
"Please fix this to avoid any unexpected behaviors.", "minReplicas", *workerGroupSpec.MinReplicas, "maxReplicas", *workerGroupSpec.MaxReplicas)
@@ -352,6 +355,9 @@ func CalculateDesiredReplicas(ctx context.Context, cluster *rayv1.RayCluster) in
352355
func CalculateMinReplicas(cluster *rayv1.RayCluster) int32 {
353356
count := int32(0)
354357
for _, nodeGroup := range cluster.Spec.WorkerGroupSpecs {
358+
if nodeGroup.Suspend != nil && *nodeGroup.Suspend {
359+
continue
360+
}
355361
count += *nodeGroup.MinReplicas
356362
}
357363

@@ -362,6 +368,9 @@ func CalculateMinReplicas(cluster *rayv1.RayCluster) int32 {
362368
func CalculateMaxReplicas(cluster *rayv1.RayCluster) int32 {
363369
count := int32(0)
364370
for _, nodeGroup := range cluster.Spec.WorkerGroupSpecs {
371+
if nodeGroup.Suspend != nil && *nodeGroup.Suspend {
372+
continue
373+
}
365374
count += *nodeGroup.MaxReplicas
366375
}
367376

@@ -405,6 +414,9 @@ func CalculateDesiredResources(cluster *rayv1.RayCluster) corev1.ResourceList {
405414
headPodResource := CalculatePodResource(cluster.Spec.HeadGroupSpec.Template.Spec)
406415
desiredResourcesList = append(desiredResourcesList, headPodResource)
407416
for _, nodeGroup := range cluster.Spec.WorkerGroupSpecs {
417+
if nodeGroup.Suspend != nil && *nodeGroup.Suspend {
418+
continue
419+
}
408420
podResource := CalculatePodResource(nodeGroup.Template.Spec)
409421
for i := int32(0); i < *nodeGroup.Replicas; i++ {
410422
desiredResourcesList = append(desiredResourcesList, podResource)

ray-operator/controllers/ray/utils/util_test.go

+51
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,57 @@ func TestGetWorkerGroupDesiredReplicas(t *testing.T) {
513513
workerGroupSpec.MinReplicas = &maxReplicas
514514
workerGroupSpec.MaxReplicas = &minReplicas
515515
assert.Equal(t, GetWorkerGroupDesiredReplicas(ctx, workerGroupSpec), *workerGroupSpec.MaxReplicas)
516+
517+
// Test 6: `WorkerGroupSpec.Suspend` is true.
518+
suspend := true
519+
workerGroupSpec.MinReplicas = &maxReplicas
520+
workerGroupSpec.MaxReplicas = &minReplicas
521+
workerGroupSpec.Suspend = &suspend
522+
assert.Equal(t, GetWorkerGroupDesiredReplicas(ctx, workerGroupSpec), int32(0))
523+
}
524+
525+
func TestCalculateMinReplicas(t *testing.T) {
526+
// Test 1
527+
minReplicas := int32(1)
528+
rayCluster := &rayv1.RayCluster{
529+
Spec: rayv1.RayClusterSpec{
530+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
531+
{
532+
MinReplicas: &minReplicas,
533+
},
534+
},
535+
},
536+
}
537+
assert.Equal(t, CalculateMinReplicas(rayCluster), minReplicas)
538+
539+
// Test 2
540+
suspend := true
541+
for i := range rayCluster.Spec.WorkerGroupSpecs {
542+
rayCluster.Spec.WorkerGroupSpecs[i].Suspend = &suspend
543+
}
544+
assert.Equal(t, CalculateMinReplicas(rayCluster), int32(0))
545+
}
546+
547+
func TestCalculateMaxReplicas(t *testing.T) {
548+
// Test 1
549+
maxReplicas := int32(1)
550+
rayCluster := &rayv1.RayCluster{
551+
Spec: rayv1.RayClusterSpec{
552+
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{
553+
{
554+
MaxReplicas: &maxReplicas,
555+
},
556+
},
557+
},
558+
}
559+
assert.Equal(t, CalculateMaxReplicas(rayCluster), maxReplicas)
560+
561+
// Test 2
562+
suspend := true
563+
for i := range rayCluster.Spec.WorkerGroupSpecs {
564+
rayCluster.Spec.WorkerGroupSpecs[i].Suspend = &suspend
565+
}
566+
assert.Equal(t, CalculateMaxReplicas(rayCluster), int32(0))
516567
}
517568

518569
func TestCalculateDesiredReplicas(t *testing.T) {

0 commit comments

Comments
 (0)