Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storageClusterPeer: add logic for storageclusterpeer controller #2678

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions api/v1/storageclusterpeer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
type StorageClusterPeerState string

const (
StorageClusterPeerStatePending StorageClusterPeerState = "Pending"
StorageClusterPeerStatePeered StorageClusterPeerState = "Peered"
StorageClusterPeerStateInitializing StorageClusterPeerState = "Initializing"
StorageClusterPeerStatePending StorageClusterPeerState = "Pending"
StorageClusterPeerStatePeered StorageClusterPeerState = "Peered"
StorageClusterPeerStateFailed StorageClusterPeerState = "Failed"
)

type PeerInfo struct {
Expand Down
28 changes: 28 additions & 0 deletions controllers/storagecluster/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
error1 "errors"
"fmt"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"strings"
"time"

Expand Down Expand Up @@ -403,6 +404,10 @@ func (r *StorageClusterReconciler) reconcilePhases(
return reconcile.Result{}, nil
}

if res, err := r.ownStorageClusterPeersInNamespace(instance); err != nil || !res.IsZero() {
return reconcile.Result{}, err
}

// in-memory conditions should start off empty. It will only ever hold
// negative conditions (!Available, Degraded, Progressing)
r.conditions = nil
Expand Down Expand Up @@ -788,6 +793,29 @@ func (r *StorageClusterReconciler) isStorageClusterNotIgnored(
return true
}

func (r *StorageClusterReconciler) ownStorageClusterPeersInNamespace(instance *ocsv1.StorageCluster) (reconcile.Result, error) {
storageClusterPeerList := &ocsv1.StorageClusterPeerList{}
err := r.Client.List(r.ctx, storageClusterPeerList, client.InNamespace(instance.Namespace))
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to list storageClusterPeer: %w", err)
}
for i := range storageClusterPeerList.Items {
scp := &storageClusterPeerList.Items[i]
lenOwners := len(scp.OwnerReferences)
err := controllerutil.SetOwnerReference(instance, scp, r.Scheme)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to set owner reference on StorageClusterPeer %v: %w", scp.Name, err)
}
if lenOwners != len(scp.OwnerReferences) {
err = r.Client.Update(r.ctx, scp)
if err != nil {
return reconcile.Result{}, fmt.Errorf("failed to persist StorageCluster owner ref on StorageClusterPeer %v: %w", scp.Name, err)
}
}
}
return reconcile.Result{}, nil
}

