Skip to content

Commit

Permalink
storageclusterpeer: implement logic for storageclusterpeer
Browse files Browse the repository at this point in the history
Signed-off-by: Rewant Soni <[email protected]>
  • Loading branch information
rewantsoni committed Nov 11, 2024
1 parent c079ab8 commit 7430e29
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 25 deletions.
6 changes: 4 additions & 2 deletions api/v1/storageclusterpeer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
type StorageClusterPeerState string

const (
StorageClusterPeerStatePending StorageClusterPeerState = "Pending"
StorageClusterPeerStatePeered StorageClusterPeerState = "Peered"
StorageClusterPeerStateInitializing StorageClusterPeerState = "Initializing"
StorageClusterPeerStatePending StorageClusterPeerState = "Pending"
StorageClusterPeerStatePeered StorageClusterPeerState = "Peered"
StorageClusterPeerStateFailed StorageClusterPeerState = "Failed"
)

type PeerInfo struct {
Expand Down
18 changes: 18 additions & 0 deletions controllers/storagecluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
error1 "errors"
"fmt"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"strings"
"time"

Expand Down Expand Up @@ -403,6 +404,23 @@ func (r *StorageClusterReconciler) reconcilePhases(
return reconcile.Result{}, nil
}

storageClusterPeerList := &ocsv1.StorageClusterList{}
err := r.Client.List(r.ctx, storageClusterPeerList, client.InNamespace(instance.Namespace))
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to list storageClusterPeer: %w", err)
}
for i := range storageClusterPeerList.Items {
scp := &storageClusterPeerList.Items[i]
err := controllerutil.SetControllerReference(instance, scp, r.Scheme)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to set controller reference on StorageClusterPeer %v: %w", scp.Name, err)
}
err = r.Client.Update(r.ctx, scp)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to update StorageClusterPeer %v: %w", scp.Name, err)
}
}

// in-memory conditions should start off empty. It will only ever hold
// negative conditions (!Available, Degraded, Progressing)
r.conditions = nil
Expand Down
3 changes: 2 additions & 1 deletion controllers/storagecluster/storagecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ func (r *StorageClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
Watches(&storagev1.StorageClass{}, enqueueStorageClusterRequest).
Watches(&volumesnapshotv1.VolumeSnapshotClass{}, enqueueStorageClusterRequest).
Watches(&ocsv1.StorageProfile{}, enqueueStorageClusterRequest).
Watches(&ocsv1alpha1.StorageConsumer{}, enqueueStorageClusterRequest, builder.WithPredicates(storageConsumerStatusPredicate))
Watches(&ocsv1alpha1.StorageConsumer{}, enqueueStorageClusterRequest, builder.WithPredicates(storageConsumerStatusPredicate)).
Watches(&ocsv1.StorageClusterPeer{}, enqueueStorageClusterRequest, builder.WithPredicates(predicate.GenerationChangedPredicate{}))

if os.Getenv("SKIP_NOOBAA_CRD_WATCH") != "true" {
build.Owns(&nbv1.NooBaa{}, builder.WithPredicates(noobaaIgnoreTimeUpdatePredicate))
Expand Down
165 changes: 147 additions & 18 deletions controllers/storageclusterpeer/storageclusterpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,175 @@ package storageclusterpeer

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/red-hat-storage/ocs-operator/v4/controllers/util"
"google.golang.org/grpc/codes"
"strings"
"time"

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
providerClient "github.com/red-hat-storage/ocs-operator/services/provider/api/v4/client"
"github.com/red-hat-storage/ocs-operator/v4/services"

"github.com/go-logr/logr"
"google.golang.org/grpc/status"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"

v1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// StorageClusterPeerReconciler reconciles a StorageClusterPeer object
// nolint:revive
type StorageClusterPeerReconciler struct {
client.Client
Scheme *runtime.Scheme

log logr.Logger
ctx context.Context
}

// SetupWithManager sets up the controller with the Manager.
func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&ocsv1.StorageClusterPeer{}).
Watches(&ocsv1.StorageCluster{}, &handler.EnqueueRequestForObject{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
}

//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/finalizers,verbs=update
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusters,verbs=get;list;watch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the StorageClusterPeer object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)

// TODO(user): your logic here
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
var err error
r.ctx = ctx
r.log = log.FromContext(ctx, "StorageClient", request)
r.log.Info("Reconciling StorageClusterPeer.")

// Fetch the StorageClusterPeer instance
storageClusterPeer := &ocsv1.StorageClusterPeer{}
storageClusterPeer.Name = request.Name
storageClusterPeer.Namespace = request.Namespace

if err = r.get(storageClusterPeer); err != nil {
if k8serrors.IsNotFound(err) {
r.log.Info("StorageClusterPeer resource not found. Ignoring since object must have been deleted.")
return ctrl.Result{}, nil
}
r.log.Error(err, "Failed to get StorageClusterPeer.")
return ctrl.Result{}, err
}

if storageClusterPeer.Status.State == ocsv1.StorageClusterPeerStatePeered {
return ctrl.Result{}, nil
}

result, reconcileError := r.reconcileStates(storageClusterPeer)

// Apply status changes
statusError := r.Client.Status().Update(r.ctx, storageClusterPeer)
if statusError != nil {
r.log.Info("Could not update StorageClusterPeer status.")
}

// Reconcile errors have higher priority than status update errors
if reconcileError != nil {
return result, reconcileError
} else if statusError != nil {
return result, statusError
}

return result, nil
}

