From d1b15b2b4d0763969849106b81981f88aca19ea5 Mon Sep 17 00:00:00 2001 From: Rewant Soni Date: Thu, 7 Nov 2024 22:11:47 +0530 Subject: [PATCH] storageclusterpeer: implement logic for storageclusterpeer Signed-off-by: Rewant Soni --- api/v1/storageclusterpeer_types.go | 6 +- controllers/storagecluster/reconcile.go | 28 +++ .../storagecluster_controller.go | 3 +- .../storageclusterpeer_controller.go | 166 ++++++++++++++++-- controllers/util/k8sutil.go | 11 ++ .../api/v4/v1/storageclusterpeer_types.go | 6 +- .../v4/controllers/util/k8sutil.go | 11 ++ .../api/v4/v1/storageclusterpeer_types.go | 6 +- 8 files changed, 212 insertions(+), 25 deletions(-) diff --git a/api/v1/storageclusterpeer_types.go b/api/v1/storageclusterpeer_types.go index 07301efbc9..68c3b3ea65 100644 --- a/api/v1/storageclusterpeer_types.go +++ b/api/v1/storageclusterpeer_types.go @@ -26,8 +26,10 @@ import ( type StorageClusterPeerState string const ( - StorageClusterPeerStatePending StorageClusterPeerState = "Pending" - StorageClusterPeerStatePeered StorageClusterPeerState = "Peered" + StorageClusterPeerStateInitializing StorageClusterPeerState = "Initializing" + StorageClusterPeerStatePending StorageClusterPeerState = "Pending" + StorageClusterPeerStatePeered StorageClusterPeerState = "Peered" + StorageClusterPeerStateFailed StorageClusterPeerState = "Failed" ) type PeerInfo struct { diff --git a/controllers/storagecluster/reconcile.go b/controllers/storagecluster/reconcile.go index 87e34b88da..8d89f44655 100644 --- a/controllers/storagecluster/reconcile.go +++ b/controllers/storagecluster/reconcile.go @@ -4,6 +4,7 @@ import ( "context" error1 "errors" "fmt" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "strings" "time" @@ -403,6 +404,10 @@ func (r *StorageClusterReconciler) reconcilePhases( return reconcile.Result{}, nil } + if res, err := r.ownStorageClusterPeersInNamespace(instance); err != nil || !res.IsZero() { + return reconcile.Result{}, err + } + // in-memory conditions should start off empty. It will only ever hold // negative conditions (!Available, Degraded, Progressing) r.conditions = nil @@ -788,6 +793,29 @@ func (r *StorageClusterReconciler) isStorageClusterNotIgnored( return true } +func (r *StorageClusterReconciler) ownStorageClusterPeersInNamespace(instance *ocsv1.StorageCluster) (reconcile.Result, error) { + storageClusterPeerList := &ocsv1.StorageClusterPeerList{} + err := r.Client.List(r.ctx, storageClusterPeerList, client.InNamespace(instance.Namespace)) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to list storageClusterPeer: %w", err) + } + for i := range storageClusterPeerList.Items { + scp := &storageClusterPeerList.Items[i] + lenOwners := len(scp.OwnerReferences) + err := controllerutil.SetOwnerReference(instance, scp, r.Scheme) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to set owner reference on StorageClusterPeer %v: %w", scp.Name, err) + } + if lenOwners != len(scp.OwnerReferences) { + err = r.Client.Update(r.ctx, scp) + if err != nil { + return reconcile.Result{}, fmt.Errorf("failed to persist StorageCluster owner ref on StorageClusterPeer %v: %w", scp.Name, err) + } + } + } + return reconcile.Result{}, nil +} + // Checks whether a string is contained within a slice func contains(slice []string, s string) bool { for _, item := range slice { diff --git a/controllers/storagecluster/storagecluster_controller.go b/controllers/storagecluster/storagecluster_controller.go index 9e7589fe7d..b63e00c092 100644 --- a/controllers/storagecluster/storagecluster_controller.go +++ b/controllers/storagecluster/storagecluster_controller.go @@ -245,7 +245,8 @@ func (r *StorageClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { Watches(&storagev1.StorageClass{}, enqueueStorageClusterRequest). Watches(&volumesnapshotv1.VolumeSnapshotClass{}, enqueueStorageClusterRequest). Watches(&ocsv1.StorageProfile{}, enqueueStorageClusterRequest). - Watches(&ocsv1alpha1.StorageConsumer{}, enqueueStorageClusterRequest, builder.WithPredicates(storageConsumerStatusPredicate)) + Watches(&ocsv1alpha1.StorageConsumer{}, enqueueStorageClusterRequest, builder.WithPredicates(storageConsumerStatusPredicate)). + Watches(&ocsv1.StorageClusterPeer{}, enqueueStorageClusterRequest, builder.WithPredicates(predicate.GenerationChangedPredicate{})) if os.Getenv("SKIP_NOOBAA_CRD_WATCH") != "true" { build.Owns(&nbv1.NooBaa{}, builder.WithPredicates(noobaaIgnoreTimeUpdatePredicate)) diff --git a/controllers/storageclusterpeer/storageclusterpeer_controller.go b/controllers/storageclusterpeer/storageclusterpeer_controller.go index 7fdec8f3ea..aa49478f37 100644 --- a/controllers/storageclusterpeer/storageclusterpeer_controller.go +++ b/controllers/storageclusterpeer/storageclusterpeer_controller.go @@ -18,13 +18,28 @@ package storageclusterpeer import ( "context" + "encoding/base64" + "encoding/json" + "fmt" + "google.golang.org/grpc/codes" + "strings" + "time" + ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" + providerClient "github.com/red-hat-storage/ocs-operator/services/provider/api/v4/client" + "github.com/red-hat-storage/ocs-operator/v4/controllers/util" + "github.com/red-hat-storage/ocs-operator/v4/services" + + "github.com/go-logr/logr" + "google.golang.org/grpc/status" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" - - v1 "github.com/red-hat-storage/ocs-operator/api/v4/v1" + "sigs.k8s.io/controller-runtime/pkg/predicate" ) // StorageClusterPeerReconciler reconciles a StorageClusterPeer object @@ -32,32 +47,147 @@ import ( type StorageClusterPeerReconciler struct { client.Client Scheme *runtime.Scheme + + log logr.Logger + ctx context.Context +} + +// SetupWithManager sets up the controller with the Manager. +func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&ocsv1.StorageClusterPeer{}). + Watches(&ocsv1.StorageCluster{}, &handler.EnqueueRequestForObject{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Complete(r) } //+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/status,verbs=get;update;patch //+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/finalizers,verbs=update +//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusters,verbs=get;list;watch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. -// TODO(user): Modify the Reconcile function to compare the state specified by -// the StorageClusterPeer object against the actual cluster state, and then -// perform operations to make the cluster state reflect the state specified by -// the user. -// -// For more details, check Reconcile and its Result here: -// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile -func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) { - _ = log.FromContext(ctx) - - // TODO(user): your logic here +func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { + var err error + r.ctx = ctx + r.log = log.FromContext(ctx, "StorageClient", request) + r.log.Info("Reconciling StorageClusterPeer.") + + // Fetch the StorageClusterPeer instance + storageClusterPeer := &ocsv1.StorageClusterPeer{} + storageClusterPeer.Name = request.Name + storageClusterPeer.Namespace = request.Namespace + + if err = r.get(storageClusterPeer); err != nil { + if k8serrors.IsNotFound(err) { + r.log.Info("StorageClusterPeer resource not found. Ignoring since object must have been deleted.") + return ctrl.Result{}, nil + } + r.log.Error(err, "Failed to get StorageClusterPeer.") + return ctrl.Result{}, err + } + + if storageClusterPeer.Status.State == ocsv1.StorageClusterPeerStatePeered { + return ctrl.Result{}, nil + } + + result, reconcileError := r.reconcileStates(storageClusterPeer) + + // Apply status changes + statusError := r.Client.Status().Update(r.ctx, storageClusterPeer) + if statusError != nil { + r.log.Info("Could not update StorageClusterPeer status.") + } + + // Reconcile errors have higher priority than status update errors + if reconcileError != nil { + return result, reconcileError + } else if statusError != nil { + return result, statusError + } + return result, nil +} + +func (r *StorageClusterPeerReconciler) reconcileStates(storageClusterPeer *ocsv1.StorageClusterPeer) (ctrl.Result, error) { + storageCluster := &ocsv1.StorageCluster{} + storageCluster.Namespace = storageClusterPeer.Namespace + + // Fetch StorageCluster + owner, err := util.FindOwnerRefByKind(storageClusterPeer, "StorageCluster") + if err != nil { + return ctrl.Result{}, err + } + + storageCluster.Name = owner.Name + + if err := r.get(storageCluster); client.IgnoreNotFound(err) != nil { + r.log.Error(err, "Error fetching StorageCluster for StorageClusterPeer found in the same namespace.") + storageClusterPeer.Status.State = ocsv1.ReconcileFailed + return ctrl.Result{}, err + } else if k8serrors.IsNotFound(err) { + r.log.Error(err, "Cannot find a StorageCluster for StorageClusterPeer in the same namespace.") + storageClusterPeer.Status.State = ocsv1.ReconcileFailed + return ctrl.Result{}, nil + } + + storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStateInitializing + + // Read StorageClusterUID from ticket + ticketArr := strings.Split(string(storageClusterPeer.Spec.OnboardingToken), ".") + if len(ticketArr) != 2 { + storageClusterPeer.Status.State = ocsv1.ReconcileFailed + r.log.Error(fmt.Errorf("invalid ticket"), "Invalid onboarding ticket. Does not conform to expected ticket structure") + return ctrl.Result{}, nil + } + message, err := base64.StdEncoding.DecodeString(ticketArr[0]) + if err != nil { + storageClusterPeer.Status.State = ocsv1.ReconcileFailed + r.log.Error(err, "failed to decode onboarding ticket") + return ctrl.Result{}, nil + } + var ticketData services.OnboardingTicket + err = json.Unmarshal(message, &ticketData) + if err != nil { + storageClusterPeer.Status.State = ocsv1.ReconcileFailed + r.log.Error(err, "onboarding ticket message is not a valid JSON.") + return ctrl.Result{}, nil + } + + if storageClusterPeer.Status.PeerInfo == nil { + storageClusterPeer.Status.PeerInfo = &ocsv1.PeerInfo{StorageClusterUid: string(ticketData.StorageCluster)} + } + + ocsClient, err := providerClient.NewProviderClient(r.ctx, storageClusterPeer.Spec.ApiEndpoint, time.Second*10) + if err != nil { + storageClusterPeer.Status.State = ocsv1.ReconcileFailed + return ctrl.Result{}, fmt.Errorf("failed to create a new provider client: %v", err) + } + defer ocsClient.Close() + + storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStatePending + + _, err = ocsClient.PeerStorageCluster(r.ctx, storageClusterPeer.Spec.OnboardingToken, string(storageCluster.UID)) + if err != nil { + r.log.Error(err, fmt.Sprintf("failed to Peer Storage Cluster, reason: %v.", err)) + st, ok := status.FromError(err) + if !ok { + storageClusterPeer.Status.State = ocsv1.ReconcileFailed + r.log.Error(fmt.Errorf("invalid code"), "failed to extract an HTTP status code from error") + return ctrl.Result{}, fmt.Errorf("failed to extract an HTTP status code from error") + } + if st.Code() == codes.InvalidArgument { + storageClusterPeer.Status.State = ocsv1.ReconcileFailed + return ctrl.Result{}, nil + } + storageClusterPeer.Status.State = ocsv1.ReconcileFailed + } else { + storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStatePeered + } return ctrl.Result{}, nil } -// SetupWithManager sets up the controller with the Manager. -func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&v1.StorageClusterPeer{}). - Complete(r) +func (r *StorageClusterPeerReconciler) get(obj client.Object) error { + key := client.ObjectKeyFromObject(obj) + return r.Client.Get(r.ctx, key, obj) } diff --git a/controllers/util/k8sutil.go b/controllers/util/k8sutil.go index 95771759a9..8bba33260c 100644 --- a/controllers/util/k8sutil.go +++ b/controllers/util/k8sutil.go @@ -3,6 +3,7 @@ package util import ( "context" "fmt" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" "strings" @@ -202,3 +203,13 @@ func NewK8sClient(scheme *runtime.Scheme) (client.Client, error) { return k8sClient, nil } + +func FindOwnerRefByKind(obj client.Object, kind string) (*v1.OwnerReference, error) { + owners := obj.GetOwnerReferences() + for i := range owners { + if owners[i].Kind == kind { + return &owners[i], nil + } + } + return nil, fmt.Errorf("unable to find owner reference by kind %s", kind) +} diff --git a/metrics/vendor/github.com/red-hat-storage/ocs-operator/api/v4/v1/storageclusterpeer_types.go b/metrics/vendor/github.com/red-hat-storage/ocs-operator/api/v4/v1/storageclusterpeer_types.go index 07301efbc9..68c3b3ea65 100644 --- a/metrics/vendor/github.com/red-hat-storage/ocs-operator/api/v4/v1/storageclusterpeer_types.go +++ b/metrics/vendor/github.com/red-hat-storage/ocs-operator/api/v4/v1/storageclusterpeer_types.go @@ -26,8 +26,10 @@ import ( type StorageClusterPeerState string const ( - StorageClusterPeerStatePending StorageClusterPeerState = "Pending" - StorageClusterPeerStatePeered StorageClusterPeerState = "Peered" + StorageClusterPeerStateInitializing StorageClusterPeerState = "Initializing" + StorageClusterPeerStatePending StorageClusterPeerState = "Pending" + StorageClusterPeerStatePeered StorageClusterPeerState = "Peered" + StorageClusterPeerStateFailed StorageClusterPeerState = "Failed" ) type PeerInfo struct { diff --git a/metrics/vendor/github.com/red-hat-storage/ocs-operator/v4/controllers/util/k8sutil.go b/metrics/vendor/github.com/red-hat-storage/ocs-operator/v4/controllers/util/k8sutil.go index 95771759a9..8bba33260c 100644 --- a/metrics/vendor/github.com/red-hat-storage/ocs-operator/v4/controllers/util/k8sutil.go +++ b/metrics/vendor/github.com/red-hat-storage/ocs-operator/v4/controllers/util/k8sutil.go @@ -3,6 +3,7 @@ package util import ( "context" "fmt" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "os" "strings" @@ -202,3 +203,13 @@ func NewK8sClient(scheme *runtime.Scheme) (client.Client, error) { return k8sClient, nil } + +func FindOwnerRefByKind(obj client.Object, kind string) (*v1.OwnerReference, error) { + owners := obj.GetOwnerReferences() + for i := range owners { + if owners[i].Kind == kind { + return &owners[i], nil + } + } + return nil, fmt.Errorf("unable to find owner reference by kind %s", kind) +} diff --git a/vendor/github.com/red-hat-storage/ocs-operator/api/v4/v1/storageclusterpeer_types.go b/vendor/github.com/red-hat-storage/ocs-operator/api/v4/v1/storageclusterpeer_types.go index 07301efbc9..68c3b3ea65 100644 --- a/vendor/github.com/red-hat-storage/ocs-operator/api/v4/v1/storageclusterpeer_types.go +++ b/vendor/github.com/red-hat-storage/ocs-operator/api/v4/v1/storageclusterpeer_types.go @@ -26,8 +26,10 @@ import ( type StorageClusterPeerState string const ( - StorageClusterPeerStatePending StorageClusterPeerState = "Pending" - StorageClusterPeerStatePeered StorageClusterPeerState = "Peered" + StorageClusterPeerStateInitializing StorageClusterPeerState = "Initializing" + StorageClusterPeerStatePending StorageClusterPeerState = "Pending" + StorageClusterPeerStatePeered StorageClusterPeerState = "Peered" + StorageClusterPeerStateFailed StorageClusterPeerState = "Failed" ) type PeerInfo struct {