Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 114 additions & 11 deletions api/v2/cassandracluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
)

const (
Expand Down Expand Up @@ -49,12 +50,22 @@
Name string
}

type InitializingSubPhase string

func (p InitializingSubPhase) IsEmpty() bool {
return len(p) == 0
}

var (
//Cluster phases
ClusterPhaseInitial = ClusterStateInfo{1, "Initializing"}
ClusterPhaseRunning = ClusterStateInfo{2, "Running"}
ClusterPhasePending = ClusterStateInfo{3, "Pending"}

//Cluster Initializing sub-phases
ClusterPhaseInitialSubPhaseFirstPodPerRack InitializingSubPhase = "FirstPodPerRack"
ClusterPhaseInitialSubPhaseNextPodPerRack InitializingSubPhase = "NextPodPerRack"

//Available actions
ActionUpdateConfigMap = ClusterStateInfo{1, "UpdateConfigMap"}
ActionUpdateDockerImage = ClusterStateInfo{2, "UpdateDockerImage"}
Expand Down Expand Up @@ -169,16 +180,23 @@
ccs.NodesPerRacks = 1
changed = true
}
if len(cc.Status.Phase) == 0 {
cc.Status.Phase = ClusterPhaseInitial.Name
if cc.Status.CassandraPhase.IsEmpty() {
cc.Status.CassandraPhase = CassandraPhase{
Phase: ClusterPhaseInitial.Name,
InitializingSubPhase: ptr.To(ClusterPhaseInitialSubPhaseFirstPodPerRack),
}
if cc.InitCassandraRackList() < 1 {
logrus.Errorf("[%s]: We should have at list One Rack, Please correct the Error", cc.Name)

Check failure on line 189 in api/v2/cassandracluster_types.go

View workflow job for this annotation

GitHub Actions / lint

cc.Name undefined (type *CassandraCluster has no field or method Name) (typecheck)
}
if cc.Status.SeedList == nil {
cc.Status.SeedList = cc.InitSeedList()
}
changed = true
}
if cc.Status.CassandraPhase.IsInitialButNoSubPhase() {
cc.Status.CassandraPhase.InitializingSubPhase = ptr.To(ClusterPhaseInitialSubPhaseFirstPodPerRack)
changed = true
}
if ccs.MaxPodUnavailable == 0 {
ccs.MaxPodUnavailable = defaultMaxPodUnavailable
changed = true
Expand Down Expand Up @@ -306,7 +324,10 @@
func (cc *CassandraCluster) InitCassandraRackStatus(status *CassandraClusterStatus, dcName string, rackName string) {
dcRackName := cc.GetDCRackName(dcName, rackName)
rackStatus := CassandraRackStatus{
Phase: ClusterPhaseInitial.Name,
CassandraPhase: CassandraPhase{
Phase: ClusterPhaseInitial.Name,
InitializingSubPhase: ptr.To(ClusterPhaseInitialSubPhaseFirstPodPerRack),
},
CassandraLastAction: CassandraLastAction{
Name: ClusterPhaseInitial.Name,
Status: StatusOngoing,
Expand Down Expand Up @@ -931,26 +952,85 @@
VolumeMounts []v1.VolumeMount `json:"volumeMount,omitempty"`
}

// CassandraRackStatus defines states of Cassandra for 1 rack (1 statefulset)
type CassandraRackStatus struct {
// Phase indicates the state this Cassandra cluster jumps in.
// Phase goes as one way as below:
type CassandraPhase struct {
// phase indicates the state this Cassandra cluster jumps in.
// phase goes as one way as below:
// Initial -> Running <-> updating
Phase string `json:"phase,omitempty"`

// initializingSubPhase adds detail to Initial phase
// initializingSubPhase goes one way as below:
// FirstPodPerRack -> NextPodPerRack
InitializingSubPhase *InitializingSubPhase `json:"initializingSubPhase,omitempty"`
}

func (p CassandraPhase) String() string {
if p.InitializingSubPhase != nil {
return fmt.Sprintf("%s (%s)", p.Phase, *p.InitializingSubPhase)
}
return p.Phase
}

func (p *CassandraPhase) IsEmpty() bool {
return p.Phase == ""
}

func (p *CassandraPhase) SubPhaseMigrationNeeded() bool {
// currently only one condition triggers subphase migration
return p.IsInitialButNoSubPhase()
}

func (p *CassandraPhase) IsInitialButNoSubPhase() bool {
return p.IsInInitialPhase() && (p.InitializingSubPhase == nil || p.InitializingSubPhase.IsEmpty())
}

func (p *CassandraPhase) IsInInitialPhase() bool {
return p.Phase == ClusterPhaseInitial.Name
}

func (p *CassandraPhase) IsInFirstPodPerRackInitPhase() bool {
return p.IsInInitialPhase() && p.InitializingSubPhase != nil && *p.InitializingSubPhase == ClusterPhaseInitialSubPhaseFirstPodPerRack
}

func (p *CassandraPhase) IsInRunningPhase() bool {
return p.Phase == ClusterPhaseRunning.Name
}

// CassandraRackStatus defines states of Cassandra for 1 rack (1 statefulset)
type CassandraRackStatus struct {
CassandraPhase `json:",inline"`

// CassandraLastAction is the set of Cassandra State & Actions: Active, Standby..
CassandraLastAction CassandraLastAction `json:"cassandraLastAction,omitempty"`

// PodLastOperation manage status for Pod Operation (nodetool cleanup, upgradesstables..)
PodLastOperation PodLastOperation `json:"podLastOperation,omitempty"`
}

func (in *CassandraRackStatus) SetNextPodPerRackInitPhase() {
in.CassandraPhase = CassandraPhase{
Phase: ClusterPhaseInitial.Name,
InitializingSubPhase: ptr.To(ClusterPhaseInitialSubPhaseNextPodPerRack),
}
}

func (in *CassandraRackStatus) SetPendingPhase() {
in.CassandraPhase = CassandraPhase{
Phase: ClusterPhasePending.Name,
InitializingSubPhase: nil,
}
}

func (in *CassandraRackStatus) SetRunningPhase() {
in.CassandraPhase = CassandraPhase{
Phase: ClusterPhaseRunning.Name,
InitializingSubPhase: nil,
}
}

// CassandraClusterStatus defines Global state of CassandraCluster
type CassandraClusterStatus struct {
// Phase indicates the state this Cassandra cluster jumps in.
// Phase goes as one way as below:
// Initial -> Running <-> updating
Phase string `json:"phase,omitempty"`
CassandraPhase `json:",inline"`

// Store last action at cluster level
LastClusterAction string `json:"lastClusterAction,omitempty"`
Expand All @@ -969,6 +1049,29 @@
CassandraRackStatus map[string]*CassandraRackStatus `json:"cassandraRackStatus,omitempty"`
}

func (in *CassandraClusterStatus) SetClusterPhaseFromRackPhase(rackStatus *CassandraRackStatus) {
in.CassandraPhase = *rackStatus.CassandraPhase.DeepCopy()
}

func (in *CassandraClusterStatus) SetNextPodPerRackInitPhase() {
in.CassandraPhase = CassandraPhase{
Phase: ClusterPhaseInitial.Name,
InitializingSubPhase: ptr.To(ClusterPhaseInitialSubPhaseNextPodPerRack),
}
}

func (in *CassandraClusterStatus) SetRunningPhase() {
in.CassandraPhase = CassandraPhase{
Phase: ClusterPhaseRunning.Name,
InitializingSubPhase: nil,
}
}

func (in *CassandraClusterStatus) HasRackPhaseChanged(dcRackName string, cassandraPhase CassandraPhase) bool {
rackStatus, ok := in.CassandraRackStatus[dcRackName]
return !ok || rackStatus.CassandraPhase != cassandraPhase
}

// CassandraLastAction defines status of the CassandraStatefulset
type CassandraLastAction struct {
// Action is the specific actions that can be done on a Cassandra Cluster
Expand Down
46 changes: 43 additions & 3 deletions api/v2/cassandracluster_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/utils/ptr"

"github.com/ghodss/yaml"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -205,15 +206,25 @@ func TestInitCassandraRackinStatus(t *testing.T) {

cc.InitCassandraRackList()

initialCassandraPhase := CassandraPhase{
Phase: ClusterPhaseInitial.Name,
InitializingSubPhase: ptr.To(ClusterPhaseInitialSubPhaseFirstPodPerRack),
}

assert.Equal(ClusterPhaseInitial.Name, cc.Status.CassandraRackStatus["online-rack1"].CassandraLastAction.Name)
assert.Equal(ClusterPhaseInitial.Name, cc.Status.CassandraRackStatus["online-rack2"].CassandraLastAction.Name)
assert.Equal(ClusterPhaseInitial.Name, cc.Status.CassandraRackStatus["stats-rack1"].CassandraLastAction.Name)
assert.Equal(ClusterPhaseInitial.Name, cc.Status.CassandraRackStatus["stats-rack2"].CassandraLastAction.Name)
assert.Equal(initialCassandraPhase, cc.Status.CassandraRackStatus["online-rack1"].CassandraPhase)
assert.Equal(initialCassandraPhase, cc.Status.CassandraRackStatus["online-rack2"].CassandraPhase)
assert.Equal(initialCassandraPhase, cc.Status.CassandraRackStatus["stats-rack1"].CassandraPhase)
assert.Equal(initialCassandraPhase, cc.Status.CassandraRackStatus["stats-rack2"].CassandraPhase)
assert.Equal(4, len(cc.Status.CassandraRackStatus))
//Add new DC from existing RackStatus
cc.InitCassandraRackStatus(&cc.Status, "foo", "bar")

assert.Equal(ClusterPhaseInitial.Name, cc.Status.CassandraRackStatus["foo-bar"].CassandraLastAction.Name)
assert.Equal(initialCassandraPhase, cc.Status.CassandraRackStatus["foo-bar"].CassandraPhase)
assert.Equal(5, len(cc.Status.CassandraRackStatus))
}

Expand Down Expand Up @@ -250,7 +261,7 @@ func TestGetStatusDCRackSize(t *testing.T) {
assert.Equal(int(2), nb)
}

//Test that a reinit keep history of changes in the status
// Test that a reinit keep history of changes in the status
func TestGetStatusDCRackSize_KeepChanges(t *testing.T) {
assert := assert.New(t)

Expand Down Expand Up @@ -424,7 +435,7 @@ func TestIsPodInSeedList(t *testing.T) {

}

//Test that a reinit keep history of changes in the status
// Test that a reinit keep history of changes in the status
func TestComputeLastAppliedConfiguration(t *testing.T) {
assert := assert.New(t)

Expand Down Expand Up @@ -472,7 +483,36 @@ func TestSetDefaults(t *testing.T) {
assert.Equal(resource.MustParse("500m"), *cluster.Spec.Resources.Limits.Cpu())
assert.Equal(resource.MustParse("1Gi"), *cluster.Spec.Resources.Limits.Memory())

assert.Equal(ClusterPhaseInitial.Name, cluster.Status.Phase)
initialCassandraPhase := CassandraPhase{
Phase: ClusterPhaseInitial.Name,
InitializingSubPhase: ptr.To(ClusterPhaseInitialSubPhaseFirstPodPerRack),
}
assert.Equal(initialCassandraPhase, cluster.Status.CassandraPhase)
assert.Equal(int32(defaultMaxPodUnavailable), cluster.Spec.MaxPodUnavailable)
assert.Equal([]string{"defaults-test-dc1-rack1-0.defaults-test.default"}, cluster.Status.SeedList)
}

func TestAddSubPhase(t *testing.T) {
assert := assert.New(t)

cluster := CassandraCluster{
Status: CassandraClusterStatus{
CassandraPhase: CassandraPhase{
Phase: ClusterPhaseInitial.Name,
InitializingSubPhase: nil,
},
SeedList: nil,
CassandraNodesStatus: nil,
CassandraRackStatus: nil,
},
}

assert.True(cluster.SetDefaults())
cluster.CheckDefaults()

initialCassandraPhase := CassandraPhase{
Phase: ClusterPhaseInitial.Name,
InitializingSubPhase: ptr.To(ClusterPhaseInitialSubPhaseFirstPodPerRack),
}
assert.Equal(initialCassandraPhase, cluster.Status.CassandraPhase)
}
22 changes: 22 additions & 0 deletions api/v2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 16 additions & 4 deletions charts/casskop/crds/db.orange.com_cassandraclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2270,10 +2270,16 @@ spec:
type: array
items:
type: string
initializingSubPhase:
description: |-
initializingSubPhase adds detail to Initial phase
initializingSubPhase goes one way as below:
FirstPodPerRack -> NextPodPerRack
type: string
phase:
description: |-
Phase indicates the state this Cassandra cluster jumps in.
Phase goes as one way as below:
phase indicates the state this Cassandra cluster jumps in.
phase goes as one way as below:
Initial -> Running <-> updating
type: string
podLastOperation:
Expand Down Expand Up @@ -2308,15 +2314,21 @@ spec:
format: date-time
status:
type: string
initializingSubPhase:
description: |-
initializingSubPhase adds detail to Initial phase
initializingSubPhase goes one way as below:
FirstPodPerRack -> NextPodPerRack
type: string
lastClusterAction:
description: Store last action at cluster level
type: string
lastClusterActionStatus:
type: string
phase:
description: |-
Phase indicates the state this Cassandra cluster jumps in.
Phase goes as one way as below:
phase indicates the state this Cassandra cluster jumps in.
phase goes as one way as below:
Initial -> Running <-> updating
type: string
seedlist:
Expand Down
Loading
Loading