Skip to content

Commit

Permalink
storageclusterpeer: implement logic for storageclusterpeer
Browse files Browse the repository at this point in the history
Signed-off-by: Rewant Soni <[email protected]>
  • Loading branch information
rewantsoni committed Nov 7, 2024
1 parent dc7d651 commit 99e35e9
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 26 deletions.
6 changes: 4 additions & 2 deletions api/v1/storageclusterpeer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
type StorageClusterPeerState string

const (
StorageClusterPeerStatePending StorageClusterPeerState = "Pending"
StorageClusterPeerStatePeered StorageClusterPeerState = "Peered"
StorageClusterPeerStateInitializing StorageClusterPeerState = "Initializing"
StorageClusterPeerStatePending StorageClusterPeerState = "Pending"
StorageClusterPeerStatePeered StorageClusterPeerState = "Peered"
StorageClusterPeerStateFailed StorageClusterPeerState = "Failed"
)

type PeerInfo struct {
Expand Down
12 changes: 12 additions & 0 deletions controllers/storagecluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -200,6 +201,17 @@ func (r *StorageClusterReconciler) Reconcile(ctx context.Context, request reconc
return reconcile.Result{}, err
}

storageClusterPeerList := ocsv1.StorageClusterPeerList{}
if err := r.List(ctx, &storageClusterPeerList, client.InNamespace(sc.Namespace)); err != nil {
return reconcile.Result{}, err
}
for i := range storageClusterPeerList.Items {
storageClusterPeer := &storageClusterPeerList.Items[i]
if err = controllerutil.SetOwnerReference(sc, storageClusterPeer, r.Client.Scheme()); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to set owner reference on storageClusterPeer %s: %w", storageClusterPeer.Name, err)
}
}

// Apply status changes to the storagecluster
statusError := r.Client.Status().Update(ctx, sc)
if statusError != nil {
Expand Down
169 changes: 149 additions & 20 deletions controllers/storageclusterpeer/storageclusterpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,175 @@ package storageclusterpeer

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"google.golang.org/grpc/codes"
"k8s.io/apimachinery/pkg/types"
"strings"
"time"

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
providerClient "github.com/red-hat-storage/ocs-operator/services/provider/api/v4/client"
"github.com/red-hat-storage/ocs-operator/v4/services"

"github.com/go-logr/logr"
"google.golang.org/grpc/status"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"

v1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// StorageClusterPeerReconciler reconciles a StorageClusterPeer object
// nolint:revive
type StorageClusterPeerReconciler struct {
client.Client
Scheme *runtime.Scheme

log logr.Logger
ctx context.Context
}

// SetupWithManager sets up the controller with the Manager.
func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&ocsv1.StorageClusterPeer{}).
Watches(&ocsv1.StorageCluster{}, &handler.EnqueueRequestForObject{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
}

//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/finalizers,verbs=update
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusters,verbs=get;list;watch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the StorageClusterPeer object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)

// TODO(user): your logic here

return ctrl.Result{}, nil
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
var err error
r.ctx = ctx
r.log = log.FromContext(ctx, "StorageClient", request)
r.log.Info("Reconciling StorageClusterPeer.")

// Fetch the StorageClusterPeer instance
storageClusterPeer := &ocsv1.StorageClusterPeer{}
storageClusterPeer.Name = request.Name
storageClusterPeer.Namespace = request.Namespace

if err = r.get(storageClusterPeer); err != nil {
if k8serrors.IsNotFound(err) {
r.log.Info("StorageClusterPeer resource not found. Ignoring since object must have been deleted.")
return ctrl.Result{}, nil
}
r.log.Error(err, "Failed to get StorageClusterPeer.")
return ctrl.Result{}, err
}

storageCluster := &ocsv1.StorageCluster{}
storageCluster.Namespace = request.Namespace

// Fetch StorageCluster
for i := range storageClusterPeer.OwnerReferences {
if storageClusterPeer.OwnerReferences[i].Kind == "StorageCluster" {
storageCluster.Name = storageClusterPeer.OwnerReferences[i].Name
break
}
}

if err = r.get(storageCluster); err != nil {
r.log.Error(err, "failed to fetch StorageCluster for StorageClusterPeer found in the same namespace.")
if k8serrors.IsNotFound(err) {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
_ = r.updateStatus(storageClusterPeer)
return ctrl.Result{Requeue: false}, nil
}
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, err
}

if storageClusterPeer.Status.State == ocsv1.StorageClusterPeerStatePeered {
return ctrl.Result{}, nil
}
storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStateInitializing

peerStorageClusterUID, err := readStorageClusterUIDFromTicket(storageClusterPeer.Spec.OnboardingToken)
if err != nil {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Error(err, "Failed to read peer StorageCluster Info.")
_ = r.updateStatus(storageClusterPeer)
return ctrl.Result{Requeue: false}, nil
}
if storageClusterPeer.Status.PeerInfo == nil {
storageClusterPeer.Status.PeerInfo = &ocsv1.PeerInfo{}
}
storageClusterPeer.Status.PeerInfo.StorageClusterUid = string(peerStorageClusterUID)

ocsClient, err := providerClient.NewProviderClient(r.ctx, storageClusterPeer.Spec.ApiEndpoint, time.Second*10)
if err != nil {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, fmt.Errorf("failed to create a new provider client: %v", err)
}
defer ocsClient.Close()

storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStatePending

_, err = ocsClient.PeerStorageCluster(r.ctx, storageClusterPeer.Spec.OnboardingToken, string(storageCluster.UID))
if err != nil {
r.log.Error(err, fmt.Sprintf("failed to Peer Storage Cluster, code: %v.", err))
st, _ := status.FromError(err)
if st.Code() == codes.InvalidArgument {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
_ = r.updateStatus(storageClusterPeer)
return ctrl.Result{Requeue: false}, nil
}
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
} else {
storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStatePeered
}

// Apply status changes
statusErr := r.updateStatus(storageClusterPeer)
if statusErr != nil {
r.log.Error(statusErr, "Failed to update StorageClusterPeer status.")
// Reconcile error take precedence over status error
if err == nil {
err = statusErr
}
}

return ctrl.Result{}, err
}

// SetupWithManager sets up the controller with the Manager.
func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1.StorageClusterPeer{}).
Complete(r)
func (r *StorageClusterPeerReconciler) updateStatus(obj client.Object) error {
return r.Client.Status().Update(r.ctx, obj)
}

func (r *StorageClusterPeerReconciler) get(obj client.Object) error {
key := client.ObjectKeyFromObject(obj)
return r.Client.Get(r.ctx, key, obj)
}

func readStorageClusterUIDFromTicket(ticket string) (types.UID, error) {
ticketArr := strings.Split(string(ticket), ".")
if len(ticketArr) != 2 {
return "", fmt.Errorf("invalid ticket")
}

message, err := base64.StdEncoding.DecodeString(ticketArr[0])
if err != nil {
return "", fmt.Errorf("failed to decode onboarding ticket: %v", err)
}

var ticketData services.OnboardingTicket
err = json.Unmarshal(message, &ticketData)
if err != nil {
return "", fmt.Errorf("failed to unmarshal onboarding ticket message. %v", err)
}

return ticketData.StorageCluster, nil
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 99e35e9

Please sign in to comment.