Skip to content

Commit

Permalink
storageclusterpeer: implement logic for storageclusterpeer
Browse files Browse the repository at this point in the history
Signed-off-by: Rewant Soni <[email protected]>
  • Loading branch information
rewantsoni committed Nov 11, 2024
1 parent 737f5bf commit d1b15b2
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 25 deletions.
6 changes: 4 additions & 2 deletions api/v1/storageclusterpeer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
type StorageClusterPeerState string

const (
StorageClusterPeerStatePending StorageClusterPeerState = "Pending"
StorageClusterPeerStatePeered StorageClusterPeerState = "Peered"
StorageClusterPeerStateInitializing StorageClusterPeerState = "Initializing"
StorageClusterPeerStatePending StorageClusterPeerState = "Pending"
StorageClusterPeerStatePeered StorageClusterPeerState = "Peered"
StorageClusterPeerStateFailed StorageClusterPeerState = "Failed"
)

type PeerInfo struct {
Expand Down
28 changes: 28 additions & 0 deletions controllers/storagecluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
error1 "errors"
"fmt"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"strings"
"time"

Expand Down Expand Up @@ -403,6 +404,10 @@ func (r *StorageClusterReconciler) reconcilePhases(
return reconcile.Result{}, nil
}

if res, err := r.ownStorageClusterPeersInNamespace(instance); err != nil || !res.IsZero() {
return reconcile.Result{}, err
}

// in-memory conditions should start off empty. It will only ever hold
// negative conditions (!Available, Degraded, Progressing)
r.conditions = nil
Expand Down Expand Up @@ -788,6 +793,29 @@ func (r *StorageClusterReconciler) isStorageClusterNotIgnored(
return true
}

func (r *StorageClusterReconciler) ownStorageClusterPeersInNamespace(instance *ocsv1.StorageCluster) (reconcile.Result, error) {
storageClusterPeerList := &ocsv1.StorageClusterPeerList{}
err := r.Client.List(r.ctx, storageClusterPeerList, client.InNamespace(instance.Namespace))
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to list storageClusterPeer: %w", err)
}
for i := range storageClusterPeerList.Items {
scp := &storageClusterPeerList.Items[i]
lenOwners := len(scp.OwnerReferences)
err := controllerutil.SetOwnerReference(instance, scp, r.Scheme)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to set owner reference on StorageClusterPeer %v: %w", scp.Name, err)
}
if lenOwners != len(scp.OwnerReferences) {
err = r.Client.Update(r.ctx, scp)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to persist StorageCluster owner ref on StorageClusterPeer %v: %w", scp.Name, err)
}
}
}
return reconcile.Result{}, nil
}

