Skip to content

Commit f8aedce

Browse files
authored
[libbeat] Add a metrics observer to the queue (#39774)
Add a metrics observer to the queue, reporting the metrics: - `queue.added.{events, bytes}`, the number of events/bytes added to the queue - `queue.consumed.{events, bytes}`, the number of events/bytes sent to the outputs - `queue.removed.{events, bytes}`, the number of events/bytes removed from the queue after acknowledgment (`queue.removed.events` is an alias for the existing `queue.acked`). `queue.filled.{events, bytes}`, the current number of events and bytes in the queue (gauges) It also fixes the behavior of `queue.filled.pct.events`, renaming it `queue.filled.pct`. All byte values reported by the memory queue are 0 if the output doesn't support early encoding. This required some refactoring to the pipeline, which previously used a single custom callback to track its only queue metric (`queue.acked`) from `outputObserver`, and also used that to manage a wait group that was used to drain the queue on pipeline shutdown. The main changes are: - A new interface type, `queue.Observer`, with an implementation `queueObserver` for standard metrics reporting. - `queueMaxEvents` and `queueACKed` were removed from `pipeline.outputObserver`, since their logic is now handled by `queue.Observer`. - A queue factory now takes a `queue.Observer` instead of an ACK callback - The queue API now includes a `Done()` channel that signals when all events are acked / shutdown is complete, so shutdown handling now waits on that channel in `outputController.Close` instead of the shared waitgroup in `Pipeline.Close`. - `pipeline.outputObserver` was renamed `pipeline.retryObserver` since its only remaining functions track retries and retry failures. It is now owned by `eventConsumer` (its only caller) instead of `pipeline.outputController`. The queue previously had a `Metrics()` call that was used in the shipper but didn't integrate with Beats metrics. It had no remaining callers, so I deleted it while adding the new helpers.
1 parent 3c9f4d9 commit f8aedce

29 files changed

+733
-765
lines changed

filebeat/tests/system/test_reload_inputs.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,6 @@ def test_start_stop(self):
9191
inputs=False,
9292
)
9393

94-
proc = self.start_beat()
95-
9694
os.mkdir(self.working_dir + "/logs/")
9795
logfile = self.working_dir + "/logs/test.log"
9896
os.mkdir(self.working_dir + "/configs/")
@@ -103,6 +101,8 @@ def test_start_stop(self):
103101
with open(logfile, 'w') as f:
104102
f.write("Hello world\n")
105103

104+
proc = self.start_beat()
105+
106106
self.wait_until(lambda: self.output_lines() == 1)
107107

108108
# Remove input by moving the file

libbeat/docs/metrics-in-logs.asciidoc

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
Every 30 seconds (by default), {beatname_uc} collects a _snapshot_ of metrics about itself. From this snapshot, {beatname_uc} computes a _delta snapshot_; this delta snapshot contains any metrics that have _changed_ since the last snapshot. Note that the values of the metrics are the values when the snapshot is taken, _NOT_ the _difference_ in values from the last snapshot.
44

