Skip to content

Commit

Permalink
Add RemoteKubernetesCluster protection controller
Browse files Browse the repository at this point in the history
Controller reconciles finalizer on RemoteKubernetesCluster, preventing from premature deletion.
It waits until all ScyllaDBClusters using particular RemoteKubernetesCluster are deleted.
  • Loading branch information
zimnx committed Dec 16, 2024
1 parent d87bd02 commit c422af5
Show file tree
Hide file tree
Showing 10 changed files with 586 additions and 2 deletions.
1 change: 1 addition & 0 deletions pkg/cmd/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ func (o *OperatorOptions) run(ctx context.Context, streams genericclioptions.IOS
o.kubeClient,
o.scyllaClient.ScyllaV1alpha1(),
scyllaInformers.Scylla().V1alpha1().RemoteKubernetesClusters(),
scyllaInformers.Scylla().V1alpha1().ScyllaDBClusters(),
kubeInformers.Core().V1().Secrets(),
[]remoteclient.DynamicClusterInterface{
&o.clusterKubeClient,
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/remotekubernetescluster/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ const (
clientHealthcheckControllerAvailableCondition = "ClientHealthcheckControllerAvailable"
clientHealthcheckControllerProgressingCondition = "ClientHealthcheckControllerProgressing"
clientHealthcheckControllerDegradedCondition = "ClientHealthcheckControllerDegraded"

remoteKubernetesClusterFinalizerProgressingCondition = "RemoteKubernetesClusterFinalizerProgressing"
remoteKubernetesClusterFinalizerDegradedCondition = "RemoteKubernetesClusterFinalizerDegraded"
)
46 changes: 46 additions & 0 deletions pkg/controller/remotekubernetescluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Controller struct {
scyllaClient scyllav1alpha1client.ScyllaV1alpha1Interface

remoteKubernetesClusterLister scyllav1alpha1listers.RemoteKubernetesClusterLister
scyllaDBClusterLister scyllav1alpha1listers.ScyllaDBClusterLister
secretLister corev1listers.SecretLister

clusterKubeClient remoteclient.ClusterClientInterface[kubernetes.Interface]
Expand All @@ -65,6 +66,7 @@ func NewController(
kubeClient kubernetes.Interface,
scyllaClient scyllav1alpha1client.ScyllaV1alpha1Interface,
remoteKubernetesClusterInformer scyllav1alpha1informers.RemoteKubernetesClusterInformer,
scyllaDBClusterInformer scyllav1alpha1informers.ScyllaDBClusterInformer,
secretInformer corev1informers.SecretInformer,
dynamicClusterHandlers []remoteclient.DynamicClusterInterface,
clusterKubeClient remoteclient.ClusterClientInterface[kubernetes.Interface],
Expand All @@ -79,6 +81,7 @@ func NewController(
scyllaClient: scyllaClient,

remoteKubernetesClusterLister: remoteKubernetesClusterInformer.Lister(),
scyllaDBClusterLister: scyllaDBClusterInformer.Lister(),
secretLister: secretInformer.Lister(),

dynamicClusterHandlers: dynamicClusterHandlers,
Expand All @@ -88,6 +91,7 @@ func NewController(

cachesToSync: []cache.InformerSynced{
remoteKubernetesClusterInformer.Informer().HasSynced,
scyllaDBClusterInformer.Informer().HasSynced,
secretInformer.Informer().HasSynced,
},

Expand Down Expand Up @@ -123,6 +127,12 @@ func NewController(
DeleteFunc: rkcc.deleteRemoteKubernetesCluster,
})

scyllaDBClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rkcc.addScyllaDBCluster,
UpdateFunc: rkcc.updateScyllaDBCluster,
DeleteFunc: rkcc.deleteScyllaDBCluster,
})

secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: rkcc.addSecret,
UpdateFunc: rkcc.updateSecret,
Expand Down Expand Up @@ -263,3 +273,39 @@ func (rkcc *Controller) enqueueRemoteKubernetesClusterUsingSecret(secret *corev1
return rkc.Spec.KubeconfigSecretRef.Namespace == secret.Namespace && rkc.Spec.KubeconfigSecretRef.Name == secret.Name
}))
}

func (rkcc *Controller) addScyllaDBCluster(obj interface{}) {
rkcc.handlers.HandleAdd(
obj.(*scyllav1alpha1.ScyllaDBCluster),
rkcc.enqueueScyllaDBCluster,
)
}

func (rkcc *Controller) updateScyllaDBCluster(old, cur interface{}) {
rkcc.handlers.HandleUpdate(
old.(*scyllav1alpha1.ScyllaDBCluster),
cur.(*scyllav1alpha1.ScyllaDBCluster),
rkcc.enqueueScyllaDBCluster,
rkcc.deleteScyllaDBCluster,
)
}

func (rkcc *Controller) deleteScyllaDBCluster(obj interface{}) {
rkcc.handlers.HandleDelete(
obj,
rkcc.enqueueScyllaDBCluster,
)
}

