diff --git a/ray-operator/controllers/ray/common/association.go b/ray-operator/controllers/ray/common/association.go index 5566421a29c..63eefa94bc4 100644 --- a/ray-operator/controllers/ray/common/association.go +++ b/ray-operator/controllers/ray/common/association.go @@ -100,6 +100,16 @@ func RayClusterWorkerPodsAssociationOptions(instance *rayv1.RayCluster) Associat } } +func RayClusterRedisCleanupJobAssociationOptions(instance *rayv1.RayCluster) AssociationOptions { + return AssociationOptions{ + client.InNamespace(instance.Namespace), + client.MatchingLabels{ + utils.RayClusterLabelKey: instance.Name, + utils.RayNodeTypeLabelKey: string(rayv1.RedisCleanupNode), + }, + } +} + func RayClusterGroupPodsAssociationOptions(instance *rayv1.RayCluster, group string) AssociationOptions { return AssociationOptions{ client.InNamespace(instance.Namespace), @@ -184,3 +194,12 @@ func GetRayClusterHeadPod(ctx context.Context, reader client.Reader, instance *r } return &runtimePods.Items[0], nil } + +func RayClusterNetworkResourcesOptions(instance *rayv1.RayCluster) AssociationOptions { + return AssociationOptions{ + client.InNamespace(instance.Namespace), + client.MatchingLabels{ + utils.RayClusterLabelKey: instance.Name, + }, + } +} diff --git a/ray-operator/controllers/ray/common/association_test.go b/ray-operator/controllers/ray/common/association_test.go index af79b2e397a..506107a72d6 100644 --- a/ray-operator/controllers/ray/common/association_test.go +++ b/ray-operator/controllers/ray/common/association_test.go @@ -5,6 +5,7 @@ import ( "reflect" "testing" + routev1 "github.com/openshift/api/route/v1" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -292,3 +293,74 @@ func TestGetRayClusterHeadPod(t *testing.T) { assert.Nil(t, err) assert.Equal(t, ret, headPod) } + +func TestRayClusterRedisCleanupJobAssociationOptions(t *testing.T) { + // Create a new scheme + newScheme := runtime.NewScheme() + _ = rayv1.AddToScheme(newScheme) + _ = corev1.AddToScheme(newScheme) + + instance := &rayv1.RayCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "raycluster-example", + Namespace: "default", + }, + } + + _ = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "redis-cleanup", + Namespace: instance.ObjectMeta.Namespace, + Labels: map[string]string{ + utils.RayClusterLabelKey: instance.Name, + utils.RayNodeTypeLabelKey: string(rayv1.RedisCleanupNode), + }, + }, + } + + expected := []client.ListOption{ + client.InNamespace(instance.ObjectMeta.Namespace), + client.MatchingLabels(map[string]string{ + utils.RayClusterLabelKey: instance.Name, + utils.RayNodeTypeLabelKey: string(rayv1.RedisCleanupNode), + }), + } + result := RayClusterRedisCleanupJobAssociationOptions(instance).ToListOptions() + + assert.Equal(t, expected, result) +} + +func TestRayClusterNetworkResourcesOptions(t *testing.T) { + newScheme := runtime.NewScheme() + _ = rayv1.AddToScheme(newScheme) + _ = corev1.AddToScheme(newScheme) + _ = routev1.AddToScheme(newScheme) + instance := &rayv1.RayCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "raycluster-example", + Namespace: "default", + Annotations: map[string]string{ + IngressClassAnnotationKey: "nginx", + }, + }, + } + _ = &routev1.Route{ + ObjectMeta: metav1.ObjectMeta{ + Name: utils.GenerateRouteName(instance.Name), + Namespace: instance.Namespace, + Labels: map[string]string{ + utils.RayClusterLabelKey: instance.Name, + }, + }, + } + expected := []client.ListOption{ + client.InNamespace(instance.ObjectMeta.Namespace), + client.MatchingLabels(map[string]string{ + utils.RayClusterLabelKey: instance.Name, + }), + } + + result := RayClusterNetworkResourcesOptions(instance).ToListOptions() + + assert.Equal(t, expected, result) +} diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index 794177651e0..3485d33c8fe 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -346,10 +346,9 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, instance return ctrl.Result{RequeueAfter: 10 * time.Second}, nil } - // We can start the Redis cleanup process now because the head Pod has been terminated. - filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeTypeLabelKey: string(rayv1.RedisCleanupNode)} + filterLabels := common.RayClusterRedisCleanupJobAssociationOptions(instance).ToListOptions() redisCleanupJobs := batchv1.JobList{} - if err := r.List(ctx, &redisCleanupJobs, client.InNamespace(instance.Namespace), filterLabels); err != nil { + if err := r.List(ctx, &redisCleanupJobs, filterLabels...); err != nil { return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err } @@ -542,8 +541,8 @@ func (r *RayClusterReconciler) reconcileIngress(ctx context.Context, instance *r func (r *RayClusterReconciler) reconcileRouteOpenShift(ctx context.Context, instance *rayv1.RayCluster) error { logger := ctrl.LoggerFrom(ctx) headRoutes := routev1.RouteList{} - filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name} - if err := r.List(ctx, &headRoutes, client.InNamespace(instance.Namespace), filterLabels); err != nil { + filterLabels := common.RayClusterNetworkResourcesOptions(instance).ToListOptions() + if err := r.List(ctx, &headRoutes, filterLabels...); err != nil { return err } @@ -573,8 +572,8 @@ func (r *RayClusterReconciler) reconcileRouteOpenShift(ctx context.Context, inst func (r *RayClusterReconciler) reconcileIngressKubernetes(ctx context.Context, instance *rayv1.RayCluster) error { logger := ctrl.LoggerFrom(ctx) headIngresses := networkingv1.IngressList{} - filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name} - if err := r.List(ctx, &headIngresses, client.InNamespace(instance.Namespace), filterLabels); err != nil { + filterLabels := common.RayClusterNetworkResourcesOptions(instance).ToListOptions() + if err := r.List(ctx, &headIngresses, filterLabels...); err != nil { return err } @@ -605,9 +604,9 @@ func (r *RayClusterReconciler) reconcileIngressKubernetes(ctx context.Context, i func (r *RayClusterReconciler) reconcileHeadService(ctx context.Context, instance *rayv1.RayCluster) error { logger := ctrl.LoggerFrom(ctx) services := corev1.ServiceList{} - filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeTypeLabelKey: string(rayv1.HeadNode)} + filterLabels := common.RayClusterHeadServiceListOptions(instance) - if err := r.List(ctx, &services, client.InNamespace(instance.Namespace), filterLabels); err != nil { + if err := r.List(ctx, &services, filterLabels...); err != nil { return err } @@ -1510,11 +1509,8 @@ func (r *RayClusterReconciler) updateEndpoints(ctx context.Context, instance *ra // We assume we can find the right one by filtering Services with appropriate label selectors // and picking the first one. We may need to select by name in the future if the Service naming is stable. rayHeadSvc := corev1.ServiceList{} - filterLabels := client.MatchingLabels{ - utils.RayClusterLabelKey: instance.Name, - utils.RayNodeTypeLabelKey: "head", - } - if err := r.List(ctx, &rayHeadSvc, client.InNamespace(instance.Namespace), filterLabels); err != nil { + filterLabels := common.RayClusterHeadServiceListOptions(instance) + if err := r.List(ctx, &rayHeadSvc, filterLabels...); err != nil { return err } diff --git a/ray-operator/controllers/ray/raycluster_controller_unit_test.go b/ray-operator/controllers/ray/raycluster_controller_unit_test.go index 6beb74f9804..8b476bb2b23 100644 --- a/ray-operator/controllers/ray/raycluster_controller_unit_test.go +++ b/ray-operator/controllers/ray/raycluster_controller_unit_test.go @@ -1013,6 +1013,7 @@ func TestReconcileHeadService(t *testing.T) { Labels: map[string]string{ utils.RayClusterLabelKey: cluster.Name, utils.RayNodeTypeLabelKey: string(rayv1.HeadNode), + utils.RayIDLabelKey: utils.CheckLabel(utils.GenerateIdentifier(cluster.Name, rayv1.HeadNode)), }, }, } @@ -1026,6 +1027,7 @@ func TestReconcileHeadService(t *testing.T) { headServiceSelector := labels.SelectorFromSet(map[string]string{ utils.RayClusterLabelKey: cluster.Name, utils.RayNodeTypeLabelKey: string(rayv1.HeadNode), + utils.RayIDLabelKey: utils.CheckLabel(utils.GenerateIdentifier(cluster.Name, rayv1.HeadNode)), }) // Initialize RayCluster reconciler.