Skip to content
3 changes: 3 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ packages:
github.com/elastic/elastic-agent/internal/pkg/agent/application/info:
interfaces:
Agent: {}
github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway/fleet:
interfaces:
rollbacksSource: {}
github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade:
interfaces:
WatcherHelper: {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker
return nil
}

var uOpts []coordinator.UpgradeOpt
uOpts := []coordinator.UpgradeOpt{
coordinator.WithRollback(action.Data.Rollback),
}
if h.tamperProtectionFn() {
// Find inputs that want to receive UPGRADE action
// Endpoint needs to receive a signed UPGRADE action in order to be able to uncontain itself
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,19 @@ func TestEndpointPreUpgradeCallback(t *testing.T) {
}

upgradeCalledChan := make(chan struct{})
mockCoordinator.EXPECT().Upgrade(mock.Anything, tc.upgradeAction.Data.Version, tc.upgradeAction.Data.SourceURI, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, s string, s2 string, actionUpgrade *fleetapi.ActionUpgrade, opt ...coordinator.UpgradeOpt) error {
upgradeCalledChan <- struct{}{}
return tc.coordUpgradeErr
})
if tc.shouldProxyToEndpoint {
mockCoordinator.EXPECT().Upgrade(mock.Anything, tc.upgradeAction.Data.Version, tc.upgradeAction.Data.SourceURI, mock.Anything, mock.AnythingOfType("coordinator.UpgradeOpt"), mock.AnythingOfType("coordinator.UpgradeOpt")).
RunAndReturn(func(ctx context.Context, s string, s2 string, actionUpgrade *fleetapi.ActionUpgrade, opt ...coordinator.UpgradeOpt) error {
upgradeCalledChan <- struct{}{}
return tc.coordUpgradeErr
})
} else {
mockCoordinator.EXPECT().Upgrade(mock.Anything, tc.upgradeAction.Data.Version, tc.upgradeAction.Data.SourceURI, mock.Anything, mock.AnythingOfType("coordinator.UpgradeOpt")).
RunAndReturn(func(ctx context.Context, s string, s2 string, actionUpgrade *fleetapi.ActionUpgrade, opt ...coordinator.UpgradeOpt) error {
upgradeCalledChan <- struct{}{}
return tc.coordUpgradeErr
})
}

log, _ := logger.New("", false)
u := NewUpgrade(log, mockCoordinator)
Expand Down
66 changes: 11 additions & 55 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ package application
import (
"context"
"fmt"
"os"
"path/filepath"
"time"

"go.elastic.co/apm/v2"

componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
"github.com/elastic/elastic-agent/internal/pkg/agent/install"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/ttl"
"github.com/elastic/go-ucfg"

"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -52,8 +50,8 @@ import (
)

type rollbacksSource interface {
Set(map[string]upgrade.TTLMarker) error
Get() (map[string]upgrade.TTLMarker, error)
Set(map[string]ttl.TTLMarker) error
Get() (map[string]ttl.TTLMarker, error)
}

