Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ packages:
github.com/elastic/elastic-agent/internal/pkg/agent/application/info:
interfaces:
Agent: {}
github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway/fleet:
interfaces:
rollbacksSource: {}
github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade:
interfaces:
WatcherHelper: {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker
return nil
}

var uOpts []coordinator.UpgradeOpt
uOpts := []coordinator.UpgradeOpt{
coordinator.WithRollback(action.Data.Rollback),
}
if h.tamperProtectionFn() {
// Find inputs that want to receive UPGRADE action
// Endpoint needs to receive a signed UPGRADE action in order to be able to uncontain itself
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,19 @@ func TestEndpointPreUpgradeCallback(t *testing.T) {
}

upgradeCalledChan := make(chan struct{})
mockCoordinator.EXPECT().Upgrade(mock.Anything, tc.upgradeAction.Data.Version, tc.upgradeAction.Data.SourceURI, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, s string, s2 string, actionUpgrade *fleetapi.ActionUpgrade, opt ...coordinator.UpgradeOpt) error {
upgradeCalledChan <- struct{}{}
return tc.coordUpgradeErr
})
if tc.shouldProxyToEndpoint {
mockCoordinator.EXPECT().Upgrade(mock.Anything, tc.upgradeAction.Data.Version, tc.upgradeAction.Data.SourceURI, mock.Anything, mock.AnythingOfType("coordinator.UpgradeOpt"), mock.AnythingOfType("coordinator.UpgradeOpt")).
RunAndReturn(func(ctx context.Context, s string, s2 string, actionUpgrade *fleetapi.ActionUpgrade, opt ...coordinator.UpgradeOpt) error {
upgradeCalledChan <- struct{}{}
return tc.coordUpgradeErr
})
} else {
mockCoordinator.EXPECT().Upgrade(mock.Anything, tc.upgradeAction.Data.Version, tc.upgradeAction.Data.SourceURI, mock.Anything, mock.AnythingOfType("coordinator.UpgradeOpt")).
RunAndReturn(func(ctx context.Context, s string, s2 string, actionUpgrade *fleetapi.ActionUpgrade, opt ...coordinator.UpgradeOpt) error {
upgradeCalledChan <- struct{}{}
return tc.coordUpgradeErr
})
}

log, _ := logger.New("", false)
u := NewUpgrade(log, mockCoordinator)
Expand Down
9 changes: 5 additions & 4 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.elastic.co/apm/v2"

componentmonitoring "github.com/elastic/elastic-agent/internal/pkg/agent/application/monitoring/component"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/ttl"
"github.com/elastic/elastic-agent/internal/pkg/agent/install"

"github.com/elastic/go-ucfg"
Expand Down Expand Up @@ -52,8 +53,8 @@ import (
)

type rollbacksSource interface {
Set(map[string]upgrade.TTLMarker) error
Get() (map[string]upgrade.TTLMarker, error)
Set(map[string]ttl.TTLMarker) error
Get() (map[string]ttl.TTLMarker, error)
}