// Checks whether a string is contained within a slice
func contains(slice []string, s string) bool {
for _, item := range slice {
Expand Down
3 changes: 2 additions & 1 deletion controllers/storagecluster/storagecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ func (r *StorageClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
Watches(&storagev1.StorageClass{}, enqueueStorageClusterRequest).
Watches(&volumesnapshotv1.VolumeSnapshotClass{}, enqueueStorageClusterRequest).
Watches(&ocsv1.StorageProfile{}, enqueueStorageClusterRequest).
Watches(&ocsv1alpha1.StorageConsumer{}, enqueueStorageClusterRequest, builder.WithPredicates(storageConsumerStatusPredicate))
Watches(&ocsv1alpha1.StorageConsumer{}, enqueueStorageClusterRequest, builder.WithPredicates(storageConsumerStatusPredicate)).
Watches(&ocsv1.StorageClusterPeer{}, enqueueStorageClusterRequest, builder.WithPredicates(predicate.GenerationChangedPredicate{}))

if os.Getenv("SKIP_NOOBAA_CRD_WATCH") != "true" {
build.Owns(&nbv1.NooBaa{}, builder.WithPredicates(noobaaIgnoreTimeUpdatePredicate))
Expand Down
168 changes: 150 additions & 18 deletions controllers/storageclusterpeer/storageclusterpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,178 @@ package storageclusterpeer

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"google.golang.org/grpc/codes"
"strings"
"time"

ocsv1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
providerClient "github.com/red-hat-storage/ocs-operator/services/provider/api/v4/client"
"github.com/red-hat-storage/ocs-operator/v4/controllers/util"
"github.com/red-hat-storage/ocs-operator/v4/services"

"github.com/go-logr/logr"
"google.golang.org/grpc/status"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"

v1 "github.com/red-hat-storage/ocs-operator/api/v4/v1"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// StorageClusterPeerReconciler reconciles a StorageClusterPeer object
// nolint:revive
nb-ohad marked this conversation as resolved.
Show resolved Hide resolved
type StorageClusterPeerReconciler struct {
client.Client
Scheme *runtime.Scheme

log logr.Logger
ctx context.Context
}

// SetupWithManager sets up the controller with the Manager.
func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&ocsv1.StorageClusterPeer{}).
Watches(&ocsv1.StorageCluster{}, &handler.EnqueueRequestForObject{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
Complete(r)
}

//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusterpeers/finalizers,verbs=update
//+kubebuilder:rbac:groups=ocs.openshift.io,resources=storageclusters,verbs=get;list;watch

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the StorageClusterPeer object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)

// TODO(user): your logic here
func (r *StorageClusterPeerReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
var err error
r.ctx = ctx
r.log = log.FromContext(ctx, "StorageClient", request)
r.log.Info("Reconciling StorageClusterPeer.")

// Fetch the StorageClusterPeer instance
storageClusterPeer := &ocsv1.StorageClusterPeer{}
storageClusterPeer.Name = request.Name
storageClusterPeer.Namespace = request.Namespace

if err = r.get(storageClusterPeer); err != nil {
if k8serrors.IsNotFound(err) {
r.log.Info("StorageClusterPeer resource not found. Ignoring since object must have been deleted.")
return ctrl.Result{}, nil
}
r.log.Error(err, "Failed to get StorageClusterPeer.")
return ctrl.Result{}, err
}

if storageClusterPeer.Status.State == ocsv1.StorageClusterPeerStatePeered {
return ctrl.Result{}, nil
}

result, reconcileError := r.reconcileStates(storageClusterPeer)

// Apply status changes
statusError := r.Client.Status().Update(r.ctx, storageClusterPeer)
if statusError != nil {
r.log.Info("Could not update StorageClusterPeer status.")
}

// Reconcile errors have higher priority than status update errors
if reconcileError != nil {
return result, reconcileError
} else if statusError != nil {
return result, statusError
}

return result, nil
}

func (r *StorageClusterPeerReconciler) reconcileStates(storageClusterPeer *ocsv1.StorageClusterPeer) (ctrl.Result, error) {
storageCluster := &ocsv1.StorageCluster{}
storageCluster.Namespace = storageClusterPeer.Namespace

// Fetch StorageCluster
owner := util.FindOwnerRefByKind(storageClusterPeer, "StorageCluster")

if owner == nil {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, fmt.Errorf("failed to find StorgeCluster owning the StorageClusterPeer")
}

storageCluster.Name = owner.Name
nb-ohad marked this conversation as resolved.
Show resolved Hide resolved

if err := r.get(storageCluster); client.IgnoreNotFound(err) != nil {
r.log.Error(err, "Error fetching StorageCluster for StorageClusterPeer found in the same namespace.")
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, err
nb-ohad marked this conversation as resolved.
Show resolved Hide resolved
} else if k8serrors.IsNotFound(err) {
r.log.Error(err, "Cannot find a StorageCluster for StorageClusterPeer in the same namespace.")
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, nil
}

storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStateInitializing

// Read StorageClusterUID from ticket
ticketArr := strings.Split(string(storageClusterPeer.Spec.OnboardingToken), ".")
if len(ticketArr) != 2 {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Error(fmt.Errorf("invalid ticket"), "Invalid onboarding ticket. Does not conform to expected ticket structure")
return ctrl.Result{}, nil
}
message, err := base64.StdEncoding.DecodeString(ticketArr[0])
if err != nil {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Error(err, "failed to decode onboarding ticket")
return ctrl.Result{}, nil
}
var ticketData services.OnboardingTicket
err = json.Unmarshal(message, &ticketData)
if err != nil {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Error(err, "onboarding ticket message is not a valid JSON.")
return ctrl.Result{}, nil
}

if storageClusterPeer.Status.PeerInfo == nil {
storageClusterPeer.Status.PeerInfo = &ocsv1.PeerInfo{StorageClusterUid: string(ticketData.StorageCluster)}
}

ocsClient, err := providerClient.NewProviderClient(r.ctx, storageClusterPeer.Spec.ApiEndpoint, time.Second*10)
if err != nil {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, fmt.Errorf("failed to create a new provider client: %v", err)
}
defer ocsClient.Close()

rewantsoni marked this conversation as resolved.
Show resolved Hide resolved
storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStatePending

_, err = ocsClient.PeerStorageCluster(r.ctx, storageClusterPeer.Spec.OnboardingToken, string(storageCluster.UID))
if err != nil {
r.log.Error(err, fmt.Sprintf("failed to Peer Storage Cluster, reason: %v.", err))
st, ok := status.FromError(err)
if !ok {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
r.log.Error(fmt.Errorf("invalid code"), "failed to extract an HTTP status code from error")
return ctrl.Result{}, fmt.Errorf("failed to extract an HTTP status code from error")
}
if st.Code() == codes.InvalidArgument {
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
return ctrl.Result{}, nil
}
storageClusterPeer.Status.State = ocsv1.ReconcileFailed
} else {
storageClusterPeer.Status.State = ocsv1.StorageClusterPeerStatePeered
}
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *StorageClusterPeerReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1.StorageClusterPeer{}).
Complete(r)
func (r *StorageClusterPeerReconciler) get(obj client.Object) error {
key := client.ObjectKeyFromObject(obj)
return r.Client.Get(r.ctx, key, obj)
}
11 changes: 11 additions & 0 deletions controllers/util/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package util
import (
"context"
"fmt"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"os"
"strings"

Expand Down Expand Up @@ -202,3 +203,13 @@ func NewK8sClient(scheme *runtime.Scheme) (client.Client, error) {

return k8sClient, nil
}

func FindOwnerRefByKind(obj client.Object, kind string) *v1.OwnerReference {
owners := obj.GetOwnerReferences()
for i := range owners {
if owners[i].Kind == kind {
return &owners[i]
}
}
return nil
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading