Skip to content

Commit 22f1f7a

Browse files
Add agent_policy_id and policy_revision_idx to checkin request (#5501)
Allow the agents to add their currently running policy_id and revision_idx attributes to the checkin request bodies. These attributes, if included and different from the agent doc will be used when updating the agent doc in the pre-poll checkin. If the agent's policy id does not match the expected policy id from the server a reassign is detected and a new policy change action will be sent. If the revision differs a policy change action will also be sent. If an agent checks in with a different policy/revision the api keys may be managed. Add a feature flag to disable this behaviour and only use Acks + the fleet-agents doc as the source of truth.
1 parent ca23acd commit 22f1f7a

20 files changed

+952
-159
lines changed
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: feature
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Add agent_policy_id and policy_revision_idx to checkin requests
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
description: |
20+
Add the agent_policy_id and policy_revision_idx attributes to checkin
21+
request bodies so an agent is able to inform fleet-server of its exact
22+
policy. These details will replace the need for an ack on
23+
policy_change actions, and will be used to determine when to send a
24+
policy change when there is a new revision available, or when the
25+
agent is reassigned to a different policy. Add a server setting under
26+
feature_flags.ignore_checkin_policy_id that disables this behavour and
27+
restores the previous approach.
28+
29+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
30+
component: fleet-server
31+
32+
# PR URL; optional; the PR number that added the changeset.
33+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
34+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
35+
# Please provide it if you are adding a fragment for a different PR.
36+
pr: https://github.com/elastic/fleet-server/pull/5501
37+
38+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
39+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
40+
issue: https://github.com/elastic/elastic-agent/issues/6446

fleet-server.reference.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,13 @@ fleet:
270270
# upstream_url: "https://artifacts.elastic.co/GPG-KEY-elastic-agent"
271271
# # By default dir is the directory containing the fleet-server executable (following symlinks) joined with elastic-agent-upgrade-keys
272272
# dir: ./elastic-agent-upgrade-keys
273+
#
274+
# # Toggles to enable new behaviour or restore old behaviour.
275+
# feature_flags:
276+
# // ignore agent_policy_id and policy_revision_idx attributes that may be present in the checkin request bodies.
277+
# // POLICY_CHANGE actions need an explicit ack if this is set.
278+
# ignore_checkin_policy_id: false
279+
#
273280
# # monitor options are advanced configuration and should not be adjusted is most cases
274281
# monitor:
275282
# fetch_size: 1000 # The number of documents that each monitor may fetch at once

internal/pkg/api/handleAck.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -442,19 +442,28 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
442442
agentID string,
443443
apiKeyID, permissionHash string,
444444
toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, outputName string) error {
445-
bulk := ack.bulk
445+
return updateAPIKey(ctx, zlog, ack.bulk, agentID, apiKeyID, permissionHash, toRetireAPIKeyIDs, outputName)
446+
}
447+
448+
func updateAPIKey(ctx context.Context,
449+
zlog zerolog.Logger,
450+
bulk bulk.Bulk,
451+
agentID string,
452+
apiKeyID, permissionHash string,
453+
toRetireAPIKeyIDs []model.ToRetireAPIKeyIdsItems, outputName string) error {
446454
// use output bulker if exists
455+
outBulk := bulk
447456
if outputName != "" {
448-
outputBulk := ack.bulk.GetBulker(outputName)
457+
outputBulk := bulk.GetBulker(outputName)
449458
if outputBulk != nil {
450459
zlog.Debug().Str(ecs.PolicyOutputName, outputName).Msg("Using output bulker in updateAPIKey")
451-
bulk = outputBulk
460+
outBulk = outputBulk
452461
}
453462
}
454463
if apiKeyID != "" {
455-
res, err := bulk.APIKeyRead(ctx, apiKeyID, true)
464+
res, err := outBulk.APIKeyRead(ctx, apiKeyID, true)
456465
if err != nil {
457-
if isAgentActive(ctx, zlog, ack.bulk, agentID) {
466+
if isAgentActive(ctx, zlog, outBulk, agentID) {
458467
zlog.Warn().
459468
Err(err).
460469
Str(LogAPIKeyID, apiKeyID).
@@ -480,7 +489,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
480489
Str(LogAPIKeyID, apiKeyID).
481490
Msg("Failed to cleanup roles")
482491
} else if removedRolesCount > 0 {
483-
if err := bulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil {
492+
if err := outBulk.APIKeyUpdate(ctx, apiKeyID, permissionHash, clean); err != nil {
484493
zlog.Error().Err(err).RawJSON("roles", clean).Str(LogAPIKeyID, apiKeyID).Str(ecs.PolicyOutputName, outputName).Msg("Failed to update API Key")
485494
} else {
486495
zlog.Debug().
@@ -493,7 +502,7 @@ func (ack *AckT) updateAPIKey(ctx context.Context,
493502
}
494503
}
495504
}
496-
ack.invalidateAPIKeys(ctx, zlog, toRetireAPIKeyIDs, apiKeyID)
505+
invalidateAPIKeys(ctx, zlog, bulk, toRetireAPIKeyIDs, apiKeyID)
497506
}
498507

499508
return nil

internal/pkg/api/handleCheckin.go

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -281,16 +281,34 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
281281
return fmt.Errorf("failed to update upgrade_details: %w", err)
282282
}
283283

284+
initialOpts := []checkin.Option{
285+
checkin.WithStatus(string(req.Status)),
286+
checkin.WithMessage(req.Message),
287+
checkin.WithMeta(rawMeta),
288+
checkin.WithComponents(rawComponents),
289+
checkin.WithSeqNo(seqno),
290+
checkin.WithVer(ver),
291+
checkin.WithUnhealthyReason(unhealthyReason),
292+
checkin.WithDeleteAudit(agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != ""),
293+
}
294+
295+
revID, opts, err := ct.processPolicyDetails(r.Context(), zlog, agent, req)
296+
if err != nil {
297+
return fmt.Errorf("failed to update policy details: %w", err)
298+
}
299+
if len(opts) > 0 {
300+
initialOpts = append(initialOpts, opts...)
301+
}
302+
284303
// Subscribe to actions dispatcher
285304
aSub := ct.ad.Subscribe(zlog, agent.Id, seqno)
286305
defer ct.ad.Unsubscribe(zlog, aSub)
287306
actCh := aSub.Ch()
288307

289-
// use revision_idx=0 if the agent has a single output where no API key is defined
290-
// This will force the policy monitor to emit a new policy to regerate API keys
291-
revID := agent.PolicyRevisionIdx
292308
for _, output := range agent.Outputs {
293309
if output.APIKey == "" {
310+
// use revision_idx=0 if the agent has a single output where no API key is defined
311+
// This will force the policy monitor to emit a new policy to regerate API keys
294312
revID = 0
295313
break
296314
}
@@ -330,7 +348,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
330348
// Initial update on checkin, and any user fields that might have changed
331349
// Run a script to remove audit_unenrolled_* and unenrolled_at attributes if one is set on checkin.
332350
// 8.16.x releases would incorrectly set unenrolled_at
333-
err = ct.bc.CheckIn(agent.Id, checkin.WithStatus(string(req.Status)), checkin.WithMessage(req.Message), checkin.WithMeta(rawMeta), checkin.WithComponents(rawComponents), checkin.WithSeqNo(seqno), checkin.WithVer(ver), checkin.WithUnhealthyReason(unhealthyReason), checkin.WithDeleteAudit(agent.AuditUnenrolledReason != "" || agent.UnenrolledAt != ""))
351+
err = ct.bc.CheckIn(agent.Id, initialOpts...)
334352
if err != nil {
335353
zlog.Error().Err(err).Str(ecs.AgentID, agent.Id).Msg("checkin failed")
336354
}
@@ -1174,3 +1192,55 @@ func calcPollDuration(zlog zerolog.Logger, pollDuration, setupDuration, jitterDu
11741192

11751193
return pollDuration, jitter
11761194
}
1195+
1196+
// processPolicyDetails handles the agent_policy_id and revision_idx included in the checkin request.
1197+
// The API keys will be managed if the agent reports a new policy id from its last checkin, or if the revision is different than what the last checkin reported.
1198+
// It returns the revision idx that should be used when subscribing for new POLICY_CHANGE actons and optional args to use when doing the non-tick checkin.
1199+
func (ct *CheckinT) processPolicyDetails(ctx context.Context, zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) (int64, []checkin.Option, error) {
1200+
// no details specified or attributes are ignored by config
1201+
if ct.cfg.Features.IgnoreCheckinPolicyID || req == nil || req.PolicyRevisionIdx == nil || req.AgentPolicyId == nil {
1202+
return agent.PolicyRevisionIdx, nil, nil
1203+
}
1204+
policyID := *req.AgentPolicyId
1205+
revisionIDX := *req.PolicyRevisionIdx
1206+
1207+
span, ctx := apm.StartSpan(ctx, "Process policy details", "process")
1208+
span.Context.SetLabel("agent_id", agent.Agent.ID)
1209+
span.Context.SetLabel(dl.FieldAgentPolicyID, policyID)
1210+
span.Context.SetLabel(dl.FieldPolicyRevisionIdx, revisionIDX)
1211+
defer span.End()
1212+
1213+
// update agent doc if policy id or revision idx does not match
1214+
var opts []checkin.Option
1215+
if policyID != agent.AgentPolicyID || revisionIDX != agent.PolicyRevisionIdx {
1216+
opts = []checkin.Option{
1217+
checkin.WithAgentPolicyID(policyID),
1218+
checkin.WithPolicyRevisionIDX(revisionIDX),
1219+
}
1220+
}
1221+
// Policy reassign, subscribe to policy with revision 0
1222+
if policyID != agent.PolicyID {
1223+
zlog.Debug().Str(dl.FieldAgentPolicyID, policyID).Str("new_policy_id", agent.PolicyID).Msg("Policy ID mismatch detected, reassigning agent.")
1224+
return 0, opts, nil
1225+
}
1226+
1227+
// Check if the checkin revision_idx is greater than the latest available
1228+
latestRev := ct.pm.LatestRev(ctx, agent.PolicyID)
1229+
if latestRev != 0 && revisionIDX > latestRev {
1230+
revisionIDX = 0 // set return val to 0 so the agent gets latest available revision.
1231+
}
1232+
1233+
// Update API keys if the policy has changed, or if the revision differs.
1234+
if policyID != agent.AgentPolicyID || revisionIDX != agent.PolicyRevisionIdx {
1235+
for outputName, output := range agent.Outputs {
1236+
if output.Type != policy.OutputTypeElasticsearch {
1237+
continue
1238+
}
1239+
if err := updateAPIKey(ctx, zlog, ct.bulker, agent.Id, output.APIKeyID, output.PermissionsHash, output.ToRetireAPIKeyIds, outputName); err != nil {
1240+
// Only returns ErrUpdatingInactiveAgent
1241+
return 0, nil, err
1242+
}
1243+
}
1244+
}
1245+
return revisionIDX, opts, nil
1246+
}

0 commit comments

Comments
 (0)