// CfgOverrider allows for application driven overrides of configuration read from disk.
Expand Down Expand Up @@ -135,7 +136,7 @@ func New(
// monitoring is not supported in bootstrap mode https://github.com/elastic/elastic-agent/issues/1761
isMonitoringSupported := !disableMonitoring && cfg.Settings.V1MonitoringEnabled

availableRollbacksSource := upgrade.NewTTLMarkerRegistry(log, paths.Top())
availableRollbacksSource := ttl.NewTTLMarkerRegistry(log, paths.Top())
if upgrade.IsUpgradeable() {
// If we are not running in a container, check and normalize the install descriptor before we start the agent
normalizeAgentInstalls(log, paths.Top(), time.Now(), initialUpdateMarker, availableRollbacksSource)
Expand Down Expand Up @@ -255,7 +256,7 @@ func New(
}

// TODO: stop using global state
managed, err = newManagedConfigManager(ctx, log, agentInfo, cfg, store, runtime, fleetInitTimeout, paths.Top(), client, fleetAcker, actionAcker, retrier, stateStorage, actionQueue, upgrader)
managed, err = newManagedConfigManager(ctx, log, agentInfo, cfg, store, runtime, fleetInitTimeout, paths.Top(), client, fleetAcker, actionAcker, retrier, stateStorage, actionQueue, availableRollbacksSource, upgrader)
if err != nil {
return nil, nil, nil, err
}
Expand Down
13 changes: 7 additions & 6 deletions internal/pkg/agent/application/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/ttl"
"github.com/elastic/elastic-agent/internal/pkg/config"
"github.com/elastic/elastic-agent/internal/pkg/testutils"
"github.com/elastic/elastic-agent/pkg/core/logger/loggertest"
Expand Down Expand Up @@ -337,7 +338,7 @@ func Test_normalizeInstallDescriptorAtStartup(t *testing.T) {
oldAgentInstallPath := createFakeAgentInstall(t, topDir, "1.2.3", "oldversionhash", true)

mockRollbackSource := newMockRollbacksSource(t)
mockRollbackSource.EXPECT().Get().Return(map[string]upgrade.TTLMarker{
mockRollbackSource.EXPECT().Get().Return(map[string]ttl.TTLMarker{
oldAgentInstallPath: {
Version: "1.2.3",
Hash: "oldversionhash",
Expand Down Expand Up @@ -366,7 +367,7 @@ func Test_normalizeInstallDescriptorAtStartup(t *testing.T) {
}

// expect code to clear the rollback
mockRollbackSource.EXPECT().Set(map[string]upgrade.TTLMarker{}).Return(nil)
mockRollbackSource.EXPECT().Set(map[string]ttl.TTLMarker{}).Return(nil)
return updateMarker, mockRollbackSource
},
postNormalizeAssertions: nil,
Expand All @@ -379,7 +380,7 @@ func Test_normalizeInstallDescriptorAtStartup(t *testing.T) {

mockRollbackSource := newMockRollbacksSource(t)
nonExistingVersionedHome := filepath.Join("data", "thisdirectorydoesnotexist")
mockRollbackSource.EXPECT().Get().Return(map[string]upgrade.TTLMarker{
mockRollbackSource.EXPECT().Get().Return(map[string]ttl.TTLMarker{
oldAgentInstallPath: {
Version: "1.2.3",
Hash: "oldversionhash",
Expand All @@ -392,7 +393,7 @@ func Test_normalizeInstallDescriptorAtStartup(t *testing.T) {
},
}, nil)

mockRollbackSource.EXPECT().Set(map[string]upgrade.TTLMarker{
mockRollbackSource.EXPECT().Set(map[string]ttl.TTLMarker{
oldAgentInstallPath: {
Version: "1.2.3",
Hash: "oldversionhash",
Expand All @@ -415,7 +416,7 @@ func Test_normalizeInstallDescriptorAtStartup(t *testing.T) {

mockRollbackSource := newMockRollbacksSource(t)
mockRollbackSource.EXPECT().Get().Return(
map[string]upgrade.TTLMarker{
map[string]ttl.TTLMarker{
oldAgentInstallPath: {
Version: "1.2.3",
Hash: "oldver",
Expand All @@ -425,7 +426,7 @@ func Test_normalizeInstallDescriptorAtStartup(t *testing.T) {
nil,
)
// expect removal of the existing ttlmarker
mockRollbackSource.EXPECT().Set(map[string]upgrade.TTLMarker{}).Return(nil)
mockRollbackSource.EXPECT().Set(map[string]ttl.TTLMarker{}).Return(nil)
return nil, mockRollbackSource
},
postNormalizeAssertions: func(t *testing.T, topDir string, _ *upgrade.UpdateMarker) {
Expand Down
24 changes: 24 additions & 0 deletions internal/pkg/agent/application/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/elastic/elastic-agent/internal/pkg/core/backoff"
"github.com/elastic/elastic-agent/internal/pkg/otel/translate"
"github.com/elastic/elastic-agent/internal/pkg/release"

"go.opentelemetry.io/collector/component/componentstatus"

Expand Down Expand Up @@ -850,6 +851,22 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str
return c.upgradeMgr.AckAction(ctx, c.fleetAcker, action)
}

if errors.Is(err, upgrade.ErrNoRollbacksAvailable) && action != nil && release.VersionWithSnapshot() == action.Data.Version {
// when manually rolling back the action store is not copied back, so it's likely that the rolled back agent
// will receive (again) the rollback action because it's using an ackToken from before the rollback action
// was received by the "upgraded" elastic-agent.
// This block here is to avoid setting an error state because the rollback requested no longer exist after
// having performed the rollback once.
// A better test would be to compare actionIDs but there's no way to persist the actionID of the rollback action
// from the upgraded agent to the rolled back agent (upgrade details is reset when the upgrade marker is deleted)
c.logger.Infow(
"Received a rollback action with the same version as current and no rollbacks available, ignoring the likely replayed action",
"action_id", action.ID())
c.ClearOverrideState()
det.SetState(details.StateRollback)
return c.upgradeMgr.AckAction(ctx, c.fleetAcker, action)
}

c.logger.Errorw("upgrade failed", "error", logp.Error(err))
// If ErrInsufficientDiskSpace is in the error chain, we want to set the
// the error to ErrInsufficientDiskSpace so that the error message is
Expand All @@ -861,10 +878,17 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str
det.Fail(err)
return err
}

if cb != nil {
det.SetState(details.StateRestarting)
c.ReExec(cb)
}

if uOpts.rollback {
// Ack the rollback action, since there's no restart callback returned, this is still run
return c.upgradeMgr.AckAction(ctx, c.fleetAcker, action)
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/ttl"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
"github.com/elastic/elastic-agent/internal/pkg/testutils"

Expand Down Expand Up @@ -468,7 +469,7 @@ func TestCoordinatorReportsInvalidPolicy(t *testing.T) {
}()

tmpDir := t.TempDir()
upgradeMgr, err := upgrade.NewUpgrader(log, &artifact.Config{}, nil, &info.AgentInfo{}, new(upgrade.AgentWatcherHelper), upgrade.NewTTLMarkerRegistry(nil, tmpDir))
upgradeMgr, err := upgrade.NewUpgrader(log, &artifact.Config{}, nil, &info.AgentInfo{}, new(upgrade.AgentWatcherHelper), ttl.NewTTLMarkerRegistry(nil, tmpDir))
require.NoError(t, err, "errored when creating a new upgrader")

// Channels have buffer length 1, so we don't have to run on multiple
Expand Down
71 changes: 53 additions & 18 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/ttl"
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"

eaclient "github.com/elastic/elastic-agent-client/v7/pkg/client"
Expand Down Expand Up @@ -79,6 +80,10 @@ type stateStore interface {
Action() fleetapi.Action
}

type rollbacksSource interface {
Get() (map[string]ttl.TTLMarker, error)
}

type FleetGateway struct {
log *logger.Logger
client client.Sender
Expand All @@ -92,6 +97,7 @@ type FleetGateway struct {
stateFetcher StateFetcher
errCh chan error
actionCh chan []fleetapi.Action
rollbackSource rollbacksSource
}

// New creates a new fleet gateway
Expand All @@ -103,6 +109,7 @@ func New(
stateStore stateStore,
stateFetcher StateFetcher,
cfg *configuration.FleetCheckin,
source rollbacksSource,
) (*FleetGateway, error) {
scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter)
st := defaultGatewaySettings
Expand All @@ -116,6 +123,7 @@ func New(
acker,
stateStore,
stateFetcher,
source,
)
}

Expand All @@ -128,18 +136,20 @@ func newFleetGatewayWithScheduler(
acker acker.Acker,
stateStore stateStore,
stateFetcher StateFetcher,
source rollbacksSource,
) (*FleetGateway, error) {
return &FleetGateway{
log: log,
client: client,
settings: settings,
agentInfo: agentInfo,
scheduler: scheduler,
acker: acker,
stateFetcher: stateFetcher,
stateStore: stateStore,
errCh: make(chan error),
actionCh: make(chan []fleetapi.Action, 1),
log: log,
client: client,
settings: settings,
agentInfo: agentInfo,
scheduler: scheduler,
acker: acker,
stateFetcher: stateFetcher,
stateStore: stateStore,
errCh: make(chan error),
actionCh: make(chan []fleetapi.Action, 1),
rollbackSource: source,
}, nil
}

Expand Down Expand Up @@ -180,6 +190,7 @@ func (f *FleetGateway) Run(ctx context.Context) error {
actions := make([]fleetapi.Action, len(resp.Actions))
copy(actions, resp.Actions)
if len(actions) > 0 {
f.log.Infow("received new actions from Fleet checkin", "actions", actions)
f.actionCh <- actions
}
}
Expand Down Expand Up @@ -393,17 +404,41 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
agentPolicyID := getPolicyID(action)
policyRevisionIDX := getPolicyRevisionIDX(action)

// get available rollbacks
rollbacks, err := f.rollbackSource.Get()
if err != nil {
f.log.Warnf("error getting available rollbacks: %s", err.Error())
// this should already be nil but let's make sure that we don't include rollbacks in checkin body when encountering errors
rollbacks = nil
}

var validRollbacks []fleetapi.CheckinRollback
if len(rollbacks) > 0 {
now := time.Now()
validRollbacks = make([]fleetapi.CheckinRollback, 0, len(rollbacks))
for _, rollback := range rollbacks {
if rollback.ValidUntil.After(now) {
// map the `ttl.Marker` to the `fleetapi.CheckinRollback`
validRollbacks = append(validRollbacks, fleetapi.CheckinRollback{
Version: rollback.Version,
ValidUntil: rollback.ValidUntil,
})
}
}
}

// checkin
cmd := fleetapi.NewCheckinCmd(f.agentInfo, f.client)
req := &fleetapi.CheckinRequest{
AckToken: ackToken,
Metadata: ecsMeta,
Status: agentStateToString(state.State),
Message: state.Message,
Components: components,
UpgradeDetails: state.UpgradeDetails,
AgentPolicyID: agentPolicyID,
PolicyRevisionIDX: policyRevisionIDX,
AckToken: ackToken,
Metadata: ecsMeta,
Status: agentStateToString(state.State),
Message: state.Message,
Components: components,
UpgradeDetails: state.UpgradeDetails,
AgentPolicyID: agentPolicyID,
PolicyRevisionIDX: policyRevisionIDX,
AvailableRollbacks: validRollbacks,
}

resp, took, err := cmd.Execute(stateCtx, req)
Expand Down
Loading
Loading