func (rkcc *Controller) enqueueScyllaDBCluster(depth int, obj kubeinterfaces.ObjectInterface, op controllerhelpers.HandlerOperationType) {
sc := obj.(*scyllav1alpha1.ScyllaDBCluster)

for _, dc := range sc.Spec.Datacenters {
rkc, err := rkcc.remoteKubernetesClusterLister.Get(dc.RemoteKubernetesClusterName)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't find RemoteKubernetesCluster with name %q", dc.RemoteKubernetesClusterName))
return
}
rkcc.handlers.Enqueue(depth+1, rkc, op)
}
}
23 changes: 23 additions & 0 deletions pkg/controller/remotekubernetescluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/scylladb/scylla-operator/pkg/controllerhelpers"
"github.com/scylladb/scylla-operator/pkg/helpers/slices"
"github.com/scylladb/scylla-operator/pkg/naming"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/errors"
Expand Down Expand Up @@ -42,9 +44,30 @@ func (rkcc *Controller) sync(ctx context.Context, key string) error {

status := rkcc.calculateStatus(rkc)
if rkc.DeletionTimestamp != nil {
err = controllerhelpers.RunSync(
&status.Conditions,
remoteKubernetesClusterFinalizerProgressingCondition,
remoteKubernetesClusterFinalizerDegradedCondition,
rkc.Generation,
func() ([]metav1.Condition, error) {
return rkcc.syncFinalizer(ctx, rkc)
},
)
if err != nil {
return fmt.Errorf("can't finalize: %w", err)
}

return rkcc.updateStatus(ctx, rkc, status)
}

if !slices.ContainsItem(rkc.GetFinalizers(), naming.RemoteKubernetesClusterFinalizer) {
err = rkcc.addFinalizer(ctx, rkc)
if err != nil {
return fmt.Errorf("can't add finalizer: %w", err)
}
return nil
}

var errs []error

err = controllerhelpers.RunSync(
Expand Down
135 changes: 135 additions & 0 deletions pkg/controller/remotekubernetescluster/sync_finalizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright (c) 2024 ScyllaDB.

package remotekubernetescluster

import (
"context"
"fmt"
"strings"

scyllav1alpha1 "github.com/scylladb/scylla-operator/pkg/api/scylla/v1alpha1"
"github.com/scylladb/scylla-operator/pkg/controllerhelpers"
"github.com/scylladb/scylla-operator/pkg/helpers/slices"
"github.com/scylladb/scylla-operator/pkg/naming"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)

func (rkcc *Controller) syncFinalizer(ctx context.Context, rkc *scyllav1alpha1.RemoteKubernetesCluster) ([]metav1.Condition, error) {
var progressingConditions []metav1.Condition
var err error

if !slices.ContainsItem(rkc.GetFinalizers(), naming.RemoteKubernetesClusterFinalizer) {
klog.V(4).InfoS("Object is already finalized", "RemoteKubernetesCluster", klog.KObj(rkc), "UID", rkc.UID)
return progressingConditions, nil
}

klog.V(4).InfoS("Finalizing object", "RemoteKubernetesCluster", klog.KObj(rkc), "UID", rkc.UID)

isUsed, users, err := rkcc.isBeingUsed(ctx, rkc)
if err != nil {
return progressingConditions, fmt.Errorf("can't check if RemoteKubernetesCluster %q is being used: %w", naming.ObjRef(rkc), err)
}

if isUsed {
klog.V(2).InfoS("Keeping RemoteKubernetesCluster because it's being used", "RemoteKubernetesCluster", klog.KObj(rkc))

progressingConditions = append(progressingConditions, metav1.Condition{
Type: remoteKubernetesClusterFinalizerProgressingCondition,
Status: metav1.ConditionTrue,
Reason: "IsBeingUsed",
Message: fmt.Sprintf("Object is being used by following ScyllaDBCluster(s): %s", strings.Join(users, ",")),
ObservedGeneration: rkc.Generation,
})

return progressingConditions, nil
}

err = rkcc.removeFinalizer(ctx, rkc)
if err != nil {
return progressingConditions, fmt.Errorf("can't remove finalizer from RemoteKubernetesCluster %q: %w", naming.ObjRef(rkc), err)
}
return progressingConditions, nil
}

func (rkcc *Controller) addFinalizer(ctx context.Context, rkc *scyllav1alpha1.RemoteKubernetesCluster) error {
patch, err := controllerhelpers.AddFinalizerPatch(rkc, naming.RemoteKubernetesClusterFinalizer)
if err != nil {
return fmt.Errorf("can't create add finalizer patch: %w", err)
}

_, err = rkcc.scyllaClient.RemoteKubernetesClusters().Patch(ctx, rkc.Name, types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("can't patch RemoteKubernetesCluster %q: %w", naming.ObjRef(rkc), err)
}

klog.V(2).InfoS("Added finalizer to RemoteKubernetesCluster", "RemoteKubernetesCluster", klog.KObj(rkc))
return nil
}

func (rkcc *Controller) removeFinalizer(ctx context.Context, rkc *scyllav1alpha1.RemoteKubernetesCluster) error {
patch, err := controllerhelpers.RemoveFinalizerPatch(rkc, naming.RemoteKubernetesClusterFinalizer)
if err != nil {
return fmt.Errorf("can't create remove finalizer patch: %w", err)
}

_, err = rkcc.scyllaClient.RemoteKubernetesClusters().Patch(ctx, rkc.Name, types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("can't patch RemoteKubernetesCluster %q: %w", naming.ObjRef(rkc), err)
}

klog.V(2).InfoS("Removed finalizer from RemoteKubernetesCluster", "RemoteKubernetesCluster", klog.KObj(rkc))
return nil
}

func (rkcc *Controller) isBeingUsed(ctx context.Context, rkc *scyllav1alpha1.RemoteKubernetesCluster) (bool, []string, error) {
scs, err := rkcc.scyllaDBClusterLister.List(labels.Everything())
if err != nil {
return false, nil, fmt.Errorf("can't list all ScyllaClusters using lister: %w", err)
}

var scyllaDBClusterReferents []string
for _, sc := range scs {
for _, dc := range sc.Spec.Datacenters {
if dc.RemoteKubernetesClusterName == rkc.Name {
scyllaDBClusterReferents = append(scyllaDBClusterReferents, naming.ObjRef(sc))
}
}
}

if len(scyllaDBClusterReferents) != 0 {
klog.V(4).InfoS("Listed ScyllaClusters using Informer and found ScyllaDBCluster's referencing it", "RemoteKubernetesCluster", klog.KObj(rkc), "ScyllaDBClusters", scyllaDBClusterReferents)
return true, scyllaDBClusterReferents, nil
}

klog.V(4).InfoS("No ScyllaClusters referencing RemoteKubernetesCluster found in the Informer cache", "RemoteKubernetesCluster", klog.KObj(rkc))

// Live list ScyllaClusters to be 100% sure before we delete. Informer cache might not be updated yet.
scList, err := rkcc.scyllaClient.ScyllaDBClusters(corev1.NamespaceAll).List(ctx, metav1.ListOptions{
LabelSelector: labels.Everything().String(),
})
if err != nil {
return false, nil, fmt.Errorf("list all ScyllaClusters using lister: %w", err)
}

scyllaDBClusterReferents = scyllaDBClusterReferents[:0]
for _, sc := range scList.Items {
for _, dc := range sc.Spec.Datacenters {
if dc.RemoteKubernetesClusterName == rkc.Name {
scyllaDBClusterReferents = append(scyllaDBClusterReferents, naming.ObjRef(&sc))
}
}
}

if len(scyllaDBClusterReferents) != 0 {
klog.V(4).InfoS("Listed ScyllaClusters using live call and found ScyllaCluster's referencing it", "RemoteKubernetesCluster", klog.KObj(rkc), "ScyllaClusters", scyllaDBClusterReferents)
return true, scyllaDBClusterReferents, nil
}

klog.V(2).InfoS("RemoteKubernetesCluster doesn't have any referents", "RemoteKubernetesCluster", klog.KObj(rkc))

return false, nil, nil
}
82 changes: 82 additions & 0 deletions pkg/controllerhelpers/finalizers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) 2024 ScyllaDB.

package controllerhelpers

import (
"encoding/json"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type objectForFinalizersPatch struct {
objectMetaForFinalizersPatch `json:"metadata"`
}

// objectMetaForFinalizersPatch defines object meta struct for finalizers patch operation.
type objectMetaForFinalizersPatch struct {
ResourceVersion string `json:"resourceVersion"`
Finalizers []string `json:"finalizers"`
}

func RemoveFinalizerPatch(obj metav1.Object, finalizer string) ([]byte, error) {
if !HasFinalizer(obj, finalizer) {
return nil, nil
}

finalizers := obj.GetFinalizers()
var newFinalizers []string

for _, f := range finalizers {
if f == finalizer {
continue
}
newFinalizers = append(newFinalizers, f)
}

patch, err := json.Marshal(&objectForFinalizersPatch{
objectMetaForFinalizersPatch: objectMetaForFinalizersPatch{
ResourceVersion: obj.GetResourceVersion(),
Finalizers: newFinalizers,
},
})
if err != nil {
return nil, fmt.Errorf("can't marshal finalizer remove patch: %w", err)
}

return patch, nil
}

func AddFinalizerPatch(obj metav1.Object, finalizer string) ([]byte, error) {
if HasFinalizer(obj, finalizer) {
return nil, nil
}
newFinalizers := make([]string, 0, len(obj.GetFinalizers())+1)
for _, f := range obj.GetFinalizers() {
newFinalizers = append(newFinalizers, f)
}
newFinalizers = append(newFinalizers, finalizer)

patch, err := json.Marshal(&objectForFinalizersPatch{
objectMetaForFinalizersPatch: objectMetaForFinalizersPatch{
ResourceVersion: obj.GetResourceVersion(),
Finalizers: newFinalizers,
},
})
if err != nil {
return nil, fmt.Errorf("can't marshal finalizer add patch: %w", err)
}

return patch, nil
}

func HasFinalizer(obj metav1.Object, finalizer string) bool {
found := false
for _, f := range obj.GetFinalizers() {
if f == finalizer {
found = true
break
}
}
return found
}
Loading

0 comments on commit c422af5

Please sign in to comment.