func (r *StorageClusterPeerReconciler) reconcileStates(storageClusterPeer *ocsv1.StorageClusterPeer) (ctrl.Result, error) {
var err error
storageCluster := &ocsv1.StorageCluster{}
storageCluster.Namespace = storageClusterPeer.Namespace

// Fetch StorageCluster
storageCluster.Name, err = util.FindOwnerRefByKind(storageClusterPeer, "StorageCluster")
if err != nil {
return ctrl.Result{}, err
}

if err := r.get(storageCluster); client.IgnoreNotFound(err) != nil {
r.log.Error(err, "Error fetching StorageCluster for StorageClusterPeer found in the same namespace.")
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, err
} else if k8serrors.IsNotFound(err) {
r.log.Error(err, "Cannot find a StorageCluster for StorageClusterPeer in the same namespace.")
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, nil
}

storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStateInitializing

// Read StorageClusterUID from ticket
ticketArr := strings.Split(string(storageClusterPeer.Spec.OnboardingToken), ".")
if len(ticketArr) != 2 {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Error(fmt.Errorf("invalid ticket"), "Invalid onboarding ticket. Does not conform to expected ticket structure")
return ctrl.Result{}, nil
}
message, err := base64.StdEncoding.DecodeString(ticketArr[0])
if err != nil {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Error(err, "failed to decode onboarding ticket")
return ctrl.Result{}, nil
}
var ticketData services.OnboardingTicket
err = json.Unmarshal(message, &ticketData)
if err != nil {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Error(err, "onboarding ticket message is not a valid JSON.")
return ctrl.Result{}, nil
}

if storageClusterPeer.Status.PeerInfo == nil {
storageClusterPeer.Status.PeerInfo = &ocsv1.PeerInfo{StorageClusterUid: string(ticketData.StorageCluster)}
}

ocsClient, err := providerClient.NewProviderClient(r.ctx, storageClusterPeer.Spec.ApiEndpoint, time.Second*10)
if err != nil {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, fmt.Errorf("failed to create a new provider client: %v", err)
}
defer ocsClient.Close()

storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStatePending

_, err = ocsClient.PeerStorageCluster(r.ctx, storageClusterPeer.Spec.OnboardingToken, string(storageCluster.UID))
if err != nil {
r.log.Error(err, fmt.Sprintf("failed to Peer Storage Cluster, reason: %v.", err))
st, ok := status.FromError(err)
if !ok {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Info("invalid code return from the rpc call")
return ctrl.Result{}, fmt.Errorf("invalid code return from the rpc call")
}
if st.Code() == codes.InvalidArgument {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, nil
}
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
} else {
storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStatePeered
}
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1.StorageClusterPeer{}).
Complete(r)
func (r *StorageClusterPeerReconciler) get(obj client.Object) error {
key := client.ObjectKeyFromObject(obj)
return r.Client.Get(r.ctx, key, obj)
}
10 changes: 10 additions & 0 deletions controllers/util/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,13 @@ func NewK8sClient(scheme *runtime.Scheme) (client.Client, error) {

return k8sClient, nil
}

func FindOwnerRefByKind(obj client.Object, kind string) (string, error) {
owners := obj.GetOwnerReferences()
for i := range owners {
if owners[i].Kind == kind {
return owners[i].Name, nil
}
}
return "", fmt.Errorf("unable to find owner reference by kind %s", kind)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 7430e29

Please sign in to comment.