diff --git a/pkg/controller/elasticsearch/driver/nodes.go b/pkg/controller/elasticsearch/driver/nodes.go index 65199fc48a8..545c6033484 100644 --- a/pkg/controller/elasticsearch/driver/nodes.go +++ b/pkg/controller/elasticsearch/driver/nodes.go @@ -125,6 +125,9 @@ func (d *defaultDriver) reconcileNodeSpecs( if reconcileState.HasPendingNewNodes() { results.WithReconciliationState(defaultRequeue.WithReason("Upscale in progress")) } + if reconcileState.HasPendingNonMasterSTSUpgrades() { + results.WithReconciliationState(defaultRequeue.WithReason("Non-master StatefulSets are still upgrading")) + } actualStatefulSets = upscaleResults.ActualStatefulSets // Once all the StatefulSets have been updated we can ensure that the former version of the transport certificates Secret is deleted. diff --git a/pkg/controller/elasticsearch/driver/upgrade.go b/pkg/controller/elasticsearch/driver/upgrade.go index 5790f61fd23..6141419a209 100644 --- a/pkg/controller/elasticsearch/driver/upgrade.go +++ b/pkg/controller/elasticsearch/driver/upgrade.go @@ -207,6 +207,12 @@ func isVersionUpgrade(es esv1.Elasticsearch) (bool, error) { if err != nil { return false, err } + + // If status version is empty, this is a new cluster, not an upgrade + if es.Status.Version == "" { + return false, nil + } + statusVersion, err := version.Parse(es.Status.Version) if err != nil { return false, err diff --git a/pkg/controller/elasticsearch/driver/upscale.go b/pkg/controller/elasticsearch/driver/upscale.go index 763f3a5ac6b..3bb66cf9295 100644 --- a/pkg/controller/elasticsearch/driver/upscale.go +++ b/pkg/controller/elasticsearch/driver/upscale.go @@ -7,14 +7,19 @@ package driver import ( "context" "fmt" + "slices" appsv1 "k8s.io/api/apps/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/utils/ptr" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/expectations" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/metadata" sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/reconcile" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/settings" @@ -22,6 +27,7 @@ import ( "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/version/zen1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/version/zen2" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" + ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log" ) type upscaleCtx struct { @@ -66,35 +72,112 @@ func HandleUpscaleAndSpecChanges( if err != nil { return results, fmt.Errorf("adjust resources: %w", err) } - // reconcile all resources - for _, res := range adjusted { - res := res - if err := settings.ReconcileConfig(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config, ctx.meta); err != nil { - return results, fmt.Errorf("reconcile config: %w", err) + + // Check if this is a version upgrade + isVersionUpgrade, err := isVersionUpgrade(ctx.es) + if err != nil { + return results, fmt.Errorf("while checking for version upgrade: %w", err) + } + + // If this is not a version upgrade, process all resources normally and return + if !isVersionUpgrade { + results, err = reconcileResources(ctx, actualStatefulSets, adjusted) + if err != nil { + return results, fmt.Errorf("while reconciling resources: %w", err) } - if _, err := common.ReconcileService(ctx.parentCtx, ctx.k8sClient, &res.HeadlessService, &ctx.es); err != nil { - return results, fmt.Errorf("reconcile service: %w", err) + return results, nil + } + + // Version upgrade: separate master and non-master StatefulSets + var masterResources, nonMasterResources []nodespec.Resources + for _, res := range adjusted { + if label.IsMasterNodeSet(res.StatefulSet) { + masterResources = append(masterResources, res) + } else { + nonMasterResources = append(nonMasterResources, res) } - if actualSset, exists := actualStatefulSets.GetByName(res.StatefulSet.Name); exists { - recreateSset, err := handleVolumeExpansion(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, actualSset, ctx.validateStorageClass) - if err != nil { - return results, fmt.Errorf("handle volume expansion: %w", err) - } - if recreateSset { - // The StatefulSet is scheduled for recreation: let's requeue before attempting any further spec change. - results.Requeue = true + } + + // The only adjustment we want to make to master statefulSets before ensuring that all non-master + // statefulSets have been reconciled is to potentially scale up the replicas + // which should happen 1 at a time as we adjust the replicas early. + if err = maybeUpscaleMasterResources(ctx, masterResources); err != nil { + return results, fmt.Errorf("while scaling up master resources: %w", err) + } + + // First, reconcile all non-master resources + results, err = reconcileResources(ctx, actualStatefulSets, nonMasterResources) + if err != nil { + return results, fmt.Errorf("while reconciling non-master resources: %w", err) + } + results.ActualStatefulSets = actualStatefulSets + + if results.Requeue { + return results, nil + } + + targetVersion, err := version.Parse(ctx.es.Spec.Version) + if err != nil { + return results, fmt.Errorf("while parsing Elasticsearch upgrade target version: %w", err) + } + + // Check if all non-master StatefulSets have completed their upgrades before proceeding with master StatefulSets + pendingNonMasterSTS, err := findPendingNonMasterStatefulSetUpgrades( + ctx.k8sClient, + actualStatefulSets, + expectedResources.StatefulSets(), + targetVersion, + ctx.expectations, + ) + if err != nil { + return results, fmt.Errorf("while checking non-master upgrade status: %w", err) + } + + ctx.upscaleReporter.RecordPendingNonMasterSTSUpgrades(pendingNonMasterSTS) + + if len(pendingNonMasterSTS) > 0 { + // Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily. + // This will cause a requeue in the caller, and master StatefulSets will attempt to be processed in the next reconciliation + return results, nil + } + + // All non-master StatefulSets are upgraded, now process master StatefulSets + results, err = reconcileResources(ctx, actualStatefulSets, masterResources) + if err != nil { + return results, fmt.Errorf("while reconciling master resources: %w", err) + } + + results.ActualStatefulSets = actualStatefulSets + return results, nil +} + +func maybeUpscaleMasterResources(ctx upscaleCtx, masterResources []nodespec.Resources) error { + // Upscale master StatefulSets using the adjusted resources and read the current StatefulSet + // from k8s to get the latest state. + for _, res := range masterResources { + stsName := res.StatefulSet.Name + + // Read the current StatefulSet from k8s to get the latest state + var actualSset appsv1.StatefulSet + if err := ctx.k8sClient.Get(ctx.parentCtx, k8s.ExtractNamespacedName(&res.StatefulSet), &actualSset); err != nil { + if apierrors.IsNotFound(err) { continue } + return fmt.Errorf("while getting master StatefulSet %s: %w", stsName, err) } - reconciled, err := es_sset.ReconcileStatefulSet(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, ctx.expectations) - if err != nil { - return results, fmt.Errorf("reconcile StatefulSet: %w", err) + + actualReplicas := sset.GetReplicas(actualSset) + targetReplicas := sset.GetReplicas(res.StatefulSet) + + if actualReplicas < targetReplicas { + actualSset.Spec.Replicas = ptr.To(targetReplicas) + if err := ctx.k8sClient.Update(ctx.parentCtx, &actualSset); err != nil { + return fmt.Errorf("while upscaling master sts replicas: %w", err) + } + ctx.expectations.ExpectGeneration(actualSset) } - // update actual with the reconciled ones for next steps to work with up-to-date information - actualStatefulSets = actualStatefulSets.WithStatefulSet(reconciled) } - results.ActualStatefulSets = actualStatefulSets - return results, nil + return nil } func podsToCreate( @@ -166,3 +249,122 @@ func adjustStatefulSetReplicas( return expected, nil } + +// reconcileResources handles the common StatefulSet reconciliation logic +// It returns: +// - the updated StatefulSets +// - whether a requeue is needed +// - any errors that occurred +func reconcileResources( + ctx upscaleCtx, + actualStatefulSets es_sset.StatefulSetList, + resources []nodespec.Resources, +) (UpscaleResults, error) { + results := UpscaleResults{ + ActualStatefulSets: actualStatefulSets, + } + ulog.FromContext(ctx.parentCtx).Info("Reconciling resources", "resource_size", len(resources)) + for _, res := range resources { + res := res + if err := settings.ReconcileConfig(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config, ctx.meta); err != nil { + return results, fmt.Errorf("reconcile config: %w", err) + } + if _, err := common.ReconcileService(ctx.parentCtx, ctx.k8sClient, &res.HeadlessService, &ctx.es); err != nil { + return results, fmt.Errorf("reconcile service: %w", err) + } + if actualSset, exists := actualStatefulSets.GetByName(res.StatefulSet.Name); exists { + recreateSset, err := handleVolumeExpansion(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, actualSset, ctx.validateStorageClass) + if err != nil { + return results, fmt.Errorf("handle volume expansion: %w", err) + } + if recreateSset { + ulog.FromContext(ctx.parentCtx).Info("StatefulSet is scheduled for recreation, requeuing", "name", res.StatefulSet.Name) + // The StatefulSet is scheduled for recreation: let's requeue before attempting any further spec change. + results.Requeue = true + continue + } + } else { + ulog.FromContext(ctx.parentCtx).Info("StatefulSet does not exist", "name", res.StatefulSet.Name) + } + ulog.FromContext(ctx.parentCtx).Info("Reconciling StatefulSet", "name", res.StatefulSet.Name) + reconciled, err := es_sset.ReconcileStatefulSet(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, ctx.expectations) + if err != nil { + return results, fmt.Errorf("reconcile StatefulSet: %w", err) + } + // update actual with the reconciled ones for next steps to work with up-to-date information + results.ActualStatefulSets = results.ActualStatefulSets.WithStatefulSet(reconciled) + } + ulog.FromContext(ctx.parentCtx).Info("Resources reconciled", "actualStatefulSets_size", len(results.ActualStatefulSets), "requeue", results.Requeue) + return results, nil +} + +// findPendingNonMasterStatefulSetUpgrades finds all non-master StatefulSets that have not completed their upgrades +func findPendingNonMasterStatefulSetUpgrades( + client k8s.Client, + actualStatefulSets es_sset.StatefulSetList, + expectedStatefulSets es_sset.StatefulSetList, + targetVersion version.Version, + expectations *expectations.Expectations, +) ([]appsv1.StatefulSet, error) { + pendingStatefulSets, err := expectations.ExpectedStatefulSetUpdates.PendingGenerations() + if err != nil { + return nil, err + } + + pendingNonMasterSTS := make([]appsv1.StatefulSet, 0) + for _, actualStatefulSet := range actualStatefulSets { + expectedSset, _ := expectedStatefulSets.GetByName(actualStatefulSet.Name) + + // Skip master StatefulSets. We check both here because the master role may have been added + // to a non-master StatefulSet during the upgrade spec change. + if label.IsMasterNodeSet(actualStatefulSet) || label.IsMasterNodeSet(expectedSset) { + continue + } + + // If the expectations show this as a pending StatefulSet, add it to the list. + if slices.Contains(pendingStatefulSets, actualStatefulSet.Name) { + pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) + continue + } + + // If the StatefulSet is not at the target version, it is not upgraded + // so don't even bother looking at the state/status of the StatefulSet. + actualVersion, err := es_sset.GetESVersion(actualStatefulSet) + if err != nil { + return pendingNonMasterSTS, err + } + if actualVersion.LT(targetVersion) { + pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) + continue + } + + if actualStatefulSet.Status.ObservedGeneration < actualStatefulSet.Generation { + // The StatefulSet controller has not yet observed the latest generation. + pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) + continue + } + + // Check if this StatefulSet has pending updates + if actualStatefulSet.Status.UpdatedReplicas != actualStatefulSet.Status.Replicas { + pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) + continue + } + + // Check if there are any pods that need to be upgraded + pods, err := es_sset.GetActualPodsForStatefulSet(client, k8s.ExtractNamespacedName(&actualStatefulSet)) + if err != nil { + return pendingNonMasterSTS, err + } + + for _, pod := range pods { + // Check if pod revision matches StatefulSet update revision + if actualStatefulSet.Status.UpdateRevision != "" && sset.PodRevision(pod) != actualStatefulSet.Status.UpdateRevision { + // This pod still needs to be upgraded + pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet) + break + } + } + } + + return pendingNonMasterSTS, nil +} diff --git a/pkg/controller/elasticsearch/driver/upscale_test.go b/pkg/controller/elasticsearch/driver/upscale_test.go index 7120fea7ea4..9322d6fdd56 100644 --- a/pkg/controller/elasticsearch/driver/upscale_test.go +++ b/pkg/controller/elasticsearch/driver/upscale_test.go @@ -6,6 +6,7 @@ package driver import ( "context" + "fmt" "reflect" "sort" "sync" @@ -197,7 +198,6 @@ func TestHandleUpscaleAndSpecChanges(t *testing.T) { comparison.RequireEqual(t, &res.ActualStatefulSets[1], &sset2) // expectations should have been set require.NotEmpty(t, ctx.expectations.GetGenerations()) - // apply a spec change actualStatefulSets = es_sset.StatefulSetList{sset1, sset2} expectedResources[1].StatefulSet.Spec.Template.Labels = map[string]string{"a": "b"} @@ -266,7 +266,8 @@ func TestHandleUpscaleAndSpecChanges_PVCResize(t *testing.T) { Spec: appsv1.StatefulSetSpec{ Replicas: ptr.To[int32](4), VolumeClaimTemplates: []corev1.PersistentVolumeClaim{ - {ObjectMeta: metav1.ObjectMeta{Name: "elasticsearch-data"}, + { + ObjectMeta: metav1.ObjectMeta{Name: "elasticsearch-data"}, Spec: corev1.PersistentVolumeClaimSpec{ Resources: corev1.VolumeResourceRequirements{ Requests: corev1.ResourceList{ @@ -569,3 +570,366 @@ func Test_adjustResources(t *testing.T) { }) } } + +func TestHandleUpscaleAndSpecChanges_VersionUpgradeDataFirstFlow(t *testing.T) { + // Test the complete upgrade flow: data nodes upgrade first, then master nodes + // starting at 8.16.2 and upgrading to 8.17.1 + es := esv1.Elasticsearch{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "es", + Annotations: map[string]string{ + "elasticsearch.k8s.elastic.co/initial-master-nodes": "node-1,node-2,node-3", + bootstrap.ClusterUUIDAnnotationName: "uuid", + }, + }, + Spec: esv1.ElasticsearchSpec{Version: "8.16.2"}, + Status: esv1.ElasticsearchStatus{Version: "8.16.2"}, + } + k8sClient := k8s.NewFakeClient(&es) + ctx := upscaleCtx{ + k8sClient: k8sClient, + es: es, + esState: nil, + expectations: expectations.NewExpectations(k8sClient), + parentCtx: context.Background(), + } + + // Create expected resources with both master and data StatefulSets at 8.16.2 + expectedResources := nodespec.ResourcesList{ + { + StatefulSet: appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "master-sset", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-master": "true", + "elasticsearch.k8s.elastic.co/version": "8.16.2", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: ptr.To[int32](3), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-master": "true", + "elasticsearch.k8s.elastic.co/version": "8.16.2", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "elasticsearch", + Image: "docker.elastic.co/elasticsearch/elasticsearch:8.16.2", + }, + }, + }, + }, + }, + }, + HeadlessService: corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "master-sset", + }, + }, + Config: settings.NewCanonicalConfig(), + }, + { + StatefulSet: appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "data-sset", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-data": "true", + "elasticsearch.k8s.elastic.co/version": "8.16.2", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: ptr.To[int32](2), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-data": "true", + "elasticsearch.k8s.elastic.co/version": "8.16.2", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "elasticsearch", + Image: "docker.elastic.co/elasticsearch/elasticsearch:8.16.2", + }, + }, + }, + }, + }, + }, + HeadlessService: corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "data-sset", + }, + }, + Config: settings.NewCanonicalConfig(), + }, + } + + // Set template hash labels + expectedResources[0].StatefulSet.Labels = hash.SetTemplateHashLabel(expectedResources[0].StatefulSet.Labels, expectedResources[0].StatefulSet.Spec) + expectedResources[1].StatefulSet.Labels = hash.SetTemplateHashLabel(expectedResources[1].StatefulSet.Labels, expectedResources[1].StatefulSet.Spec) + + // Call HandleUpscaleAndSpecChanges and check things are created properly + actualStatefulSets := es_sset.StatefulSetList{} + res, err := HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResources) + require.NoError(t, err) + require.Len(t, res.ActualStatefulSets, 2) + + // Verify both StatefulSets were created at 8.16.2 + var masterSset appsv1.StatefulSet + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) + require.NotNil(t, masterSset.Spec.Replicas) + // Master nodes/pods are limited to 1 creation at a time regardless of the replicas setting. + require.Equal(t, int32(1), *masterSset.Spec.Replicas) + require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.16.2", masterSset.Spec.Template.Spec.Containers[0].Image) + + // Set master StatefulSet status to show it's fully deployed at 8.16.2 + // Also update the replicas to 3 to simulate full rollout at 8.16.2 + masterSset.Spec.Replicas = ptr.To[int32](3) + masterSset.Status.Replicas = 3 + masterSset.Status.UpdatedReplicas = 3 + masterSset.Status.CurrentRevision = "master-sset-old" + masterSset.Status.UpdateRevision = "master-sset-old" + require.NoError(t, k8sClient.Update(context.Background(), &masterSset)) + require.NoError(t, k8sClient.Status().Update(context.Background(), &masterSset)) + + var dataSset appsv1.StatefulSet + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) + require.NotNil(t, dataSset.Spec.Replicas) + require.Equal(t, int32(2), *dataSset.Spec.Replicas) + require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.16.2", dataSset.Spec.Template.Spec.Containers[0].Image) + + // Set data StatefulSet status to show it's fully deployed at 8.16.2 + dataSset.Status.Replicas = 2 + dataSset.Status.UpdatedReplicas = 2 + dataSset.Status.CurrentRevision = "data-sset-old" + dataSset.Status.UpdateRevision = "data-sset-old" + require.NoError(t, k8sClient.Status().Update(context.Background(), &dataSset)) + + // Create pods for both StatefulSets with the old revision + masterPods := []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "master-sset-0", + Namespace: "ns", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-master": "true", + "controller-revision-hash": "master-sset-old", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "master-sset-1", + Namespace: "ns", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-master": "true", + "controller-revision-hash": "master-sset-old", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "master-sset-2", + Namespace: "ns", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-master": "true", + "controller-revision-hash": "master-sset-old", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + } + for _, pod := range masterPods { + require.NoError(t, k8sClient.Create(context.Background(), &pod)) + } + + dataPods := []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "data-sset-0", + Namespace: "ns", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-data": "true", + "controller-revision-hash": "data-sset-old", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "data-sset-1", + Namespace: "ns", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-data": "true", + "controller-revision-hash": "data-sset-old", + }, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + } + for _, pod := range dataPods { + require.NoError(t, k8sClient.Create(context.Background(), &pod)) + } + + // Update the ES object to 8.17.1 in k8s + es.Spec.Version = "8.17.1" + require.NoError(t, k8sClient.Update(context.Background(), &es)) + ctx.es = es + + // Update actualStatefulSets to reflect the current state with status + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) + actualStatefulSets = es_sset.StatefulSetList{masterSset, dataSset} + + // Update expected resources to 8.17.1 for the upgrade + expectedResourcesUpgrade := nodespec.ResourcesList{ + { + StatefulSet: appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "master-sset", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-master": "true", + "elasticsearch.k8s.elastic.co/version": "8.17.1", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: ptr.To[int32](4), // also upscale the master replicas to ensure that an upscale during an upgrade can happen in parallel with non-masters. + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-master": "true", + "elasticsearch.k8s.elastic.co/version": "8.17.1", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "elasticsearch", + Image: "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", + }, + }, + }, + }, + }, + }, + HeadlessService: corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "master-sset", + }, + }, + Config: settings.NewCanonicalConfig(), + }, + { + StatefulSet: appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "data-sset", + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-data": "true", + "elasticsearch.k8s.elastic.co/version": "8.17.1", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: ptr.To[int32](2), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "elasticsearch.k8s.elastic.co/node-data": "true", + "elasticsearch.k8s.elastic.co/version": "8.17.1", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "elasticsearch", + Image: "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", + }, + }, + }, + }, + }, + }, + HeadlessService: corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "data-sset", + }, + }, + Config: settings.NewCanonicalConfig(), + }, + } + + // Set template hash labels for upgrade resources + expectedResourcesUpgrade[0].StatefulSet.Labels = hash.SetTemplateHashLabel(expectedResourcesUpgrade[0].StatefulSet.Labels, expectedResourcesUpgrade[0].StatefulSet.Spec) + expectedResourcesUpgrade[1].StatefulSet.Labels = hash.SetTemplateHashLabel(expectedResourcesUpgrade[1].StatefulSet.Labels, expectedResourcesUpgrade[1].StatefulSet.Spec) + + // Manually set the data StatefulSet status to show it's NOT fully upgraded + // This simulates the state after the StatefulSet controller has updated the spec but before the pods are updated + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) + dataSset.Status.UpdatedReplicas = 0 // No replicas updated yet + dataSset.Status.Replicas = 2 // Total replicas + dataSset.Status.UpdateRevision = "data-sset-12345" // New revision (different from old) + require.NoError(t, k8sClient.Status().Update(context.Background(), &dataSset)) + + // Call HandleUpscaleAndSpecChanges and verify that both data upgrade has begun and master STS is not updated + _, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResourcesUpgrade) + require.NoError(t, err) + + // Verify data StatefulSet is updated to 8.17.1 + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) + require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", dataSset.Spec.Template.Spec.Containers[0].Image) + + // Verify master StatefulSet version hasn't changed yet (should still be 8.16.2) + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) + require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.16.2", masterSset.Spec.Template.Spec.Containers[0].Image) + // Verify master StatefulSet replicas have been scaled up to 4 + require.Equal(t, int32(4), *masterSset.Spec.Replicas) + + // Update data STS and associated pods to show they are completely upgraded + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "data-sset"}, &dataSset)) + dataSset.Status.UpdatedReplicas = 2 // All replicas updated + dataSset.Status.Replicas = 2 // Total replicas + dataSset.Status.UpdateRevision = "data-sset-12345" // Set update revision + require.NoError(t, k8sClient.Status().Update(context.Background(), &dataSset)) + + // Update the existing data pods to have the new revision + for i := 0; i < int(dataSset.Status.Replicas); i++ { + var pod corev1.Pod + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: fmt.Sprintf("data-sset-%d", i)}, &pod)) + pod.Labels["controller-revision-hash"] = "data-sset-12345" + require.NoError(t, k8sClient.Update(context.Background(), &pod)) + } + // Call HandleUpscaleAndSpecChanges and verify that master STS is now set to be upgraded + actualStatefulSets = res.ActualStatefulSets + _, err = HandleUpscaleAndSpecChanges(ctx, actualStatefulSets, expectedResourcesUpgrade) + require.NoError(t, err) + + // Verify master StatefulSet is now updated to 8.17.1 + require.NoError(t, k8sClient.Get(context.Background(), types.NamespacedName{Namespace: "ns", Name: "master-sset"}, &masterSset)) + require.Equal(t, "docker.elastic.co/elasticsearch/elasticsearch:8.17.1", masterSset.Spec.Template.Spec.Containers[0].Image) +} diff --git a/pkg/controller/elasticsearch/reconcile/status.go b/pkg/controller/elasticsearch/reconcile/status.go index 77e5e224382..d7416b5c3f4 100644 --- a/pkg/controller/elasticsearch/reconcile/status.go +++ b/pkg/controller/elasticsearch/reconcile/status.go @@ -8,6 +8,7 @@ import ( "reflect" "sort" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/ptr" @@ -60,6 +61,8 @@ func (s *StatusReporter) ReportCondition( type UpscaleReporter struct { // Expected nodes to be upscaled nodes map[string]esv1.NewNode + // Number of non-master StatefulSets that are still upgrading + nonMasterSTSUpgrades int } // RecordNewNodes records pending node creations. @@ -103,6 +106,19 @@ func (u *UpscaleReporter) HasPendingNewNodes() bool { return len(u.nodes) > 0 } +// HasPendingNonMasterSTSUpgrades returns true if at least one non-master StatefulSet is still upgrading. +func (u *UpscaleReporter) HasPendingNonMasterSTSUpgrades() bool { + return u.nonMasterSTSUpgrades > 0 +} + +// RecordPendingNonMasterSTSUpgrades records the number of non-master StatefulSets that have upgrades pending. +func (u *UpscaleReporter) RecordPendingNonMasterSTSUpgrades(pendingNonMasterSTS []appsv1.StatefulSet) { + if u == nil { + return + } + u.nonMasterSTSUpgrades = len(pendingNonMasterSTS) +} + // Merge creates a new upscale status using the reported upscale status and an existing upscale status. func (u *UpscaleReporter) Merge(other esv1.UpscaleOperation) esv1.UpscaleOperation { upscaleOperation := other.DeepCopy() diff --git a/test/e2e/es/non_master_first_upgrade_test.go b/test/e2e/es/non_master_first_upgrade_test.go new file mode 100644 index 00000000000..a3979ca6ad9 --- /dev/null +++ b/test/e2e/es/non_master_first_upgrade_test.go @@ -0,0 +1,103 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +//go:build es || e2e + +package es + +import ( + "fmt" + "testing" + "time" + + esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" + "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label" + essset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/sset" + "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" + "github.com/elastic/cloud-on-k8s/v3/test/e2e/test" + "github.com/elastic/cloud-on-k8s/v3/test/e2e/test/elasticsearch" +) + +// newNonMasterFirstUpgradeWatcher creates a watcher that monitors StatefulSet upgrade order +// and ensures non-master StatefulSets upgrade before master StatefulSets +func newNonMasterFirstUpgradeWatcher(es esv1.Elasticsearch) test.Watcher { + var violations []string + + return test.NewWatcher( + "watch StatefulSet upgrade order: non-master StatefulSets should upgrade before master StatefulSets", + 2*time.Second, + func(k *test.K8sClient, t *testing.T) { + statefulSets, err := essset.RetrieveActualStatefulSets(k.Client, k8s.ExtractNamespacedName(&es)) + if err != nil { + t.Logf("failed to get StatefulSets: %s", err.Error()) + return + } + + // Check if any master StatefulSet has its version higher than any non-master StatefulSet + // which indicates that the master StatefulSet is upgrading before the non-master StatefulSets + for _, sset := range statefulSets { + masterSTSVersion, err := essset.GetESVersion(sset) + if err != nil { + t.Logf("failed to get StatefulSet version: %s", err.Error()) + continue + } + if !label.IsMasterNodeSet(sset) { + continue + } + // Ensure that the master StatefulSet never has a version higher than any non-master StatefulSet. + for _, otherSset := range statefulSets { + // don't compare master against master. + if label.IsMasterNodeSet(otherSset) { + continue + } + otherSsetVersion, err := essset.GetESVersion(otherSset) + if err != nil { + t.Logf("failed to get StatefulSet version: %s", err.Error()) + continue + } + if masterSTSVersion.GT(otherSsetVersion) { + violations = append(violations, fmt.Sprintf("master StatefulSet %s has a version higher than non-master StatefulSet %s", sset.Name, otherSset.Name)) + } + } + } + }, + func(k *test.K8sClient, t *testing.T) { + if len(violations) > 0 { + t.Errorf("%d non-master first upgrade order violations detected", len(violations)) + } + }) +} + +// runNonMasterFirstUpgradeTest runs the complete test for non-master first upgrade behavior +func runNonMasterFirstUpgradeTest(t *testing.T, initial, mutated elasticsearch.Builder) { + watcher := newNonMasterFirstUpgradeWatcher(initial.Elasticsearch) + + test.RunMutationsWhileWatching( + t, + []test.Builder{initial}, + []test.Builder{mutated}, + []test.Watcher{watcher}, + ) +} + +// TestNonMasterFirstUpgradeComplexTopology tests the non-master first upgrade behavior with a complex topology +func TestNonMasterFirstUpgradeComplexTopology(t *testing.T) { + srcVersion, dstVersion := test.GetUpgradePathTo8x(test.Ctx().ElasticStackVersion) + + test.SkipInvalidUpgrade(t, srcVersion, dstVersion) + + initial := elasticsearch.NewBuilder("test-non-master-first-complex"). + WithVersion(srcVersion). + WithESMasterNodes(3, elasticsearch.DefaultResources). + WithESDataNodes(2, elasticsearch.DefaultResources). + WithESCoordinatingNodes(1, elasticsearch.DefaultResources) + + mutated := initial.WithNoESTopology(). + WithVersion(dstVersion). + WithESMasterNodes(3, elasticsearch.DefaultResources). + WithESDataNodes(2, elasticsearch.DefaultResources). + WithESCoordinatingNodes(1, elasticsearch.DefaultResources) + + runNonMasterFirstUpgradeTest(t, initial, mutated) +}