Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions controllers/cassandracluster/cassandra_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func needToWaitDelayBeforeCheck(cc *api.CassandraCluster, dcRackName string, sto
t := *lastAction.StartTime
now := metav1.Now()

if t.Add(api.DefaultDelayWait * time.Second).After(now.Time) {
if t.Add(delayWait()).After(now.Time) {
logrus.WithFields(logrus.Fields{"cluster": cc.Name,
"rack": dcRackName}).Info(
fmt.Sprintf("The Operator Waits %s seconds for the action to start correctly",
Expand All @@ -179,6 +179,13 @@ func needToWaitDelayBeforeCheck(cc *api.CassandraCluster, dcRackName string, sto
return false
}

// visible for tests
var delayWait = defaultDelayWait

func defaultDelayWait() time.Duration {
return api.DefaultDelayWait * time.Second
}

// UpdateStatusIfconfigMapHasChanged updates CassandraCluster Action Status if it detect a changes :
// - a new configmapName in the CRD
// - or the add or remoove of the configmap in the CRD
Expand Down Expand Up @@ -365,14 +372,28 @@ func (rcc *CassandraClusterReconciler) UpdateStatusIfActionEnded(ctx context.Con
//Does the Scaling ended ?
if nodesPerRacks == storedStatefulSet.Status.Replicas {

podsList, err := rcc.ListPods(ctx, cc.Namespace, k8s.LabelsForCassandraDCRack(cc, dcName, rackName))
podsList, err := rcc.ListPodsOrderByNameAscending(ctx, cc.Namespace, k8s.LabelsForCassandraDCRack(cc, dcName, rackName))
nb := len(podsList.Items)
if err != nil || nb < 1 {
return false
}
if nb < int(nodesPerRacks) {
logrus.WithFields(logrus.Fields{"cluster": cc.Name, "rack": dcRackName}).Warn(fmt.Sprintf(
"Although statefulSet has %d replicas, only %d matching pods found", nodesPerRacks, nb))
return false
}
pod := podsList.Items[nodesPerRacks-1]

//We need lastPod to be running to consider ScaleUp ended
if cassandraPodIsReady(&pod) {
if hasJoiningNodes, err := rcc.hasJoiningNodes(ctx, cc); err != nil {
return false
} else if hasJoiningNodes {
logrus.WithFields(logrus.Fields{"cluster": cc.Name, "dc": dcName, "rack": rackName,
"err": err}).Info("Cluster has joining nodes, ScaleUp not yet completed")
return false
}

logrus.WithFields(logrus.Fields{"cluster": cc.Name, "rack": dcRackName}).Info("ScaleUp is Done")
rackLastAction.Status = api.StatusDone
rackLastAction.EndTime = &now
Expand Down Expand Up @@ -453,7 +474,7 @@ func (rcc *CassandraClusterReconciler) UpdateCassandraRackStatusPhase(ctx contex
return
}
//If yes, just check that lastPod is running
podsList, err := rcc.ListPods(ctx, cc.Namespace, k8s.LabelsForCassandraDCRack(cc, dcName, rackName))
podsList, err := rcc.ListPodsOrderByNameAscending(ctx, cc.Namespace, k8s.LabelsForCassandraDCRack(cc, dcName, rackName))
if err != nil || len(podsList.Items) < 1 {
return
}
Expand Down
104 changes: 80 additions & 24 deletions controllers/cassandracluster/cassandra_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package cassandracluster
import (
"context"
"fmt"
"time"

"github.com/cscetbon/casskop/controllers/common"
"github.com/jarcoal/httpmock"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -95,6 +97,8 @@ func HelperInitCluster(t *testing.T, name string) (*CassandraClusterReconciler,
var cc api.CassandraCluster
yaml.Unmarshal(common.HelperLoadBytes(t, name), &cc)

cc.UID = "123456789" //We need to set a UID so PatchMaker does not fail when comparing owner references

ccList := api.CassandraClusterList{}
//Create Fake client
//Objects to track in the Fake client
Expand Down Expand Up @@ -201,33 +205,11 @@ func helperCreateCassandraCluster(ctx context.Context, t *testing.T, cassandraCl
rcc.Client.Status().Update(ctx, sts)

//Create Statefulsets associated fake Pods
podTemplate := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "template",
Namespace: namespace,
Labels: map[string]string{
"cluster": cc.Labels["cluster"],
"dc-rack": dcRackName,
"cassandraclusters.db.orange.com.dc": dc.Name,
"cassandraclusters.db.orange.com.rack": rack.Name,
"app": "cassandracluster",
"cassandracluster": cc.Name,
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
Name: "cassandra",
Ready: true,
},
},
},
}
podTemplate := fakePodTemplate(cc, dc.Name, rack.Name)

for i := 0; i < int(sts.Status.Replicas); i++ {
pod := podTemplate.DeepCopy()
pod.Name = sts.Name + strconv.Itoa(i)
pod.Name = sts.Name + "-" + strconv.Itoa(i)
pod.Spec.Hostname = pod.Name
pod.Spec.Subdomain = cc.Name
if err = rcc.CreatePod(ctx, pod); err != nil {
Expand Down Expand Up @@ -263,7 +245,37 @@ func helperCreateCassandraCluster(ctx context.Context, t *testing.T, cassandraCl
return rcc, &req
}

func fakePodTemplate(cc *api.CassandraCluster, dcName, rackName string) v1.Pod {
dcRackName := cc.GetDCRackName(dcName, rackName)
return v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "template",
Namespace: namespace,
Labels: map[string]string{
"cluster": cc.Labels["cluster"],
"dc-rack": dcRackName,
"cassandraclusters.db.orange.com.dc": dcName,
"cassandraclusters.db.orange.com.rack": rackName,
"app": "cassandracluster",
"cassandracluster": cc.Name,
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
ContainerStatuses: []v1.ContainerStatus{
{
Name: "cassandra",
Ready: true,
},
},
},
}
}

func TestCassandraClusterReconciler(t *testing.T) {
// tests speed-up
httpmock.Activate()
defer httpmock.DeactivateAndReset()

// Mock request to simulate Reconcile() being called on an event for a
// watched resource .
Expand All @@ -284,6 +296,10 @@ func TestCassandraClusterReconciler(t *testing.T) {

// test that we detect an addition of a configmap
func TestUpdateStatusIfconfigMapHasChangedWithNoConfigMap(t *testing.T) {
// tests speed-up
httpmock.Activate()
defer httpmock.DeactivateAndReset()

// Mock request to simulate Reconcile() being called on an event for a
// watched resource .
rcc, req := helperCreateCassandraCluster(context.TODO(), t, "cassandracluster-2DC.yaml")
Expand Down Expand Up @@ -337,6 +353,10 @@ func TestUpdateStatusIfconfigMapHasChangedWithNoConfigMap(t *testing.T) {

// test that we detect a change in a configmap
func TestUpdateStatusIfconfigMapHasChangedWithConfigMap(t *testing.T) {
// tests speed-up
httpmock.Activate()
defer httpmock.DeactivateAndReset()

// Mock request to simulate Reconcile() being called on an event for a
// watched resource .
rcc, req := helperCreateCassandraCluster(context.TODO(), t, "cassandracluster-2DC-configmap.yaml")
Expand Down Expand Up @@ -408,6 +428,10 @@ func TestUpdateStatusIfconfigMapHasChangedWithConfigMap(t *testing.T) {

// test that we detect a change in a the docker image
func TestUpdateStatusIfDockerImageHasChanged(t *testing.T) {
// tests speed-up
httpmock.Activate()
defer httpmock.DeactivateAndReset()

// Mock request to simulate Reconcile() being called on an event for a
// watched resource .
rcc, req := helperCreateCassandraCluster(context.TODO(), t, "cassandracluster-2DC-configmap.yaml")
Expand Down Expand Up @@ -458,3 +482,35 @@ func TestUpdateStatusIfDockerImageHasChanged(t *testing.T) {
}

}

func assertRackStatusPhase(assert *assert.Assertions, rcc *CassandraClusterReconciler, dcRackName string, expectedPhase api.ClusterStateInfo) {
assert.Equal(expectedPhase.Name, rcc.cc.Status.CassandraRackStatus[dcRackName].Phase, dcRackName+" phase")
}

func assertClusterStatusPhase(assert *assert.Assertions, rcc *CassandraClusterReconciler, expectedPhase api.ClusterStateInfo) {
assert.Equal(expectedPhase.Name, rcc.cc.Status.Phase, "cluster phase")
}

func assertRackStatusLastAction(assert *assert.Assertions, rcc *CassandraClusterReconciler, dcRackName string, expectedActionType api.ClusterStateInfo, expectedActionStatus string) {
assert.Equal(expectedActionType.Name, rcc.cc.Status.CassandraRackStatus[dcRackName].CassandraLastAction.Name, "dc1-rack1 last action type")
assert.Equal(expectedActionStatus, rcc.cc.Status.CassandraRackStatus[dcRackName].CassandraLastAction.Status, "dc1-rack1 last action status")
}

func assertClusterStatusLastAction(assert *assert.Assertions, rcc *CassandraClusterReconciler, expectedActionType api.ClusterStateInfo, expectedActionStatus string) {
assert.Equal(expectedActionType.Name, rcc.cc.Status.LastClusterAction, "cluster last action type")
assert.Equal(expectedActionStatus, rcc.cc.Status.LastClusterActionStatus, "cluster last action status")
}

func overrideDelayWaitWithNoDelay() {
delayWait = func() time.Duration {
return 0
}
retryInterval = func() time.Duration {
return time.Millisecond
}
}

func restoreDefaultDelayWait() {
delayWait = defaultDelayWait
retryInterval = defaultRetryInterval
}
9 changes: 6 additions & 3 deletions controllers/cassandracluster/decommission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type podName struct {
}

func podHost(stfsName string, id int8, rcc *CassandraClusterReconciler) podName {
name := stfsName + strconv.Itoa(int(id))
name := stfsName + "-" + strconv.Itoa(int(id))
return podName{name, name + "." + rcc.cc.Name}
}

Expand All @@ -81,13 +81,16 @@ func deletePodNotDeletedByFakeClient(rcc *CassandraClusterReconciler, host podNa
}

func TestOneDecommission(t *testing.T) {
ctx := context.TODO()
rcc, req := createCassandraClusterWithNoDisruption(t, "cassandracluster-1DC.yaml")
overrideDelayWaitWithNoDelay()
defer restoreDefaultDelayWait()

httpmock.Activate()
defer httpmock.DeactivateAndReset()
assert := assert.New(t)

ctx := context.TODO()
rcc, req := createCassandraClusterWithNoDisruption(t, "cassandracluster-1DC.yaml")

assert.Equal(int32(3), rcc.cc.Spec.NodesPerRacks)

cassandraCluster := rcc.cc.DeepCopy()
Expand Down
25 changes: 18 additions & 7 deletions controllers/cassandracluster/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,7 @@ func GetLastOrFirstPodItem(podsList []v1.Pod, last bool) (*v1.Pod, error) {

items := podsList[:]

// Sort pod list using ending number in field ObjectMeta.Name
sort.Slice(items, func(i, j int) bool {
id1, _ := strconv.Atoi(reEndingNumber.FindString(items[i].ObjectMeta.Name))
id2, _ := strconv.Atoi(reEndingNumber.FindString(items[j].ObjectMeta.Name))
return id1 < id2
})
sortPodsList(items)

idx := 0
if last {
Expand All @@ -105,6 +100,15 @@ func GetLastOrFirstPodItem(podsList []v1.Pod, last bool) (*v1.Pod, error) {
return &pod, nil
}

// sortPodsList sorts pod list using ending number in field ObjectMeta.Name
func sortPodsList(items []v1.Pod) {
sort.Slice(items, func(i, j int) bool {
id1, _ := strconv.Atoi(reEndingNumber.FindString(items[i].ObjectMeta.Name))
id2, _ := strconv.Atoi(reEndingNumber.FindString(items[j].ObjectMeta.Name))
return id1 < id2
})
}

// GetFirstPod returns the first pod satisfying the selector and being in the namespace
func (rcc *CassandraClusterReconciler) GetFirstPod(ctx context.Context, namespace string, selector map[string]string) (*v1.Pod, error) {
podsList, err := rcc.ListPods(ctx, namespace, selector)
Expand Down Expand Up @@ -182,7 +186,6 @@ func (rcc *CassandraClusterReconciler) hasUnschedulablePod(ctx context.Context,
}

func (rcc *CassandraClusterReconciler) ListPods(ctx context.Context, namespace string, selector map[string]string) (*v1.PodList, error) {

clientOpt := &client.ListOptions{
Namespace: namespace,
LabelSelector: labels.SelectorFromSet(selector),
Expand All @@ -196,6 +199,14 @@ func (rcc *CassandraClusterReconciler) ListPods(ctx context.Context, namespace s
return pl, rcc.Client.List(ctx, pl, opt...)
}

func (rcc *CassandraClusterReconciler) ListPodsOrderByNameAscending(ctx context.Context, namespace string, selector map[string]string) (*v1.PodList, error) {
pods, err := rcc.ListPods(ctx, namespace, selector)
if pods != nil {
sortPodsList(pods.Items)
}
return pods, err
}

func (rcc *CassandraClusterReconciler) CreatePod(ctx context.Context, pod *v1.Pod) error {
err := rcc.Client.Create(ctx, pod)
if err != nil {
Expand Down
38 changes: 22 additions & 16 deletions controllers/cassandracluster/pod_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,9 @@ func (rcc *CassandraClusterReconciler) handlePodOperation(ctx context.Context, c
return breakResyncLoopSwitch, err
}

podsList, err := rcc.ListCassandraClusterPods(ctx, cc)
if err != nil {
return true, err
}
firstPod, err := GetLastOrFirstPodReady(podsList, false)
if err != nil {
return true, err
}

hostName := k8s.PodHostname(*firstPod)
jolokiaClient, _ := NewJolokiaClient(ctx, hostName, JolokiaPort, rcc, cc.Spec.ImageJolokiaSecret, cc.Namespace)

hasJoiningNodes, err := jolokiaClient.hasJoiningNodes()
hasJoiningNodes, err := rcc.hasJoiningNodes(ctx, cc)
if err != nil {
return true, err
return breakResyncLoop, err
}
if hasJoiningNodes {
logrus.WithFields(logrus.Fields{"cluster": cc.Name, "dc": dcName, "rack": rackName,
Expand All @@ -151,6 +139,24 @@ func (rcc *CassandraClusterReconciler) handlePodOperation(ctx context.Context, c
return breakResyncLoopSwitch, err
}

func (rcc *CassandraClusterReconciler) hasJoiningNodes(ctx context.Context, cc *api.CassandraCluster) (bool, error) {
podsList, err := rcc.ListCassandraClusterPods(ctx, cc)
if err != nil {
return false, err
}
firstPod, err := GetLastOrFirstPodReady(podsList, false)
if err != nil {
return false, err
}

hostName := k8s.PodHostname(*firstPod)
jolokiaClient, err := NewJolokiaClient(ctx, hostName, JolokiaPort, rcc, cc.Spec.ImageJolokiaSecret, cc.Namespace)
if err != nil {
return false, err
}
return jolokiaClient.hasJoiningNodes()
}

// addPodOperationLabels will add Pod Labels labels on all Pod in the Current dcRackName
func (rcc *CassandraClusterReconciler) addPodOperationLabels(ctx context.Context, cc *api.CassandraCluster, dcName string,
rackName string, labels map[string]string) {
Expand Down Expand Up @@ -675,7 +681,7 @@ func (rcc *CassandraClusterReconciler) finalizeOperation(ctx context.Context, er
}
logrus.WithFields(logrus.Fields{"cluster": cc.Name, "rack": dcRackName, "pod": pod.Name,
"status": status}).Debug("Can't get new version of Cassandra Cluster. Will try again")
time.Sleep(retryInterval)
time.Sleep(retryInterval())
}
}

Expand Down Expand Up @@ -830,7 +836,7 @@ func (rcc *CassandraClusterReconciler) runRemove(ctx context.Context, hostName s
}

func (rcc *CassandraClusterReconciler) waitUntilPvcIsDeleted(ctx context.Context, namespace, pvcName string) error {
err := wait.Poll(retryInterval, deletedPvcTimeout, func() (done bool, err error) {
err := wait.Poll(retryInterval(), deletedPvcTimeout, func() (done bool, err error) {
_, err = rcc.GetPVC(ctx, namespace, pvcName)
if err != nil && apierrors.IsNotFound(err) {
logrus.WithFields(logrus.Fields{"namespace": namespace,
Expand Down
Loading
Loading