Skip to content

Commit

Permalink
storageclusterpeer: implement logic for storageclusterpeer
Browse files Browse the repository at this point in the history
Signed-off-by: Rewant Soni <[email protected]>
  • Loading branch information
rewantsoni committed Nov 5, 2024
1 parent 7d685ba commit cfbc4bf
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 17 deletions.
8 changes: 8 additions & 0 deletions controllers/storagecluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,14 @@ func (r *StorageClusterReconciler) reconcilePhases(
// The object is marked for deletion
instance.Status.Phase = statusutil.PhaseDeleting

storageClusterPeerList := ocsv1.StorageClusterPeerList{}
if err := r.List(ctx, &storageClusterPeerList, client.InNamespace(instance.Namespace)); err != nil {
return reconcile.Result{}, err
}
if len(storageClusterPeerList.Items) != 0 {
return reconcile.Result{}, fmt.Errorf("waiting for %v StorageClusterPeer attached to StorageCluster to be deleted before proceeding", len(storageClusterPeerList.Items))
}

if contains(instance.GetFinalizers(), storageClusterFinalizer) {
if res, err := r.deleteResources(instance); err != nil {
r.Log.Info("Uninstall in progress.", "Status", err)
Expand Down
152 changes: 135 additions & 17 deletions controllers/storageclusterpeer/storageclusterpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,164 @@ package storageclusterpeer

import (
"context"
"fmt"
"github.com/red-hat-storage/ocs-operator/v4/controllers/util"
"time"

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
providerClient "github.com/red-hat-storage/ocs-operator/services/provider/api/v4/client"

"github.com/go-logr/logr"
"google.golang.org/grpc/status"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

v1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
const (
storageClusterPeerFinalizer = "storageclusterpeer.ocs.openshift.io"
)

// StorageClusterPeerReconciler reconciles a StorageClusterPeer object
// nolint:revive
type StorageClusterPeerReconciler struct {
client.Client
Scheme *runtime.Scheme

log logr.Logger
ctx context.Context
storageClusterPeer *ocsv1.StorageClusterPeer
storageCluster *ocsv1.StorageCluster
}

// SetupWithManager sets up the controller with the Manager.
func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&ocsv1.StorageClusterPeer{}).
Complete(r)
}

//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/finalizers,verbs=update
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusters,verbs=get;list;watch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the StorageClusterPeer object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)

// TODO(user): your logic here
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
var err error
r.ctx = ctx
r.log = log.FromContext(ctx, "StorageClient", request)
r.log.Info("Reconciling StorageClusterPeer.")

// Fetch StorageCluster(s)
r.storageCluster, err = util.GetStorageClusterInNamespace(r.ctx, r.Client, request.Namespace)
if err != nil && k8serrors.IsNotFound(err) {
r.log.Error(err, "StorageCluster for StorageClusterPeer found in the same namespace.")
return ctrl.Result{}, err
}

// Fetch the StorageClusterPeer instance
r.storageClusterPeer = &ocsv1.StorageClusterPeer{}
r.storageClusterPeer.Name = request.Name
r.storageClusterPeer.Namespace = request.Namespace

if err = r.get(r.storageClusterPeer); err != nil {
if k8serrors.IsNotFound(err) {
r.log.Info("StorageClusterPeer resource not found. Ignoring since object must be deleted.")
return reconcile.Result{}, nil
}
r.log.Error(err, "Failed to get StorageClusterPeer.")
return reconcile.Result{}, err
}

result, reconcileErr := r.reconcilePhases()

// Apply status changes to the StorageClient
statusErr := r.Client.Status().Update(ctx, r.storageClusterPeer)
if statusErr != nil {
r.log.Error(statusErr, "Failed to update StorageClusterPeer status.")
}

if reconcileErr != nil {
err = reconcileErr
} else if statusErr != nil {
err = statusErr
}

return result, err
}

