Skip to content

Commit

Permalink
storageclusterpeer: implement logic for storageclusterpeer
Browse files Browse the repository at this point in the history
Signed-off-by: Rewant Soni <[email protected]>
  • Loading branch information
rewantsoni committed Nov 7, 2024
1 parent c079ab8 commit 44f8993
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 24 deletions.
6 changes: 4 additions & 2 deletions api/v1/storageclusterpeer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
type StorageClusterPeerState string

const (
StorageClusterPeerStatePending StorageClusterPeerState = "Pending"
StorageClusterPeerStatePeered StorageClusterPeerState = "Peered"
StorageClusterPeerStateInitializing StorageClusterPeerState = "Initializing"
StorageClusterPeerStatePending StorageClusterPeerState = "Pending"
StorageClusterPeerStatePeered StorageClusterPeerState = "Peered"
StorageClusterPeerStateFailed StorageClusterPeerState = "Failed"
)

type PeerInfo struct {
Expand Down
166 changes: 148 additions & 18 deletions controllers/storageclusterpeer/storageclusterpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,176 @@ package storageclusterpeer

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"google.golang.org/grpc/codes"
"k8s.io/klog/v2"
"strings"
"time"

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
providerClient "github.com/red-hat-storage/ocs-operator/services/provider/api/v4/client"
"github.com/red-hat-storage/ocs-operator/v4/services"

"github.com/go-logr/logr"
"google.golang.org/grpc/status"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"

v1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// StorageClusterPeerReconciler reconciles a StorageClusterPeer object
// nolint:revive
type StorageClusterPeerReconciler struct {
client.Client
Scheme *runtime.Scheme

log logr.Logger
ctx context.Context
}

// SetupWithManager sets up the controller with the Manager.
func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&ocsv1.StorageClusterPeer{}).
Watches(&ocsv1.StorageCluster{}, &handler.EnqueueRequestForObject{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
}

//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/finalizers,verbs=update
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusters,verbs=get;list;watch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the StorageClusterPeer object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)

// TODO(user): your logic here
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
var err error
r.ctx = ctx
r.log = log.FromContext(ctx, "StorageClient", request)
r.log.Info("Reconciling StorageClusterPeer.")

// Fetch the StorageClusterPeer instance
storageClusterPeer := &ocsv1.StorageClusterPeer{}
storageClusterPeer.Name = request.Name
storageClusterPeer.Namespace = request.Namespace

if err = r.get(storageClusterPeer); err != nil {
if k8serrors.IsNotFound(err) {
r.log.Info("StorageClusterPeer resource not found. Ignoring since object must have been deleted.")
return ctrl.Result{}, nil
}
r.log.Error(err, "Failed to get StorageClusterPeer.")
return ctrl.Result{}, err
}

if storageClusterPeer.Status.State == ocsv1.StorageClusterPeerStatePeered {
return ctrl.Result{}, nil
}

result, reconcileError := r.reconcileStates(storageClusterPeer)

// Apply status changes
statusError := r.Client.Status().Update(r.ctx, storageClusterPeer)
if statusError != nil {
r.log.Info("Could not update StorageClusterPeer status.", "StorageClusterPeer", klog.KRef(storageClusterPeer.Namespace, storageClusterPeer.Name))
}

// Reconcile errors have higher priority than status update errors
if reconcileError != nil {
return result, reconcileError
} else if statusError != nil {
return result, statusError
}

return result, nil
}

func (r *StorageClusterPeerReconciler) reconcileStates(storageClusterPeer *ocsv1.StorageClusterPeer) (ctrl.Result, error) {
storageCluster := &ocsv1.StorageCluster{}
storageCluster.Namespace = storageClusterPeer.Namespace

// Fetch StorageCluster
for i := range storageClusterPeer.OwnerReferences {
if storageClusterPeer.OwnerReferences[i].Kind == "StorageCluster" {
storageCluster.Name = storageClusterPeer.OwnerReferences[i].Name
break
}
}

if err := r.get(storageCluster); err != nil {
r.log.Error(err, "failed to fetch StorageCluster for StorageClusterPeer found in the same namespace.")
if k8serrors.IsNotFound(err) {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{Requeue: false}, nil
}
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, err
}

storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStateInitializing

// Read StorageClusterUID from ticket
ticketArr := strings.Split(string(storageClusterPeer.Spec.OnboardingToken), ".")
if len(ticketArr) != 2 {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Error(fmt.Errorf("invalid ticket"), "onboarding ticket has been tampered")
return ctrl.Result{Requeue: false}, nil
}
message, err := base64.StdEncoding.DecodeString(ticketArr[0])
if err != nil {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Error(err, "failed to decode onboarding ticket")
return ctrl.Result{Requeue: false}, nil
}
var ticketData services.OnboardingTicket
err = json.Unmarshal(message, &ticketData)
if err != nil {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Error(err, "onboarding ticket message is not a valid JSON.")
return ctrl.Result{Requeue: false}, nil
}

if storageClusterPeer.Status.PeerInfo == nil {
storageClusterPeer.Status.PeerInfo = &ocsv1.PeerInfo{StorageClusterUid: string(ticketData.StorageCluster)}
}

ocsClient, err := providerClient.NewProviderClient(r.ctx, storageClusterPeer.Spec.ApiEndpoint, time.Second*10)
if err != nil {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, fmt.Errorf("failed to create a new provider client: %v", err)
}
defer ocsClient.Close()

storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStatePending

_, err = ocsClient.PeerStorageCluster(r.ctx, storageClusterPeer.Spec.OnboardingToken, string(storageCluster.UID))
if err != nil {
r.log.Error(err, fmt.Sprintf("failed to Peer Storage Cluster, reason: %v.", err))
st, ok := status.FromError(err)
if !ok {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Info("invalid code return from the rpc call")
return ctrl.Result{Requeue: false}, nil
}
if st.Code() == codes.InvalidArgument {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{Requeue: false}, nil
}
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
} else {
storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStatePeered
}
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1.StorageClusterPeer{}).
Complete(r)
func (r *StorageClusterPeerReconciler) get(obj client.Object) error {
key := client.ObjectKeyFromObject(obj)
return r.Client.Get(r.ctx, key, obj)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 44f8993

Please sign in to comment.