From e250adad6de2c16e3a4e3a958d223b9039f69d05 Mon Sep 17 00:00:00 2001 From: Rewant Soni Date: Mon, 4 Nov 2024 21:11:02 +0530 Subject: [PATCH] add server side implementation Signed-off-by: Rewant Soni add generated changes Signed-off-by: Rewant Soni --- controllers/storagecluster/reconcile.go | 27 ++++ .../ocs-operator/manifests/provider-role.yaml | 1 + rbac/provider-role.yaml | 1 + services/provider/server/server.go | 141 ++++++++++++++++++ 4 files changed, 170 insertions(+) diff --git a/controllers/storagecluster/reconcile.go b/controllers/storagecluster/reconcile.go index 8d89f44655..8fceeacdab 100644 --- a/controllers/storagecluster/reconcile.go +++ b/controllers/storagecluster/reconcile.go @@ -408,6 +408,10 @@ func (r *StorageClusterReconciler) reconcilePhases( return reconcile.Result{}, err } + if res, err := r.ownStorageConsumersInNamespace(instance); err != nil || !res.IsZero() { + return reconcile.Result{}, err + } + // in-memory conditions should start off empty. It will only ever hold // negative conditions (!Available, Degraded, Progressing) r.conditions = nil @@ -816,6 +820,29 @@ func (r *StorageClusterReconciler) ownStorageClusterPeersInNamespace(instance *o return reconcile.Result{}, nil } +func (r *StorageClusterReconciler) ownStorageConsumersInNamespace(instance *ocsv1.StorageCluster) (reconcile.Result, error) { + storageConsumerList := &ocsv1alpha1.StorageConsumerList{} + err := r.Client.List(r.ctx, storageConsumerList, client.InNamespace(instance.Namespace)) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to list storageConsumer: %w", err) + } + for i := range storageConsumerList.Items { + scp := &storageConsumerList.Items[i] + lenOwners := len(scp.OwnerReferences) + err := controllerutil.SetOwnerReference(instance, scp, r.Scheme) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to set owner reference on storageConsumer %v: %w", scp.Name, err) + } + if lenOwners != len(scp.OwnerReferences) { + err = r.Client.Update(r.ctx, scp) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to persist StorageCluster owner ref on storageConsumer %v: %w", scp.Name, err) + } + } + } + return reconcile.Result{}, nil +} + // Checks whether a string is contained within a slice func contains(slice []string, s string) bool { for _, item := range slice { diff --git a/deploy/ocs-operator/manifests/provider-role.yaml b/deploy/ocs-operator/manifests/provider-role.yaml index 42259a455d..b8980ab515 100644 --- a/deploy/ocs-operator/manifests/provider-role.yaml +++ b/deploy/ocs-operator/manifests/provider-role.yaml @@ -16,6 +16,7 @@ rules: resources: - cephfilesystemsubvolumegroups - cephblockpoolradosnamespaces + - cephblockpools verbs: - get - list diff --git a/rbac/provider-role.yaml b/rbac/provider-role.yaml index 42259a455d..b8980ab515 100644 --- a/rbac/provider-role.yaml +++ b/rbac/provider-role.yaml @@ -16,6 +16,7 @@ rules: resources: - cephfilesystemsubvolumegroups - cephblockpoolradosnamespaces + - cephblockpools verbs: - get - list diff --git a/services/provider/server/server.go b/services/provider/server/server.go index 62349d3b59..b82bf50300 100644 --- a/services/provider/server/server.go +++ b/services/provider/server/server.go @@ -61,6 +61,7 @@ const ( ramenDRStorageIDKey = "ramendr.openshift.io/storageID" ramenDRReplicationIDKey = "ramendr.openshift.io/replicationid" ramenDRFlattenModeKey = "replication.storage.openshift.io/flatten-mode" + mirroringTokenKey = "rbdMirrorBootstrap1eerSecretName" ) const ( @@ -1042,3 +1043,143 @@ func (s *OCSProviderServer) PeerStorageCluster(ctx context.Context, req *pb.Peer return &pb.PeerStorageClusterResponse{}, nil } + +func (s *OCSProviderServer) GetStorageClientsInfo(ctx context.Context, req *pb.StorageClientsInfoRequest) (*pb.StorageClientsInfoResponse, error) { + var clientsInfo []*pb.ClientInfo + var clientError []*pb.StorageClientInfoError + + for i := range req.ClientIDs { + consumer, err := s.consumerManager.Get(ctx, req.ClientIDs[i]) + if err != nil { + klog.Errorf("failed to get consumer with id %v: %v", req.ClientIDs[i], err) + clientError = append(clientError, + &pb.StorageClientInfoError{ + ClientID: req.ClientIDs[i], + Code: pb.ErrorCode_Internal, + Message: err.Error(), + }, + ) + continue + } + owner := util.FindOwnerRefByKind(consumer, "StorageCluster") + + if owner == nil { + klog.Errorf("failed to find StorgeCluster owning the StorageConsumer %v", req.ClientIDs[i]) + clientError = append(clientError, + &pb.StorageClientInfoError{ + ClientID: req.ClientIDs[i], + Code: pb.ErrorCode_Internal, + Message: "failed to find StorgeCluster owning the StorageConsumer", + }, + ) + continue + } + + if owner.UID != types.UID(req.StorageClusterUID) { + klog.Errorf("storageCluster specified on the req does not own the Client") + clientError = append(clientError, + &pb.StorageClientInfoError{ + ClientID: req.ClientIDs[i], + Code: pb.ErrorCode_InvalidArgument, + Message: "storageCluster specified on the req does not own the Client", + }, + ) + continue + } + + rnsList := &rookCephv1.CephBlockPoolRadosNamespaceList{} + err = s.client.List( + ctx, + rnsList, + client.InNamespace(s.namespace), + client.MatchingLabels{controllers.StorageConsumerNameLabel: consumer.Name}, + client.Limit(1), + ) + if err != nil { + klog.Error(err) + clientError = append(clientError, + &pb.StorageClientInfoError{ + ClientID: req.ClientIDs[i], + Code: pb.ErrorCode_Internal, + Message: "rados namespace for the client not found", + }, + ) + continue + } + if len(rnsList.Items) == 0 { + klog.Errorf("no rados namespace found for StorageClient %s", req.ClientIDs[i]) + clientError = append(clientError, + &pb.StorageClientInfoError{ + ClientID: req.ClientIDs[i], + Code: pb.ErrorCode_Internal, + Message: "rados namespace for the client not found: %v", + }, + ) + continue + } + clientsInfo = append(clientsInfo, &pb.ClientInfo{ClientID: req.ClientIDs[i], RadosNamespace: rnsList.Items[0].Name}) + } + + return &pb.StorageClientsInfoResponse{ClientsInfo: clientsInfo, Error: clientError}, nil +} + +func (s *OCSProviderServer) GetBlockPoolsInfo(ctx context.Context, req *pb.BlockPoolsInfoRequest) (*pb.BlockPoolsInfoResponse, error) { + var blockPoolsInfo []*pb.BlockPoolInfo + var blockPoolError []*pb.BlockPoolInfoError + + for i := range req.BlockPoolNames { + cephBlockPool := &rookCephv1.CephBlockPool{} + cephBlockPool.Name = req.BlockPoolNames[i] + cephBlockPool.Namespace = s.namespace + err := s.client.Get(ctx, client.ObjectKeyFromObject(cephBlockPool), cephBlockPool) + if err != nil { + klog.Error(err) + blockPoolError = append(blockPoolError, + &pb.BlockPoolInfoError{ + BlockPoolName: cephBlockPool.Name, + Code: pb.ErrorCode_Internal, + Message: "failed to get CephBlockPool", + }, + ) + continue + } + + var mirroringToken string + + if cephBlockPool.Spec.Mirroring.Enabled && + cephBlockPool.Status.Info != nil && + cephBlockPool.Status.Info[mirroringTokenKey] != "" { + secret := &corev1.Secret{} + secret.Name = cephBlockPool.Status.Info[mirroringTokenKey] + secret.Namespace = s.namespace + err := s.client.Get(ctx, client.ObjectKeyFromObject(secret), secret) + if err != nil { + errMsg := fmt.Sprintf( + "failed to get bootstrap secret %s for CephBlockPool %s: %v", + cephBlockPool.Status.Info[mirroringTokenKey], + cephBlockPool.Name, + err, + ) + klog.Error(errMsg) + blockPoolError = append(blockPoolError, + &pb.BlockPoolInfoError{ + BlockPoolName: cephBlockPool.Name, + Code: pb.ErrorCode_Internal, + Message: "failed to get mirroring token for CephBlockPool", + }, + ) + continue + } + mirroringToken = string(secret.Data["token"]) + } + + blockPoolsInfo = append(blockPoolsInfo, &pb.BlockPoolInfo{ + BlockPoolName: cephBlockPool.Name, + MirroringToken: mirroringToken, + BlockPoolID: strconv.Itoa(cephBlockPool.Status.PoolID), + }) + + } + + return &pb.BlockPoolsInfoResponse{BlockPoolsInfo: blockPoolsInfo, Error: blockPoolError}, nil +}