5-
If this delta snapshot contains _any_ metrics (indicating at least one metric that has changed since the last snapshot), this delta snapshot is serialized as JSON and emitted in {beatname_uc}'s logs at the `INFO` log level. Here is an example of such a log entry:
5+
If this delta snapshot contains _any_ metrics (indicating at least one metric that has changed since the last snapshot), this delta snapshot is serialized as JSON and emitted in {beatname_uc}'s logs at the `INFO` log level. Most snapshot fields report the change in the metric since the last snapshot, however some fields are _gauges_, which always report the current value. Here is an example of such a log entry:
66
77
[source,json]
88
----
9-
{"log.level":"info","@timestamp":"2023-07-14T12:50:36.811Z","log.logger":"monitoring","log.origin":{"file.name":"log/log.go","file.line":187},"message":"Non-zero metrics in the last 30s","service.name":"filebeat","monitoring":{"metrics":{"beat":{"cgroup":{"memory":{"mem":{"usage":{"bytes":0}}}},"cpu":{"system":{"ticks":692690,"time":{"ms":60}},"total":{"ticks":3167250,"time":{"ms":150},"value":3167250},"user":{"ticks":2474560,"time":{"ms":90}}},"handles":{"limit":{"hard":1048576,"soft":1048576},"open":32},"info":{"ephemeral_id":"2bab8688-34c0-4522-80af-db86948d547d","uptime":{"ms":617670096},"version":"8.6.2"},"memstats":{"gc_next":57189272,"memory_alloc":43589824,"memory_total":275281335792,"rss":183574528},"runtime":{"goroutines":212}},"filebeat":{"events":{"active":5,"added":52,"done":49},"harvester":{"open_files":6,"running":6,"started":1}},"libbeat":{"config":{"module":{"running":15}},"output":{"events":{"acked":48,"active":0,"batches":6,"total":48},"read":{"bytes":210},"write":{"bytes":26923}},"pipeline":{"clients":15,"events":{"active":5,"filtered":1,"published":51,"total":52},"queue":{"acked":48}}},"registrar":{"states":{"current":14,"update":49},"writes":{"success":6,"total":6}},"system":{"load":{"1":0.91,"15":0.37,"5":0.4,"norm":{"1":0.1138,"15":0.0463,"5":0.05}}}},"ecs.version":"1.6.0"}}
9+
{"log.level":"info","@timestamp":"2023-07-14T12:50:36.811Z","log.logger":"monitoring","log.origin":{"file.name":"log/log.go","file.line":187},"message":"Non-zero metrics in the last 30s","service.name":"filebeat","monitoring":{"metrics":{"beat":{"cgroup":{"memory":{"mem":{"usage":{"bytes":0}}}},"cpu":{"system":{"ticks":692690,"time":{"ms":60}},"total":{"ticks":3167250,"time":{"ms":150},"value":3167250},"user":{"ticks":2474560,"time":{"ms":90}}},"handles":{"limit":{"hard":1048576,"soft":1048576},"open":32},"info":{"ephemeral_id":"2bab8688-34c0-4522-80af-db86948d547d","uptime":{"ms":617670096},"version":"8.6.2"},"memstats":{"gc_next":57189272,"memory_alloc":43589824,"memory_total":275281335792,"rss":183574528},"runtime":{"goroutines":212}},"filebeat":{"events":{"active":5,"added":52,"done":49},"harvester":{"open_files":6,"running":6,"started":1}},"libbeat":{"config":{"module":{"running":15}},"output":{"events":{"acked":48,"active":0,"batches":6,"total":48},"read":{"bytes":210},"write":{"bytes":26923}},"pipeline":{"clients":15,"events":{"active":5,"filtered":1,"published":51,"total":52},"queue":{"max_events":3500,"filled":{"events":5,"bytes":6425,"pct":0.0014},"added":{"events":52,"bytes":65702},"consumed":{"events":52,"bytes":65702},"removed":{"events":48,"bytes":59277},"acked":48}}},"registrar":{"states":{"current":14,"update":49},"writes":{"success":6,"total":6}},"system":{"load":{"1":0.91,"15":0.37,"5":0.4,"norm":{"1":0.1138,"15":0.0463,"5":0.05}}}},"ecs.version":"1.6.0"}}
1010
----
1111
1212
[discrete]
@@ -113,6 +113,24 @@ Focussing on the `.monitoring.metrics` field, and formatting the JSON, it's valu
113113
"total": 52
114114
},
115115
"queue": {
116+
"max_events": 3500,
117+
"filled": {
118+
"events": 5,
119+
"bytes": 6425,
120+
"pct": 0.0014
121+
},
122+
"added": {
123+
"events": 52,
124+
"bytes": 65702
125+
},
126+
"consumed": {
127+
"events": 52,
128+
"bytes": 65702
129+
},
130+
"removed": {
131+
"events": 48,
132+
"bytes": 59277
133+
},
116134
"acked": 48
117135
}
118136
}
@@ -130,12 +148,12 @@ Focussing on the `.monitoring.metrics` field, and formatting the JSON, it's valu
130148
"system": {
131149
"load": {
132150
"1": 0.91,
133-
"5": 0.4,
134151
"15": 0.37,
152+
"5": 0.4,
135153
"norm": {
136154
"1": 0.1138,
137-
"5": 0.05,
138-
"15": 0.0463
155+
"15": 0.0463,
156+
"5": 0.05
139157
}
140158
}
141159
}
@@ -170,9 +188,30 @@ endif::[]
170188
| `.output.events.total` | Integer | Number of events currently being processed by the output. | If this number grows over time, it may indicate that the output destination (e.g. {ls} pipeline or {es} cluster) is not able to accept events at the same or faster rate than what {beatname_uc} is sending to it.
171189
| `.output.events.acked` | Integer | Number of events acknowledged by the output destination. | Generally, we want this number to be the same as `.output.events.total` as this indicates that the output destination has reliably received all the events sent to it.
172190
| `.output.events.failed` | Integer | Number of events that {beatname_uc} tried to send to the output destination, but the destination failed to receive them. | Generally, we want this field to be absent or its value to be zero. When the value is greater than zero, it's useful to check {beatname_uc}'s logs right before this log entry's `@timestamp` to see if there are any connectivity issues with the output destination. Note that failed events are not lost or dropped; they will be sent back to the publisher pipeline for retrying later.
191+
| `.output.events.dropped` | Integer | Number of events that {beatname_uc} gave up sending to the output destination because of a permanent (non-retryable) error.
192+
| `.output.events.dead_letter` | Integer | Number of events that {beatname_uc} successfully sent to a configured dead letter index after they failed to ingest in the primary index.
173193
| `.output.write.latency` | Object | Reports statistics on the time to send an event to the connected output, in milliseconds. This can be used to diagnose delays and performance issues caused by I/O or output configuration. This metric is available for the Elasticsearch, file, redis, and logstash outputs.
174194
|===
175195
196+
[cols="1,1,2,2"]
197+
|===
198+
| Field path (relative to `.monitoring.metrics.libbeat.pipeline`) | Type | Meaning | Troubleshooting hints
199+
200+
| `.queue.max_events` | Integer (gauge) | The queue's maximum event count if it has one, otherwise zero.
201+
| `.queue.max_bytes` | Integer (gauge) | The queue's maximum byte count if it has one, otherwise zero.
202+
| `.queue.filled.events` | Integer (gauge) | Number of events currently stored by the queue. |
203+
| `.queue.filled.bytes` | Integer (gauge) | Number of bytes currently stored by the queue. |
204+
| `.queue.filled.pct` | Float (gauge) | How full the queue is relative to its maximum size, as a fraction from 0 to 1. | Low throughput while `queue.filled.pct` is low means congestion in the input. Low throughput while `queue.filled.pct` is high means congestion in the output.
205+
| `.queue.added.events` | Integer | Number of events added to the queue by input workers. |
206+
| `.queue.added.bytes` | Integer | Number of bytes added to the queue by input workers. |
207+
| `.queue.consumed.events` | Integer | Number of events sent to output workers. |
208+
| `.queue.consumed.bytes` | Integer | Number of bytes sent to output workers. |
209+
| `.queue.removed.events` | Integer | Number of events removed from the queue after being processed by output workers. |
210+
| `.queue.removed.bytes` | Integer | Number of bytes removed from the queue after being processed by output workers. |
211+
|===
212+
213+
When using the memory queue, byte metrics are only set if the output supports them. Currently only the Elasticsearch output supports byte metrics.
214+
176215
ifeval::["{beatname_lc}"=="filebeat"]
177216
[cols="1,1,2,2"]
178217
|===

