Skip to content

Commit fec9b86

Browse files
committed
[RayCluster][Feature] setup GCS FT annotations and the RAY_REDIS_ADDRESS env by the GcsFaultToleranceOptions
Signed-off-by: Rueian <[email protected]>
1 parent 7a768f9 commit fec9b86

File tree

7 files changed

+174
-21
lines changed

7 files changed

+174
-21
lines changed

ray-operator/controllers/ray/common/pod.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func GetHeadPort(headStartParams map[string]string) string {
6363
// Check if the RayCluster has GCS fault tolerance enabled.
6464
func IsGCSFaultToleranceEnabled(instance rayv1.RayCluster) bool {
6565
v, ok := instance.Annotations[utils.RayFTEnabledAnnotationKey]
66-
return ok && strings.ToLower(v) == "true"
66+
return (ok && strings.ToLower(v) == "true") || instance.Spec.GcsFaultToleranceOptions != nil
6767
}
6868

6969
// Check if overwrites the container command.
@@ -96,6 +96,9 @@ func initTemplateAnnotations(instance rayv1.RayCluster, podTemplate *corev1.PodT
9696
podTemplate.Annotations[utils.RayExternalStorageNSAnnotationKey] = v
9797
}
9898
}
99+
if options := instance.Spec.GcsFaultToleranceOptions; options != nil && options.ExternalStorageNamespace != "" {
100+
podTemplate.Annotations[utils.RayExternalStorageNSAnnotationKey] = options.ExternalStorageNamespace
101+
}
99102
}
100103

101104
// DefaultHeadPodTemplate sets the config values
@@ -322,7 +325,7 @@ func initLivenessAndReadinessProbe(rayContainer *corev1.Container, rayNodeType r
322325
}
323326

