diff --git a/filebeat/input/filestream/internal/task/group_test.go b/filebeat/input/filestream/internal/task/group_test.go index 5ce15d455e3e..30b9858a1de3 100644 --- a/filebeat/input/filestream/internal/task/group_test.go +++ b/filebeat/input/filestream/internal/task/group_test.go @@ -195,8 +195,8 @@ func TestGroup_Go(t *testing.T) { }) t.Run("without limit, all goroutines run", func(t *testing.T) { - // 100 <= limit <= 100000 - limit := rand.Int63n(100000-100) + 100 + // 100 <= limit <= 10000 + limit := rand.Int63n(10000-100) + 100 t.Logf("running %d goroutines", limit) g := NewGroup(uint64(limit), time.Second, noopLogger{}, "") diff --git a/libbeat/esleg/eslegclient/bulkapi.go b/libbeat/esleg/eslegclient/bulkapi.go index 56a36ccf1452..8512426ed9ed 100644 --- a/libbeat/esleg/eslegclient/bulkapi.go +++ b/libbeat/esleg/eslegclient/bulkapi.go @@ -23,7 +23,6 @@ import ( "encoding/json" "errors" "io" - "io/ioutil" "net/http" "strings" @@ -60,8 +59,8 @@ type bulkRequest struct { requ *http.Request } -// BulkResult contains the result of a bulk API request. -type BulkResult json.RawMessage +// BulkResponse contains the result of a bulk API request. +type BulkResponse json.RawMessage // Bulk performs many index/delete operations in a single API call. // Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html @@ -69,7 +68,7 @@ func (conn *Connection) Bulk( ctx context.Context, index, docType string, params map[string]string, body []interface{}, -) (int, BulkResult, error) { +) (int, BulkResponse, error) { if len(body) == 0 { return 0, nil, nil } @@ -142,7 +141,7 @@ func (r *bulkRequest) reset(body BodyEncoder) { rc, ok := bdy.(io.ReadCloser) if !ok && body != nil { - rc = ioutil.NopCloser(bdy) + rc = io.NopCloser(bdy) } switch v := bdy.(type) { @@ -160,9 +159,9 @@ func (r *bulkRequest) reset(body BodyEncoder) { body.AddHeader(&r.requ.Header) } -func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, BulkResult, error) { +func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, BulkResponse, error) { status, resp, err := conn.execHTTPRequest(requ.requ) - return status, BulkResult(resp), err + return status, BulkResponse(resp), err } func bulkEncode(log *logp.Logger, out BulkWriter, body []interface{}) error { diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index fbc4fcef772a..56f56ac8e1e3 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -224,7 +224,7 @@ func getMonitoringIndexName() string { return fmt.Sprintf(".monitoring-beats-%v-%s", version, date) } -func logBulkFailures(log *logp.Logger, result eslegclient.BulkResult, events []report.Event) { +func logBulkFailures(log *logp.Logger, result eslegclient.BulkResponse, events []report.Event) { var response struct { Items []map[string]map[string]interface{} `json:"items"` } diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index f723bf818c91..867316fe3a97 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -111,8 +111,8 @@ func (c *console) Publish(_ context.Context, batch publisher.Batch) error { c.writer.Flush() batch.ACK() - st.Dropped(dropped) - st.Acked(len(events) - dropped) + st.PermanentErrors(dropped) + st.AckedEvents(len(events) - dropped) return nil } diff --git a/libbeat/outputs/discard/discard.go b/libbeat/outputs/discard/discard.go index c9a51b0f33df..bfd1a1c1add0 100644 --- a/libbeat/outputs/discard/discard.go +++ b/libbeat/outputs/discard/discard.go @@ -70,7 +70,7 @@ func (out *discardOutput) Publish(_ context.Context, batch publisher.Batch) erro st := out.observer events := batch.Events() st.NewBatch(len(events)) - st.Acked(len(events)) + st.AckedEvents(len(events)) return nil } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 0892ce401731..e05c4e0b261f 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -26,6 +26,7 @@ import ( "time" "go.elastic.co/apm/v2" + "gotest.tools/gotestsum/log" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/beat/events" @@ -66,7 +67,11 @@ type clientSettings struct { connection eslegclient.ConnectionSettings indexSelector outputs.IndexSelector pipelineSelector *outil.Selector - observer outputs.Observer + + // The metrics observer from the clientSettings, or a no-op placeholder if + // none is provided. This variable is always non-nil for a client created + // via NewClient. + observer outputs.Observer // If deadLetterIndex is set, events with bulk-ingest errors will be // forwarded to this index. Otherwise, they will be dropped. @@ -76,15 +81,40 @@ type clientSettings struct { type bulkResultStats struct { acked int // number of events ACKed by Elasticsearch duplicates int // number of events failed with `create` due to ID already being indexed - fails int // number of failed events (can be retried) - nonIndexable int // number of failed events (not indexable) + fails int // number of events with retryable failures. + nonIndexable int // number of events with permanent failures. + deadLetter int // number of failed events ingested to the dead letter index. tooMany int // number of events receiving HTTP 429 Too Many Requests } +type bulkResult struct { + // A connection-level error if the request couldn't be sent or the response + // couldn't be read. This error is returned from (*Client).Publish to signal + // to the pipeline that this output worker needs to be reconnected before the + // next Publish call. + connErr error + + // The array of events sent via bulk request. This excludes any events that + // had encoding errors while assembling the request. + events []publisher.Event + + // The http status returned by the bulk request. + status int + + // The API response from Elasticsearch. + response eslegclient.BulkResponse +} + const ( defaultEventType = "doc" ) +// Flags passed with the Bulk API request: we filter the response to include +// only the fields we need for checking request/item state. +var bulkRequestParams = map[string]string{ + "filter_path": "errors,items.*.error,items.*.status", +} + // NewClient instantiates a new client. func NewClient( s clientSettings, @@ -125,11 +155,17 @@ func NewClient( return nil } + // Make sure there's a non-nil obser + observer := s.observer + if observer == nil { + observer = outputs.NewNilObserver() + } + client := &Client{ conn: *conn, indexSelector: s.indexSelector, pipelineSelector: pipeline, - observer: s.observer, + observer: observer, deadLetterIndex: s.deadLetterIndex, log: logp.NewLogger("elasticsearch"), @@ -178,119 +214,104 @@ func (client *Client) Clone() *Client { } func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error { - events := batch.Events() - rest, err := client.publishEvents(ctx, events) - - switch { - case errors.Is(err, errPayloadTooLarge): - if batch.SplitRetry() { - // Report that we split a batch - client.observer.Split() - } else { - // If the batch could not be split, there is no option left but - // to drop it and log the error state. - batch.Drop() - client.observer.Dropped(len(events)) - err := apm.CaptureError(ctx, fmt.Errorf("failed to perform bulk index operation: %w", err)) - err.Send() - client.log.Error(err) - } - // Returning an error from Publish forces a client close / reconnect, - // so don't pass this error through since it doesn't indicate anything - // wrong with the connection. - return nil - case len(rest) == 0: - batch.ACK() - default: - batch.RetryEvents(rest) - } - return err -} - -// PublishEvents sends all events to elasticsearch. On error a slice with all -// events not published or confirmed to be processed by elasticsearch will be -// returned. The input slice backing memory will be reused by return the value. -func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) ([]publisher.Event, error) { span, ctx := apm.StartSpan(ctx, "publishEvents", "output") defer span.End() + span.Context.SetLabel("events_original", len(batch.Events())) + client.observer.NewBatch(len(batch.Events())) + + // Create and send the bulk request. + bulkResult := client.doBulkRequest(ctx, batch) + span.Context.SetLabel("events_encoded", len(bulkResult.events)) + if bulkResult.connErr != nil { + // If there was a connection-level error there is no per-item response, + // handle it and return. + return client.handleBulkResultError(ctx, batch, bulkResult) + } + span.Context.SetLabel("events_published", len(bulkResult.events)) - st := client.observer + // At this point we have an Elasticsearch response for our request, + // check and report the per-item results. + eventsToRetry, stats := client.bulkCollectPublishFails(bulkResult) + stats.reportToObserver(client.observer) - if st != nil { - st.NewBatch(len(data)) + if len(eventsToRetry) > 0 { + span.Context.SetLabel("events_failed", len(eventsToRetry)) + batch.RetryEvents(eventsToRetry) + } else { + batch.ACK() } + return nil +} - if len(data) == 0 { - return nil, nil - } +// Encode a batch's events into a bulk publish request, send the request to +// Elasticsearch, and return the resulting metadata. +// Reports the network request latency to the client's metrics observer. +// The events list in the result will be shorter than the original batch if +// some events couldn't be encoded. In this case, the removed events will +// be reported to the Client's metrics observer via PermanentErrors. +func (client *Client) doBulkRequest( + ctx context.Context, + batch publisher.Batch, +) bulkResult { + var result bulkResult + + rawEvents := batch.Events() // encode events into bulk request buffer, dropping failed elements from // events slice - origCount := len(data) - span.Context.SetLabel("events_original", origCount) - data, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), data) - newCount := len(data) - span.Context.SetLabel("events_encoded", newCount) - if st != nil && origCount > newCount { - st.Dropped(origCount - newCount) - } - if newCount == 0 { - return nil, nil - } - - begin := time.Now() - params := map[string]string{"filter_path": "errors,items.*.error,items.*.status"} - status, result, sendErr := client.conn.Bulk(ctx, "", "", params, bulkItems) - timeSinceSend := time.Since(begin) - - if sendErr != nil { - if status == http.StatusRequestEntityTooLarge { - // This error must be handled by splitting the batch, propagate it - // back to Publish instead of reporting it directly - return data, errPayloadTooLarge + resultEvents, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), rawEvents) + result.events = resultEvents + client.observer.PermanentErrors(len(rawEvents) - len(resultEvents)) + + // If we encoded any events, send the network request. + if len(result.events) > 0 { + begin := time.Now() + result.status, result.response, result.connErr = + client.conn.Bulk(ctx, "", "", bulkRequestParams, bulkItems) + if result.connErr == nil { + duration := time.Since(begin) + client.observer.ReportLatency(duration) + client.log.Debugf( + "doBulkRequest: %d events have been sent to elasticsearch in %v.", + len(result.events), duration) } - err := apm.CaptureError(ctx, fmt.Errorf("failed to perform any bulk index operations: %w", sendErr)) - err.Send() - client.log.Error(err) - return data, sendErr - } - pubCount := len(data) - span.Context.SetLabel("events_published", pubCount) - - client.log.Debugf("PublishEvents: %d events have been published to elasticsearch in %v.", - pubCount, - timeSinceSend) - - // check response for transient errors - var failedEvents []publisher.Event - var stats bulkResultStats - if status != 200 { - failedEvents = data - stats.fails = len(failedEvents) - } else { - failedEvents, stats = client.bulkCollectPublishFails(result, data) } - failed := len(failedEvents) - span.Context.SetLabel("events_failed", failed) - if st := client.observer; st != nil { - dropped := stats.nonIndexable - duplicates := stats.duplicates - acked := len(data) - failed - dropped - duplicates - - st.Acked(acked) - st.Failed(failed) - st.Dropped(dropped) - st.Duplicate(duplicates) - st.ErrTooMany(stats.tooMany) - st.ReportLatency(timeSinceSend) + return result +} +func (client *Client) handleBulkResultError( + ctx context.Context, batch publisher.Batch, bulkResult bulkResult, +) error { + if bulkResult.status == http.StatusRequestEntityTooLarge { + if batch.SplitRetry() { + // Report that we split a batch + client.observer.BatchSplit() + client.observer.RetryableErrors(len(bulkResult.events)) + } else { + // If the batch could not be split, there is no option left but + // to drop it and log the error state. + batch.Drop() + client.observer.PermanentErrors(len(bulkResult.events)) + client.log.Error(errPayloadTooLarge) + } + // Don't propagate a too-large error since it doesn't indicate a problem + // with the connection. + return nil } + err := apm.CaptureError(ctx, fmt.Errorf("failed to perform any bulk index operations: %w", bulkResult.connErr)) + err.Send() + client.log.Error(err) - if failed > 0 { - return failedEvents, eslegclient.ErrTempBulkFailure + if len(bulkResult.events) > 0 { + // At least some events failed, retry them + batch.RetryEvents(bulkResult.events) + } else { + // All events were sent successfully + batch.ACK() } - return nil, nil + client.observer.RetryableErrors(len(bulkResult.events)) + return bulkResult.connErr } // bulkEncodePublishRequest encodes all bulk requests and returns slice of events @@ -380,80 +401,108 @@ func getPipeline(event *beat.Event, defaultSelector *outil.Selector) (string, er // to be tried again due to error code returned for that items. If indexing an // event failed due to some error in the event itself (e.g. does not respect mapping), // the event will be dropped. -func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, data []publisher.Event) ([]publisher.Event, bulkResultStats) { - reader := newJSONReader(result) +// Each of the events will be reported in the returned stats as exactly one of +// acked, duplicates, fails, nonIndexable, or deadLetter. +func (client *Client) bulkCollectPublishFails(bulkResult bulkResult) ([]publisher.Event, bulkResultStats) { + events := bulkResult.events + + if len(bulkResult.events) == 0 { + // No events to process + return nil, bulkResultStats{} + } + if bulkResult.status != 200 { + return events, bulkResultStats{fails: len(events)} + } + reader := newJSONReader(bulkResult.response) if err := bulkReadToItems(reader); err != nil { client.log.Errorf("failed to parse bulk response: %v", err.Error()) - return nil, bulkResultStats{} + return events, bulkResultStats{fails: len(events)} } - count := len(data) - failed := data[:0] + count := len(events) + eventsToRetry := events[:0] stats := bulkResultStats{} for i := 0; i < count; i++ { - status, msg, err := bulkReadItemStatus(client.log, reader) + itemStatus, itemMessage, err := bulkReadItemStatus(client.log, reader) if err != nil { - client.log.Error(err) - return nil, bulkResultStats{} + // The response json is invalid, mark the remaining events for retry. + stats.fails += count - i + eventsToRetry = append(eventsToRetry, events[i:]...) + break } - if status < 300 { - stats.acked++ - continue // ok value + if client.applyItemStatus(events[i], itemStatus, itemMessage, &stats) { + eventsToRetry = append(eventsToRetry, events[i]) + log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, itemStatus, itemMessage) } + } - if status == 409 { - // 409 is used to indicate an event with same ID already exists if - // `create` op_type is used. - stats.duplicates++ - continue // ok - } + return eventsToRetry, stats +} - if status < 500 { - if status == http.StatusTooManyRequests { - stats.tooMany++ - } else { - // hard failure, apply policy action - encodedEvent := data[i].EncodedEvent.(*encodedEvent) - if encodedEvent.deadLetter { - stats.nonIndexable++ - client.log.Errorf("Can't deliver to dead letter index event (status=%v). Look at the event log to view the event and cause.", status) - client.log.Errorw(fmt.Sprintf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg), logp.TypeKey, logp.EventType) - // poison pill - this will clog the pipeline if the underlying failure is non transient. - } else if client.deadLetterIndex != "" { - client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Look at the event log to view the event and cause.", status) - client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg), logp.TypeKey, logp.EventType) - client.setDeadLetter(encodedEvent, status, string(msg)) - - } else { // drop - stats.nonIndexable++ - client.log.Warnf("Cannot index event (status=%v): dropping event! Look at the event log to view the event and cause.", status) - client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg), logp.TypeKey, logp.EventType) - continue - } - } +// applyItemStatus processes the ingestion status of one event from a bulk request. +// Returns true if the item should be retried. +// In the provided bulkResultStats, applyItemStatus increments exactly one of: +// acked, duplicates, deadLetter, fails, nonIndexable. +func (client *Client) applyItemStatus( + event publisher.Event, + itemStatus int, + itemMessage []byte, + stats *bulkResultStats, +) bool { + encodedEvent := event.EncodedEvent.(*encodedEvent) + if itemStatus < 300 { + if encodedEvent.deadLetter { + // This was ingested into the dead letter index, not the original target + stats.deadLetter++ + } else { + stats.acked++ } + return false // no retry needed + } - client.log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg) - stats.fails++ - failed = append(failed, data[i]) + if itemStatus == 409 { + // 409 is used to indicate there is already an event with the same ID, or + // with identical Time Series Data Stream dimensions when TSDS is active. + stats.duplicates++ + return false // no retry needed } - return failed, stats -} + if itemStatus == http.StatusTooManyRequests { + stats.fails++ + stats.tooMany++ + return true + } -func (client *Client) setDeadLetter( - encodedEvent *encodedEvent, errType int, errMsg string, -) { - encodedEvent.deadLetter = true - encodedEvent.index = client.deadLetterIndex - deadLetterReencoding := mapstr.M{ - "@timestamp": encodedEvent.timestamp, - "message": string(encodedEvent.encoding), - "error.type": errType, - "error.message": errMsg, + if itemStatus < 500 { + // hard failure, apply policy action + if encodedEvent.deadLetter { + // Fatal error while sending an already-failed event to the dead letter + // index, drop. + client.log.Errorf("Can't deliver to dead letter index event (status=%v). Look at the event log to view the event and cause.", itemStatus) + client.log.Errorw(fmt.Sprintf("Can't deliver to dead letter index event %#v (status=%v): %s", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType) + stats.nonIndexable++ + return false + } + if client.deadLetterIndex == "" { + // Fatal error and no dead letter index, drop. + client.log.Warnf("Cannot index event (status=%v): dropping event! Look at the event log to view the event and cause.", itemStatus) + client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, dropping event!", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType) + stats.nonIndexable++ + return false + } + // Send this failure to the dead letter index and "retry". + // We count this as a "retryable failure", and then if the dead letter + // ingestion succeeds it is counted in the "deadLetter" counter + // rather than the "acked" counter. + client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Look at the event log to view the event and cause.", itemStatus) + client.log.Warnw(fmt.Sprintf("Cannot index event %#v (status=%v): %s, trying dead letter index", event, itemStatus, itemMessage), logp.TypeKey, logp.EventType) + encodedEvent.setDeadLetter(client.deadLetterIndex, itemStatus, string(itemMessage)) } - encodedEvent.encoding = []byte(deadLetterReencoding.String()) + + // Everything else gets retried. + stats.fails++ + return true } func (client *Client) Connect() error { @@ -471,3 +520,13 @@ func (client *Client) String() string { func (client *Client) Test(d testing.Driver) { client.conn.Test(d) } + +func (stats bulkResultStats) reportToObserver(ob outputs.Observer) { + ob.AckedEvents(stats.acked) + ob.RetryableErrors(stats.fails) + ob.PermanentErrors(stats.nonIndexable) + ob.DuplicateEvents(stats.duplicates) + ob.DeadLetterEvents(stats.deadLetter) + + ob.ErrTooMany(stats.tooMany) +} diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 56567931ee4e..765fd3eec5aa 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -238,10 +238,7 @@ func TestClientBulkPublishEventsWithDeadletterIndex(t *testing.T) { "testfield": "foo0", }, })) - err = output.Publish(context.Background(), batch) - if err == nil { - t.Fatal("Expecting mapping conflict") - } + _ = output.Publish(context.Background(), batch) _, _, err = client.conn.Refresh(deadletterIndex) if err == nil { t.Fatal("expecting index to not exist yet") diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 429b600d11aa..5124c0defe9d 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -49,6 +49,7 @@ import ( c "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/monitoring" libversion "github.com/elastic/elastic-agent-libs/version" ) @@ -83,17 +84,19 @@ func (bm *batchMock) RetryEvents(events []publisher.Event) { } func TestPublish(t *testing.T) { - makePublishTestClient := func(t *testing.T, url string) *Client { + + makePublishTestClient := func(t *testing.T, url string) (*Client, *monitoring.Registry) { + reg := monitoring.NewRegistry() client, err := NewClient( clientSettings{ - observer: outputs.NewNilObserver(), + observer: outputs.NewStats(reg), connection: eslegclient.ConnectionSettings{URL: url}, indexSelector: testIndexSelector{}, }, nil, ) require.NoError(t, err) - return client + return client, reg } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -109,7 +112,7 @@ func TestPublish(t *testing.T) { _, _ = w.Write([]byte("Request failed to get to the server (status code: 413)")) // actual response from ES })) defer esMock.Close() - client := makePublishTestClient(t, esMock.URL) + client, reg := makePublishTestClient(t, esMock.URL) // Try publishing a batch that can be split batch := encodeBatch(client, &batchMock{ @@ -120,6 +123,8 @@ func TestPublish(t *testing.T) { assert.NoError(t, err, "Publish should split the batch without error") assert.True(t, batch.didSplit, "batch should be split") + assertRegistryUint(t, reg, "events.failed", 1, "Splitting a batch should report the event as failed/retried") + assertRegistryUint(t, reg, "events.dropped", 0, "Splitting a batch should not report any dropped events") // Try publishing a batch that cannot be split batch = encodeBatch(client, &batchMock{ @@ -131,6 +136,9 @@ func TestPublish(t *testing.T) { assert.NoError(t, err, "Publish should drop the batch without error") assert.False(t, batch.didSplit, "batch should not be split") assert.True(t, batch.drop, "unsplittable batch should be dropped") + assertRegistryUint(t, reg, "events.failed", 1, "Failed batch split should not report any more retryable failures") + assertRegistryUint(t, reg, "events.dropped", 1, "Failed batch split should report a dropped event") + }) t.Run("retries the batch if bad HTTP status", func(t *testing.T) { @@ -138,7 +146,7 @@ func TestPublish(t *testing.T) { w.WriteHeader(http.StatusInternalServerError) })) defer esMock.Close() - client := makePublishTestClient(t, esMock.URL) + client, reg := makePublishTestClient(t, esMock.URL) batch := encodeBatch(client, &batchMock{ events: []publisher.Event{event1, event2}, @@ -149,40 +157,44 @@ func TestPublish(t *testing.T) { assert.Error(t, err) assert.False(t, batch.ack, "should not be acknowledged") assert.Len(t, batch.retryEvents, 2, "all events should be retried") + assertRegistryUint(t, reg, "events.failed", 2, "HTTP failure should report failed events") }) t.Run("live batches, still too big after split", func(t *testing.T) { - // Test a live (non-mocked) batch where both events by themselves are - // rejected by the server as too large after the initial split. + // Test a live (non-mocked) batch where all three events by themselves are + // rejected by the server as too large after the initial batch splits. esMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusRequestEntityTooLarge) _, _ = w.Write([]byte("Request failed to get to the server (status code: 413)")) // actual response from ES })) defer esMock.Close() - client := makePublishTestClient(t, esMock.URL) + client, reg := makePublishTestClient(t, esMock.URL) // Because our tests don't use a live eventConsumer routine, // everything will happen synchronously and it's safe to track // test results directly without atomics/mutexes. done := false retryCount := 0 + var retryBatches []publisher.Batch batch := encodeBatch(client, pipeline.NewBatchForTesting( []publisher.Event{event1, event2, event3}, func(b publisher.Batch) { // The retry function sends the batch back through Publish. // In a live pipeline it would instead be sent to eventConsumer - // first and then back to Publish when an output worker was - // available. + // and then back to Publish when an output worker was available. retryCount++ - // We shouldn't need to re-encode the events since that was done - // before the initial Publish call - err := client.Publish(ctx, b) - assert.NoError(t, err, "Publish should return without error") + retryBatches = append(retryBatches, b) }, func() { done = true }, )) - err := client.Publish(ctx, batch) - assert.NoError(t, err, "Publish should return without error") + retryBatches = []publisher.Batch{batch} + // Loop until all pending retries are complete, the same as a pipeline caller would. + for len(retryBatches) > 0 { + batch := retryBatches[0] + retryBatches = retryBatches[1:] + err := client.Publish(ctx, batch) + assert.NoError(t, err, "Publish should return without error") + } // For three events there should be four retries in total: // {[event1], [event2, event3]}, then {[event2], [event3]}. @@ -190,6 +202,15 @@ func TestPublish(t *testing.T) { // events, all 3 will fail and be dropped. assert.Equal(t, 4, retryCount, "3-event batch should produce 4 total retries") assert.True(t, done, "batch should be marked as done") + // Metrics should report: + // 8 total events (3 + 1 + 2 + 1 + 1 from the batches described above) + // 3 dropped events (each event is dropped once) + // 5 failed events (8 - 3, for each event's attempted publish calls before being dropped) + // 0 active events (because Publish is complete) + assertRegistryUint(t, reg, "events.total", 8, "Publish is called on 8 events total") + assertRegistryUint(t, reg, "events.dropped", 3, "All 3 events should be dropped") + assertRegistryUint(t, reg, "events.failed", 5, "Split batches should retry 5 events before dropping them") + assertRegistryUint(t, reg, "events.active", 0, "Active events should be zero when Publish returns") }) t.Run("live batches, one event too big after split", func(t *testing.T) { @@ -206,32 +227,36 @@ func TestPublish(t *testing.T) { } else { // Report success with no events dropped w.WriteHeader(200) - _, _ = io.WriteString(w, "{\"items\": []}") + _, _ = io.WriteString(w, "{\"items\": [{\"index\":{\"status\":200}},{\"index\":{\"status\":200}},{\"index\":{\"status\":200}}]}") } })) defer esMock.Close() - client := makePublishTestClient(t, esMock.URL) + client, reg := makePublishTestClient(t, esMock.URL) // Because our tests don't use a live eventConsumer routine, // everything will happen synchronously and it's safe to track // test results directly without atomics/mutexes. done := false retryCount := 0 + var retryBatches []publisher.Batch batch := encodeBatch(client, pipeline.NewBatchForTesting( []publisher.Event{event1, event2, event3}, func(b publisher.Batch) { // The retry function sends the batch back through Publish. // In a live pipeline it would instead be sent to eventConsumer - // first and then back to Publish when an output worker was - // available. + // and then back to Publish when an output worker was available. retryCount++ - err := client.Publish(ctx, b) - assert.NoError(t, err, "Publish should return without error") + retryBatches = append(retryBatches, b) }, func() { done = true }, )) - err := client.Publish(ctx, batch) - assert.NoError(t, err, "Publish should return without error") + retryBatches = []publisher.Batch{batch} + for len(retryBatches) > 0 { + batch := retryBatches[0] + retryBatches = retryBatches[1:] + err := client.Publish(ctx, batch) + assert.NoError(t, err, "Publish should return without error") + } // There should be two retries: {[event1], [event2, event3]}. // The first split batch should fail and be dropped since it contains @@ -240,9 +265,26 @@ func TestPublish(t *testing.T) { // (one with failure, one with success). assert.Equal(t, 2, retryCount, "splitting with one large event should produce two retries") assert.True(t, done, "batch should be marked as done") + // The metrics should show: + // 6 total events (3 + 1 + 2) + // 1 dropped event (because only one event is uningestable) + // 2 acked events (because the other two ultimately succeed) + // 3 failed events (because all events fail and are retried on the first call) + // 0 active events (because Publish is finished) + assertRegistryUint(t, reg, "events.total", 6, "Publish is called on 6 events total") + assertRegistryUint(t, reg, "events.dropped", 1, "One event should be dropped") + assertRegistryUint(t, reg, "events.failed", 3, "Split batches should retry 3 events before dropping them") + assertRegistryUint(t, reg, "events.active", 0, "Active events should be zero when Publish returns") }) } +func assertRegistryUint(t *testing.T, reg *monitoring.Registry, key string, expected uint64, message string) { + t.Helper() + value := reg.Get(key).(*monitoring.Uint) + assert.NotNilf(t, value, "expected registry entry for key '%v'", key) + assert.Equal(t, expected, value.Get(), message) +} + func TestCollectPublishFailsNone(t *testing.T) { client, err := NewClient( clientSettings{ @@ -262,7 +304,11 @@ func TestCollectPublishFailsNone(t *testing.T) { events[i] = publisher.Event{Content: beat.Event{Fields: event}} } - res, _ := client.bulkCollectPublishFails(response, events) + res, _ := client.bulkCollectPublishFails(bulkResult{ + events: encodeEvents(client, events), + status: 200, + response: response, + }) assert.Equal(t, 0, len(res)) } @@ -283,11 +329,16 @@ func TestCollectPublishFailMiddle(t *testing.T) { ]} `) - event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 1}}} - eventFail := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}} - events := []publisher.Event{event, eventFail, event} + event1 := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 1}}}) + event2 := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}}) + eventFail := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 3}}}) + events := []publisher.Event{event1, eventFail, event2} - res, stats := client.bulkCollectPublishFails(response, events) + res, stats := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) assert.Equal(t, 1, len(res)) if len(res) == 1 { assert.Equal(t, eventFail, res[0]) @@ -295,58 +346,143 @@ func TestCollectPublishFailMiddle(t *testing.T) { assert.Equal(t, bulkResultStats{acked: 2, fails: 1, tooMany: 1}, stats) } -func TestCollectPublishFailDeadLetterQueue(t *testing.T) { +func TestCollectPublishFailDeadLetterSuccess(t *testing.T) { + const deadLetterIndex = "test_index" client, err := NewClient( clientSettings{ observer: outputs.NewNilObserver(), - deadLetterIndex: "test_index", + deadLetterIndex: deadLetterIndex, }, nil, ) assert.NoError(t, err) - parseError := `{ - "root_cause" : [ - { - "type" : "mapper_parsing_exception", - "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'" - } - ], - "type" : "mapper_parsing_exception", - "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'", - "caused_by" : { - "type" : "illegal_argument_exception", - "reason" : "For input string: \"bar1\"" - } - }` - response := []byte(` - { "items": [ - {"create": {"status": 200}}, - {"create": { - "error" : ` + parseError + `, - "status" : 400 - } - }, - {"create": {"status": 200}} - ]} - `) + const errorMessage = "test error message" + // Return a successful response + response := []byte(`{"items": [{"create": {"status": 200}}]}`) - event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}} - event2 := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 2}}} - eventFail := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": "bar1"}}} - events := encodeEvents(client, []publisher.Event{event, eventFail, event2}) + event1 := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}}) + event1.EncodedEvent.(*encodedEvent).setDeadLetter(deadLetterIndex, 123, errorMessage) + events := []publisher.Event{event1} - res, stats := client.bulkCollectPublishFails(response, events) - assert.Equal(t, 1, len(res)) - if len(res) == 1 { - expected := encodeEvent(client, eventFail) - encodedEvent := expected.EncodedEvent.(*encodedEvent) - // Mark the encoded event with the expected error - client.setDeadLetter(encodedEvent, 400, parseError) + // The event should be successful after being set to dead letter, so it + // should be reported in the metrics as deadLetter + res, stats := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) + assert.Equal(t, bulkResultStats{acked: 0, deadLetter: 1}, stats) + assert.Equal(t, 0, len(res)) +} + +func TestCollectPublishFailFatalErrorNotRetried(t *testing.T) { + // Test that a fatal error sending to the dead letter index is reported as + // a dropped event, and is not retried forever + const deadLetterIndex = "test_index" + client, err := NewClient( + clientSettings{ + observer: outputs.NewNilObserver(), + deadLetterIndex: deadLetterIndex, + }, + nil, + ) + assert.NoError(t, err) + + const errorMessage = "test error message" + // Return a fatal error + response := []byte(`{"items": [{"create": {"status": 499}}]}`) + + event1 := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}}) + event1.EncodedEvent.(*encodedEvent).setDeadLetter(deadLetterIndex, 123, errorMessage) + events := []publisher.Event{event1} + + // The event should fail permanently while being sent to the dead letter + // index, so it should be dropped instead of retrying. + res, stats := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) + assert.Equal(t, bulkResultStats{acked: 0, nonIndexable: 1}, stats) + assert.Equal(t, 0, len(res)) +} + +func TestCollectPublishFailInvalidBulkIndexResponse(t *testing.T) { + client, err := NewClient( + clientSettings{observer: outputs.NewNilObserver()}, + nil, + ) + assert.NoError(t, err) + + // Return a truncated response without valid item data + response := []byte(`{"items": [...`) - assert.Equal(t, expected, res[0]) + event1 := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}}) + events := []publisher.Event{event1} + + // The event should be successful after being set to dead letter, so it + // should be reported in the metrics as deadLetter + res, stats := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) + // The event should be returned for retry, and should appear in aggregated + // stats as failed (retryable error) + assert.Equal(t, bulkResultStats{acked: 0, fails: 1}, stats) + assert.Equal(t, 1, len(res)) + if len(res) > 0 { + assert.Equal(t, event1, res[0]) } +} + +func TestCollectPublishFailDeadLetterIndex(t *testing.T) { + const deadLetterIndex = "test_index" + client, err := NewClient( + clientSettings{ + observer: outputs.NewNilObserver(), + deadLetterIndex: deadLetterIndex, + }, + nil, + ) + assert.NoError(t, err) + + const errorMessage = "test error message" + response := []byte(` +{ + "items": [ + {"create": {"status": 200}}, + { + "create": { + "error" : "` + errorMessage + `", + "status" : 400 + } + }, + {"create": {"status": 200}} + ] +}`) + + event1 := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}}) + event2 := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 2}}}) + eventFail := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": "bar1"}}}) + events := []publisher.Event{event1, eventFail, event2} + + res, stats := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) assert.Equal(t, bulkResultStats{acked: 2, fails: 1, nonIndexable: 0}, stats) + assert.Equal(t, 1, len(res)) + if len(res) == 1 { + assert.Equalf(t, eventFail, res[0], "bulkCollectPublishFails should return failed event") + encodedEvent, ok := res[0].EncodedEvent.(*encodedEvent) + require.True(t, ok, "event must be encoded as *encodedEvent") + assert.True(t, encodedEvent.deadLetter, "failed event's dead letter flag should be set") + assert.Equalf(t, deadLetterIndex, encodedEvent.index, "failed event's index should match dead letter index") + assert.Contains(t, string(encodedEvent.encoding), errorMessage, "dead letter event should include associated error message") + } } func TestCollectPublishFailDrop(t *testing.T) { @@ -388,7 +524,11 @@ func TestCollectPublishFailDrop(t *testing.T) { eventFail := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": "bar1"}}} events := encodeEvents(client, []publisher.Event{event, eventFail, event}) - res, stats := client.bulkCollectPublishFails(response, events) + res, stats := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) assert.Equal(t, 0, len(res)) assert.Equal(t, bulkResultStats{acked: 2, fails: 0, nonIndexable: 1}, stats) } @@ -413,7 +553,11 @@ func TestCollectPublishFailAll(t *testing.T) { event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}} events := encodeEvents(client, []publisher.Event{event, event, event}) - res, stats := client.bulkCollectPublishFails(response, events) + res, stats := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) assert.Equal(t, 3, len(res)) assert.Equal(t, events, res) assert.Equal(t, stats, bulkResultStats{fails: 3, tooMany: 3}) @@ -462,7 +606,11 @@ func TestCollectPipelinePublishFail(t *testing.T) { event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}} events := encodeEvents(client, []publisher.Event{event}) - res, _ := client.bulkCollectPublishFails(response, events) + res, _ := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) assert.Equal(t, 1, len(res)) assert.Equal(t, events, res) } @@ -489,7 +637,11 @@ func BenchmarkCollectPublishFailsNone(b *testing.B) { events := encodeEvents(client, []publisher.Event{event, event, event}) for i := 0; i < b.N; i++ { - res, _ := client.bulkCollectPublishFails(response, events) + res, _ := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) if len(res) != 0 { b.Fail() } @@ -518,7 +670,11 @@ func BenchmarkCollectPublishFailMiddle(b *testing.B) { events := encodeEvents(client, []publisher.Event{event, eventFail, event}) for i := 0; i < b.N; i++ { - res, _ := client.bulkCollectPublishFails(response, events) + res, _ := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) if len(res) != 1 { b.Fail() } @@ -546,7 +702,11 @@ func BenchmarkCollectPublishFailAll(b *testing.B) { events := encodeEvents(client, []publisher.Event{event, event, event}) for i := 0; i < b.N; i++ { - res, _ := client.bulkCollectPublishFails(response, events) + res, _ := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) if len(res) != 3 { b.Fail() } @@ -791,7 +951,7 @@ func TestClientWithAPIKey(t *testing.T) { assert.Equal(t, "ApiKey aHlva0hHNEJmV2s1dmlLWjE3Mlg6bzQ1SlVreXVTLS15aVNBdXV4bDhVdw==", headers.Get("Authorization")) } -func TestPublishEventsWithBulkFiltering(t *testing.T) { +func TestBulkRequestHasFilterPath(t *testing.T) { makePublishTestClient := func(t *testing.T, url string, configParams map[string]string) *Client { client, err := NewClient( clientSettings{ @@ -813,16 +973,14 @@ func TestPublishEventsWithBulkFiltering(t *testing.T) { event1 := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 1}}} + const filterPathKey = "filter_path" + const filterPathValue = "errors,items.*.error,items.*.status" t.Run("Single event with response filtering", func(t *testing.T) { - var expectedFilteringParams = map[string]string{ - "filter_path": "errors,items.*.error,items.*.status", - } - var recParams url.Values - + var reqParams url.Values esMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) if strings.ContainsAny("_bulk", r.URL.Path) { - recParams = r.URL.Query() + reqParams = r.URL.Query() response := []byte(`{"took":85,"errors":false,"items":[{"index":{"status":200}}]}`) _, _ = w.Write(response) } @@ -834,26 +992,23 @@ func TestPublishEventsWithBulkFiltering(t *testing.T) { defer esMock.Close() client := makePublishTestClient(t, esMock.URL, nil) - // Try publishing a batch that can be split - events := encodeEvents(client, []publisher.Event{event1}) - evt, err := client.publishEvents(ctx, events) - require.NoError(t, err) - require.Equal(t, len(recParams), len(expectedFilteringParams)) - require.Nil(t, evt) + batch := encodeBatch(client, &batchMock{events: []publisher.Event{event1}}) + result := client.doBulkRequest(ctx, batch) + require.NoError(t, result.connErr) + // Only param should be the standard filter path + require.Equal(t, len(reqParams), 1, "Only bulk request param should be standard filter path") + require.Equal(t, filterPathValue, reqParams.Get(filterPathKey), "Bulk request should include standard filter path") }) t.Run("Single event with response filtering and preconfigured client params", func(t *testing.T) { var configParams = map[string]string{ "hardcoded": "yes", } - var expectedFilteringParams = map[string]string{ - "filter_path": "errors,items.*.error,items.*.status", - } - var recParams url.Values + var reqParams url.Values esMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) if strings.ContainsAny("_bulk", r.URL.Path) { - recParams = r.URL.Query() + reqParams = r.URL.Query() response := []byte(`{"took":85,"errors":false,"items":[{"index":{"status":200}}]}`) _, _ = w.Write(response) } @@ -865,69 +1020,22 @@ func TestPublishEventsWithBulkFiltering(t *testing.T) { defer esMock.Close() client := makePublishTestClient(t, esMock.URL, configParams) - // Try publishing a batch that can be split - events := encodeEvents(client, []publisher.Event{event1}) - evt, err := client.publishEvents(ctx, events) - require.NoError(t, err) - require.Equal(t, len(recParams), len(expectedFilteringParams)+len(configParams)) - require.Nil(t, evt) - }) - t.Run("Single event without response filtering", func(t *testing.T) { - var recParams url.Values - - esMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if strings.ContainsAny("_bulk", r.URL.Path) { - recParams = r.URL.Query() - response := []byte(`{ - "took":85, - "errors":false, - "items":[ - { - "index":{ - "_index":"test", - "_id":"1", - "_version":1, - "result":"created", - "_shards":{"total":2,"successful":1,"failed":0}, - "_seq_no":0, - "_primary_term":1, - "status":201 - } - } - ]}`) - _, _ = w.Write(response) - } - if strings.Contains("/", r.URL.Path) { - response := []byte(`{}`) - _, _ = w.Write(response) - } - w.WriteHeader(http.StatusOK) - - })) - defer esMock.Close() - client := makePublishTestClient(t, esMock.URL, nil) - - // Try publishing a batch that can be split - events := encodeEvents(client, []publisher.Event{event1}) - _, err := client.publishEvents(ctx, events) - require.NoError(t, err) - require.Equal(t, len(recParams), 1) + batch := encodeBatch(client, &batchMock{events: []publisher.Event{event1}}) + result := client.doBulkRequest(ctx, batch) + require.NoError(t, result.connErr) + require.Equal(t, len(reqParams), 2, "Bulk request should include configured parameter and standard filter path") + require.Equal(t, filterPathValue, reqParams.Get(filterPathKey), "Bulk request should include standard filter path") }) } func TestSetDeadLetter(t *testing.T) { dead_letter_index := "dead_index" - client := &Client{ - deadLetterIndex: dead_letter_index, - indexSelector: testIndexSelector{}, - } - e := &encodedEvent{ index: "original_index", } errType := 123 errStr := "test error string" - client.setDeadLetter(e, errType, errStr) + e.setDeadLetter(dead_letter_index, errType, errStr) assert.True(t, e.deadLetter, "setDeadLetter should set the event's deadLetter flag") assert.Equal(t, dead_letter_index, e.index, "setDeadLetter should overwrite the event's original index") diff --git a/libbeat/outputs/elasticsearch/event_encoder.go b/libbeat/outputs/elasticsearch/event_encoder.go index 0441695d53c7..7d345e2bc5c5 100644 --- a/libbeat/outputs/elasticsearch/event_encoder.go +++ b/libbeat/outputs/elasticsearch/event_encoder.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs/outil" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" + "github.com/elastic/elastic-agent-libs/mapstr" ) type eventEncoder struct { @@ -136,3 +137,17 @@ func (pe *eventEncoder) encodeRawEvent(e *beat.Event) *encodedEvent { encoding: bytes, } } + +func (e *encodedEvent) setDeadLetter( + deadLetterIndex string, errType int, errMsg string, +) { + e.deadLetter = true + e.index = deadLetterIndex + deadLetterReencoding := mapstr.M{ + "@timestamp": e.timestamp, + "message": string(e.encoding), + "error.type": errType, + "error.message": errMsg, + } + e.encoding = []byte(deadLetterReencoding.String()) +} diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index 87b50f62c1af..f650ff3f9649 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -159,9 +159,9 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error { st.ReportLatency(took) } - st.Dropped(dropped) + st.PermanentErrors(dropped) - st.Acked(len(events) - dropped) + st.AckedEvents(len(events) - dropped) return nil } diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index afeb02a5534d..1780f1392b31 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -171,7 +171,7 @@ func (c *client) Publish(_ context.Context, batch publisher.Batch) error { if err != nil { c.log.Errorf("Dropping event: %+v", err) ref.done() - c.observer.Dropped(1) + c.observer.PermanentErrors(1) continue } @@ -360,13 +360,13 @@ func (r *msgRef) fail(msg *message, err error) { switch { case errors.Is(err, sarama.ErrInvalidMessage): r.client.log.Errorf("Kafka (topic=%v): dropping invalid message", msg.topic) - r.client.observer.Dropped(1) + r.client.observer.PermanentErrors(1) case errors.Is(err, sarama.ErrMessageSizeTooLarge) || errors.Is(err, sarama.ErrInvalidMessageSize): r.client.log.Errorf("Kafka (topic=%v): dropping too large message of size %v.", msg.topic, len(msg.key)+len(msg.value)) - r.client.observer.Dropped(1) + r.client.observer.PermanentErrors(1) case errors.Is(err, breaker.ErrBreakerOpen): // Add this message to the failed list, but don't overwrite r.err since @@ -399,15 +399,15 @@ func (r *msgRef) dec() { success := r.total - failed r.batch.RetryEvents(r.failed) - stats.Failed(failed) + stats.RetryableErrors(failed) if success > 0 { - stats.Acked(success) + stats.AckedEvents(success) } r.client.log.Debugf("Kafka publish failed with: %+v", err) } else { r.batch.ACK() - stats.Acked(r.total) + stats.AckedEvents(r.total) } } diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 1458ee9d3827..b1e20a0e7749 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -238,7 +238,7 @@ func (r *msgRef) callback(seq uint32, err error) { } func (r *msgRef) done(n uint32) { - r.client.observer.Acked(int(n)) + r.client.observer.AckedEvents(int(n)) r.slice = r.slice[n:] if r.win != nil { r.win.tryGrowWindow(r.batchSize) @@ -255,7 +255,7 @@ func (r *msgRef) fail(n uint32, err error) { r.win.shrinkWindow() } - r.client.observer.Acked(int(n)) + r.client.observer.AckedEvents(int(n)) r.dec() } @@ -267,7 +267,7 @@ func (r *msgRef) dec() { } if L := len(r.slice); L > 0 { - r.client.observer.Failed(L) + r.client.observer.RetryableErrors(L) } err := r.err diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index 2a49324c46f9..d24ab1ebb97d 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -149,7 +149,7 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error { n, len(events), c.Host()) events = events[n:] - st.Acked(n) + st.AckedEvents(n) if err != nil { // return batch to pipeline before reporting/counting error batch.RetryEvents(events) @@ -162,7 +162,7 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error { c.log.Errorf("Failed to publish events caused by: %+v", err) rest := len(events) - st.Failed(rest) + st.RetryableErrors(rest) return err } diff --git a/libbeat/outputs/metrics.go b/libbeat/outputs/metrics.go index 5502c4e4ae06..7e47c6e7ab9a 100644 --- a/libbeat/outputs/metrics.go +++ b/libbeat/outputs/metrics.go @@ -32,18 +32,41 @@ type Stats struct { // // Output event stats // - batches *monitoring.Uint // total number of batches processed by output - events *monitoring.Uint // total number of events processed by output - acked *monitoring.Uint // total number of events ACKed by output - failed *monitoring.Uint // total number of events failed in output - active *monitoring.Uint // events sent and waiting for ACK/fail from output - duplicates *monitoring.Uint // events sent and waiting for ACK/fail from output - dropped *monitoring.Uint // total number of invalid events dropped by the output - tooMany *monitoring.Uint // total number of too many requests replies from output + // Number of calls to the output's Publish function + eventsBatches *monitoring.Uint + + // Number of events sent to the output's Publish function. + eventsTotal *monitoring.Uint + + // Number of events accepted by the output's receiver. + eventsACKed *monitoring.Uint + + // Number of failed events ingested to the dead letter index + eventsDeadLetter *monitoring.Uint + + // Number of events that reported a retryable error from the output's + // receiver. + eventsFailed *monitoring.Uint + + // Number of events that were dropped due to a non-retryable error. + eventsDropped *monitoring.Uint + + // Number of events rejected by the output's receiver for being duplicates. + eventsDuplicates *monitoring.Uint + + // (Gauge) Number of events that have been sent to the output's Publish + // call but have not yet been ACKed, + eventsActive *monitoring.Uint // (gauge) events sent and waiting for ACK/fail from output + + // Number of events that failed due to a "429 too many requests" error. + // These events are also included in eventsFailed. + eventsTooMany *monitoring.Uint // Output batch stats - split *monitoring.Uint // total number of batches split for being too large + + // Number of times a batch was split for being too large + batchesSplit *monitoring.Uint // // Output network connection stats @@ -62,16 +85,17 @@ type Stats struct { // The registry must not be null. func NewStats(reg *monitoring.Registry) *Stats { obj := &Stats{ - batches: monitoring.NewUint(reg, "events.batches"), - events: monitoring.NewUint(reg, "events.total"), - acked: monitoring.NewUint(reg, "events.acked"), - failed: monitoring.NewUint(reg, "events.failed"), - dropped: monitoring.NewUint(reg, "events.dropped"), - duplicates: monitoring.NewUint(reg, "events.duplicates"), - active: monitoring.NewUint(reg, "events.active"), - tooMany: monitoring.NewUint(reg, "events.toomany"), - - split: monitoring.NewUint(reg, "batches.split"), + eventsBatches: monitoring.NewUint(reg, "events.batches"), + eventsTotal: monitoring.NewUint(reg, "events.total"), + eventsACKed: monitoring.NewUint(reg, "events.acked"), + eventsDeadLetter: monitoring.NewUint(reg, "events.dead_letter"), + eventsFailed: monitoring.NewUint(reg, "events.failed"), + eventsDropped: monitoring.NewUint(reg, "events.dropped"), + eventsDuplicates: monitoring.NewUint(reg, "events.duplicates"), + eventsActive: monitoring.NewUint(reg, "events.active"), + eventsTooMany: monitoring.NewUint(reg, "events.toomany"), + + batchesSplit: monitoring.NewUint(reg, "batches.split"), writeBytes: monitoring.NewUint(reg, "write.bytes"), writeErrors: monitoring.NewUint(reg, "write.errors"), @@ -88,9 +112,9 @@ func NewStats(reg *monitoring.Registry) *Stats { // NewBatch updates active batch and event metrics. func (s *Stats) NewBatch(n int) { if s != nil { - s.batches.Inc() - s.events.Add(uint64(n)) - s.active.Add(uint64(n)) + s.eventsBatches.Inc() + s.eventsTotal.Add(uint64(n)) + s.eventsActive.Add(uint64(n)) } } @@ -98,59 +122,59 @@ func (s *Stats) ReportLatency(time time.Duration) { s.sendLatencyMillis.Update(time.Milliseconds()) } -// Acked updates active and acked event metrics. -func (s *Stats) Acked(n int) { +// AckedEvents updates active and acked event metrics. +func (s *Stats) AckedEvents(n int) { if s != nil { - s.acked.Add(uint64(n)) - s.active.Sub(uint64(n)) + s.eventsACKed.Add(uint64(n)) + s.eventsActive.Sub(uint64(n)) } } -// Failed updates active and failed event metrics. -func (s *Stats) Failed(n int) { +func (s *Stats) DeadLetterEvents(n int) { if s != nil { - s.failed.Add(uint64(n)) - s.active.Sub(uint64(n)) + s.eventsDeadLetter.Add(uint64(n)) + s.eventsActive.Sub(uint64(n)) } } -// Duplicate updates the active and duplicate event metrics. -func (s *Stats) Duplicate(n int) { +// RetryableErrors updates active and failed event metrics. +func (s *Stats) RetryableErrors(n int) { if s != nil { - s.duplicates.Add(uint64(n)) - s.active.Sub(uint64(n)) + s.eventsFailed.Add(uint64(n)) + s.eventsActive.Sub(uint64(n)) } } -// Dropped updates total number of event drops as reported by the output. -// Outputs will only report dropped events on fatal errors which lead to the -// event not being publishable. For example encoding errors or total event size -// being bigger then maximum supported event size. -func (s *Stats) Dropped(n int) { - // number of dropped events (e.g. encoding failures) +// DuplicateEvents updates the active and duplicate event metrics. +func (s *Stats) DuplicateEvents(n int) { if s != nil { - s.active.Sub(uint64(n)) - s.dropped.Add(uint64(n)) + s.eventsDuplicates.Add(uint64(n)) + s.eventsActive.Sub(uint64(n)) } } -// Cancelled updates the active event metrics. -func (s *Stats) Cancelled(n int) { +// PermanentErrors updates total number of event drops as reported by the output. +// Outputs will only report dropped events on fatal errors which lead to the +// event not being publishable. For example encoding errors or total event size +// being bigger then maximum supported event size. +func (s *Stats) PermanentErrors(n int) { + // number of dropped events (e.g. encoding failures) if s != nil { - s.active.Sub(uint64(n)) + s.eventsActive.Sub(uint64(n)) + s.eventsDropped.Add(uint64(n)) } } -func (s *Stats) Split() { +func (s *Stats) BatchSplit() { if s != nil { - s.split.Inc() + s.batchesSplit.Inc() } } // ErrTooMany updates the number of Too Many Requests responses reported by the output. func (s *Stats) ErrTooMany(n int) { if s != nil { - s.tooMany.Add(uint64(n)) + s.eventsTooMany.Add(uint64(n)) } } diff --git a/libbeat/outputs/observer.go b/libbeat/outputs/observer.go index 3a330e4a43ac..28d40f90dbd2 100644 --- a/libbeat/outputs/observer.go +++ b/libbeat/outputs/observer.go @@ -22,19 +22,23 @@ import "time" // Observer provides an interface used by outputs to report common events on // documents/events being published and I/O workload. type Observer interface { - NewBatch(int) // report new batch being processed with number of events + NewBatch(int) // report new batch being processed with number of events + + RetryableErrors(int) // report number of events with retryable errors + PermanentErrors(int) // report number of events dropped due to permanent errors + DuplicateEvents(int) // report number of events detected as duplicates (e.g. on resends) + DeadLetterEvents(int) // report number of failed events ingested to dead letter index + AckedEvents(int) // report number of acked events + ErrTooMany(int) // report too many requests response + + BatchSplit() // report a batch was split for being too large to ingest + + WriteError(error) // report an I/O error on write + WriteBytes(int) // report number of bytes being written + ReadError(error) // report an I/O error on read + ReadBytes(int) // report number of bytes being read + ReportLatency(time.Duration) // report the duration a send to the output takes - Acked(int) // report number of acked events - Failed(int) // report number of failed events - Dropped(int) // report number of dropped events - Duplicate(int) // report number of events detected as duplicates (e.g. on resends) - Cancelled(int) // report number of cancelled events - Split() // report a batch was split for being too large to ingest - WriteError(error) // report an I/O error on write - WriteBytes(int) // report number of bytes being written - ReadError(error) // report an I/O error on read - ReadBytes(int) // report number of bytes being read - ErrTooMany(int) // report too many requests response } type emptyObserver struct{} @@ -48,12 +52,12 @@ func NewNilObserver() Observer { func (*emptyObserver) NewBatch(int) {} func (*emptyObserver) ReportLatency(_ time.Duration) {} -func (*emptyObserver) Acked(int) {} -func (*emptyObserver) Duplicate(int) {} -func (*emptyObserver) Failed(int) {} -func (*emptyObserver) Dropped(int) {} -func (*emptyObserver) Cancelled(int) {} -func (*emptyObserver) Split() {} +func (*emptyObserver) AckedEvents(int) {} +func (*emptyObserver) DeadLetterEvents(int) {} +func (*emptyObserver) DuplicateEvents(int) {} +func (*emptyObserver) RetryableErrors(int) {} +func (*emptyObserver) PermanentErrors(int) {} +func (*emptyObserver) BatchSplit() {} func (*emptyObserver) WriteError(error) {} func (*emptyObserver) WriteBytes(int) {} func (*emptyObserver) ReadError(error) {} diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index 1fcd46e6f647..9f5c9812dd10 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -148,7 +148,7 @@ func (c *client) Publish(_ context.Context, batch publisher.Batch) error { c.observer.NewBatch(len(events)) rest, err := c.publish(c.key, events) if rest != nil { - c.observer.Failed(len(rest)) + c.observer.RetryableErrors(len(rest)) batch.RetryEvents(rest) return err } @@ -229,7 +229,7 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn { args[0] = dest okEvents, args := serializeEvents(c.log, args, 1, data, c.index, c.codec) - c.observer.Dropped(len(data) - len(okEvents)) + c.observer.PermanentErrors(len(data) - len(okEvents)) if (len(args) - 1) == 0 { return nil, nil } @@ -245,7 +245,7 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn { } - c.observer.Acked(len(okEvents)) + c.observer.AckedEvents(len(okEvents)) return nil, nil } } @@ -255,7 +255,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF var okEvents []publisher.Event serialized := make([]interface{}, 0, len(data)) okEvents, serialized = serializeEvents(c.log, serialized, 0, data, c.index, c.codec) - c.observer.Dropped(len(data) - len(okEvents)) + c.observer.PermanentErrors(len(data) - len(okEvents)) if len(serialized) == 0 { return nil, nil } @@ -276,7 +276,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF return okEvents, err } } - c.observer.Dropped(dropped) + c.observer.PermanentErrors(dropped) if err := conn.Flush(); err != nil { return data, err @@ -302,7 +302,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF } } - c.observer.Acked(len(okEvents) - len(failed)) + c.observer.AckedEvents(len(okEvents) - len(failed)) return failed, lastErr } } diff --git a/libbeat/publisher/event.go b/libbeat/publisher/event.go index efd5220740e3..b80b12d6793d 100644 --- a/libbeat/publisher/event.go +++ b/libbeat/publisher/event.go @@ -27,6 +27,8 @@ import ( // errors), one of the signal methods must be called. In normal operation // every batch will eventually receive an ACK() or a Drop(). type Batch interface { + // The output that receives a batch owns the entries in its Events array, + // and changes to them will persist between retries. Events() []Event // All events have been acknowledged by the output. diff --git a/libbeat/publisher/pipeline/monitoring.go b/libbeat/publisher/pipeline/monitoring.go index cda329e0963a..0bc63a739f90 100644 --- a/libbeat/publisher/pipeline/monitoring.go +++ b/libbeat/publisher/pipeline/monitoring.go @@ -32,21 +32,32 @@ type observer interface { } type pipelineObserver interface { + // A new client connected to the pipeline via (*Pipeline).ConnectWith. clientConnected() + // An open pipeline client received a Close() call. clientClosed() } type clientObserver interface { + // The client received a Publish call newEvent() + // An event was filtered by processors before being published filteredEvent() + // An event was published to the queue publishedEvent() + // An event was rejected by the queue failedPublishEvent() } type outputObserver interface { + // Events encountered too many errors and were permanently dropped. eventsDropped(int) + // Events were sent back to an output worker after an earlier failure. eventsRetry(int) + // The queue received acknowledgment for events from the output workers. + // (This may include events already reported via eventsDropped.) queueACKed(n int) + // Report the maximum event count supported by the queue. queueMaxEvents(n int) } @@ -65,10 +76,10 @@ type metricsObserverVars struct { // clients metrics clients *monitoring.Uint - // events publish/dropped stats - events, filtered, published, failed *monitoring.Uint - dropped, retry *monitoring.Uint // (retryer) drop/retry counters - activeEvents *monitoring.Uint + // eventsTotal publish/dropped stats + eventsTotal, eventsFiltered, eventsPublished, eventsFailed *monitoring.Uint + eventsDropped, eventsRetry *monitoring.Uint // (retryer) drop/retry counters + activeEvents *monitoring.Uint // queue metrics queueACKed *monitoring.Uint @@ -85,19 +96,46 @@ func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver { return &metricsObserver{ metrics: metrics, vars: metricsObserverVars{ - clients: monitoring.NewUint(reg, "clients"), // Gauge + // (Gauge) clients measures the number of open pipeline clients. + clients: monitoring.NewUint(reg, "clients"), - events: monitoring.NewUint(reg, "events.total"), - filtered: monitoring.NewUint(reg, "events.filtered"), - published: monitoring.NewUint(reg, "events.published"), - failed: monitoring.NewUint(reg, "events.failed"), - dropped: monitoring.NewUint(reg, "events.dropped"), - retry: monitoring.NewUint(reg, "events.retry"), + // events.total counts all created events. + eventsTotal: monitoring.NewUint(reg, "events.total"), - queueACKed: monitoring.NewUint(reg, "queue.acked"), + // (Gauge) events.active measures events that have been created, but have + // not yet been failed, filtered, or acked/dropped. + activeEvents: monitoring.NewUint(reg, "events.active"), + + // events.filtered counts events that were filtered by processors before + // being sent to the queue. + eventsFiltered: monitoring.NewUint(reg, "events.filtered"), + + // events.failed counts events that were rejected by the queue, or that + // were sent via an already-closed pipeline client. + eventsFailed: monitoring.NewUint(reg, "events.failed"), + + // events.published counts events that were accepted by the queue. + eventsPublished: monitoring.NewUint(reg, "events.published"), + + // events.retry counts events that an output worker sent back to be + // retried. + eventsRetry: monitoring.NewUint(reg, "events.retry"), + + // events.dropped counts events that were dropped because errors from + // the output workers exceeded the configured maximum retry count. + eventsDropped: monitoring.NewUint(reg, "events.dropped"), + + // (Gauge) queue.max_events measures the maximum number of events the + // queue will accept, or 0 if there is none. queueMaxEvents: monitoring.NewUint(reg, "queue.max_events"), - activeEvents: monitoring.NewUint(reg, "events.active"), // Gauge + // queue.acked counts events that have been acknowledged by the output + // workers. This includes events that were dropped for fatal errors, + // which are also reported in events.dropped. + queueACKed: monitoring.NewUint(reg, "queue.acked"), + + // (Gauge) queue.filled.pct.events measures the fraction (from 0 to 1) + // of the queue's event capacity that is currently filled. percentQueueFull: monitoring.NewFloat(reg, "queue.filled.pct.events"), }, } @@ -125,7 +163,7 @@ func (o *metricsObserver) clientClosed() { o.vars.clients.Dec() } // (client) client is trying to publish a new event func (o *metricsObserver) newEvent() { - o.vars.events.Inc() + o.vars.eventsTotal.Inc() o.vars.activeEvents.Inc() o.setPercentageFull() } @@ -142,19 +180,19 @@ func (o *metricsObserver) setPercentageFull() { // (client) event is filtered out (on purpose or failed) func (o *metricsObserver) filteredEvent() { - o.vars.filtered.Inc() + o.vars.eventsFiltered.Inc() o.vars.activeEvents.Dec() o.setPercentageFull() } // (client) managed to push an event into the publisher pipeline func (o *metricsObserver) publishedEvent() { - o.vars.published.Inc() + o.vars.eventsPublished.Inc() } // (client) client closing down or DropIfFull is set func (o *metricsObserver) failedPublishEvent() { - o.vars.failed.Inc() + o.vars.eventsFailed.Inc() o.vars.activeEvents.Dec() o.setPercentageFull() } @@ -182,12 +220,12 @@ func (o *metricsObserver) queueMaxEvents(n int) { // (retryer) number of events dropped by retryer func (o *metricsObserver) eventsDropped(n int) { - o.vars.dropped.Add(uint64(n)) + o.vars.eventsDropped.Add(uint64(n)) } // (retryer) number of events pushed to the output worker queue func (o *metricsObserver) eventsRetry(n int) { - o.vars.retry.Add(uint64(n)) + o.vars.eventsRetry.Add(uint64(n)) } type emptyObserver struct{} diff --git a/libbeat/publisher/pipeline/stress/out.go b/libbeat/publisher/pipeline/stress/out.go index 03ea06d3be86..b0e4c3d4e399 100644 --- a/libbeat/publisher/pipeline/stress/out.go +++ b/libbeat/publisher/pipeline/stress/out.go @@ -93,7 +93,7 @@ func (t *testOutput) Publish(_ context.Context, batch publisher.Batch) error { if config.Fail.EveryBatch == t.batchCount { t.batchCount = 0 - t.observer.Failed(n) + t.observer.RetryableErrors(n) batch.Retry() return nil } @@ -104,7 +104,7 @@ func (t *testOutput) Publish(_ context.Context, batch publisher.Batch) error { // ack complete batch batch.ACK() - t.observer.Acked(n) + t.observer.AckedEvents(n) return nil } diff --git a/libbeat/tests/integration/ca_pinning_test.go b/libbeat/tests/integration/ca_pinning_test.go index 51e098885eae..98c3db6729dd 100644 --- a/libbeat/tests/integration/ca_pinning_test.go +++ b/libbeat/tests/integration/ca_pinning_test.go @@ -54,7 +54,7 @@ output.elasticsearch: mockbeat.WriteConfigFile(fmt.Sprintf(cfg, esURL.String(), caPath)) mockbeat.Start() mockbeat.WaitForLogs("mockbeat start running.", 60*time.Second) - mockbeat.WaitForLogs("PublishEvents: 1 events have been published", 60*time.Second) + mockbeat.WaitForLogs("doBulkRequest: 1 events have been sent", 60*time.Second) } func TestCAPinningBadSHA(t *testing.T) { diff --git a/libbeat/tests/integration/template_test.go b/libbeat/tests/integration/template_test.go index aec46e448b99..3dc30cbf4301 100644 --- a/libbeat/tests/integration/template_test.go +++ b/libbeat/tests/integration/template_test.go @@ -224,7 +224,7 @@ logging: mockbeat.WaitForLogs("mockbeat start running.", 60*time.Second) mockbeat.WaitForLogs("Template with name \\\"mockbeat-9.9.9\\\" loaded.", 20*time.Second) require.Eventually(t, func() bool { - return mockbeat.LogMatch("PublishEvents: [[:digit:]]+ events have been published") + return mockbeat.LogMatch("doBulkRequest: [[:digit:]]+ events have been sent") }, 20*time.Second, 100*time.Millisecond, "looking for PublishEvents") status, body, err := HttpDo(t, http.MethodGet, indexURL) @@ -296,7 +296,7 @@ logging: mockbeat.Start() mockbeat.WaitForLogs("mockbeat start running.", 60*time.Second) require.Eventually(t, func() bool { - return mockbeat.LogMatch("PublishEvents: [[:digit:]]+ events have been published") + return mockbeat.LogMatch("doBulkRequest: [[:digit:]]+ events have been sent") }, 20*time.Second, 100*time.Millisecond, "looking for PublishEvents") u := fmt.Sprintf("%s/_index_template/%s", esUrl.String(), datastream) diff --git a/libbeat/tests/system/test_ilm.py b/libbeat/tests/system/test_ilm.py index 630ab5515932..0313097d938b 100644 --- a/libbeat/tests/system/test_ilm.py +++ b/libbeat/tests/system/test_ilm.py @@ -52,7 +52,7 @@ def test_ilm_default(self): proc = self.start_beat() self.wait_until(lambda: self.log_contains("mockbeat start running.")) self.wait_until(lambda: self.log_contains(MSG_ILM_POLICY_LOADED)) - self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published")) + self.wait_until(lambda: self.log_contains("doBulkRequest: 1 events have been sent")) proc.check_kill_and_wait() self.idxmgmt.assert_data_stream_created(self.data_stream) @@ -69,7 +69,7 @@ def test_ilm_disabled(self): self.render_config(ilm={"enabled": False}) proc = self.start_beat() self.wait_until(lambda: self.log_contains("mockbeat start running.")) - self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published")) + self.wait_until(lambda: self.log_contains("doBulkRequest: 1 events have been sent")) proc.check_kill_and_wait() self.idxmgmt.assert_index_template_loaded(self.data_stream) @@ -89,7 +89,7 @@ def test_policy_name(self): proc = self.start_beat() self.wait_until(lambda: self.log_contains("mockbeat start running.")) self.wait_until(lambda: self.log_contains(MSG_ILM_POLICY_LOADED)) - self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published")) + self.wait_until(lambda: self.log_contains("doBulkRequest: 1 events have been sent")) proc.check_kill_and_wait() self.idxmgmt.assert_index_template_loaded(self.data_stream)