// CfgOverrider allows for application driven overrides of configuration read from disk.
Expand Down Expand Up @@ -135,7 +133,7 @@ func New(
// monitoring is not supported in bootstrap mode https://github.com/elastic/elastic-agent/issues/1761
isMonitoringSupported := !disableMonitoring && cfg.Settings.V1MonitoringEnabled

availableRollbacksSource := upgrade.NewTTLMarkerRegistry(log, paths.Top())
availableRollbacksSource := ttl.NewTTLMarkerRegistry(log, paths.Top())
if upgrade.IsUpgradeable() {
// If we are not running in a container, check and normalize the install descriptor before we start the agent
normalizeAgentInstalls(log, paths.Top(), time.Now(), initialUpdateMarker, availableRollbacksSource)
Expand Down Expand Up @@ -255,7 +253,7 @@ func New(
}

// TODO: stop using global state
managed, err = newManagedConfigManager(ctx, log, agentInfo, cfg, store, runtime, fleetInitTimeout, paths.Top(), client, fleetAcker, actionAcker, retrier, stateStorage, actionQueue, upgrader)
managed, err = newManagedConfigManager(ctx, log, agentInfo, cfg, store, runtime, fleetInitTimeout, paths.Top(), client, fleetAcker, actionAcker, retrier, stateStorage, actionQueue, availableRollbacksSource, upgrader)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -331,57 +329,15 @@ func normalizeAgentInstalls(log *logger.Logger, topDir string, now time.Time, in
}
}

// check if we need to cleanup old agent installs
rollbacks, err := rollbackSource.Get()
absHomePath := paths.Home()
relHomePath, err := filepath.Rel(topDir, absHomePath)
if err != nil {
log.Warnf("Error getting available rollbacks during startup check: %s", err)
log.Warnf("Error calculating home path %q relative to top path %q: %s", absHomePath, topDir, err)
return
}

var versionedHomesToCleanup []string
for versionedHome, ttlMarker := range rollbacks {

versionedHomeAbsPath := filepath.Join(topDir, versionedHome)

if versionedHomeAbsPath == paths.HomeFrom(topDir) {
// skip the current install
log.Warnf("Found a TTL marker for the currently running agent at %s. Skipping cleanup...", versionedHome)
continue
}

_, err = os.Stat(versionedHomeAbsPath)
if errors.Is(err, os.ErrNotExist) {
log.Warnf("Versioned home %s corresponding to agent TTL marker %+v is not found on disk", versionedHomeAbsPath, ttlMarker)
versionedHomesToCleanup = append(versionedHomesToCleanup, versionedHome)
continue
}

if err != nil {
log.Warnf("error checking versioned home %s for agent install: %s", versionedHomeAbsPath, err.Error())
continue
}

if now.After(ttlMarker.ValidUntil) {
// the install directory exists but it's expired. Remove the files.
log.Infof("agent TTL marker %+v marks %q as expired, removing directory", ttlMarker, versionedHomeAbsPath)
if cleanupErr := install.RemoveBut(versionedHomeAbsPath, true); cleanupErr != nil {
log.Warnf("Error removing directory %q: %s", versionedHomeAbsPath, cleanupErr)
} else {
log.Infof("Directory %q was removed", versionedHomeAbsPath)
versionedHomesToCleanup = append(versionedHomesToCleanup, versionedHome)
}
}
}

if len(versionedHomesToCleanup) > 0 {
log.Infof("removing install descriptor(s) for %v", versionedHomesToCleanup)
for _, versionedHomeToCleanup := range versionedHomesToCleanup {
delete(rollbacks, versionedHomeToCleanup)
}
err = rollbackSource.Set(rollbacks)
if err != nil {
log.Warnf("Error removing install descriptor(s): %s", err)
}
_, err = upgrade.CleanAvailableRollbacks(log, rollbackSource, topDir, relHomePath, upgrade.PreserveActiveUpgradeVersions(initialUpdateMarker, upgrade.CleanupExpiredRollbacks))
if err != nil {
log.Warnf("Error cleaning available rollbacks: %s", err)
}
}

Expand Down
13 changes: 7 additions & 6 deletions internal/pkg/agent/application/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/ttl"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/testutils"
"github.com/elastic/elastic-agent/pkg/core/logger/loggertest"
Expand Down Expand Up @@ -337,7 +338,7 @@ func Test_normalizeInstallDescriptorAtStartup(t *testing.T) {
oldAgentInstallPath := createFakeAgentInstall(t, topDir, "1.2.3", "oldversionhash", true)

mockRollbackSource := newMockRollbacksSource(t)
mockRollbackSource.EXPECT().Get().Return(map[string]upgrade.TTLMarker{
mockRollbackSource.EXPECT().Get().Return(map[string]ttl.TTLMarker{
oldAgentInstallPath: {
Version: "1.2.3",
Hash: "oldversionhash",
Expand Down Expand Up @@ -366,7 +367,7 @@ func Test_normalizeInstallDescriptorAtStartup(t *testing.T) {
}

// expect code to clear the rollback
mockRollbackSource.EXPECT().Set(map[string]upgrade.TTLMarker{}).Return(nil)
mockRollbackSource.EXPECT().Set(map[string]ttl.TTLMarker{}).Return(nil)
return updateMarker, mockRollbackSource
},
postNormalizeAssertions: nil,
Expand All @@ -379,7 +380,7 @@ func Test_normalizeInstallDescriptorAtStartup(t *testing.T) {

mockRollbackSource := newMockRollbacksSource(t)
nonExistingVersionedHome := filepath.Join("data", "thisdirectorydoesnotexist")
mockRollbackSource.EXPECT().Get().Return(map[string]upgrade.TTLMarker{
mockRollbackSource.EXPECT().Get().Return(map[string]ttl.TTLMarker{
oldAgentInstallPath: {
Version: "1.2.3",
Hash: "oldversionhash",
Expand All @@ -392,7 +393,7 @@ func Test_normalizeInstallDescriptorAtStartup(t *testing.T) {
},
}, nil)

mockRollbackSource.EXPECT().Set(map[string]upgrade.TTLMarker{
mockRollbackSource.EXPECT().Set(map[string]ttl.TTLMarker{
oldAgentInstallPath: {
Version: "1.2.3",
Hash: "oldversionhash",
Expand All @@ -415,7 +416,7 @@ func Test_normalizeInstallDescriptorAtStartup(t *testing.T) {

mockRollbackSource := newMockRollbacksSource(t)
mockRollbackSource.EXPECT().Get().Return(
map[string]upgrade.TTLMarker{
map[string]ttl.TTLMarker{
oldAgentInstallPath: {
Version: "1.2.3",
Hash: "oldver",
Expand All @@ -425,7 +426,7 @@ func Test_normalizeInstallDescriptorAtStartup(t *testing.T) {
nil,
)
// expect removal of the existing ttlmarker
mockRollbackSource.EXPECT().Set(map[string]upgrade.TTLMarker{}).Return(nil)
mockRollbackSource.EXPECT().Set(map[string]ttl.TTLMarker{}).Return(nil)
return nil, mockRollbackSource
},
postNormalizeAssertions: func(t *testing.T, topDir string, _ *upgrade.UpdateMarker) {
Expand Down
24 changes: 24 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
"github.com/elastic/elastic-agent/internal/pkg/release"

"go.opentelemetry.io/collector/component/componentstatus"

Expand Down Expand Up @@ -850,6 +851,22 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str
return c.upgradeMgr.AckAction(ctx, c.fleetAcker, action)
}

if errors.Is(err, upgrade.ErrNoRollbacksAvailable) && action != nil && release.VersionWithSnapshot() == action.Data.Version {
// when manually rolling back the action store is not copied back, so it's likely that the rolled back agent
// will receive (again) the rollback action because it's using an ackToken from before the rollback action
// was received by the "upgraded" elastic-agent.
// This block here is to avoid setting an error state because the rollback requested no longer exist after
// having performed the rollback once.
// A better test would be to compare actionIDs but there's no way to persist the actionID of the rollback action
// from the upgraded agent to the rolled back agent (upgrade details is reset when the upgrade marker is deleted)
c.logger.Infow(
"Received a rollback action with the same version as current and no rollbacks available, ignoring the likely replayed action",
"action_id", action.ID())
c.ClearOverrideState()
det.SetState(details.StateRollback)
return c.upgradeMgr.AckAction(ctx, c.fleetAcker, action)
}

c.logger.Errorw("upgrade failed", "error", logp.Error(err))
// If ErrInsufficientDiskSpace is in the error chain, we want to set the
// the error to ErrInsufficientDiskSpace so that the error message is
Expand All @@ -861,10 +878,17 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str
det.Fail(err)
return err
}

if cb != nil {
det.SetState(details.StateRestarting)
c.ReExec(cb)
}

if uOpts.rollback {
// Ack the rollback action, since there's no restart callback returned, this is still run
return c.upgradeMgr.AckAction(ctx, c.fleetAcker, action)
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/ttl"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/internal/pkg/testutils"

Expand Down Expand Up @@ -468,7 +469,7 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) {
}()

tmpDir := t.TempDir()
upgradeMgr, err := upgrade.NewUpgrader(log, &artifact.Config{}, nil, &info.AgentInfo{}, new(upgrade.AgentWatcherHelper), upgrade.NewTTLMarkerRegistry(nil, tmpDir))
upgradeMgr, err := upgrade.NewUpgrader(log, &artifact.Config{}, nil, &info.AgentInfo{}, new(upgrade.AgentWatcherHelper), ttl.NewTTLMarkerRegistry(nil, tmpDir))
require.NoError(t, err, "errored when creating a new upgrader")

// Channels have buffer length 1, so we don't have to run on multiple
Expand Down
Loading