libbeat/monitoring/report/log/log.go

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -37,36 +37,39 @@ import (
3737
// TODO: Replace this with a proper solution that uses the metric type from
3838
// where it is defined. See: https://github.com/elastic/beats/issues/5433
3939
var gauges = map[string]bool{
40-
"libbeat.output.events.active": true,
41-
"libbeat.pipeline.events.active": true,
42-
"libbeat.pipeline.clients": true,
43-
"libbeat.pipeline.queue.max_events": true,
44-
"libbeat.pipeline.queue.filled.pct.events": true,
45-
"libbeat.config.module.running": true,
46-
"registrar.states.current": true,
47-
"filebeat.events.active": true,
48-
"filebeat.harvester.running": true,
49-
"filebeat.harvester.open_files": true,
50-
"beat.memstats.memory_total": true,
51-
"beat.memstats.memory_alloc": true,
52-
"beat.memstats.rss": true,
53-
"beat.memstats.gc_next": true,
54-
"beat.info.uptime.ms": true,
55-
"beat.cgroup.memory.mem.usage.bytes": true,
56-
"beat.cpu.user.ticks": true,
57-
"beat.cpu.system.ticks": true,
58-
"beat.cpu.total.value": true,
59-
"beat.cpu.total.ticks": true,
60-
"beat.handles.open": true,
61-
"beat.handles.limit.hard": true,
62-
"beat.handles.limit.soft": true,
63-
"beat.runtime.goroutines": true,
64-
"system.load.1": true,
65-
"system.load.5": true,
66-
"system.load.15": true,
67-
"system.load.norm.1": true,
68-
"system.load.norm.5": true,
69-
"system.load.norm.15": true,
40+
"libbeat.output.events.active": true,
41+
"libbeat.pipeline.events.active": true,
42+
"libbeat.pipeline.clients": true,
43+
"libbeat.pipeline.queue.max_events": true,
44+
"libbeat.pipeline.queue.max_bytes": true,
45+
"libbeat.pipeline.queue.filled.events": true,
46+
"libbeat.pipeline.queue.filled.bytes": true,
47+
"libbeat.pipeline.queue.filled.pct": true,
48+
"libbeat.config.module.running": true,
49+
"registrar.states.current": true,
50+
"filebeat.events.active": true,
51+
"filebeat.harvester.running": true,
52+
"filebeat.harvester.open_files": true,
53+
"beat.memstats.memory_total": true,
54+
"beat.memstats.memory_alloc": true,
55+
"beat.memstats.rss": true,
56+
"beat.memstats.gc_next": true,
57+
"beat.info.uptime.ms": true,
58+
"beat.cgroup.memory.mem.usage.bytes": true,
59+
"beat.cpu.user.ticks": true,
60+
"beat.cpu.system.ticks": true,
61+
"beat.cpu.total.value": true,
62+
"beat.cpu.total.ticks": true,
63+
"beat.handles.open": true,
64+
"beat.handles.limit.hard": true,
65+
"beat.handles.limit.soft": true,
66+
"beat.runtime.goroutines": true,
67+
"system.load.1": true,
68+
"system.load.5": true,
69+
"system.load.15": true,
70+
"system.load.norm.1": true,
71+
"system.load.norm.5": true,
72+
"system.load.norm.15": true,
7073
}
7174

7275
// isGauge returns true when the given metric key name represents a gauge value.

libbeat/publisher/pipeline/client.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,8 @@ type client struct {
3737
mutex sync.Mutex
3838
waiter *clientCloseWaiter
3939

40-
eventFlags publisher.EventFlags
41-
canDrop bool
42-
eventWaitGroup *sync.WaitGroup
40+
eventFlags publisher.EventFlags
41+
canDrop bool
4342

4443
// Open state, signaling, and sync primitives for coordinating client Close.
4544
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
@@ -132,10 +131,8 @@ func (c *client) publish(e beat.Event) {
132131
}
133132

134133
func (c *client) Close() error {
135-
// first stop ack handling. ACK handler might block on wait (with timeout), waiting
136-
// for pending events to be ACKed.
137-
c.closeOnce.Do(func() {
138-
c.isOpen.Store(false)
134+
if c.isOpen.Swap(false) {
135+
// Only do shutdown handling the first time Close is called
139136
c.onClosing()
140137

141138
c.logger.Debug("client: closing acker")
@@ -158,7 +155,7 @@ func (c *client) Close() error {
158155
}
159156
c.logger.Debug("client: done closing processors")
160157
}
161-
})
158+
}
162159
return nil
163160
}
164161

@@ -180,9 +177,6 @@ func (c *client) onNewEvent() {
180177
}
181178

182179
func (c *client) onPublished() {
183-
if c.eventWaitGroup != nil {
184-
c.eventWaitGroup.Add(1)
185-
}
186180
c.observer.publishedEvent()
187181
if c.clientListener != nil {
188182
c.clientListener.Published()

libbeat/publisher/pipeline/client_test.go

Lines changed: 21 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -60,59 +60,27 @@ func TestClient(t *testing.T) {
6060
// Note: no asserts. If closing fails we have a deadlock, because Publish
6161
// would block forever
6262

63-
cases := map[string]struct {
64-
context bool
65-
close func(client beat.Client, cancel func())
66-
}{
67-
"close unblocks client without context": {
68-
context: false,
69-
close: func(client beat.Client, _ func()) {
70-
client.Close()
71-
},
72-
},
73-
"close unblocks client with context": {
74-
context: true,
75-
close: func(client beat.Client, _ func()) {
76-
client.Close()
77-
},
78-
},
79-
"context cancel unblocks client": {
80-
context: true,
81-
close: func(client beat.Client, cancel func()) {
82-
cancel()
83-
},
84-
},
85-
}
86-
8763
logp.TestingSetup()
64+
routinesChecker := resources.NewGoroutinesChecker()
65+
defer routinesChecker.Check(t)
8866

89-
for name, test := range cases {
90-
t.Run(name, func(t *testing.T) {
91-
routinesChecker := resources.NewGoroutinesChecker()
92-
defer routinesChecker.Check(t)
67+
pipeline := makePipeline(t, Settings{}, makeTestQueue())
68+
defer pipeline.Close()
9369

94-
pipeline := makePipeline(t, Settings{}, makeTestQueue())
95-
defer pipeline.Close()
96-
97-
client, err := pipeline.ConnectWith(beat.ClientConfig{})
98-
if err != nil {
99-
t.Fatal(err)
100-
}
101-
defer client.Close()
70+
client, err := pipeline.ConnectWith(beat.ClientConfig{})
71+
if err != nil {
72+
t.Fatal(err)
73+
}
10274

103-
var wg sync.WaitGroup
104-
wg.Add(1)
105-
go func() {
106-
defer wg.Done()
107-
client.Publish(beat.Event{})
108-
}()
75+
var wg sync.WaitGroup
76+
wg.Add(1)
77+
go func() {
78+
defer wg.Done()
79+
client.Publish(beat.Event{})
80+
}()
10981

110-
test.close(client, func() {
111-
client.Close()
112-
})
113-
wg.Wait()
114-
})
115-
}
82+
client.Close()
83+
wg.Wait()
11684
})
11785

11886
t.Run("no infinite loop when processing fails", func(t *testing.T) {
@@ -216,9 +184,6 @@ func TestClient(t *testing.T) {
216184
}
217185

218186
func TestClientWaitClose(t *testing.T) {
219-
routinesChecker := resources.NewGoroutinesChecker()
220-
defer routinesChecker.Check(t)
221-
222187
makePipeline := func(settings Settings, qu queue.Queue) *Pipeline {
223188
p, err := New(beat.Info{},
224189
Monitors{},
@@ -241,6 +206,9 @@ func TestClientWaitClose(t *testing.T) {
241206
defer pipeline.Close()
242207

243208
t.Run("WaitClose blocks", func(t *testing.T) {
209+
routinesChecker := resources.NewGoroutinesChecker()
210+
defer routinesChecker.Check(t)
211+
244212
client, err := pipeline.ConnectWith(beat.ClientConfig{
245213
WaitClose: 500 * time.Millisecond,
246214
})
@@ -272,6 +240,8 @@ func TestClientWaitClose(t *testing.T) {
272240
})
273241

274242
t.Run("ACKing events unblocks WaitClose", func(t *testing.T) {
243+
routinesChecker := resources.NewGoroutinesChecker()
244+
defer routinesChecker.Check(t)
275245
client, err := pipeline.ConnectWith(beat.ClientConfig{
276246
WaitClose: time.Minute,
277247
})
@@ -344,9 +314,6 @@ func TestMonitoring(t *testing.T) {
344314
require.NoError(t, err)
345315
defer pipeline.Close()
346316

347-
metricsSnapshot := monitoring.CollectFlatSnapshot(metrics, monitoring.Full, true)
348-
assert.Equal(t, int64(maxEvents), metricsSnapshot.Ints["pipeline.queue.max_events"])
349-
350317
telemetrySnapshot := monitoring.CollectFlatSnapshot(telemetry, monitoring.Full, true)
351318
assert.Equal(t, "output_name", telemetrySnapshot.Strings["output.name"])
352319
assert.Equal(t, int64(batchSize), telemetrySnapshot.Ints["output.batch_size"])

libbeat/publisher/pipeline/consumer.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ import (
3131
type eventConsumer struct {
3232
logger *logp.Logger
3333

34-
// eventConsumer calls the observer methods eventsRetry and eventsDropped.
35-
observer outputObserver
34+
// eventConsumer calls the retryObserver methods eventsRetry and eventsDropped.
35+
retryObserver retryObserver
3636

3737
// When the output changes, the new target is sent to the worker routine
3838
// on this channel. Clients should call eventConsumer.setTarget().
@@ -73,12 +73,12 @@ type retryRequest struct {
7373

7474
func newEventConsumer(
7575
log *logp.Logger,
76-
observer outputObserver,
76+
observer retryObserver,
7777
) *eventConsumer {
7878
c := &eventConsumer{
79-
logger: log,
80-
observer: observer,
81-
queueReader: makeQueueReader(),
79+
logger: log,
80+
retryObserver: observer,
81+
queueReader: makeQueueReader(),
8282

8383
targetChan: make(chan consumerTarget),
8484
retryChan: make(chan retryRequest),
@@ -163,7 +163,7 @@ outerLoop:
163163
// Successfully sent a batch to the output workers
164164
if len(retryBatches) > 0 {
165165
// This was a retry, report it to the observer
166-
c.observer.eventsRetry(len(active.Events()))
166+
c.retryObserver.eventsRetry(len(active.Events()))
167167
retryBatches = retryBatches[1:]
168168
} else {
169169
// This was directly from the queue, clear the value so we can
@@ -183,7 +183,7 @@ outerLoop:
183183
alive := req.batch.reduceTTL()
184184

185185
countDropped := countFailed - len(req.batch.Events())
186-
c.observer.eventsDropped(countDropped)
186+
c.retryObserver.eventsDropped(countDropped)
187187

188188
if !alive {
189189
log.Info("Drop batch")

0 commit comments

Comments
 (0)