func (r *StorageClusterPeerReconciler) reconcilePhases() (ctrl.Result, error) {
ocsClient, err := r.newExternalClusterClient()
if err != nil {
return reconcile.Result{}, err
}
defer ocsClient.Close()

// marked for deletion
if !r.storageClusterPeer.GetDeletionTimestamp().IsZero() {
//TODO: Removing PeerOCS Call

if controllerutil.RemoveFinalizer(r.storageClusterPeer, storageClusterPeerFinalizer) {
r.log.Info("removing finalizer from StorageClusterPeer.", "StorageClusterPeer", r.storageClusterPeer.Name)
if err := r.update(r.storageClusterPeer); err != nil {
r.log.Info("Failed to remove finalizer from StorageClusterPeer", "StorageClusterPeer", r.storageClusterPeer.Name)
return reconcile.Result{}, fmt.Errorf("failed to remove finalizer from StorageClient: %v", err)
}
}
}

if controllerutil.AddFinalizer(r.storageClusterPeer, storageClusterPeerFinalizer) {
r.log.Info("Finalizer not found for StorageClusterPeer. Adding finalizer.", "StorageClusterPeer", r.storageClusterPeer.Name)
if err := r.update(r.storageClusterPeer); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to update StorageClusterPeer: %v", err)
}
}

if r.storageClusterPeer.Status.State != ocsv1.StorageClusterPeerRemoteStatePeered {
return r.peerOCS(ocsClient)
}

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1.StorageClusterPeer{}).
Complete(r)
func (r *StorageClusterPeerReconciler) newExternalClusterClient() (*providerClient.OCSProviderClient, error) {

ocsProviderClient, err := providerClient.NewProviderClient(
r.ctx, r.storageClusterPeer.Spec.ApiEndpoint, time.Second*10)
if err != nil {
return nil, fmt.Errorf("failed to create a new provider client: %v", err)
}

return ocsProviderClient, nil
}

func (r *StorageClusterPeerReconciler) peerOCS(ocsClient *providerClient.OCSProviderClient) (ctrl.Result, error) {
r.storageClusterPeer.Status.State = ocsv1.StorageClusterPeerRemoteStatePeering
response, err := ocsClient.PeerStorageCluster(r.ctx, r.storageClusterPeer.Spec.OnboardingToken, string(r.storageCluster.UID))
if err != nil {
r.log.Error(err, fmt.Sprintf("failed to Peer Storage Cluster, code: %v.", status.Code(err)))
return ctrl.Result{}, err
}
if r.storageClusterPeer.Status.RemoteStorageClusterUID != response.StorageClusterUID {
err := fmt.Errorf("failed to validate remote Storage Cluster UID against PeerOCS Response")
r.log.Error(err, "failed to Peer Storage Cluster")
return ctrl.Result{}, err
}
r.storageClusterPeer.Status.State = ocsv1.StorageClusterPeerRemoteStatePeered
return ctrl.Result{}, nil
}

func (r *StorageClusterPeerReconciler) get(obj client.Object) error {
key := client.ObjectKeyFromObject(obj)
return r.Client.Get(r.ctx, key, obj)
}

func (r *StorageClusterPeerReconciler) update(obj client.Object, opts ...client.UpdateOption) error {
return r.Client.Update(r.ctx, obj, opts...)
}
26 changes: 26 additions & 0 deletions controllers/util/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,29 @@ func GenerateNameForNonResilientCephBlockPoolSC(initData *ocsv1.StorageCluster)
}
return fmt.Sprintf("%s-ceph-non-resilient-rbd", initData.Name)
}

func GetStorageClusterInNamespace(ctx context.Context, cl client.Client, namespace string) (*ocsv1.StorageCluster, error) {
storageClusterList := &ocsv1.StorageClusterList{}
err := cl.List(ctx, storageClusterList, client.InNamespace(namespace))
if err != nil {
return nil, fmt.Errorf("unable to list storageCluster(s) in namespace %s: %v", namespace, err)
}

var foundSc *ocsv1.StorageCluster
for i := range storageClusterList.Items {
sc := &storageClusterList.Items[i]
if sc.Status.Phase == PhaseIgnored {
continue // Skip Ignored storage cluster
}
if foundSc != nil {
// This means we have already found one storage cluster, so this is a second one
return nil, fmt.Errorf("multiple storageClusters found in namespace %s, expected: 1 actual: %v", namespace, len(storageClusterList.Items))
}
foundSc = sc
}

if foundSc == nil {
return nil, fmt.Errorf("no storageCluster found in namespace %s, expected: 1", namespace)
}
return foundSc, nil
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit cfbc4bf

Please sign in to comment.