Skip to content

Commit 37db599

Browse files
authored
Clean up / document metrics monitor fields (#39413)
Document the semantics of many metrics monitoring variables, and rename some metrics APIs to more clearly indicate their function. As a side effect, fix several metrics reporting / publishing bugs in the Elasticsearch output, including #39146. Add the `output.events.dead_letter` metric to distinguish events that were ingested to the dead letter index after a fatal error (previously these events were just reported as "acked"). This could have been a shorter fix, but it was hard to properly test since the metrics were changed from two separate functions with a lot of special cases. I ended up reorganizing the Elasticsearch `Publish` helpers to make the logic more clear. The new layout makes it much easier to test the error handling and metrics reporting. The bugs fixed by this refactor are: - When a batch was split, the events in it were not reported to the observer via `RetryableErrors`. This caused `activeEvents` to increase permanently even after the events were handled. - When a previously-failed event was ingested to the dead letter index as a raw string, it was reported to the observer as `Acked` (success). The new logic creates a new `dead_letter` metric specifically for this case. - When a previously-failed event encountered a fatal error ingesting to the dead letter index: * It was reported to the observer as both a permanent error and a retryable error, giving incorrect event counts. It's now reported as just a permanent error. * It was added to the retry list anyway, which would block all ingestion if the error really was fatal since the queue could never advance past that event. It's now dropped, the same as with a permanent error in the main index. - If the Elasticsearch bulk index response was invalid, the associated events were dropped and reported as acknowledged
1 parent 727777e commit 37db599

File tree

22 files changed

+677
-431
lines changed

22 files changed

+677
-431
lines changed

filebeat/input/filestream/internal/task/group_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,8 @@ func TestGroup_Go(t *testing.T) {
195195
})
196196

197197
t.Run("without limit, all goroutines run", func(t *testing.T) {
198-
// 100 <= limit <= 100000
199-
limit := rand.Int63n(100000-100) + 100
198+
// 100 <= limit <= 10000
199+
limit := rand.Int63n(10000-100) + 100
200200
t.Logf("running %d goroutines", limit)
201201
g := NewGroup(uint64(limit), time.Second, noopLogger{}, "")
202202

libbeat/esleg/eslegclient/bulkapi.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"encoding/json"
2424
"errors"
2525
"io"
26-
"io/ioutil"
2726
"net/http"
2827
"strings"
2928

@@ -60,16 +59,16 @@ type bulkRequest struct {
6059
requ *http.Request
6160
}
6261

63-
// BulkResult contains the result of a bulk API request.
64-
type BulkResult json.RawMessage
62+
// BulkResponse contains the result of a bulk API request.
63+
type BulkResponse json.RawMessage
6564

6665
// Bulk performs many index/delete operations in a single API call.
6766
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
6867
func (conn *Connection) Bulk(
6968
ctx context.Context,
7069
index, docType string,
7170
params map[string]string, body []interface{},
72-
) (int, BulkResult, error) {
71+
) (int, BulkResponse, error) {
7372
if len(body) == 0 {
7473
return 0, nil, nil
7574
}
@@ -142,7 +141,7 @@ func (r *bulkRequest) reset(body BodyEncoder) {
142141

143142
rc, ok := bdy.(io.ReadCloser)
144143
if !ok && body != nil {
145-
rc = ioutil.NopCloser(bdy)
144+
rc = io.NopCloser(bdy)
146145
}
147146

148147
switch v := bdy.(type) {
@@ -160,9 +159,9 @@ func (r *bulkRequest) reset(body BodyEncoder) {
160159
body.AddHeader(&r.requ.Header)
161160
}
162161

163-
func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, BulkResult, error) {
162+
func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, BulkResponse, error) {
164163
status, resp, err := conn.execHTTPRequest(requ.requ)
165-
return status, BulkResult(resp), err
164+
return status, BulkResponse(resp), err
166165
}
167166

168167
func bulkEncode(log *logp.Logger, out BulkWriter, body []interface{}) error {

libbeat/monitoring/report/elasticsearch/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ func getMonitoringIndexName() string {
224224
return fmt.Sprintf(".monitoring-beats-%v-%s", version, date)
225225
}
226226

227-
func logBulkFailures(log *logp.Logger, result eslegclient.BulkResult, events []report.Event) {
227+
func logBulkFailures(log *logp.Logger, result eslegclient.BulkResponse, events []report.Event) {
228228
var response struct {
229229
Items []map[string]map[string]interface{} `json:"items"`
230230
}

libbeat/outputs/console/console.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ func (c *console) Publish(_ context.Context, batch publisher.Batch) error {
111111
c.writer.Flush()
112112
batch.ACK()
113113

114-
st.Dropped(dropped)
115-
st.Acked(len(events) - dropped)
114+
st.PermanentErrors(dropped)
115+
st.AckedEvents(len(events) - dropped)
116116

117117
return nil
118118
}

libbeat/outputs/discard/discard.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (out *discardOutput) Publish(_ context.Context, batch publisher.Batch) erro
7070
st := out.observer
7171
events := batch.Events()
7272
st.NewBatch(len(events))
73-
st.Acked(len(events))
73+
st.AckedEvents(len(events))
7474
return nil
7575
}
7676

0 commit comments

Comments
 (0)