// Checks whether a string is contained within a slice
func contains(slice []string, s string) bool {
for _, item := range slice {
Expand Down
3 changes: 2 additions & 1 deletion controllers/storagecluster/storagecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ func (r *StorageClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
Watches(&storagev1.StorageClass{}, enqueueStorageClusterRequest).
Watches(&volumesnapshotv1.VolumeSnapshotClass{}, enqueueStorageClusterRequest).
Watches(&ocsv1.StorageProfile{}, enqueueStorageClusterRequest).
Watches(&ocsv1alpha1.StorageConsumer{}, enqueueStorageClusterRequest, builder.WithPredicates(storageConsumerStatusPredicate))
Watches(&ocsv1alpha1.StorageConsumer{}, enqueueStorageClusterRequest, builder.WithPredicates(storageConsumerStatusPredicate)).
Watches(&ocsv1.StorageClusterPeer{}, enqueueStorageClusterRequest, builder.WithPredicates(predicate.GenerationChangedPredicate{}))

if os.Getenv("SKIP_NOOBAA_CRD_WATCH") != "true" {
build.Owns(&nbv1.NooBaa{}, builder.WithPredicates(noobaaIgnoreTimeUpdatePredicate))
Expand Down
166 changes: 148 additions & 18 deletions controllers/storageclusterpeer/storageclusterpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,176 @@ package storageclusterpeer

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"google.golang.org/grpc/codes"
"strings"
"time"

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
providerClient "github.com/red-hat-storage/ocs-operator/services/provider/api/v4/client"
"github.com/red-hat-storage/ocs-operator/v4/controllers/util"
"github.com/red-hat-storage/ocs-operator/v4/services"

"github.com/go-logr/logr"
"google.golang.org/grpc/status"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"

v1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// StorageClusterPeerReconciler reconciles a StorageClusterPeer object
// nolint:revive
type StorageClusterPeerReconciler struct {
client.Client
Scheme *runtime.Scheme

log logr.Logger
ctx context.Context
}

// SetupWithManager sets up the controller with the Manager.
func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&ocsv1.StorageClusterPeer{}).
Watches(&ocsv1.StorageCluster{}, &handler.EnqueueRequestForObject{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
}

//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/finalizers,verbs=update
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusters,verbs=get;list;watch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the StorageClusterPeer object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)

// TODO(user): your logic here
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
var err error
r.ctx = ctx
r.log = log.FromContext(ctx, "StorageClient", request)
r.log.Info("Reconciling StorageClusterPeer.")

// Fetch the StorageClusterPeer instance
storageClusterPeer := &ocsv1.StorageClusterPeer{}
storageClusterPeer.Name = request.Name
storageClusterPeer.Namespace = request.Namespace

if err = r.get(storageClusterPeer); err != nil {
if k8serrors.IsNotFound(err) {
r.log.Info("StorageClusterPeer resource not found. Ignoring since object must have been deleted.")
return ctrl.Result{}, nil
}
r.log.Error(err, "Failed to get StorageClusterPeer.")
return ctrl.Result{}, err
}

if storageClusterPeer.Status.State == ocsv1.StorageClusterPeerStatePeered {
return ctrl.Result{}, nil
}

result, reconcileError := r.reconcileStates(storageClusterPeer)

// Apply status changes
statusError := r.Client.Status().Update(r.ctx, storageClusterPeer)
if statusError != nil {
r.log.Info("Could not update StorageClusterPeer status.")
}

// Reconcile errors have higher priority than status update errors
if reconcileError != nil {
return result, reconcileError
} else if statusError != nil {
return result, statusError
}

return result, nil
}

func (r *StorageClusterPeerReconciler) reconcileStates(storageClusterPeer *ocsv1.StorageClusterPeer) (ctrl.Result, error) {
storageCluster := &ocsv1.StorageCluster{}
storageCluster.Namespace = storageClusterPeer.Namespace

// Fetch StorageCluster
owner, err := util.FindOwnerRefByKind(storageClusterPeer, "StorageCluster")
if err != nil {
return ctrl.Result{}, err
}

storageCluster.Name = owner.Name

if err := r.get(storageCluster); client.IgnoreNotFound(err) != nil {
r.log.Error(err, "Error fetching StorageCluster for StorageClusterPeer found in the same namespace.")
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, err
} else if k8serrors.IsNotFound(err) {
r.log.Error(err, "Cannot find a StorageCluster for StorageClusterPeer in the same namespace.")
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, nil
}

storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStateInitializing

// Read StorageClusterUID from ticket
ticketArr := strings.Split(string(storageClusterPeer.Spec.OnboardingToken), ".")
if len(ticketArr) != 2 {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Error(fmt.Errorf("invalid ticket"), "Invalid onboarding ticket. Does not conform to expected ticket structure")
return ctrl.Result{}, nil
}
message, err := base64.StdEncoding.DecodeString(ticketArr[0])
if err != nil {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Error(err, "failed to decode onboarding ticket")
return ctrl.Result{}, nil
}
var ticketData services.OnboardingTicket
err = json.Unmarshal(message, &ticketData)
if err != nil {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Error(err, "onboarding ticket message is not a valid JSON.")
return ctrl.Result{}, nil
}

if storageClusterPeer.Status.PeerInfo == nil {
storageClusterPeer.Status.PeerInfo = &ocsv1.PeerInfo{StorageClusterUid: string(ticketData.StorageCluster)}
}

ocsClient, err := providerClient.NewProviderClient(r.ctx, storageClusterPeer.Spec.ApiEndpoint, time.Second*10)
if err != nil {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, fmt.Errorf("failed to create a new provider client: %v", err)
}
defer ocsClient.Close()

storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStatePending

_, err = ocsClient.PeerStorageCluster(r.ctx, storageClusterPeer.Spec.OnboardingToken, string(storageCluster.UID))
if err != nil {
r.log.Error(err, fmt.Sprintf("failed to Peer Storage Cluster, reason: %v.", err))
st, ok := status.FromError(err)
if !ok {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Error(fmt.Errorf("invalid code"), "failed to extract an HTTP status code from error")
return ctrl.Result{}, fmt.Errorf("failed to extract an HTTP status code from error")
}
if st.Code() == codes.InvalidArgument {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, nil
}
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
} else {
storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStatePeered
}
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1.StorageClusterPeer{}).
Complete(r)
func (r *StorageClusterPeerReconciler) get(obj client.Object) error {
key := client.ObjectKeyFromObject(obj)
return r.Client.Get(r.ctx, key, obj)
}
11 changes: 11 additions & 0 deletions controllers/util/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package util
import (
"context"
"fmt"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
"strings"

Expand Down Expand Up @@ -202,3 +203,13 @@ func NewK8sClient(scheme *runtime.Scheme) (client.Client, error) {

return k8sClient, nil
}

func FindOwnerRefByKind(obj client.Object, kind string) (*v1.OwnerReference, error) {
owners := obj.GetOwnerReferences()
for i := range owners {
if owners[i].Kind == kind {
return &owners[i], nil
}
}
return nil, fmt.Errorf("unable to find owner reference by kind %s", kind)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d1b15b2

Please sign in to comment.