Skip to content

Commit 17d571e

Browse files
committed
ws solution B
1 parent a3572de commit 17d571e

File tree

7 files changed

+201
-6
lines changed

7 files changed

+201
-6
lines changed

api/v2/cassandracluster_types.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ var (
5555
ClusterPhaseRunning = ClusterStateInfo{2, "Running"}
5656
ClusterPhasePending = ClusterStateInfo{3, "Pending"}
5757

58+
//Indicates whether all racks has at least one node Ready
59+
ClusterFirstLayerInitial = ClusterStateInfo{1, "Initializing"}
60+
ClusterFirstLayerRunning = ClusterStateInfo{2, "Running"}
61+
ClusterFirstLayerSkipped = ClusterStateInfo{2, "Skipped"}
62+
5863
//Available actions
5964
ActionUpdateConfigMap = ClusterStateInfo{1, "UpdateConfigMap"}
6065
ActionUpdateDockerImage = ClusterStateInfo{2, "UpdateDockerImage"}
@@ -179,6 +184,10 @@ func (cc *CassandraCluster) SetDefaults() bool {
179184
}
180185
changed = true
181186
}
187+
if len(cc.Status.FirstLayerPhase) == 0 {
188+
cc.Status.FirstLayerPhase = ClusterFirstLayerInitial.Name
189+
changed = true
190+
}
182191
if ccs.MaxPodUnavailable == 0 {
183192
ccs.MaxPodUnavailable = defaultMaxPodUnavailable
184193
changed = true
@@ -306,7 +315,8 @@ func (cc *CassandraCluster) initTopology(dcName string, rackName string) {
306315
func (cc *CassandraCluster) InitCassandraRackStatus(status *CassandraClusterStatus, dcName string, rackName string) {
307316
dcRackName := cc.GetDCRackName(dcName, rackName)
308317
rackStatus := CassandraRackStatus{
309-
Phase: ClusterPhaseInitial.Name,
318+
Phase: ClusterPhaseInitial.Name,
319+
FirstLayerPhase: ClusterFirstLayerInitial.Name,
310320
CassandraLastAction: CassandraLastAction{
311321
Name: ClusterPhaseInitial.Name,
312322
Status: StatusOngoing,
@@ -655,6 +665,10 @@ func (rack *RackSlice) Remove(idx int) {
655665
*rack = append((*rack)[:idx], (*rack)[idx+1:]...)
656666
}
657667

668+
func (in *CassandraClusterStatus) IsFirstLayerDuringInitialization() bool {
669+
return in.FirstLayerPhase == ClusterFirstLayerInitial.Name
670+
}
671+
658672
// CassandraClusterSpec defines the configuration of CassandraCluster
659673

660674
type CassandraClusterSpec struct {
@@ -938,6 +952,12 @@ type CassandraRackStatus struct {
938952
// Initial -> Running <-> updating
939953
Phase string `json:"phase,omitempty"`
940954

955+
// FirstLayerPhase indicates whether the rack has at least one node Ready so further initial scale-out might be allowed
956+
// Needed to correctly handle `allocate_tokens_for_local_replication_factor` introduced in Cassandra 4.0
957+
// FirstLayerPhase goes as one way as below:
958+
// Initial -> Running
959+
FirstLayerPhase string `json:"firstLayerPhase,omitempty"`
960+
941961
// CassandraLastAction is the set of Cassandra State & Actions: Active, Standby..
942962
CassandraLastAction CassandraLastAction `json:"cassandraLastAction,omitempty"`
943963

@@ -952,6 +972,12 @@ type CassandraClusterStatus struct {
952972
// Initial -> Running <-> updating
953973
Phase string `json:"phase,omitempty"`
954974

975+
// FirstLayerPhase indicates whether all racks has at least one node Ready so further initial scale-out might be allowed
976+
// Needed to correctly handle `allocate_tokens_for_local_replication_factor` introduced in Cassandra 4.0
977+
// FirstLayerPhase goes as one way as below:
978+
// Initial -> Running
979+
FirstLayerPhase string `json:"firstLayerPhase,omitempty"`
980+
955981
// Store last action at cluster level
956982
LastClusterAction string `json:"lastClusterAction,omitempty"`
957983
LastClusterActionStatus string `json:"lastClusterActionStatus,omitempty"`

charts/casskop/crds/db.orange.com_cassandraclusters.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2270,6 +2270,9 @@ spec:
22702270
type: array
22712271
items:
22722272
type: string
2273+
firstLayerPhase:
2274+
description: 'FirstLayerPhase indicates whether the rack has at least one node Ready so further initial scale-out might be allowed Needed to correctly handle `allocate_tokens_for_local_replication_factor` introduced in Cassandra 4.0 FirstLayerPhase goes as one way as below: Initial -> Running'
2275+
type: string
22732276
phase:
22742277
description: |-
22752278
Phase indicates the state this Cassandra cluster jumps in.
@@ -2313,6 +2316,9 @@ spec:
23132316
type: string
23142317
lastClusterActionStatus:
23152318
type: string
2319+
firstLayerPhase:
2320+
description: 'FirstLayerPhase indicates whether all racks has at least one node Ready so further initial scale-out might be allowed Needed to correctly handle `allocate_tokens_for_local_replication_factor` introduced in Cassandra 4.0 FirstLayerPhase goes as one way as below: Initial -> Running'
2321+
type: string
23162322
phase:
23172323
description: |-
23182324
Phase indicates the state this Cassandra cluster jumps in.

config/crd/bases/db.orange.com_cassandraclusters.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2270,6 +2270,9 @@ spec:
22702270
type: array
22712271
items:
22722272
type: string
2273+
firstLayerPhase:
2274+
description: 'FirstLayerPhase indicates whether the rack has at least one node Ready so further initial scale-out might be allowed Needed to correctly handle `allocate_tokens_for_local_replication_factor` introduced in Cassandra 4.0 FirstLayerPhase goes as one way as below: Initial -> Running'
2275+
type: string
22732276
phase:
22742277
description: |-
22752278
Phase indicates the state this Cassandra cluster jumps in.
@@ -2308,6 +2311,9 @@ spec:
23082311
format: date-time
23092312
status:
23102313
type: string
2314+
firstLayerPhase:
2315+
description: 'FirstLayerPhase indicates whether all racks has at least one node Ready so further initial scale-out might be allowed Needed to correctly handle `allocate_tokens_for_local_replication_factor` introduced in Cassandra 4.0 FirstLayerPhase goes as one way as below: Initial -> Running'
2316+
type: string
23112317
lastClusterAction:
23122318
description: Store last action at cluster level
23132319
type: string

controllers/cassandracluster/cassandra_status.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,7 @@ func (rcc *CassandraClusterReconciler) UpdateCassandraRackStatusPhase(ctx contex
459459
}
460460
if len(podsList.Items) < int(nodesPerRacks) {
461461
logrus.WithFields(logrusFields).Infof("StatefulSet is scaling up")
462+
return
462463
}
463464
pod := podsList.Items[nodesPerRacks-1]
464465
if cassandraPodIsReady(&pod) {
@@ -483,6 +484,39 @@ func (rcc *CassandraClusterReconciler) UpdateCassandraRackStatusPhase(ctx contex
483484
}
484485
}
485486

487+
// UpdateCassandraRackStatusFirstLayerPhase goal is to calculate the Cluster FirstLayerPhase according to StatefulSet Status.
488+
// The FirstLayerPhase is going one way: Initializing -> Running
489+
// The FirstLayerPhase is used to correctly setup needed replicas before scaling the cluster out
490+
func (rcc *CassandraClusterReconciler) UpdateCassandraRackStatusFirstLayerPhase(ctx context.Context, cc *api.CassandraCluster, dcName string,
491+
rackName string, storedStatefulSet *appsv1.StatefulSet, status *api.CassandraClusterStatus) {
492+
dcRackName := cc.GetDCRackName(dcName, rackName)
493+
494+
logrusFields := logrus.Fields{"cluster": cc.Name, "rack": dcRackName, "firstLayerPhase": status.CassandraRackStatus[dcRackName].FirstLayerPhase,
495+
"ReadyReplicas": storedStatefulSet.Status.ReadyReplicas, "RequestedReplicas": *storedStatefulSet.Spec.Replicas}
496+
497+
ClusterPhaseMetric.set(api.ClusterPhaseInitial, cc.Name)
498+
499+
if isStatefulSetNotReady(storedStatefulSet) {
500+
logrus.WithFields(logrusFields).Infof("Initializing StatefulSet: Replicas count is not okay")
501+
return
502+
}
503+
//If yes, just check that lastPod is running
504+
podsList, err := rcc.ListPods(ctx, cc.Namespace, k8s.LabelsForCassandraDCRack(cc, dcName, rackName))
505+
if err != nil || len(podsList.Items) < 1 {
506+
return
507+
}
508+
nodesPerRacks := cc.GetNodesPerRacks(dcRackName)
509+
if len(podsList.Items) < int(nodesPerRacks) {
510+
logrus.WithFields(logrusFields).Infof("StatefulSet is scaling up")
511+
return
512+
}
513+
pod := podsList.Items[len(podsList.Items)-1]
514+
if cassandraPodIsReady(&pod) {
515+
status.CassandraRackStatus[dcRackName].FirstLayerPhase = api.ClusterFirstLayerRunning.Name
516+
logrus.WithFields(logrusFields).Infof("StatefulSet: Replicas count is okay")
517+
}
518+
}
519+
486520
func setDecommissionStatus(status *api.CassandraClusterStatus, dcRackName string) {
487521
status.CassandraRackStatus[dcRackName].Phase = api.ClusterPhasePending.Name
488522
now := metav1.Now()

controllers/cassandracluster/cassandracluster_controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@ func (rcc *CassandraClusterReconciler) Reconcile(ctx context.Context, request re
128128
logrus.WithFields(logrus.Fields{"cluster": cc.Name}).Errorf("CheckPodsState Error: %v", err)
129129
}
130130

131+
if shouldBreak, err := rcc.ReconcileFirstLayer(ctx, cc, status); err != nil {
132+
return requeue5, err
133+
} else if shouldBreak == breakResyncLoop {
134+
return requeue5, nil
135+
}
136+
131137
//ReconcileRack will also add and initiate new racks, we must not go through racks before this method
132138
if err = rcc.ReconcileRack(ctx, cc, status); err != nil {
133139
return requeue5, err

controllers/cassandracluster/deploy_cassandra.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ package cassandracluster
1717
import (
1818
"context"
1919
"fmt"
20+
2021
api "github.com/cscetbon/casskop/api/v2"
22+
appsv1 "k8s.io/api/apps/v1"
2123
policyv1 "k8s.io/api/policy/v1"
2224

2325
"github.com/cscetbon/casskop/pkg/k8s"
@@ -86,10 +88,11 @@ func (rcc *CassandraClusterReconciler) podDisruptionBudgetEnvelope(cc *api.Cassa
8688
}
8789

8890
// ensureCassandraStatefulSet generate and apply the statefulset
89-
// take dcRackName to accordingly named the statefulset
90-
// take dc and rack index of dc and rack in conf to retrieve according nodeselectors labels
91+
// take dcRackName to accordingly name the statefulset
92+
// take dc and rack index of dc and rack in conf to retrieve according nodeselectors labels
9193
func (rcc *CassandraClusterReconciler) ensureCassandraStatefulSet(ctx context.Context, cc *api.CassandraCluster,
92-
status *api.CassandraClusterStatus, dcName string, dcRackName string, dc int, rack int) (bool, error) {
94+
status *api.CassandraClusterStatus, dcName string, dcRackName string, dc int, rack int,
95+
cassandraStatefulSetOptions ...cassandraStatefulSetOption) (bool, error) {
9396

9497
labels, nodeSelector := k8s.DCRackLabelsAndNodeSelectorForStatefulSet(cc, dc, rack)
9598

@@ -99,10 +102,16 @@ func (rcc *CassandraClusterReconciler) ensureCassandraStatefulSet(ctx context.Co
99102
}
100103
k8s.AddOwnerRefToObject(ss, k8s.AsOwner(cc))
101104

105+
for _, option := range cassandraStatefulSetOptions {
106+
ss.Spec = option(ss.Spec)
107+
}
108+
102109
breakResyncloop, err := rcc.CreateOrUpdateStatefulSet(ctx, ss, status, dcRackName)
103110
if err != nil && !apierrors.IsAlreadyExists(err) {
104111
return breakResyncloop, fmt.Errorf("failed to create cassandra statefulset: %v", err)
105112
}
106113

107114
return breakResyncloop, nil
108115
}
116+
117+
type cassandraStatefulSetOption func(appsv1.StatefulSetSpec) appsv1.StatefulSetSpec

controllers/cassandracluster/reconcile.go

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ import (
2828
"github.com/prometheus/client_golang/prometheus"
2929
"github.com/r3labs/diff"
3030
"github.com/sirupsen/logrus"
31+
appsv1 "k8s.io/api/apps/v1"
3132
v1 "k8s.io/api/core/v1"
3233
apierrors "k8s.io/apimachinery/pkg/api/errors"
3334
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35+
"k8s.io/utils/ptr"
3436
)
3537

3638
type gaugeVec struct {
@@ -566,7 +568,8 @@ func (rcc *CassandraClusterReconciler) initiateRackStatusIfNeeded(status *api.Ca
566568
}
567569

568570
func (rcc *CassandraClusterReconciler) ensureCassandraObjectsDeployed(ctx context.Context,
569-
cc *api.CassandraCluster, status *api.CassandraClusterStatus, dc int, rack int) bool {
571+
cc *api.CassandraCluster, status *api.CassandraClusterStatus, dc int, rack int,
572+
cassandraStatefulSetOptions ...cassandraStatefulSetOption) bool {
570573

571574
dcName := cc.GetDCName(dc)
572575
rackName := cc.GetRackName(dc, rack)
@@ -581,14 +584,119 @@ func (rcc *CassandraClusterReconciler) ensureCassandraObjectsDeployed(ctx contex
581584
"dc-rack": dcRackName}).Errorf("ensureCassandraServiceMonitoring Error: %v", err)
582585
}
583586

584-
breakLoop, err := rcc.ensureCassandraStatefulSet(ctx, cc, status, dcName, dcRackName, dc, rack)
587+
breakLoop, err := rcc.ensureCassandraStatefulSet(ctx, cc, status, dcName, dcRackName, dc, rack, cassandraStatefulSetOptions...)
585588
if err != nil {
586589
logrus.WithFields(logrus.Fields{"cluster": cc.Name,
587590
"dc-rack": dcRackName}).Errorf("ensureCassandraStatefulSet Error: %v", err)
588591
}
589592
return breakLoop
590593
}
591594

595+
func (rcc *CassandraClusterReconciler) ReconcileFirstLayer(ctx context.Context, cc *api.CassandraCluster,
596+
status *api.CassandraClusterStatus) (bool, error) {
597+
598+
featureDisabled := strings.ToLower(cc.Annotations["cassandraclusters.db.orange.com/disable-first-layer-init-flow"]) == "true"
599+
if featureDisabled {
600+
status.FirstLayerPhase = api.ClusterFirstLayerSkipped.Name
601+
return continueResyncLoop, nil
602+
}
603+
604+
if status.IsFirstLayerDuringInitialization() {
605+
return breakResyncLoop, rcc.ReconcileRackFirstLayer(ctx, cc, status)
606+
}
607+
return continueResyncLoop, nil
608+
}
609+
610+
// ReconcileRackFirstLayer will try to create one node for each of the couple DC/Rack defined in the topology
611+
func (rcc *CassandraClusterReconciler) ReconcileRackFirstLayer(ctx context.Context, cc *api.CassandraCluster,
612+
status *api.CassandraClusterStatus) (err error) {
613+
614+
newStatus := false
615+
for dc := 0; dc < cc.GetDCSize(); dc++ {
616+
dcName := cc.GetDCName(dc)
617+
for rack := 0; rack < cc.GetRackSize(dc); rack++ {
618+
619+
rackName := cc.GetRackName(dc, rack)
620+
dcRackName := cc.GetDCRackName(dcName, rackName)
621+
if dcRackName == "" {
622+
return fmt.Errorf("name used for DC and/or Rack are not good")
623+
}
624+
625+
if rcc.initiateRackStatusIfNeeded(status, dcRackName, cc, dcName, rackName) {
626+
newStatus = true
627+
continue
628+
}
629+
dcRackStatus := status.CassandraRackStatus[dcRackName]
630+
631+
if cc.DeletionTimestamp != nil && cc.Spec.DeletePVC {
632+
rcc.DeletePVCs(ctx, cc, dcName, rackName)
633+
//Go to next rack
634+
continue
635+
}
636+
637+
Name := cc.Name + "-" + dcRackName
638+
storedStatefulSet, err := rcc.GetStatefulSet(ctx, cc.Namespace, Name)
639+
if err != nil {
640+
logrus.WithFields(logrus.Fields{"cluster": cc.Name,
641+
"dc-rack": dcRackName}).Infof("failed to get cassandra's statefulset (%s) %v", Name, err)
642+
} else {
643+
rcc.UpdateCassandraRackStatusFirstLayerPhase(ctx, cc, dcName, rackName, storedStatefulSet, status)
644+
}
645+
646+
_ = rcc.ensureCassandraObjectsDeployed(ctx, cc, status, dc, rack, overrideReplicasCountForFirstLayer)
647+
648+
// in first layer (pre-initial) phase move to next rack as soon as current rack has 1 ready replica
649+
if dcRackStatus.FirstLayerPhase != api.ClusterFirstLayerRunning.Name {
650+
logrus.WithFields(logrus.Fields{"cluster": cc.Name,
651+
"dc-rack": dcRackName}).Infof("Waiting Rack to be running before continuing, " +
652+
"we break ReconcileRackFirstLayer after updated statefulset")
653+
return nil
654+
}
655+
}
656+
657+
}
658+
659+
newStatus = newStatus || rcc.updateClusterFirstLayerStatus(cc, status)
660+
661+
if newStatus {
662+
return nil
663+
}
664+
665+
//If cluster is deleted and DeletePVC is set, we can now stop preventing the cluster from being deleted
666+
//cause PVCs have been deleted
667+
if cc.DeletionTimestamp != nil && cc.Spec.DeletePVC {
668+
preventClusterDeletion(cc, false)
669+
return rcc.Client.Update(ctx, cc)
670+
}
671+
672+
return nil
673+
}
674+
675+
func (rcc *CassandraClusterReconciler) updateClusterFirstLayerStatus(cc *api.CassandraCluster, status *api.CassandraClusterStatus) bool {
676+
for dc := 0; dc < cc.GetDCSize(); dc++ {
677+
dcName := cc.GetDCName(dc)
678+
for rack := 0; rack < cc.GetRackSize(dc); rack++ {
679+
rackName := cc.GetRackName(dc, rack)
680+
dcRackName := cc.GetDCRackName(dcName, rackName)
681+
dcRackStatus := status.CassandraRackStatus[dcRackName]
682+
if dcRackStatus.FirstLayerPhase != api.ClusterFirstLayerRunning.Name {
683+
return false
684+
}
685+
}
686+
}
687+
688+
// assuming first layer on all racks is ready
689+
oldStatus := status.FirstLayerPhase
690+
logrus.WithFields(logrus.Fields{"cluster": cc.Name}).Info("FirstLayer is now Ready on all racks")
691+
status.FirstLayerPhase = api.ClusterFirstLayerRunning.Name
692+
return oldStatus != status.FirstLayerPhase
693+
}
694+
695+
func overrideReplicasCountForFirstLayer(stsSpec appsv1.StatefulSetSpec) appsv1.StatefulSetSpec {
696+
stsSpec.Replicas = ptr.To(int32(1))
697+
return stsSpec
698+
}
699+
592700
func (rcc *CassandraClusterReconciler) waitForStatefulSetToBeUpdated(ctx context.Context, cc *api.CassandraCluster, dcRackName string,
593701
err error) {
594702
logrus.WithFields(logrus.Fields{

0 commit comments

Comments
 (0)