324327
// BuildPod a pod config
325-
func BuildPod(ctx context.Context, podTemplateSpec corev1.PodTemplateSpec, rayNodeType rayv1.RayNodeType, rayStartParams map[string]string, headPort string, enableRayAutoscaler *bool, creatorCRDType utils.CRDType, fqdnRayIP string) (aPod corev1.Pod) {
328+
func BuildPod(ctx context.Context, podTemplateSpec corev1.PodTemplateSpec, rayNodeType rayv1.RayNodeType, gcsOptions *rayv1.GcsFaultToleranceOptions, rayStartParams map[string]string, headPort string, enableRayAutoscaler *bool, creatorCRDType utils.CRDType, fqdnRayIP string) (aPod corev1.Pod) {
326329
log := ctrl.LoggerFrom(ctx)
327330

328331
// For Worker Pod: Traffic readiness is determined by the readiness probe.
@@ -396,7 +399,7 @@ func BuildPod(ctx context.Context, podTemplateSpec corev1.PodTemplateSpec, rayNo
396399
for index := range pod.Spec.InitContainers {
397400
setInitContainerEnvVars(&pod.Spec.InitContainers[index], fqdnRayIP)
398401
}
399-
setContainerEnvVars(&pod, rayNodeType, rayStartParams, fqdnRayIP, headPort, rayStartCmd, creatorCRDType)
402+
setContainerEnvVars(&pod, rayNodeType, gcsOptions, rayStartParams, fqdnRayIP, headPort, rayStartCmd, creatorCRDType)
400403

401404
// Inject probes into the Ray containers if the user has not explicitly disabled them.
402405
// The feature flag `ENABLE_PROBES_INJECTION` will be removed if this feature is stable enough.
@@ -559,7 +562,7 @@ func setInitContainerEnvVars(container *corev1.Container, fqdnRayIP string) {
559562
)
560563
}
561564

562-
func setContainerEnvVars(pod *corev1.Pod, rayNodeType rayv1.RayNodeType, rayStartParams map[string]string, fqdnRayIP string, headPort string, rayStartCmd string, creatorCRDType utils.CRDType) {
565+
func setContainerEnvVars(pod *corev1.Pod, rayNodeType rayv1.RayNodeType, gcsOptions *rayv1.GcsFaultToleranceOptions, rayStartParams map[string]string, fqdnRayIP string, headPort string, rayStartCmd string, creatorCRDType utils.CRDType) {
563566
// TODO: Audit all environment variables to identify which should not be modified by users.
564567
container := &pod.Spec.Containers[utils.RayContainerIndex]
565568
if len(container.Env) == 0 {
@@ -677,11 +680,19 @@ func setContainerEnvVars(pod *corev1.Pod, rayNodeType rayv1.RayNodeType, rayStar
677680
// 600s, the Raylet will exit the process. By default, the value is 60s, so the head node will
678681
// crash if the GCS server is down for more than 60s. Typically, the new GCS server will be available
679682
// in 120 seconds, so we set the timeout to 600s to avoid the worker nodes crashing.
680-
if ftEnabled := pod.Annotations[utils.RayFTEnabledAnnotationKey] == "true"; ftEnabled {
683+
if ftEnabled := pod.Annotations[utils.RayFTEnabledAnnotationKey] == "true"; ftEnabled || gcsOptions != nil {
681684
gcsTimeout := corev1.EnvVar{Name: utils.RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, Value: utils.DefaultWorkerRayGcsReconnectTimeoutS}
682685
container.Env = append(container.Env, gcsTimeout)
683686
}
684687
}
688+
689+
if rayNodeType == rayv1.HeadNode && gcsOptions != nil {
690+
container.Env = utils.UpsertEnvVar(container.Env, corev1.EnvVar{
691+
Name: utils.RAY_REDIS_ADDRESS,
692+
Value: gcsOptions.RedisAddress,
693+
})
694+
}
695+
685696
if !utils.EnvVarExists(utils.RAY_DASHBOARD_ENABLE_K8S_DISK_USAGE, container.Env) {
686697
// This flag enables the display of disk usage. Without this flag, the dashboard will not show disk usage.
687698
container.Env = append(container.Env, corev1.EnvVar{Name: utils.RAY_DASHBOARD_ENABLE_K8S_DISK_USAGE, Value: "1"})

ray-operator/controllers/ray/common/pod_test.go

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ func TestBuildPod(t *testing.T) {
305305
// Test head pod
306306
podName := strings.ToLower(cluster.Name + utils.DashSymbol + string(rayv1.HeadNode) + utils.DashSymbol + utils.FormatInt32(0))
307307
podTemplateSpec := DefaultHeadPodTemplate(ctx, *cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
308-
pod := BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "")
308+
pod := BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "")
309309

310310
// Check environment variables
311311
rayContainer := pod.Spec.Containers[utils.RayContainerIndex]
@@ -360,7 +360,7 @@ func TestBuildPod(t *testing.T) {
360360
podName = cluster.Name + utils.DashSymbol + string(rayv1.WorkerNode) + utils.DashSymbol + worker.GroupName + utils.DashSymbol + utils.FormatInt32(0)
361361
fqdnRayIP := utils.GenerateFQDNServiceName(ctx, *cluster, cluster.Namespace)
362362
podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379")
363-
pod = BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", nil, utils.GetCRDType(""), fqdnRayIP)
363+
pod = BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, cluster.Spec.GcsFaultToleranceOptions, worker.RayStartParams, "6379", nil, utils.GetCRDType(""), fqdnRayIP)
364364

365365
// Check resources
366366
rayContainer = pod.Spec.Containers[utils.RayContainerIndex]
@@ -423,7 +423,7 @@ func TestBuildPod_WithNoCPULimits(t *testing.T) {
423423
// Test head pod
424424
podName := strings.ToLower(cluster.Name + utils.DashSymbol + string(rayv1.HeadNode) + utils.DashSymbol + utils.FormatInt32(0))
425425
podTemplateSpec := DefaultHeadPodTemplate(ctx, *cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
426-
pod := BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "")
426+
pod := BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "")
427427
expectedCommandArg := splitAndSort("ulimit -n 65536; ray start --head --block --dashboard-agent-listen-port=52365 --memory=1073741824 --num-cpus=2 --metrics-export-port=8080 --dashboard-host=0.0.0.0")
428428
actualCommandArg := splitAndSort(pod.Spec.Containers[0].Args[0])
429429
if !reflect.DeepEqual(expectedCommandArg, actualCommandArg) {
@@ -435,7 +435,7 @@ func TestBuildPod_WithNoCPULimits(t *testing.T) {
435435
podName = cluster.Name + utils.DashSymbol + string(rayv1.WorkerNode) + utils.DashSymbol + worker.GroupName + utils.DashSymbol + utils.FormatInt32(0)
436436
fqdnRayIP := utils.GenerateFQDNServiceName(ctx, *cluster, cluster.Namespace)
437437
podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379")
438-
pod = BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", nil, utils.GetCRDType(""), fqdnRayIP)
438+
pod = BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, cluster.Spec.GcsFaultToleranceOptions, worker.RayStartParams, "6379", nil, utils.GetCRDType(""), fqdnRayIP)
439439
expectedCommandArg = splitAndSort("ulimit -n 65536; ray start --block --dashboard-agent-listen-port=52365 --memory=1073741824 --num-cpus=2 --num-gpus=3 --address=raycluster-sample-head-svc.default.svc.cluster.local:6379 --port=6379 --metrics-export-port=8080")
440440
actualCommandArg = splitAndSort(pod.Spec.Containers[0].Args[0])
441441
if !reflect.DeepEqual(expectedCommandArg, actualCommandArg) {
@@ -459,7 +459,7 @@ func TestBuildPod_WithOverwriteCommand(t *testing.T) {
459459

460460
podName := strings.ToLower(cluster.Name + utils.DashSymbol + string(rayv1.HeadNode) + utils.DashSymbol + utils.FormatInt32(0))
461461
podTemplateSpec := DefaultHeadPodTemplate(ctx, *cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
462-
headPod := BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "")
462+
headPod := BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "")
463463
headContainer := headPod.Spec.Containers[utils.RayContainerIndex]
464464
assert.Equal(t, headContainer.Command, []string{"I am head"})
465465
assert.Equal(t, headContainer.Args, []string{"I am head again"})
@@ -468,7 +468,7 @@ func TestBuildPod_WithOverwriteCommand(t *testing.T) {
468468
podName = cluster.Name + utils.DashSymbol + string(rayv1.WorkerNode) + utils.DashSymbol + worker.GroupName + utils.DashSymbol + utils.FormatInt32(0)
469469
fqdnRayIP := utils.GenerateFQDNServiceName(ctx, *cluster, cluster.Namespace)
470470
podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379")
471-
workerPod := BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", nil, utils.GetCRDType(""), fqdnRayIP)
471+
workerPod := BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, cluster.Spec.GcsFaultToleranceOptions, worker.RayStartParams, "6379", nil, utils.GetCRDType(""), fqdnRayIP)
472472
workerContainer := workerPod.Spec.Containers[utils.RayContainerIndex]
473473
assert.Equal(t, workerContainer.Command, []string{"I am worker"})
474474
assert.Equal(t, workerContainer.Args, []string{"I am worker again"})
@@ -480,7 +480,7 @@ func TestBuildPod_WithAutoscalerEnabled(t *testing.T) {
480480
cluster.Spec.EnableInTreeAutoscaling = &trueFlag
481481
podName := strings.ToLower(cluster.Name + utils.DashSymbol + string(rayv1.HeadNode) + utils.DashSymbol + utils.FormatInt32(0))
482482
podTemplateSpec := DefaultHeadPodTemplate(ctx, *cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
483-
pod := BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", &trueFlag, utils.GetCRDType(""), "")
483+
pod := BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", &trueFlag, utils.GetCRDType(""), "")
484484

485485
actualResult := pod.Labels[utils.RayClusterLabelKey]
486486
expectedResult := cluster.Name
@@ -537,7 +537,7 @@ func TestBuildPod_WithCreatedByRayService(t *testing.T) {
537537
cluster.Spec.EnableInTreeAutoscaling = &trueFlag
538538
podName := strings.ToLower(cluster.Name + utils.DashSymbol + string(rayv1.HeadNode) + utils.DashSymbol + utils.FormatInt32(0))
539539
podTemplateSpec := DefaultHeadPodTemplate(ctx, *cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
540-
pod := BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", &trueFlag, utils.RayServiceCRD, "")
540+
pod := BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", &trueFlag, utils.RayServiceCRD, "")
541541

542542
val, ok := pod.Labels[utils.RayClusterServingServiceLabelKey]
543543
assert.True(t, ok, "Expected serve label is not present")
@@ -548,7 +548,7 @@ func TestBuildPod_WithCreatedByRayService(t *testing.T) {
548548
podName = cluster.Name + utils.DashSymbol + string(rayv1.WorkerNode) + utils.DashSymbol + worker.GroupName + utils.DashSymbol + utils.FormatInt32(0)
549549
fqdnRayIP := utils.GenerateFQDNServiceName(ctx, *cluster, cluster.Namespace)
550550
podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379")
551-
pod = BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", nil, utils.RayServiceCRD, fqdnRayIP)
551+
pod = BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, cluster.Spec.GcsFaultToleranceOptions, worker.RayStartParams, "6379", nil, utils.RayServiceCRD, fqdnRayIP)
552552

553553
val, ok = pod.Labels[utils.RayClusterServingServiceLabelKey]
554554
assert.True(t, ok, "Expected serve label is not present")
@@ -567,7 +567,7 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) {
567567
// Build a head Pod.
568568
podName := strings.ToLower(cluster.Name + utils.DashSymbol + string(rayv1.HeadNode) + utils.DashSymbol + utils.FormatInt32(0))
569569
podTemplateSpec := DefaultHeadPodTemplate(ctx, *cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
570-
pod := BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "")
570+
pod := BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "")
571571

572572
// Check environment variable "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S"
573573
rayContainer := pod.Spec.Containers[utils.RayContainerIndex]
@@ -585,7 +585,7 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) {
585585
cluster.Spec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex].Env = append(cluster.Spec.HeadGroupSpec.Template.Spec.Containers[utils.RayContainerIndex].Env,
586586
corev1.EnvVar{Name: utils.RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, Value: "60"})
587587
podTemplateSpec = DefaultHeadPodTemplate(ctx, *cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
588-
pod = BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "")
588+
pod = BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "")
589589
rayContainer = pod.Spec.Containers[utils.RayContainerIndex]
590590

591591
// Check environment variable "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S"
@@ -602,7 +602,7 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) {
602602
podName = cluster.Name + utils.DashSymbol + string(rayv1.WorkerNode) + utils.DashSymbol + worker.GroupName + utils.DashSymbol + utils.FormatInt32(0)
603603
fqdnRayIP := utils.GenerateFQDNServiceName(ctx, *cluster, cluster.Namespace)
604604
podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379")
605-
pod = BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", nil, utils.GetCRDType(""), fqdnRayIP)
605+
pod = BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, cluster.Spec.GcsFaultToleranceOptions, worker.RayStartParams, "6379", nil, utils.GetCRDType(""), fqdnRayIP)
606606

607607
// Check the default value of "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S"
608608
rayContainer = pod.Spec.Containers[utils.RayContainerIndex]
@@ -619,11 +619,52 @@ func TestBuildPod_WithGcsFtEnabled(t *testing.T) {
619619
corev1.EnvVar{Name: utils.RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, Value: "120"})
620620
worker = cluster.Spec.WorkerGroupSpecs[0]
621621
podTemplateSpec = DefaultWorkerPodTemplate(ctx, *cluster, worker, podName, fqdnRayIP, "6379")
622-
pod = BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, "6379", nil, utils.GetCRDType(""), fqdnRayIP)
622+
pod = BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, cluster.Spec.GcsFaultToleranceOptions, worker.RayStartParams, "6379", nil, utils.GetCRDType(""), fqdnRayIP)
623623

624624
// Check the default value of "RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S"
625625
rayContainer = pod.Spec.Containers[utils.RayContainerIndex]
626626
checkContainerEnv(t, rayContainer, utils.RAY_GCS_RPC_SERVER_RECONNECT_TIMEOUT_S, "120")
627+
628+
// Test 5 with a minimal GcsFaultToleranceOptions
629+
cluster = instance.DeepCopy()
630+
cluster.UID = "mycluster"
631+
cluster.Annotations = map[string]string{}
632+
cluster.Spec.GcsFaultToleranceOptions = &rayv1.GcsFaultToleranceOptions{
633+
RedisAddress: "redis://127.0.0.1:6379",
634+
}
635+
podTemplateSpec = DefaultHeadPodTemplate(ctx, *cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
636+
pod = BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "")
637+
rayContainer = pod.Spec.Containers[utils.RayContainerIndex]
638+
639+
if pod.Annotations[utils.RayFTEnabledAnnotationKey] != "true" {
640+
t.Fatalf("Ray pod has unexpected %s annotation: %v", utils.RayFTEnabledAnnotationKey, pod.Annotations[utils.RayFTEnabledAnnotationKey])
641+
}
642+
if pod.Annotations[utils.RayExternalStorageNSAnnotationKey] != string(cluster.UID) {
643+
t.Fatalf("Ray pod has unexpected %s annotation: %v", utils.RayExternalStorageNSAnnotationKey, pod.Annotations[utils.RayExternalStorageNSAnnotationKey])
644+
}
645+
checkContainerEnv(t, rayContainer, utils.RAY_REDIS_ADDRESS, "redis://127.0.0.1:6379")
646+
checkContainerEnv(t, rayContainer, utils.RAY_EXTERNAL_STORAGE_NS, string(cluster.UID))
647+
648+
// Test 6 with a full GcsFaultToleranceOptions
649+
cluster = instance.DeepCopy()
650+
cluster.UID = "mycluster"
651+
cluster.Annotations = map[string]string{}
652+
cluster.Spec.GcsFaultToleranceOptions = &rayv1.GcsFaultToleranceOptions{
653+
ExternalStorageNamespace: "myns",
654+
RedisAddress: "redis://127.0.0.1:6379",
655+
}
656+
podTemplateSpec = DefaultHeadPodTemplate(ctx, *cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
657+
pod = BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", nil, utils.GetCRDType(""), "")
658+
rayContainer = pod.Spec.Containers[utils.RayContainerIndex]
659+
660+
if pod.Annotations[utils.RayFTEnabledAnnotationKey] != "true" {
661+
t.Fatalf("Ray pod has unexpected %s annotation: %v", utils.RayFTEnabledAnnotationKey, pod.Annotations[utils.RayFTEnabledAnnotationKey])
662+
}
663+
if pod.Annotations[utils.RayExternalStorageNSAnnotationKey] != "myns" {
664+
t.Fatalf("Ray pod has unexpected %s annotation: %v", utils.RayExternalStorageNSAnnotationKey, pod.Annotations[utils.RayExternalStorageNSAnnotationKey])
665+
}
666+
checkContainerEnv(t, rayContainer, utils.RAY_EXTERNAL_STORAGE_NS, "myns")
667+
checkContainerEnv(t, rayContainer, utils.RAY_REDIS_ADDRESS, "redis://127.0.0.1:6379")
627668
}
628669

629670
// Check that autoscaler container overrides work as expected.
@@ -690,7 +731,7 @@ func TestBuildPodWithAutoscalerOptions(t *testing.T) {
690731
SecurityContext: &customSecurityContext,
691732
}
692733
podTemplateSpec := DefaultHeadPodTemplate(ctx, *cluster, cluster.Spec.HeadGroupSpec, podName, "6379")
693-
pod := BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", &trueFlag, utils.GetCRDType(""), "")
734+
pod := BuildPod(ctx, podTemplateSpec, rayv1.HeadNode, cluster.Spec.GcsFaultToleranceOptions, cluster.Spec.HeadGroupSpec.RayStartParams, "6379", &trueFlag, utils.GetCRDType(""), "")
694735
expectedContainer := *autoscalerContainer.DeepCopy()
695736
expectedContainer.Image = customAutoscalerImage
696737
expectedContainer.ImagePullPolicy = customPullPolicy

ray-operator/controllers/ray/raycluster_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1107,7 +1107,7 @@ func (r *RayClusterReconciler) buildHeadPod(ctx context.Context, instance rayv1.
11071107
}
11081108
logger.Info("head pod labels", "labels", podConf.Labels)
11091109
creatorCRDType := getCreatorCRDType(instance)
1110-
pod := common.BuildPod(ctx, podConf, rayv1.HeadNode, instance.Spec.HeadGroupSpec.RayStartParams, headPort, autoscalingEnabled, creatorCRDType, fqdnRayIP)
1110+
pod := common.BuildPod(ctx, podConf, rayv1.HeadNode, instance.Spec.GcsFaultToleranceOptions, instance.Spec.HeadGroupSpec.RayStartParams, headPort, autoscalingEnabled, creatorCRDType, fqdnRayIP)
11111111
// Set raycluster instance as the owner and controller
11121112
if err := controllerutil.SetControllerReference(&instance, &pod, r.Scheme); err != nil {
11131113
logger.Error(err, "Failed to set controller reference for raycluster pod")
@@ -1134,7 +1134,7 @@ func (r *RayClusterReconciler) buildWorkerPod(ctx context.Context, instance rayv
11341134
podTemplateSpec.Spec.Containers = append(podTemplateSpec.Spec.Containers, r.workerSidecarContainers...)
11351135
}
11361136
creatorCRDType := getCreatorCRDType(instance)
1137-
pod := common.BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, worker.RayStartParams, headPort, autoscalingEnabled, creatorCRDType, fqdnRayIP)
1137+
pod := common.BuildPod(ctx, podTemplateSpec, rayv1.WorkerNode, instance.Spec.GcsFaultToleranceOptions, worker.RayStartParams, headPort, autoscalingEnabled, creatorCRDType, fqdnRayIP)
11381138
// Set raycluster instance as the owner and controller
11391139
if err := controllerutil.SetControllerReference(&instance, &pod, r.Scheme); err != nil {
11401140
logger.Error(err, "Failed to set controller reference for raycluster pod")

0 commit comments

Comments
 (0)