Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9db32d0
Upgrade masters last when upgrading ES clusters
naemono Oct 15, 2025
39b2702
Fix lint issue
naemono Oct 23, 2025
50b3954
Add e2e test for upgrade order.
naemono Oct 28, 2025
00555c2
unexport things in e2e tests
naemono Oct 29, 2025
88cb347
Also look at the current/target version while determining whether sts is
naemono Oct 29, 2025
790d3f1
Fix tests
naemono Oct 29, 2025
4b944d1
Merge branch 'fix-sts-upgrade-issue-recreation' of github.com:naemono…
naemono Oct 29, 2025
d9885ba
Fix the unit tests for master last upgrades
naemono Oct 29, 2025
efa8643
fix linter
naemono Oct 29, 2025
6914708
move closer to use.
naemono Oct 29, 2025
2dc664b
Ensure requeue
naemono Oct 29, 2025
46c726c
adjust comments
naemono Oct 29, 2025
fccf6c3
Adjust logging in e2e test
naemono Oct 29, 2025
8feef24
Don't compare masters against other masters or themselves.
naemono Oct 30, 2025
0f5a31a
Fix spelling
naemono Oct 30, 2025
030fe16
Also check the generation/observedGeneration.
naemono Nov 4, 2025
6c9e2c5
Merge branch 'fix-sts-upgrade-issue-recreation' of github.com:naemono…
naemono Nov 4, 2025
0db51d8
Debugging
naemono Nov 5, 2025
57c71a9
Remove useless if check.
naemono Nov 5, 2025
c89e872
More targeted debugging
naemono Nov 5, 2025
068fa54
More debugging
naemono Nov 5, 2025
0af6b85
Debugging pod upgrade logic.
naemono Nov 5, 2025
54f9775
Attempt fix for blocked sts upgrades
naemono Nov 6, 2025
5dfdd05
Bug fix adding new master role to existing non-master sts.
naemono Nov 6, 2025
edf3faf
More debugging
naemono Nov 7, 2025
a6d8edc
Adjust to logical or
naemono Nov 7, 2025
8cfa06c
Simplify logic in HandleUpscaleAndSpecChanges
naemono Nov 17, 2025
c2e1161
Remove debugging
naemono Nov 10, 2025
1dcef6c
comments
naemono Nov 10, 2025
eb963e5
Use expectations.
naemono Nov 17, 2025
518d69d
Remove duplicative check now that using expectations.
naemono Nov 17, 2025
16fd9ec
Attempt allow replicas update master (wip)
naemono Nov 17, 2025
8273152
Fix the code allowing replica updates during upgrades.
naemono Nov 18, 2025
11cc0e6
Add comment to test
naemono Nov 18, 2025
645e088
Apply review comments
naemono Dec 1, 2025
3e95b2d
More review comments
naemono Dec 1, 2025
7f00388
Ensure the StatefulSet controller has observed the latest generation.
naemono Dec 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/controller/elasticsearch/driver/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ func isVersionUpgrade(es esv1.Elasticsearch) (bool, error) {
if err != nil {
return false, err
}

// If status version is empty, this is a new cluster, not an upgrade
if es.Status.Version == "" {
return false, nil
}

statusVersion, err := version.Parse(es.Status.Version)
if err != nil {
return false, err
Expand Down
166 changes: 146 additions & 20 deletions pkg/controller/elasticsearch/driver/upscale.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/expectations"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/metadata"
sset "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/statefulset"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/label"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/nodespec"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/reconcile"
"github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/settings"
Expand Down Expand Up @@ -66,33 +68,72 @@ func HandleUpscaleAndSpecChanges(
if err != nil {
return results, fmt.Errorf("adjust resources: %w", err)
}
// reconcile all resources

// Check if this is a version upgrade
isVersionUpgrade, err := isVersionUpgrade(ctx.es)
if err != nil {
return results, fmt.Errorf("while checking for version upgrade: %w", err)
}

// If this is not a version upgrade, process all resources normally and return
if !isVersionUpgrade {
actualStatefulSets, requeue, err := reconcileResources(ctx, actualStatefulSets, adjusted)
if err != nil {
return results, fmt.Errorf("while reconciling resources: %w", err)
}
results.Requeue = requeue
results.ActualStatefulSets = actualStatefulSets
return results, nil
}

// Version upgrade: separate master and non-master StatefulSets
var masterResources, nonMasterResources []nodespec.Resources
for _, res := range adjusted {
res := res
if err := settings.ReconcileConfig(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config, ctx.meta); err != nil {
return results, fmt.Errorf("reconcile config: %w", err)
if label.IsMasterNodeSet(res.StatefulSet) {
masterResources = append(masterResources, res)
} else {
nonMasterResources = append(nonMasterResources, res)
}
if _, err := common.ReconcileService(ctx.parentCtx, ctx.k8sClient, &res.HeadlessService, &ctx.es); err != nil {
return results, fmt.Errorf("reconcile service: %w", err)
}

// First, reconcile all non-master resources
actualStatefulSets, requeue, err := reconcileResources(ctx, actualStatefulSets, nonMasterResources)
if err != nil {
return results, fmt.Errorf("while reconciling non-master resources: %w", err)
}
if requeue {
results.Requeue = true
results.ActualStatefulSets = actualStatefulSets
return results, nil
}

targetVersion, err := version.Parse(ctx.es.Spec.Version)
if err != nil {
return results, fmt.Errorf("while parsing Elasticsearch upgrade target version: %w", err)
}

// Check if all non-master StatefulSets have completed their upgrades before proceeding with master StatefulSets
if len(masterResources) > 0 {
allNonMastersUpgraded, err := areAllNonMasterStatefulSetsUpgraded(ctx.k8sClient, actualStatefulSets, targetVersion)
if err != nil {
return results, fmt.Errorf("while checking non-master upgrade status: %w", err)
}
if actualSset, exists := actualStatefulSets.GetByName(res.StatefulSet.Name); exists {
recreateSset, err := handleVolumeExpansion(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, actualSset, ctx.validateStorageClass)
if err != nil {
return results, fmt.Errorf("handle volume expansion: %w", err)
}
if recreateSset {
// The StatefulSet is scheduled for recreation: let's requeue before attempting any further spec change.
results.Requeue = true
continue
}

if !allNonMastersUpgraded {
// Non-master StatefulSets are still upgrading, skipping master StatefulSets temporarily.
// This will cause a requeue, and master StatefulSets will attempt to be processed in the next reconciliation
results.ActualStatefulSets = actualStatefulSets
results.Requeue = true
return results, nil
}
reconciled, err := es_sset.ReconcileStatefulSet(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, ctx.expectations)

// All non-master StatefulSets are upgraded, now process master StatefulSets
actualStatefulSets, results.Requeue, err = reconcileResources(ctx, actualStatefulSets, masterResources)
if err != nil {
return results, fmt.Errorf("reconcile StatefulSet: %w", err)
return results, fmt.Errorf("while reconciling master resources: %w", err)
}
// update actual with the reconciled ones for next steps to work with up-to-date information
actualStatefulSets = actualStatefulSets.WithStatefulSet(reconciled)
}

results.ActualStatefulSets = actualStatefulSets
return results, nil
}
Expand Down Expand Up @@ -166,3 +207,88 @@ func adjustStatefulSetReplicas(

return expected, nil
}

// reconcileResources handles the common StatefulSet reconciliation logic
// It returns:
// - the updated StatefulSets
// - whether a requeue is needed
// - any errors that occurred
func reconcileResources(
ctx upscaleCtx,
actualStatefulSets es_sset.StatefulSetList,
resources []nodespec.Resources,
) (es_sset.StatefulSetList, bool, error) {
requeue := false
for _, res := range resources {
res := res
if err := settings.ReconcileConfig(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet.Name, res.Config, ctx.meta); err != nil {
return actualStatefulSets, false, fmt.Errorf("reconcile config: %w", err)
}
if _, err := common.ReconcileService(ctx.parentCtx, ctx.k8sClient, &res.HeadlessService, &ctx.es); err != nil {
return actualStatefulSets, false, fmt.Errorf("reconcile service: %w", err)
}
if actualSset, exists := actualStatefulSets.GetByName(res.StatefulSet.Name); exists {
recreateSset, err := handleVolumeExpansion(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, actualSset, ctx.validateStorageClass)
if err != nil {
return actualStatefulSets, false, fmt.Errorf("handle volume expansion: %w", err)
}
if recreateSset {
// The StatefulSet is scheduled for recreation: let's requeue before attempting any further spec change.
requeue = true
continue
}
}
reconciled, err := es_sset.ReconcileStatefulSet(ctx.parentCtx, ctx.k8sClient, ctx.es, res.StatefulSet, ctx.expectations)
if err != nil {
return actualStatefulSets, false, fmt.Errorf("reconcile StatefulSet: %w", err)
}
// update actual with the reconciled ones for next steps to work with up-to-date information
actualStatefulSets = actualStatefulSets.WithStatefulSet(reconciled)
}
return actualStatefulSets, requeue, nil
}

// areAllNonMasterStatefulSetsUpgraded checks if all non-master StatefulSets have completed their upgrades
func areAllNonMasterStatefulSetsUpgraded(
client k8s.Client,
actualStatefulSets es_sset.StatefulSetList,
targetVersion version.Version,
) (bool, error) {
for _, statefulSet := range actualStatefulSets {
// Skip master StatefulSets
if label.IsMasterNodeSet(statefulSet) {
continue
}

// If the StatefulSet is not at the target version, it is not upgraded
// so don't even bother looking at the state/status of the StatefulSet.
actualVersion, err := es_sset.GetESVersion(statefulSet)
if err != nil {
return false, err
}
if actualVersion.LT(targetVersion) {
return false, nil
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not rely on the status until the sts controller has observed the new generation.

Suggested change
if actualStatefulSet.Status.ObservedGeneration < actualStatefulSet.Generation {
// The StatefulSet controller has not yet observed the latest generation.
pendingNonMasterSTS = append(pendingNonMasterSTS, actualStatefulSet)
continue
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or:

  • use actualStatefulSets.PendingReconciliation() before that loop
  • create a common function to reuse the logic in
    func (d *defaultDriver) expectationsSatisfied(ctx context.Context) (bool, string, error) {
    log := ulog.FromContext(ctx)
    // make sure the cache is up-to-date
    expectationsOK, reason, err := d.Expectations.Satisfied()
    if err != nil {
    return false, "", err
    }
    if !expectationsOK {
    log.V(1).Info("Cache expectations are not satisfied yet, re-queueing", "namespace", d.ES.Namespace, "es_name", d.ES.Name, "reason", reason)
    return false, reason, nil
    }
    actualStatefulSets, err := sset.RetrieveActualStatefulSets(d.Client, k8s.ExtractNamespacedName(&d.ES))
    if err != nil {
    return false, "", err
    }
    // make sure StatefulSet statuses have been reconciled by the StatefulSet controller
    pendingStatefulSetReconciliation := actualStatefulSets.PendingReconciliation()
    if len(pendingStatefulSetReconciliation) > 0 {
    log.V(1).Info("StatefulSets observedGeneration is not reconciled yet, re-queueing", "namespace", d.ES.Namespace, "es_name", d.ES.Name)
    return false, fmt.Sprintf("observedGeneration is not reconciled yet for StatefulSets %s", strings.Join(pendingStatefulSetReconciliation.Names().AsSlice(), ",")), nil
    }
    // make sure pods have been reconciled by the StatefulSet controller
    return actualStatefulSets.PodReconciliationDone(ctx, d.Client)
    }

// Check if this StatefulSet has pending updates
if statefulSet.Status.UpdatedReplicas != statefulSet.Status.Replicas {
return false, nil
}

// Check if there are any pods that need to be upgraded
pods, err := es_sset.GetActualPodsForStatefulSet(client, k8s.ExtractNamespacedName(&statefulSet))
if err != nil {
return false, err
}

for _, pod := range pods {
// Check if pod revision matches StatefulSet update revision
if statefulSet.Status.UpdateRevision != "" && sset.PodRevision(pod) != statefulSet.Status.UpdateRevision {
// This pod still needs to be upgraded
return false, nil
}
}
}
Comment on lines 347 to 367
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this whole logic assumes that the StatefulSet controller has updated the status, which is not necessarily true from my experiments. I hit several cases where the StatefulSet spec has been updated but not its status. A non master sts is then detected as reconciled while it is not the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe I've also solved this case with a generation vs observedGeneration check. Lmk if you feel I've missed something.


return true, nil
}
Loading
Loading