Skip to content

Commit 4449e60

Browse files
mergify[bot]efd6
andauthored
x-pack/filebeat/input/cel: log unpublished event count and exit publish loop on context cancellation (#47730) (#47909)
(cherry picked from commit ab3d2c3) Co-authored-by: Dan Kortschak <[email protected]>
1 parent 6ef0e27 commit 4449e60

File tree

2 files changed

+65
-0
lines changed

2 files changed

+65
-0
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# REQUIRED
2+
# Kind can be one of:
3+
# - breaking-change: a change to previously-documented behavior
4+
# - deprecation: functionality that is being removed in a later release
5+
# - bug-fix: fixes a problem in a previous version
6+
# - enhancement: extends functionality but does not break or fix existing behavior
7+
# - feature: new functionality
8+
# - known-issue: problems that we are aware of in a given version
9+
# - security: impacts on the security of a product or a user’s deployment.
10+
# - upgrade: important information for someone upgrading from a prior version
11+
# - other: does not fit into any of the other categories
12+
kind: feature
13+
14+
# REQUIRED for all kinds
15+
# Change summary; a 80ish characters long description of the change.
16+
summary: Log unpublished event count and exit publish loop on input context cancellation.
17+
18+
# REQUIRED for breaking-change, deprecation, known-issue
19+
# Long description; in case the summary is not enough to describe the change
20+
# this field accommodate a description without length limits.
21+
# description:
22+
23+
# REQUIRED for breaking-change, deprecation, known-issue
24+
# impact:
25+
26+
# REQUIRED for breaking-change, deprecation, known-issue
27+
# action:
28+
29+
# REQUIRED for all kinds
30+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
31+
component: filebeat
32+
33+
# AUTOMATED
34+
# OPTIONAL to manually add other PR URLs
35+
# PR URL: A link the PR that added the changeset.
36+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
37+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
38+
# Please provide it if you are adding a fragment for a different PR.
39+
# pr: https://github.com/owner/repo/1234
40+
41+
# AUTOMATED
42+
# OPTIONAL to manually add other issue URLs
43+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
44+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
45+
# issue: https://github.com/owner/repo/1234

x-pack/filebeat/input/cel/input.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
512512

513513
start = time.Now()
514514
var hadPublicationError bool
515+
loop:
515516
for i, e := range events {
516517
event, ok := e.(map[string]interface{})
517518
if !ok {
@@ -539,6 +540,25 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
539540
pubCursor = cursor
540541
}
541542
}
543+
// This is checked prior to the publish attempt since the
544+
// cursor.Publisher interface does not document the behaviour
545+
// related to context cancellation and the context is not
546+
// explicitly passed in, so favour this explicit clarity.
547+
switch err := ctx.Err(); {
548+
case err == nil:
549+
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
550+
log.Infow("context cancelled with unpublished events", "unpublished", len(events)-i)
551+
// Don't update status, since we are about to pass
552+
// through the Running state and then fall through
553+
// to the input exit with a change to Stopped.
554+
break loop
555+
default:
556+
// This should never happen.
557+
log.Warnw("failed with unpublished events", "error", err, "unpublished", len(events)-i)
558+
health.UpdateStatus(status.Degraded, "error publishing events: "+err.Error())
559+
isDegraded = true
560+
break loop
561+
}
542562
err = pub.Publish(beat.Event{
543563
Timestamp: time.Now(),
544564
Fields: event,

0 commit comments

Comments
 (0)