From 8c3fc6230949a97583d3c18db29521a3f515517e Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 6 May 2024 07:53:12 -0400 Subject: [PATCH 01/14] Clean up / document metrics registry fields --- libbeat/outputs/console/console.go | 4 +- libbeat/outputs/discard/discard.go | 2 +- libbeat/outputs/elasticsearch/client.go | 84 ++++++++++-------- libbeat/outputs/fileout/file.go | 4 +- libbeat/outputs/kafka/client.go | 12 +-- libbeat/outputs/logstash/async.go | 6 +- libbeat/outputs/logstash/sync.go | 4 +- libbeat/outputs/metrics.go | 106 ++++++++++++++--------- libbeat/outputs/observer.go | 40 +++++---- libbeat/outputs/redis/client.go | 12 +-- libbeat/outputs/shipper/shipper.go | 12 +-- libbeat/outputs/shipper/shipper_test.go | 12 +-- libbeat/publisher/pipeline/monitoring.go | 76 ++++++++++++---- libbeat/publisher/pipeline/stress/out.go | 4 +- 14 files changed, 226 insertions(+), 152 deletions(-) diff --git a/libbeat/outputs/console/console.go b/libbeat/outputs/console/console.go index f723bf818c91..867316fe3a97 100644 --- a/libbeat/outputs/console/console.go +++ b/libbeat/outputs/console/console.go @@ -111,8 +111,8 @@ func (c *console) Publish(_ context.Context, batch publisher.Batch) error { c.writer.Flush() batch.ACK() - st.Dropped(dropped) - st.Acked(len(events) - dropped) + st.PermanentErrors(dropped) + st.AckedEvents(len(events) - dropped) return nil } diff --git a/libbeat/outputs/discard/discard.go b/libbeat/outputs/discard/discard.go index c9a51b0f33df..bfd1a1c1add0 100644 --- a/libbeat/outputs/discard/discard.go +++ b/libbeat/outputs/discard/discard.go @@ -70,7 +70,7 @@ func (out *discardOutput) Publish(_ context.Context, batch publisher.Batch) erro st := out.observer events := batch.Events() st.NewBatch(len(events)) - st.Acked(len(events)) + st.AckedEvents(len(events)) return nil } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 504aac710af3..d513eba8f7ae 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -76,8 +76,9 @@ type clientSettings struct { type bulkResultStats struct { acked int // number of events ACKed by Elasticsearch duplicates int // number of events failed with `create` due to ID already being indexed - fails int // number of failed events (can be retried) - nonIndexable int // number of failed events (not indexable) + fails int // number of events with retryable failures. + nonIndexable int // number of events with permanent failures. + deadLetter int // number of failed events ingested to the dead letter index. tooMany int // number of events receiving HTTP 429 Too Many Requests } @@ -181,16 +182,16 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error events := batch.Events() rest, err := client.publishEvents(ctx, events) - switch { - case errors.Is(err, errPayloadTooLarge): + if errors.Is(err, errPayloadTooLarge) { if batch.SplitRetry() { // Report that we split a batch - client.observer.Split() + client.observer.BatchSplit() + client.observer.RetryableErrors(len(events)) } else { // If the batch could not be split, there is no option left but // to drop it and log the error state. batch.Drop() - client.observer.Dropped(len(events)) + client.observer.PermanentErrors(len(events)) err := apm.CaptureError(ctx, fmt.Errorf("failed to perform bulk index operation: %w", err)) err.Send() client.log.Error(err) @@ -199,10 +200,13 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error // so don't pass this error through since it doesn't indicate anything // wrong with the connection. return nil - case len(rest) == 0: - batch.ACK() - default: + } + if len(rest) > 0 { + // At least some events failed, retry them batch.RetryEvents(rest) + } else { + // All events were sent successfully + batch.ACK() } return err } @@ -232,7 +236,7 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) newCount := len(data) span.Context.SetLabel("events_encoded", newCount) if st != nil && origCount > newCount { - st.Dropped(origCount - newCount) + st.PermanentErrors(origCount - newCount) } if newCount == 0 { return nil, nil @@ -278,10 +282,10 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) duplicates := stats.duplicates acked := len(data) - failed - dropped - duplicates - st.Acked(acked) - st.Failed(failed) - st.Dropped(dropped) - st.Duplicate(duplicates) + st.AckedEvents(acked) + st.RetryableErrors(failed) + st.PermanentErrors(dropped) + st.DuplicateEvents(duplicates) st.ErrTooMany(stats.tooMany) st.ReportLatency(timeSinceSend) @@ -391,6 +395,7 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat failed := data[:0] stats := bulkResultStats{} for i := 0; i < count; i++ { + encodedEvent := data[i].EncodedEvent.(*encodedEvent) status, msg, err := bulkReadItemStatus(client.log, reader) if err != nil { client.log.Error(err) @@ -398,7 +403,12 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat } if status < 300 { - stats.acked++ + if encodedEvent.deadLetter { + // This was ingested into the dead letter index, not the original target + stats.deadLetter++ + } else { + stats.acked++ + } continue // ok value } @@ -409,28 +419,30 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat continue // ok } - if status < 500 { - if status == http.StatusTooManyRequests { - stats.tooMany++ - } else { - // hard failure, apply policy action - encodedEvent := data[i].EncodedEvent.(*encodedEvent) - if encodedEvent.deadLetter { - stats.nonIndexable++ - client.log.Errorf("Can't deliver to dead letter index event (status=%v). Enable debug logs to view the event and cause.", status) - client.log.Debugf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg) - // poison pill - this will clog the pipeline if the underlying failure is non transient. - } else if client.deadLetterIndex != "" { - client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Enable debug logs to view the event and cause.", status) - client.log.Debugf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg) - client.setDeadLetter(encodedEvent, status, string(msg)) - } else { // drop - stats.nonIndexable++ - client.log.Warnf("Cannot index event (status=%v): dropping event! Enable debug logs to view the event and cause.", status) - client.log.Debugf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg) - continue - } + if status == http.StatusTooManyRequests { + stats.tooMany++ + } else if status < 500 { + // hard failure, apply policy action + if encodedEvent.deadLetter { + stats.nonIndexable++ + client.log.Errorf("Can't deliver to dead letter index event (status=%v). Enable debug logs to view the event and cause.", status) + client.log.Debugf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg) + // poison pill - this will clog the pipeline if the underlying failure is non transient. + } else if client.deadLetterIndex != "" { + // Send this failure to the dead letter index. + // We count this as a "retryable failure", then if the dead letter + // ingestion succeeds it is counted in the "deadLetter" counter + // rather than the "acked" counter. + stats.fails++ + client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Enable debug logs to view the event and cause.", status) + client.log.Debugf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg) + client.setDeadLetter(encodedEvent, status, string(msg)) + } else { // drop + stats.nonIndexable++ + client.log.Warnf("Cannot index event (status=%v): dropping event! Enable debug logs to view the event and cause.", status) + client.log.Debugf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg) } + continue } client.log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg) diff --git a/libbeat/outputs/fileout/file.go b/libbeat/outputs/fileout/file.go index d14bd99d69ad..558278fe8d3d 100644 --- a/libbeat/outputs/fileout/file.go +++ b/libbeat/outputs/fileout/file.go @@ -157,9 +157,9 @@ func (out *fileOutput) Publish(_ context.Context, batch publisher.Batch) error { st.ReportLatency(took) } - st.Dropped(dropped) + st.PermanentErrors(dropped) - st.Acked(len(events) - dropped) + st.AckedEvents(len(events) - dropped) return nil } diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 24bbc61145d4..fdcc93a293dc 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -171,7 +171,7 @@ func (c *client) Publish(_ context.Context, batch publisher.Batch) error { if err != nil { c.log.Errorf("Dropping event: %+v", err) ref.done() - c.observer.Dropped(1) + c.observer.PermanentErrors(1) continue } @@ -359,13 +359,13 @@ func (r *msgRef) fail(msg *message, err error) { switch err { case sarama.ErrInvalidMessage: r.client.log.Errorf("Kafka (topic=%v): dropping invalid message", msg.topic) - r.client.observer.Dropped(1) + r.client.observer.PermanentErrors(1) case sarama.ErrMessageSizeTooLarge, sarama.ErrInvalidMessageSize: r.client.log.Errorf("Kafka (topic=%v): dropping too large message of size %v.", msg.topic, len(msg.key)+len(msg.value)) - r.client.observer.Dropped(1) + r.client.observer.PermanentErrors(1) case breaker.ErrBreakerOpen: // Add this message to the failed list, but don't overwrite r.err since @@ -398,15 +398,15 @@ func (r *msgRef) dec() { success := r.total - failed r.batch.RetryEvents(r.failed) - stats.Failed(failed) + stats.RetryableErrors(failed) if success > 0 { - stats.Acked(success) + stats.AckedEvents(success) } r.client.log.Debugf("Kafka publish failed with: %+v", err) } else { r.batch.ACK() - stats.Acked(r.total) + stats.AckedEvents(r.total) } } diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 1458ee9d3827..b1e20a0e7749 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -238,7 +238,7 @@ func (r *msgRef) callback(seq uint32, err error) { } func (r *msgRef) done(n uint32) { - r.client.observer.Acked(int(n)) + r.client.observer.AckedEvents(int(n)) r.slice = r.slice[n:] if r.win != nil { r.win.tryGrowWindow(r.batchSize) @@ -255,7 +255,7 @@ func (r *msgRef) fail(n uint32, err error) { r.win.shrinkWindow() } - r.client.observer.Acked(int(n)) + r.client.observer.AckedEvents(int(n)) r.dec() } @@ -267,7 +267,7 @@ func (r *msgRef) dec() { } if L := len(r.slice); L > 0 { - r.client.observer.Failed(L) + r.client.observer.RetryableErrors(L) } err := r.err diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index 2a49324c46f9..d24ab1ebb97d 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -149,7 +149,7 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error { n, len(events), c.Host()) events = events[n:] - st.Acked(n) + st.AckedEvents(n) if err != nil { // return batch to pipeline before reporting/counting error batch.RetryEvents(events) @@ -162,7 +162,7 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error { c.log.Errorf("Failed to publish events caused by: %+v", err) rest := len(events) - st.Failed(rest) + st.RetryableErrors(rest) return err } diff --git a/libbeat/outputs/metrics.go b/libbeat/outputs/metrics.go index 5502c4e4ae06..8c911d2ebd24 100644 --- a/libbeat/outputs/metrics.go +++ b/libbeat/outputs/metrics.go @@ -32,18 +32,38 @@ type Stats struct { // // Output event stats // - batches *monitoring.Uint // total number of batches processed by output - events *monitoring.Uint // total number of events processed by output - acked *monitoring.Uint // total number of events ACKed by output - failed *monitoring.Uint // total number of events failed in output - active *monitoring.Uint // events sent and waiting for ACK/fail from output - duplicates *monitoring.Uint // events sent and waiting for ACK/fail from output - dropped *monitoring.Uint // total number of invalid events dropped by the output - tooMany *monitoring.Uint // total number of too many requests replies from output + // Number of calls to the output's Publish function + eventsBatches *monitoring.Uint + + // Number of events sent to the output's Publish function. + eventsTotal *monitoring.Uint + + // Number of events accepted by the output's receiver. + eventsACKed *monitoring.Uint + + // Number of events that reported a retryable error from the output's + // receiver. + eventsFailed *monitoring.Uint + + // Number of events that were dropped due to a non-retryable error. + eventsDropped *monitoring.Uint + + // Number of events rejected by the output's receiver for being duplicates. + eventsDuplicates *monitoring.Uint + + // (Gauge) Number of events that have been sent to the output's Publish + // call but have not yet been ACKed, + eventsActive *monitoring.Uint // (gauge) events sent and waiting for ACK/fail from output + + // Number of events that failed due to a "429 too many requests" error. + // These events are also included in eventsFailed. + eventsTooMany *monitoring.Uint // Output batch stats - split *monitoring.Uint // total number of batches split for being too large + + // Number of times a batch was split for being too large + batchesSplit *monitoring.Uint // // Output network connection stats @@ -62,16 +82,16 @@ type Stats struct { // The registry must not be null. func NewStats(reg *monitoring.Registry) *Stats { obj := &Stats{ - batches: monitoring.NewUint(reg, "events.batches"), - events: monitoring.NewUint(reg, "events.total"), - acked: monitoring.NewUint(reg, "events.acked"), - failed: monitoring.NewUint(reg, "events.failed"), - dropped: monitoring.NewUint(reg, "events.dropped"), - duplicates: monitoring.NewUint(reg, "events.duplicates"), - active: monitoring.NewUint(reg, "events.active"), - tooMany: monitoring.NewUint(reg, "events.toomany"), + eventsBatches: monitoring.NewUint(reg, "events.batches"), + eventsTotal: monitoring.NewUint(reg, "events.total"), + eventsACKed: monitoring.NewUint(reg, "events.acked"), + eventsFailed: monitoring.NewUint(reg, "events.failed"), + eventsDropped: monitoring.NewUint(reg, "events.dropped"), + eventsDuplicates: monitoring.NewUint(reg, "events.duplicates"), + eventsActive: monitoring.NewUint(reg, "events.active"), + eventsTooMany: monitoring.NewUint(reg, "events.toomany"), - split: monitoring.NewUint(reg, "batches.split"), + batchesSplit: monitoring.NewUint(reg, "batches.split"), writeBytes: monitoring.NewUint(reg, "write.bytes"), writeErrors: monitoring.NewUint(reg, "write.errors"), @@ -88,9 +108,9 @@ func NewStats(reg *monitoring.Registry) *Stats { // NewBatch updates active batch and event metrics. func (s *Stats) NewBatch(n int) { if s != nil { - s.batches.Inc() - s.events.Add(uint64(n)) - s.active.Add(uint64(n)) + s.eventsBatches.Inc() + s.eventsTotal.Add(uint64(n)) + s.eventsActive.Add(uint64(n)) } } @@ -98,59 +118,59 @@ func (s *Stats) ReportLatency(time time.Duration) { s.sendLatencyMillis.Update(time.Milliseconds()) } -// Acked updates active and acked event metrics. -func (s *Stats) Acked(n int) { +// AckedEvents updates active and acked event metrics. +func (s *Stats) AckedEvents(n int) { if s != nil { - s.acked.Add(uint64(n)) - s.active.Sub(uint64(n)) + s.eventsACKed.Add(uint64(n)) + s.eventsActive.Sub(uint64(n)) } } -// Failed updates active and failed event metrics. -func (s *Stats) Failed(n int) { +// RetryableErrors updates active and failed event metrics. +func (s *Stats) RetryableErrors(n int) { if s != nil { - s.failed.Add(uint64(n)) - s.active.Sub(uint64(n)) + s.eventsFailed.Add(uint64(n)) + s.eventsActive.Sub(uint64(n)) } } -// Duplicate updates the active and duplicate event metrics. -func (s *Stats) Duplicate(n int) { +// DuplicateEvents updates the active and duplicate event metrics. +func (s *Stats) DuplicateEvents(n int) { if s != nil { - s.duplicates.Add(uint64(n)) - s.active.Sub(uint64(n)) + s.eventsDuplicates.Add(uint64(n)) + s.eventsActive.Sub(uint64(n)) } } -// Dropped updates total number of event drops as reported by the output. +// PermanentErrors updates total number of event drops as reported by the output. // Outputs will only report dropped events on fatal errors which lead to the // event not being publishable. For example encoding errors or total event size // being bigger then maximum supported event size. -func (s *Stats) Dropped(n int) { +func (s *Stats) PermanentErrors(n int) { // number of dropped events (e.g. encoding failures) if s != nil { - s.active.Sub(uint64(n)) - s.dropped.Add(uint64(n)) + s.eventsActive.Sub(uint64(n)) + s.eventsDropped.Add(uint64(n)) } } -// Cancelled updates the active event metrics. -func (s *Stats) Cancelled(n int) { +// CancelledEvents updates the active event metrics. +func (s *Stats) CancelledEvents(n int) { if s != nil { - s.active.Sub(uint64(n)) + s.eventsActive.Sub(uint64(n)) } } -func (s *Stats) Split() { +func (s *Stats) BatchSplit() { if s != nil { - s.split.Inc() + s.batchesSplit.Inc() } } // ErrTooMany updates the number of Too Many Requests responses reported by the output. func (s *Stats) ErrTooMany(n int) { if s != nil { - s.tooMany.Add(uint64(n)) + s.eventsTooMany.Add(uint64(n)) } } diff --git a/libbeat/outputs/observer.go b/libbeat/outputs/observer.go index 3a330e4a43ac..7a3d904ba6fa 100644 --- a/libbeat/outputs/observer.go +++ b/libbeat/outputs/observer.go @@ -22,19 +22,23 @@ import "time" // Observer provides an interface used by outputs to report common events on // documents/events being published and I/O workload. type Observer interface { - NewBatch(int) // report new batch being processed with number of events + NewBatch(int) // report new batch being processed with number of events + + CancelledEvents(int) // report number of events whose Publish call was cancelled for reasons unrelated to ingestion error. + RetryableErrors(int) // report number of events with retryable errors + PermanentErrors(int) // report number of events dropped due to permanent errors + DuplicateEvents(int) // report number of events detected as duplicates (e.g. on resends) + AckedEvents(int) // report number of acked events + ErrTooMany(int) // report too many requests response + + BatchSplit() // report a batch was split for being too large to ingest + + WriteError(error) // report an I/O error on write + WriteBytes(int) // report number of bytes being written + ReadError(error) // report an I/O error on read + ReadBytes(int) // report number of bytes being read + ReportLatency(time.Duration) // report the duration a send to the output takes - Acked(int) // report number of acked events - Failed(int) // report number of failed events - Dropped(int) // report number of dropped events - Duplicate(int) // report number of events detected as duplicates (e.g. on resends) - Cancelled(int) // report number of cancelled events - Split() // report a batch was split for being too large to ingest - WriteError(error) // report an I/O error on write - WriteBytes(int) // report number of bytes being written - ReadError(error) // report an I/O error on read - ReadBytes(int) // report number of bytes being read - ErrTooMany(int) // report too many requests response } type emptyObserver struct{} @@ -48,12 +52,12 @@ func NewNilObserver() Observer { func (*emptyObserver) NewBatch(int) {} func (*emptyObserver) ReportLatency(_ time.Duration) {} -func (*emptyObserver) Acked(int) {} -func (*emptyObserver) Duplicate(int) {} -func (*emptyObserver) Failed(int) {} -func (*emptyObserver) Dropped(int) {} -func (*emptyObserver) Cancelled(int) {} -func (*emptyObserver) Split() {} +func (*emptyObserver) AckedEvents(int) {} +func (*emptyObserver) DuplicateEvents(int) {} +func (*emptyObserver) RetryableErrors(int) {} +func (*emptyObserver) PermanentErrors(int) {} +func (*emptyObserver) CancelledEvents(int) {} +func (*emptyObserver) BatchSplit() {} func (*emptyObserver) WriteError(error) {} func (*emptyObserver) WriteBytes(int) {} func (*emptyObserver) ReadError(error) {} diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index 5a299749aac8..fea29c880bb6 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -147,7 +147,7 @@ func (c *client) Publish(_ context.Context, batch publisher.Batch) error { c.observer.NewBatch(len(events)) rest, err := c.publish(c.key, events) if rest != nil { - c.observer.Failed(len(rest)) + c.observer.RetryableErrors(len(rest)) batch.RetryEvents(rest) return err } @@ -228,7 +228,7 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn { args[0] = dest okEvents, args := serializeEvents(c.log, args, 1, data, c.index, c.codec) - c.observer.Dropped(len(data) - len(okEvents)) + c.observer.PermanentErrors(len(data) - len(okEvents)) if (len(args) - 1) == 0 { return nil, nil } @@ -244,7 +244,7 @@ func (c *client) publishEventsBulk(conn redis.Conn, command string) publishFn { } - c.observer.Acked(len(okEvents)) + c.observer.AckedEvents(len(okEvents)) return nil, nil } } @@ -254,7 +254,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF var okEvents []publisher.Event serialized := make([]interface{}, 0, len(data)) okEvents, serialized = serializeEvents(c.log, serialized, 0, data, c.index, c.codec) - c.observer.Dropped(len(data) - len(okEvents)) + c.observer.PermanentErrors(len(data) - len(okEvents)) if len(serialized) == 0 { return nil, nil } @@ -275,7 +275,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF return okEvents, err } } - c.observer.Dropped(dropped) + c.observer.PermanentErrors(dropped) if err := conn.Flush(); err != nil { return data, err @@ -301,7 +301,7 @@ func (c *client) publishEventsPipeline(conn redis.Conn, command string) publishF } } - c.observer.Acked(len(okEvents) - len(failed)) + c.observer.AckedEvents(len(okEvents) - len(failed)) return failed, lastErr } } diff --git a/libbeat/outputs/shipper/shipper.go b/libbeat/outputs/shipper/shipper.go index 83955a80f4ce..91c9e0994916 100644 --- a/libbeat/outputs/shipper/shipper.go +++ b/libbeat/outputs/shipper/shipper.go @@ -196,7 +196,7 @@ func (s *shipper) publish(ctx context.Context, batch publisher.Batch) error { convertedCount := len(toSend) - s.observer.Dropped(droppedCount) + s.observer.PermanentErrors(droppedCount) s.log.Debugf("%d events converted to protobuf, %d dropped", convertedCount, droppedCount) var lastAcceptedIndex uint64 @@ -218,10 +218,10 @@ func (s *shipper) publish(ctx context.Context, batch publisher.Batch) error { // need to drop it. if batch.SplitRetry() { // Report that we split a batch - s.observer.Split() + s.observer.BatchSplit() } else { batch.Drop() - s.observer.Dropped(len(events)) + s.observer.PermanentErrors(len(events)) s.log.Errorf("dropping %d events because of RPC failure: %v", len(events), err) } return nil @@ -233,8 +233,8 @@ func (s *shipper) publish(ctx context.Context, batch publisher.Batch) error { // request, then cancelling here (instead of dropping or retrying) // will cause an infinite retry loop, wedging the pipeline. - batch.Cancelled() // does not decrease the TTL - s.observer.Cancelled(len(events)) // we cancel the whole batch not just non-dropped events + batch.Cancelled() // does not decrease the TTL + s.observer.CancelledEvents(len(events)) // we cancel the whole batch not just non-dropped events return fmt.Errorf("failed to publish the batch to the shipper, none of the %d events were accepted: %w", len(toSend), err) } @@ -387,7 +387,7 @@ func (s *shipper) ackWorker(ctx context.Context) { p.batch.ACK() ackedCount := p.eventCount - p.droppedCount - s.observer.Acked(ackedCount) + s.observer.AckedEvents(ackedCount) s.log.Debugf("%d events have been acknowledged, %d dropped", ackedCount, p.droppedCount) lastProcessed++ } diff --git a/libbeat/outputs/shipper/shipper_test.go b/libbeat/outputs/shipper/shipper_test.go index e26d44635aff..d8abe7995bf5 100644 --- a/libbeat/outputs/shipper/shipper_test.go +++ b/libbeat/outputs/shipper/shipper_test.go @@ -638,13 +638,13 @@ type TestObserver struct { } func (to *TestObserver) NewBatch(batch int) { to.batch += batch } -func (to *TestObserver) Acked(acked int) { to.acked += acked } +func (to *TestObserver) AckedEvents(acked int) { to.acked += acked } func (to *TestObserver) ReportLatency(_ time.Duration) {} -func (to *TestObserver) Duplicate(duplicate int) { to.duplicate += duplicate } -func (to *TestObserver) Failed(failed int) { to.failed += failed } -func (to *TestObserver) Dropped(dropped int) { to.dropped += dropped } -func (to *TestObserver) Cancelled(cancelled int) { to.cancelled += cancelled } -func (to *TestObserver) Split() { to.split++ } +func (to *TestObserver) DuplicateEvents(duplicate int) { to.duplicate += duplicate } +func (to *TestObserver) RetryableErrors(failed int) { to.failed += failed } +func (to *TestObserver) PermanentErrors(dropped int) { to.dropped += dropped } +func (to *TestObserver) CancelledEvents(cancelled int) { to.cancelled += cancelled } +func (to *TestObserver) BatchSplit() { to.split++ } func (to *TestObserver) WriteError(we error) { to.writeError = we } func (to *TestObserver) WriteBytes(wb int) { to.writeBytes += wb } func (to *TestObserver) ReadError(re error) { to.readError = re } diff --git a/libbeat/publisher/pipeline/monitoring.go b/libbeat/publisher/pipeline/monitoring.go index cda329e0963a..0bc63a739f90 100644 --- a/libbeat/publisher/pipeline/monitoring.go +++ b/libbeat/publisher/pipeline/monitoring.go @@ -32,21 +32,32 @@ type observer interface { } type pipelineObserver interface { + // A new client connected to the pipeline via (*Pipeline).ConnectWith. clientConnected() + // An open pipeline client received a Close() call. clientClosed() } type clientObserver interface { + // The client received a Publish call newEvent() + // An event was filtered by processors before being published filteredEvent() + // An event was published to the queue publishedEvent() + // An event was rejected by the queue failedPublishEvent() } type outputObserver interface { + // Events encountered too many errors and were permanently dropped. eventsDropped(int) + // Events were sent back to an output worker after an earlier failure. eventsRetry(int) + // The queue received acknowledgment for events from the output workers. + // (This may include events already reported via eventsDropped.) queueACKed(n int) + // Report the maximum event count supported by the queue. queueMaxEvents(n int) } @@ -65,10 +76,10 @@ type metricsObserverVars struct { // clients metrics clients *monitoring.Uint - // events publish/dropped stats - events, filtered, published, failed *monitoring.Uint - dropped, retry *monitoring.Uint // (retryer) drop/retry counters - activeEvents *monitoring.Uint + // eventsTotal publish/dropped stats + eventsTotal, eventsFiltered, eventsPublished, eventsFailed *monitoring.Uint + eventsDropped, eventsRetry *monitoring.Uint // (retryer) drop/retry counters + activeEvents *monitoring.Uint // queue metrics queueACKed *monitoring.Uint @@ -85,19 +96,46 @@ func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver { return &metricsObserver{ metrics: metrics, vars: metricsObserverVars{ - clients: monitoring.NewUint(reg, "clients"), // Gauge + // (Gauge) clients measures the number of open pipeline clients. + clients: monitoring.NewUint(reg, "clients"), - events: monitoring.NewUint(reg, "events.total"), - filtered: monitoring.NewUint(reg, "events.filtered"), - published: monitoring.NewUint(reg, "events.published"), - failed: monitoring.NewUint(reg, "events.failed"), - dropped: monitoring.NewUint(reg, "events.dropped"), - retry: monitoring.NewUint(reg, "events.retry"), + // events.total counts all created events. + eventsTotal: monitoring.NewUint(reg, "events.total"), - queueACKed: monitoring.NewUint(reg, "queue.acked"), + // (Gauge) events.active measures events that have been created, but have + // not yet been failed, filtered, or acked/dropped. + activeEvents: monitoring.NewUint(reg, "events.active"), + + // events.filtered counts events that were filtered by processors before + // being sent to the queue. + eventsFiltered: monitoring.NewUint(reg, "events.filtered"), + + // events.failed counts events that were rejected by the queue, or that + // were sent via an already-closed pipeline client. + eventsFailed: monitoring.NewUint(reg, "events.failed"), + + // events.published counts events that were accepted by the queue. + eventsPublished: monitoring.NewUint(reg, "events.published"), + + // events.retry counts events that an output worker sent back to be + // retried. + eventsRetry: monitoring.NewUint(reg, "events.retry"), + + // events.dropped counts events that were dropped because errors from + // the output workers exceeded the configured maximum retry count. + eventsDropped: monitoring.NewUint(reg, "events.dropped"), + + // (Gauge) queue.max_events measures the maximum number of events the + // queue will accept, or 0 if there is none. queueMaxEvents: monitoring.NewUint(reg, "queue.max_events"), - activeEvents: monitoring.NewUint(reg, "events.active"), // Gauge + // queue.acked counts events that have been acknowledged by the output + // workers. This includes events that were dropped for fatal errors, + // which are also reported in events.dropped. + queueACKed: monitoring.NewUint(reg, "queue.acked"), + + // (Gauge) queue.filled.pct.events measures the fraction (from 0 to 1) + // of the queue's event capacity that is currently filled. percentQueueFull: monitoring.NewFloat(reg, "queue.filled.pct.events"), }, } @@ -125,7 +163,7 @@ func (o *metricsObserver) clientClosed() { o.vars.clients.Dec() } // (client) client is trying to publish a new event func (o *metricsObserver) newEvent() { - o.vars.events.Inc() + o.vars.eventsTotal.Inc() o.vars.activeEvents.Inc() o.setPercentageFull() } @@ -142,19 +180,19 @@ func (o *metricsObserver) setPercentageFull() { // (client) event is filtered out (on purpose or failed) func (o *metricsObserver) filteredEvent() { - o.vars.filtered.Inc() + o.vars.eventsFiltered.Inc() o.vars.activeEvents.Dec() o.setPercentageFull() } // (client) managed to push an event into the publisher pipeline func (o *metricsObserver) publishedEvent() { - o.vars.published.Inc() + o.vars.eventsPublished.Inc() } // (client) client closing down or DropIfFull is set func (o *metricsObserver) failedPublishEvent() { - o.vars.failed.Inc() + o.vars.eventsFailed.Inc() o.vars.activeEvents.Dec() o.setPercentageFull() } @@ -182,12 +220,12 @@ func (o *metricsObserver) queueMaxEvents(n int) { // (retryer) number of events dropped by retryer func (o *metricsObserver) eventsDropped(n int) { - o.vars.dropped.Add(uint64(n)) + o.vars.eventsDropped.Add(uint64(n)) } // (retryer) number of events pushed to the output worker queue func (o *metricsObserver) eventsRetry(n int) { - o.vars.retry.Add(uint64(n)) + o.vars.eventsRetry.Add(uint64(n)) } type emptyObserver struct{} diff --git a/libbeat/publisher/pipeline/stress/out.go b/libbeat/publisher/pipeline/stress/out.go index 03ea06d3be86..b0e4c3d4e399 100644 --- a/libbeat/publisher/pipeline/stress/out.go +++ b/libbeat/publisher/pipeline/stress/out.go @@ -93,7 +93,7 @@ func (t *testOutput) Publish(_ context.Context, batch publisher.Batch) error { if config.Fail.EveryBatch == t.batchCount { t.batchCount = 0 - t.observer.Failed(n) + t.observer.RetryableErrors(n) batch.Retry() return nil } @@ -104,7 +104,7 @@ func (t *testOutput) Publish(_ context.Context, batch publisher.Batch) error { // ack complete batch batch.ACK() - t.observer.Acked(n) + t.observer.AckedEvents(n) return nil } From 3583ff2a9606c82d0a0fb89649ba461176acb2b1 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 16 May 2024 17:32:38 -0400 Subject: [PATCH 02/14] fix tests --- libbeat/outputs/elasticsearch/client.go | 28 +++++++++++--------- libbeat/outputs/elasticsearch/client_test.go | 13 ++++----- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index d513eba8f7ae..4134f8210773 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -424,25 +424,27 @@ func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, dat } else if status < 500 { // hard failure, apply policy action if encodedEvent.deadLetter { - stats.nonIndexable++ + // Fatal error while sending an already-failed event to the dead letter + // index, drop. client.log.Errorf("Can't deliver to dead letter index event (status=%v). Enable debug logs to view the event and cause.", status) client.log.Debugf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg) - // poison pill - this will clog the pipeline if the underlying failure is non transient. - } else if client.deadLetterIndex != "" { - // Send this failure to the dead letter index. - // We count this as a "retryable failure", then if the dead letter - // ingestion succeeds it is counted in the "deadLetter" counter - // rather than the "acked" counter. - stats.fails++ - client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Enable debug logs to view the event and cause.", status) - client.log.Debugf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg) - client.setDeadLetter(encodedEvent, status, string(msg)) - } else { // drop stats.nonIndexable++ + continue + } + if client.deadLetterIndex == "" { + // Fatal error and no dead letter index, drop. client.log.Warnf("Cannot index event (status=%v): dropping event! Enable debug logs to view the event and cause.", status) client.log.Debugf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg) + stats.nonIndexable++ + continue } - continue + // Send this failure to the dead letter index and "retry". + // We count this as a "retryable failure", then if the dead letter + // ingestion succeeds it is counted in the "deadLetter" counter + // rather than the "acked" counter. + client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Enable debug logs to view the event and cause.", status) + client.log.Debugf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg) + client.setDeadLetter(encodedEvent, status, string(msg)) } client.log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg) diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 28033ff3cb24..760f49158eef 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -263,7 +263,7 @@ func TestCollectPublishFailsNone(t *testing.T) { events[i] = publisher.Event{Content: beat.Event{Fields: event}} } - res, _ := client.bulkCollectPublishFails(response, events) + res, _ := client.bulkCollectPublishFails(response, encodeEvents(client, events)) assert.Equal(t, 0, len(res)) } @@ -284,9 +284,10 @@ func TestCollectPublishFailMiddle(t *testing.T) { ]} `) - event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 1}}} - eventFail := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}} - events := []publisher.Event{event, eventFail, event} + event1 := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 1}}}) + event2 := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}}) + eventFail := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 3}}}) + events := []publisher.Event{event1, eventFail, event2} res, stats := client.bulkCollectPublishFails(response, events) assert.Equal(t, 1, len(res)) @@ -332,10 +333,10 @@ func TestCollectPublishFailDeadLetterQueue(t *testing.T) { ]} `) - event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}} + event1 := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}} event2 := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 2}}} eventFail := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": "bar1"}}} - events := encodeEvents(client, []publisher.Event{event, eventFail, event2}) + events := encodeEvents(client, []publisher.Event{event1, eventFail, event2}) res, stats := client.bulkCollectPublishFails(response, events) assert.Equal(t, 1, len(res)) From 5842ac25f489214f150de02db1b793598826fa1e Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 17 May 2024 09:09:04 -0400 Subject: [PATCH 03/14] Add dead letter metric --- libbeat/outputs/elasticsearch/client.go | 37 ++++++++++++-------- libbeat/outputs/elasticsearch/client_test.go | 14 +++++++- libbeat/outputs/metrics.go | 18 ++++++---- libbeat/outputs/observer.go | 14 ++++---- libbeat/outputs/shipper/shipper.go | 3 +- libbeat/outputs/shipper/shipper_test.go | 17 ++++----- 6 files changed, 63 insertions(+), 40 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 4134f8210773..f82697768bc6 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -180,7 +180,7 @@ func (client *Client) Clone() *Client { func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error { events := batch.Events() - rest, err := client.publishEvents(ctx, events) + eventsToRetry, err := client.publishEvents(ctx, events) if errors.Is(err, errPayloadTooLarge) { if batch.SplitRetry() { @@ -201,9 +201,9 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error // wrong with the connection. return nil } - if len(rest) > 0 { + if len(eventsToRetry) > 0 { // At least some events failed, retry them - batch.RetryEvents(rest) + batch.RetryEvents(eventsToRetry) } else { // All events were sent successfully batch.ACK() @@ -214,26 +214,32 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error // PublishEvents sends all events to elasticsearch. On error a slice with all // events not published or confirmed to be processed by elasticsearch will be // returned. The input slice backing memory will be reused by return the value. -func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) ([]publisher.Event, error) { +// Each input event will be reported to the metrics observer via NewBatch, and +// exactly one of: +// - RetryableErrors +// - PermanentErrors +// - DuplicateEvents +// - AckedEvents +func (client *Client) publishEvents(ctx context.Context, events []publisher.Event) ([]publisher.Event, error) { span, ctx := apm.StartSpan(ctx, "publishEvents", "output") defer span.End() st := client.observer if st != nil { - st.NewBatch(len(data)) + st.NewBatch(len(events)) } - if len(data) == 0 { + if len(events) == 0 { return nil, nil } // encode events into bulk request buffer, dropping failed elements from // events slice - origCount := len(data) + origCount := len(events) span.Context.SetLabel("events_original", origCount) - data, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), data) - newCount := len(data) + events, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), events) + newCount := len(events) span.Context.SetLabel("events_encoded", newCount) if st != nil && origCount > newCount { st.PermanentErrors(origCount - newCount) @@ -251,14 +257,14 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) if status == http.StatusRequestEntityTooLarge { // This error must be handled by splitting the batch, propagate it // back to Publish instead of reporting it directly - return data, errPayloadTooLarge + return events, errPayloadTooLarge } err := apm.CaptureError(ctx, fmt.Errorf("failed to perform any bulk index operations: %w", sendErr)) err.Send() client.log.Error(err) - return data, sendErr + return events, sendErr } - pubCount := len(data) + pubCount := len(events) span.Context.SetLabel("events_published", pubCount) client.log.Debugf("PublishEvents: %d events have been published to elasticsearch in %v.", @@ -269,10 +275,10 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) var failedEvents []publisher.Event var stats bulkResultStats if status != 200 { - failedEvents = data + failedEvents = events stats.fails = len(failedEvents) } else { - failedEvents, stats = client.bulkCollectPublishFails(result, data) + failedEvents, stats = client.bulkCollectPublishFails(result, events) } failed := len(failedEvents) @@ -280,12 +286,13 @@ func (client *Client) publishEvents(ctx context.Context, data []publisher.Event) if st := client.observer; st != nil { dropped := stats.nonIndexable duplicates := stats.duplicates - acked := len(data) - failed - dropped - duplicates + acked := len(events) - failed - dropped - duplicates st.AckedEvents(acked) st.RetryableErrors(failed) st.PermanentErrors(dropped) st.DuplicateEvents(duplicates) + st.DeadLetterEvents(stats.deadLetter) st.ErrTooMany(stats.tooMany) st.ReportLatency(timeSinceSend) diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 760f49158eef..36afa4d23b84 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -297,7 +297,15 @@ func TestCollectPublishFailMiddle(t *testing.T) { assert.Equal(t, bulkResultStats{acked: 2, fails: 1, tooMany: 1}, stats) } -func TestCollectPublishFailDeadLetterQueue(t *testing.T) { +func TestCollectPublishFailDeadLetterSuccess(t *testing.T) { + +} + +func TestColllectPublishFailFatalErrorNotRetried(t *testing.T) { + +} + +func TestCollectPublishFailDeadLetterIndex(t *testing.T) { client, err := NewClient( clientSettings{ observer: outputs.NewNilObserver(), @@ -917,6 +925,10 @@ func TestPublishEventsWithBulkFiltering(t *testing.T) { }) } +func TestPublishPayloadTooLargeReportsMetrics(t *testing.T) { + +} + func TestSetDeadLetter(t *testing.T) { dead_letter_index := "dead_index" client := &Client{ diff --git a/libbeat/outputs/metrics.go b/libbeat/outputs/metrics.go index 8c911d2ebd24..7e47c6e7ab9a 100644 --- a/libbeat/outputs/metrics.go +++ b/libbeat/outputs/metrics.go @@ -42,6 +42,9 @@ type Stats struct { // Number of events accepted by the output's receiver. eventsACKed *monitoring.Uint + // Number of failed events ingested to the dead letter index + eventsDeadLetter *monitoring.Uint + // Number of events that reported a retryable error from the output's // receiver. eventsFailed *monitoring.Uint @@ -85,6 +88,7 @@ func NewStats(reg *monitoring.Registry) *Stats { eventsBatches: monitoring.NewUint(reg, "events.batches"), eventsTotal: monitoring.NewUint(reg, "events.total"), eventsACKed: monitoring.NewUint(reg, "events.acked"), + eventsDeadLetter: monitoring.NewUint(reg, "events.dead_letter"), eventsFailed: monitoring.NewUint(reg, "events.failed"), eventsDropped: monitoring.NewUint(reg, "events.dropped"), eventsDuplicates: monitoring.NewUint(reg, "events.duplicates"), @@ -126,6 +130,13 @@ func (s *Stats) AckedEvents(n int) { } } +func (s *Stats) DeadLetterEvents(n int) { + if s != nil { + s.eventsDeadLetter.Add(uint64(n)) + s.eventsActive.Sub(uint64(n)) + } +} + // RetryableErrors updates active and failed event metrics. func (s *Stats) RetryableErrors(n int) { if s != nil { @@ -154,13 +165,6 @@ func (s *Stats) PermanentErrors(n int) { } } -// CancelledEvents updates the active event metrics. -func (s *Stats) CancelledEvents(n int) { - if s != nil { - s.eventsActive.Sub(uint64(n)) - } -} - func (s *Stats) BatchSplit() { if s != nil { s.batchesSplit.Inc() diff --git a/libbeat/outputs/observer.go b/libbeat/outputs/observer.go index 7a3d904ba6fa..28d40f90dbd2 100644 --- a/libbeat/outputs/observer.go +++ b/libbeat/outputs/observer.go @@ -24,12 +24,12 @@ import "time" type Observer interface { NewBatch(int) // report new batch being processed with number of events - CancelledEvents(int) // report number of events whose Publish call was cancelled for reasons unrelated to ingestion error. - RetryableErrors(int) // report number of events with retryable errors - PermanentErrors(int) // report number of events dropped due to permanent errors - DuplicateEvents(int) // report number of events detected as duplicates (e.g. on resends) - AckedEvents(int) // report number of acked events - ErrTooMany(int) // report too many requests response + RetryableErrors(int) // report number of events with retryable errors + PermanentErrors(int) // report number of events dropped due to permanent errors + DuplicateEvents(int) // report number of events detected as duplicates (e.g. on resends) + DeadLetterEvents(int) // report number of failed events ingested to dead letter index + AckedEvents(int) // report number of acked events + ErrTooMany(int) // report too many requests response BatchSplit() // report a batch was split for being too large to ingest @@ -53,10 +53,10 @@ func NewNilObserver() Observer { func (*emptyObserver) NewBatch(int) {} func (*emptyObserver) ReportLatency(_ time.Duration) {} func (*emptyObserver) AckedEvents(int) {} +func (*emptyObserver) DeadLetterEvents(int) {} func (*emptyObserver) DuplicateEvents(int) {} func (*emptyObserver) RetryableErrors(int) {} func (*emptyObserver) PermanentErrors(int) {} -func (*emptyObserver) CancelledEvents(int) {} func (*emptyObserver) BatchSplit() {} func (*emptyObserver) WriteError(error) {} func (*emptyObserver) WriteBytes(int) {} diff --git a/libbeat/outputs/shipper/shipper.go b/libbeat/outputs/shipper/shipper.go index 91c9e0994916..8ed5668edb05 100644 --- a/libbeat/outputs/shipper/shipper.go +++ b/libbeat/outputs/shipper/shipper.go @@ -233,8 +233,7 @@ func (s *shipper) publish(ctx context.Context, batch publisher.Batch) error { // request, then cancelling here (instead of dropping or retrying) // will cause an infinite retry loop, wedging the pipeline. - batch.Cancelled() // does not decrease the TTL - s.observer.CancelledEvents(len(events)) // we cancel the whole batch not just non-dropped events + batch.Cancelled() // does not decrease the TTL return fmt.Errorf("failed to publish the batch to the shipper, none of the %d events were accepted: %w", len(toSend), err) } diff --git a/libbeat/outputs/shipper/shipper_test.go b/libbeat/outputs/shipper/shipper_test.go index d8abe7995bf5..acb276388ca3 100644 --- a/libbeat/outputs/shipper/shipper_test.go +++ b/libbeat/outputs/shipper/shipper_test.go @@ -620,13 +620,14 @@ func failMarshal(e publisher.Event) (*messages.Event, error) { // mock test observer for tracking events type TestObserver struct { - acked int - dropped int - cancelled int - batch int - duplicate int - failed int - split int + acked int + deadLetter int + dropped int + cancelled int + batch int + duplicate int + failed int + split int writeError error readError error @@ -639,11 +640,11 @@ type TestObserver struct { func (to *TestObserver) NewBatch(batch int) { to.batch += batch } func (to *TestObserver) AckedEvents(acked int) { to.acked += acked } +func (to *TestObserver) DeadLetterEvents(count int) { to.deadLetter += count } func (to *TestObserver) ReportLatency(_ time.Duration) {} func (to *TestObserver) DuplicateEvents(duplicate int) { to.duplicate += duplicate } func (to *TestObserver) RetryableErrors(failed int) { to.failed += failed } func (to *TestObserver) PermanentErrors(dropped int) { to.dropped += dropped } -func (to *TestObserver) CancelledEvents(cancelled int) { to.cancelled += cancelled } func (to *TestObserver) BatchSplit() { to.split++ } func (to *TestObserver) WriteError(we error) { to.writeError = we } func (to *TestObserver) WriteBytes(wb int) { to.writeBytes += wb } From 6d00455023a84d5734037164b424f4620a83babc Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 17 May 2024 18:00:50 -0400 Subject: [PATCH 04/14] reworking the ES publish helpers --- libbeat/outputs/elasticsearch/client.go | 378 ++++++++++-------- .../outputs/elasticsearch/event_encoder.go | 15 + libbeat/publisher/event.go | 2 + 3 files changed, 231 insertions(+), 164 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index f82697768bc6..3e9d3ae36e3a 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -26,6 +26,7 @@ import ( "time" "go.elastic.co/apm/v2" + "gotest.tools/gotestsum/log" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/beat/events" @@ -66,7 +67,11 @@ type clientSettings struct { connection eslegclient.ConnectionSettings indexSelector outputs.IndexSelector pipelineSelector *outil.Selector - observer outputs.Observer + + // The metrics observer from the clientSettings, or a no-op placeholder if + // none is provided. This variable is always non-nil for a client created + // via NewClient. + observer outputs.Observer // If deadLetterIndex is set, events with bulk-ingest errors will be // forwarded to this index. Otherwise, they will be dropped. @@ -126,11 +131,17 @@ func NewClient( return nil } + // Make sure there's a non-nil obser + observer := s.observer + if observer == nil { + observer = outputs.NewNilObserver() + } + client := &Client{ conn: *conn, indexSelector: s.indexSelector, pipelineSelector: pipeline, - observer: s.observer, + observer: observer, deadLetterIndex: s.deadLetterIndex, log: logp.NewLogger("elasticsearch"), @@ -179,129 +190,149 @@ func (client *Client) Clone() *Client { } func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error { - events := batch.Events() - eventsToRetry, err := client.publishEvents(ctx, events) + client.observer.NewBatch(len(batch.Events())) + + span, ctx := apm.StartSpan(ctx, "publishEvents", "output") + defer span.End() + + // Create and send the bulk request. + bulkResult := client.doBulkRequest(ctx, span, batch) + client.observer.ReportLatency(bulkResult.duration) + + // If there was a connection-level error there is no per-item response, + // handle it and return. + if bulkResult.err != nil { + return client.handleBulkResultError(ctx, batch, bulkResult) + } + + // At this point we have an Elasticsearch response for our request, + // check and report the per-item results. + client.reportBulkResult(ctx, span, batch, bulkResult) + return nil +} + +type bulkResult struct { + err error + + // The array of events sent via bulk request. This excludes any events that + // had encoding errors while assembling the request. + events []publisher.Event + + // The http status returned by the bulk request. + status int + + // The API response from Elasticsearch. + response eslegclient.BulkResult + + // The time it took to send the bulk request and read the response. + duration time.Duration +} + +var bulkRequestParams = map[string]string{ + "filter_path": "errors,items.*.error,items.*.status", +} + +// Encode a batch's events into a bulk publish request, send the request to +// Elasticsearch, and return the resulting metadata. +// The events list in the result will be shorter than the original batch if +// some events couldn't be encoded. In this case, the removed events will +// be reported to the Client's metrics observer via PermanentErrors. +func (client *Client) doBulkRequest( + ctx context.Context, + span *apm.Span, + batch publisher.Batch, +) bulkResult { + var result bulkResult + + rawEvents := batch.Events() + + // encode events into bulk request buffer, dropping failed elements from + // events slice + span.Context.SetLabel("events_original", len(rawEvents)) + resultEvents, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), rawEvents) + result.events = resultEvents + span.Context.SetLabel("events_encoded", len(resultEvents)) + client.observer.PermanentErrors(len(rawEvents) - len(resultEvents)) + + // If we encoded any events, send the network request. + if len(result.events) > 0 { + begin := time.Now() + result.status, result.response, result.err = client.conn.Bulk(ctx, "", "", bulkRequestParams, bulkItems) + result.duration = time.Since(begin) + } - if errors.Is(err, errPayloadTooLarge) { + return result +} + +func (client *Client) handleBulkResultError( + ctx context.Context, batch publisher.Batch, bulkResult bulkResult, +) error { + if bulkResult.status == http.StatusRequestEntityTooLarge { if batch.SplitRetry() { // Report that we split a batch client.observer.BatchSplit() - client.observer.RetryableErrors(len(events)) + client.observer.RetryableErrors(len(bulkResult.events)) } else { // If the batch could not be split, there is no option left but // to drop it and log the error state. batch.Drop() - client.observer.PermanentErrors(len(events)) - err := apm.CaptureError(ctx, fmt.Errorf("failed to perform bulk index operation: %w", err)) + client.observer.PermanentErrors(len(bulkResult.events)) + err := apm.CaptureError(ctx, fmt.Errorf("failed to perform bulk index operation: %w", bulkResult.err)) err.Send() client.log.Error(err) } - // Returning an error from Publish forces a client close / reconnect, - // so don't pass this error through since it doesn't indicate anything - // wrong with the connection. + // Don't propagate a too-large error since it doesn't indicate a problem + // with the connection. return nil } - if len(eventsToRetry) > 0 { + err := apm.CaptureError(ctx, fmt.Errorf("failed to perform any bulk index operations: %w", bulkResult.err)) + err.Send() + client.log.Error(err) + + if len(bulkResult.events) > 0 { // At least some events failed, retry them - batch.RetryEvents(eventsToRetry) + batch.RetryEvents(bulkResult.events) } else { // All events were sent successfully batch.ACK() } - return err + return bulkResult.err } -// PublishEvents sends all events to elasticsearch. On error a slice with all -// events not published or confirmed to be processed by elasticsearch will be -// returned. The input slice backing memory will be reused by return the value. -// Each input event will be reported to the metrics observer via NewBatch, and -// exactly one of: -// - RetryableErrors -// - PermanentErrors -// - DuplicateEvents -// - AckedEvents -func (client *Client) publishEvents(ctx context.Context, events []publisher.Event) ([]publisher.Event, error) { - span, ctx := apm.StartSpan(ctx, "publishEvents", "output") - defer span.End() - - st := client.observer - - if st != nil { - st.NewBatch(len(events)) - } - - if len(events) == 0 { - return nil, nil - } - - // encode events into bulk request buffer, dropping failed elements from - // events slice - origCount := len(events) - span.Context.SetLabel("events_original", origCount) - events, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), events) - newCount := len(events) - span.Context.SetLabel("events_encoded", newCount) - if st != nil && origCount > newCount { - st.PermanentErrors(origCount - newCount) - } - if newCount == 0 { - return nil, nil - } +// reportBulkResult processes the result of a bulk request. It reports the +// event states to the metrics observer and acknowledges or retries the +// batch as appropriate. +// The returned error is for connection-level errors. A non-nil value should +// be returned from (*Client).Publish to trigger a reconnection attempt. +func (client *Client) reportBulkResult( + ctx context.Context, + span *apm.Span, + batch publisher.Batch, + bulkResult bulkResult, +) { + span.Context.SetLabel("events_published", len(bulkResult.events)) + client.log.Debugf("PublishEvents: %d events have been sent to elasticsearch in %v.", + len(bulkResult.events), bulkResult.duration) - begin := time.Now() - params := map[string]string{"filter_path": "errors,items.*.error,items.*.status"} - status, result, sendErr := client.conn.Bulk(ctx, "", "", params, bulkItems) - timeSinceSend := time.Since(begin) + eventsToRetry, stats := client.bulkCollectPublishFails(bulkResult) + stats.reportToObserver(client.observer) - if sendErr != nil { - if status == http.StatusRequestEntityTooLarge { - // This error must be handled by splitting the batch, propagate it - // back to Publish instead of reporting it directly - return events, errPayloadTooLarge - } - err := apm.CaptureError(ctx, fmt.Errorf("failed to perform any bulk index operations: %w", sendErr)) - err.Send() - client.log.Error(err) - return events, sendErr - } - pubCount := len(events) - span.Context.SetLabel("events_published", pubCount) - - client.log.Debugf("PublishEvents: %d events have been published to elasticsearch in %v.", - pubCount, - timeSinceSend) - - // check response for transient errors - var failedEvents []publisher.Event - var stats bulkResultStats - if status != 200 { - failedEvents = events - stats.fails = len(failedEvents) + if len(eventsToRetry) > 0 { + span.Context.SetLabel("events_failed", len(eventsToRetry)) + batch.RetryEvents(eventsToRetry) } else { - failedEvents, stats = client.bulkCollectPublishFails(result, events) + batch.ACK() } +} - failed := len(failedEvents) - span.Context.SetLabel("events_failed", failed) - if st := client.observer; st != nil { - dropped := stats.nonIndexable - duplicates := stats.duplicates - acked := len(events) - failed - dropped - duplicates - - st.AckedEvents(acked) - st.RetryableErrors(failed) - st.PermanentErrors(dropped) - st.DuplicateEvents(duplicates) - st.DeadLetterEvents(stats.deadLetter) - st.ErrTooMany(stats.tooMany) - st.ReportLatency(timeSinceSend) +func (stats bulkResultStats) reportToObserver(ob outputs.Observer) { + ob.AckedEvents(stats.acked) + ob.RetryableErrors(stats.fails) + ob.PermanentErrors(stats.nonIndexable) + ob.DuplicateEvents(stats.duplicates) + ob.DeadLetterEvents(stats.deadLetter) - } - - if failed > 0 { - return failedEvents, eslegclient.ErrTempBulkFailure - } - return nil, nil + ob.ErrTooMany(stats.tooMany) } // bulkEncodePublishRequest encodes all bulk requests and returns slice of events @@ -391,89 +422,108 @@ func getPipeline(event *beat.Event, defaultSelector *outil.Selector) (string, er // to be tried again due to error code returned for that items. If indexing an // event failed due to some error in the event itself (e.g. does not respect mapping), // the event will be dropped. -func (client *Client) bulkCollectPublishFails(result eslegclient.BulkResult, data []publisher.Event) ([]publisher.Event, bulkResultStats) { - reader := newJSONReader(result) +// Each of the events will be reported in the returned stats as exactly one of +// acked, duplicates, fails, nonIndexable, or deadLetter. +func (client *Client) bulkCollectPublishFails(bulkResult bulkResult) ([]publisher.Event, bulkResultStats) { + events := bulkResult.events + + if len(bulkResult.events) == 0 { + // No events to process + return nil, bulkResultStats{} + } + if bulkResult.status != 200 { + return events, bulkResultStats{fails: len(events)} + } + reader := newJSONReader(bulkResult.response) if err := bulkReadToItems(reader); err != nil { client.log.Errorf("failed to parse bulk response: %v", err.Error()) - return nil, bulkResultStats{} + return events, bulkResultStats{fails: len(events)} } - count := len(data) - failed := data[:0] + count := len(events) + eventsToRetry := events[:0] stats := bulkResultStats{} for i := 0; i < count; i++ { - encodedEvent := data[i].EncodedEvent.(*encodedEvent) - status, msg, err := bulkReadItemStatus(client.log, reader) + itemStatus, itemMessage, err := bulkReadItemStatus(client.log, reader) if err != nil { - client.log.Error(err) - return nil, bulkResultStats{} + // The response json is invalid, mark the remaining events for retry. + stats.fails += count - i + eventsToRetry = append(eventsToRetry, events[i:]...) + break } - if status < 300 { - if encodedEvent.deadLetter { - // This was ingested into the dead letter index, not the original target - stats.deadLetter++ - } else { - stats.acked++ - } - continue // ok value + if client.applyItemStatus(events[i], itemStatus, itemMessage, &stats) { + eventsToRetry = append(eventsToRetry, events[i]) + log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, itemStatus, itemMessage) } + } - if status == 409 { - // 409 is used to indicate an event with same ID already exists if - // `create` op_type is used. - stats.duplicates++ - continue // ok - } + return eventsToRetry, stats +} - if status == http.StatusTooManyRequests { - stats.tooMany++ - } else if status < 500 { - // hard failure, apply policy action - if encodedEvent.deadLetter { - // Fatal error while sending an already-failed event to the dead letter - // index, drop. - client.log.Errorf("Can't deliver to dead letter index event (status=%v). Enable debug logs to view the event and cause.", status) - client.log.Debugf("Can't deliver to dead letter index event %#v (status=%v): %s", data[i], status, msg) - stats.nonIndexable++ - continue - } - if client.deadLetterIndex == "" { - // Fatal error and no dead letter index, drop. - client.log.Warnf("Cannot index event (status=%v): dropping event! Enable debug logs to view the event and cause.", status) - client.log.Debugf("Cannot index event %#v (status=%v): %s, dropping event!", data[i], status, msg) - stats.nonIndexable++ - continue - } - // Send this failure to the dead letter index and "retry". - // We count this as a "retryable failure", then if the dead letter - // ingestion succeeds it is counted in the "deadLetter" counter - // rather than the "acked" counter. - client.log.Warnf("Cannot index event (status=%v), trying dead letter index. Enable debug logs to view the event and cause.", status) - client.log.Debugf("Cannot index event %#v (status=%v): %s, trying dead letter index", data[i], status, msg) - client.setDeadLetter(encodedEvent, status, string(msg)) +// applyItemStatus processes the ingestion status of one event from a bulk request. +// Returns true if the item should be retried. +// In the provided bulkResultStats, applyItemStatus increments exactly one of: +// acked, duplicates, deadLetter, fails, nonIndexable. +func (client *Client) applyItemStatus( + event publisher.Event, + itemStatus int, + itemMessage []byte, + stats *bulkResultStats, +) bool { + encodedEvent := event.EncodedEvent.(*encodedEvent) + if itemStatus < 300 { + if encodedEvent.deadLetter { + // This was ingested into the dead letter index, not the original target + stats.deadLetter++ + } else { + stats.acked++ } + return false // ok value + } - client.log.Debugf("Bulk item insert failed (i=%v, status=%v): %s", i, status, msg) - stats.fails++ - failed = append(failed, data[i]) + if itemStatus == 409 { + // 409 is used to indicate an event with same ID already exists if + // `create` op_type is used. + stats.duplicates++ + return false // ok } - return failed, stats -} + if itemStatus == http.StatusTooManyRequests { + stats.fails++ + stats.tooMany++ + return true + } -func (client *Client) setDeadLetter( - encodedEvent *encodedEvent, errType int, errMsg string, -) { - encodedEvent.deadLetter = true - encodedEvent.index = client.deadLetterIndex - deadLetterReencoding := mapstr.M{ - "@timestamp": encodedEvent.timestamp, - "message": string(encodedEvent.encoding), - "error.type": errType, - "error.message": errMsg, + if itemStatus < 500 { + // hard failure, apply policy action + if encodedEvent.deadLetter { + // Fatal error while sending an already-failed event to the dead letter + // index, drop. + log.Errorf("Can't deliver to dead letter index event (status=%v). Enable debug logs to view the event and cause.", itemStatus) + log.Debugf("Can't deliver to dead letter index event %#v (status=%v): %s", event, itemStatus, itemMessage) + stats.nonIndexable++ + return false + } + if client.deadLetterIndex == "" { + // Fatal error and no dead letter index, drop. + log.Warnf("Cannot index event (status=%v): dropping event! Enable debug logs to view the event and cause.", itemStatus) + log.Debugf("Cannot index event %#v (status=%v): %s, dropping event!", event, itemStatus, itemMessage) + stats.nonIndexable++ + return false + } + // Send this failure to the dead letter index and "retry". + // We count this as a "retryable failure", and then if the dead letter + // ingestion succeeds it is counted in the "deadLetter" counter + // rather than the "acked" counter. + log.Warnf("Cannot index event (status=%v), trying dead letter index. Enable debug logs to view the event and cause.", itemStatus) + log.Debugf("Cannot index event %#v (status=%v): %s, trying dead letter index", event, itemStatus, itemMessage) + encodedEvent.setDeadLetter(client.deadLetterIndex, itemStatus, string(itemMessage)) } - encodedEvent.encoding = []byte(deadLetterReencoding.String()) + + // Everything else gets retried. + stats.fails++ + return true } func (client *Client) Connect() error { diff --git a/libbeat/outputs/elasticsearch/event_encoder.go b/libbeat/outputs/elasticsearch/event_encoder.go index 0441695d53c7..7d345e2bc5c5 100644 --- a/libbeat/outputs/elasticsearch/event_encoder.go +++ b/libbeat/outputs/elasticsearch/event_encoder.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/v7/libbeat/outputs/outil" "github.com/elastic/beats/v7/libbeat/publisher" "github.com/elastic/beats/v7/libbeat/publisher/queue" + "github.com/elastic/elastic-agent-libs/mapstr" ) type eventEncoder struct { @@ -136,3 +137,17 @@ func (pe *eventEncoder) encodeRawEvent(e *beat.Event) *encodedEvent { encoding: bytes, } } + +func (e *encodedEvent) setDeadLetter( + deadLetterIndex string, errType int, errMsg string, +) { + e.deadLetter = true + e.index = deadLetterIndex + deadLetterReencoding := mapstr.M{ + "@timestamp": e.timestamp, + "message": string(e.encoding), + "error.type": errType, + "error.message": errMsg, + } + e.encoding = []byte(deadLetterReencoding.String()) +} diff --git a/libbeat/publisher/event.go b/libbeat/publisher/event.go index 77ab6716f99f..5a80dd491a90 100644 --- a/libbeat/publisher/event.go +++ b/libbeat/publisher/event.go @@ -27,6 +27,8 @@ import ( // errors), one of the signal methods must be called. In normal operation // every batch will eventually receive an ACK() or a Drop(). type Batch interface { + // The output that receives a batch owns the entries in its Events array, + // and changes to them will persist between retries. Events() []Event // All events have been acknowledged by the output. From 0ae1785ec7a1cb70dc52926946cae1aa5aa49de0 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Mon, 20 May 2024 19:01:04 -0400 Subject: [PATCH 05/14] updating tests --- libbeat/esleg/eslegclient/bulkapi.go | 10 +- .../monitoring/report/elasticsearch/client.go | 2 +- libbeat/outputs/elasticsearch/client.go | 136 ++++++------ libbeat/outputs/elasticsearch/client_test.go | 198 ++++++++---------- 4 files changed, 154 insertions(+), 192 deletions(-) diff --git a/libbeat/esleg/eslegclient/bulkapi.go b/libbeat/esleg/eslegclient/bulkapi.go index 56a36ccf1452..f60be7a45a94 100644 --- a/libbeat/esleg/eslegclient/bulkapi.go +++ b/libbeat/esleg/eslegclient/bulkapi.go @@ -60,8 +60,8 @@ type bulkRequest struct { requ *http.Request } -// BulkResult contains the result of a bulk API request. -type BulkResult json.RawMessage +// BulkResponse contains the result of a bulk API request. +type BulkResponse json.RawMessage // Bulk performs many index/delete operations in a single API call. // Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html @@ -69,7 +69,7 @@ func (conn *Connection) Bulk( ctx context.Context, index, docType string, params map[string]string, body []interface{}, -) (int, BulkResult, error) { +) (int, BulkResponse, error) { if len(body) == 0 { return 0, nil, nil } @@ -160,9 +160,9 @@ func (r *bulkRequest) reset(body BodyEncoder) { body.AddHeader(&r.requ.Header) } -func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, BulkResult, error) { +func (conn *Connection) sendBulkRequest(requ *bulkRequest) (int, BulkResponse, error) { status, resp, err := conn.execHTTPRequest(requ.requ) - return status, BulkResult(resp), err + return status, BulkResponse(resp), err } func bulkEncode(log *logp.Logger, out BulkWriter, body []interface{}) error { diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index fbc4fcef772a..56f56ac8e1e3 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -224,7 +224,7 @@ func getMonitoringIndexName() string { return fmt.Sprintf(".monitoring-beats-%v-%s", version, date) } -func logBulkFailures(log *logp.Logger, result eslegclient.BulkResult, events []report.Event) { +func logBulkFailures(log *logp.Logger, result eslegclient.BulkResponse, events []report.Event) { var response struct { Items []map[string]map[string]interface{} `json:"items"` } diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 3e9d3ae36e3a..f56bc1eea3ba 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -87,10 +87,32 @@ type bulkResultStats struct { tooMany int // number of events receiving HTTP 429 Too Many Requests } +type bulkResult struct { + // A connection-level error if the request couldn't be sent or the response + // couldn't be read. This error is returned from (*Client).Publish to signal + // to the pipeline that this output worker needs to be reconnected before the + // next Publish call. + connErr error + + // The array of events sent via bulk request. This excludes any events that + // had encoding errors while assembling the request. + events []publisher.Event + + // The http status returned by the bulk request. + status int + + // The API response from Elasticsearch. + response eslegclient.BulkResponse +} + const ( defaultEventType = "doc" ) +var bulkRequestParams = map[string]string{ + "filter_path": "errors,items.*.error,items.*.status", +} + // NewClient instantiates a new client. func NewClient( s clientSettings, @@ -190,56 +212,43 @@ func (client *Client) Clone() *Client { } func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error { - client.observer.NewBatch(len(batch.Events())) - span, ctx := apm.StartSpan(ctx, "publishEvents", "output") defer span.End() + span.Context.SetLabel("events_original", len(batch.Events())) + client.observer.NewBatch(len(batch.Events())) // Create and send the bulk request. - bulkResult := client.doBulkRequest(ctx, span, batch) - client.observer.ReportLatency(bulkResult.duration) - - // If there was a connection-level error there is no per-item response, - // handle it and return. - if bulkResult.err != nil { + bulkResult := client.doBulkRequest(ctx, batch) + span.Context.SetLabel("events_encoded", len(bulkResult.events)) + if bulkResult.connErr != nil { + // If there was a connection-level error there is no per-item response, + // handle it and return. return client.handleBulkResultError(ctx, batch, bulkResult) } + span.Context.SetLabel("events_published", len(bulkResult.events)) // At this point we have an Elasticsearch response for our request, // check and report the per-item results. - client.reportBulkResult(ctx, span, batch, bulkResult) - return nil -} - -type bulkResult struct { - err error - - // The array of events sent via bulk request. This excludes any events that - // had encoding errors while assembling the request. - events []publisher.Event - - // The http status returned by the bulk request. - status int - - // The API response from Elasticsearch. - response eslegclient.BulkResult - - // The time it took to send the bulk request and read the response. - duration time.Duration -} + eventsToRetry, stats := client.bulkCollectPublishFails(bulkResult) + stats.reportToObserver(client.observer) -var bulkRequestParams = map[string]string{ - "filter_path": "errors,items.*.error,items.*.status", + if len(eventsToRetry) > 0 { + span.Context.SetLabel("events_failed", len(eventsToRetry)) + batch.RetryEvents(eventsToRetry) + } else { + batch.ACK() + } + return nil } // Encode a batch's events into a bulk publish request, send the request to // Elasticsearch, and return the resulting metadata. +// Reports the network request latency to the client's metrics observer. // The events list in the result will be shorter than the original batch if // some events couldn't be encoded. In this case, the removed events will // be reported to the Client's metrics observer via PermanentErrors. func (client *Client) doBulkRequest( ctx context.Context, - span *apm.Span, batch publisher.Batch, ) bulkResult { var result bulkResult @@ -248,17 +257,22 @@ func (client *Client) doBulkRequest( // encode events into bulk request buffer, dropping failed elements from // events slice - span.Context.SetLabel("events_original", len(rawEvents)) resultEvents, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), rawEvents) result.events = resultEvents - span.Context.SetLabel("events_encoded", len(resultEvents)) client.observer.PermanentErrors(len(rawEvents) - len(resultEvents)) // If we encoded any events, send the network request. if len(result.events) > 0 { begin := time.Now() - result.status, result.response, result.err = client.conn.Bulk(ctx, "", "", bulkRequestParams, bulkItems) - result.duration = time.Since(begin) + result.status, result.response, result.connErr = + client.conn.Bulk(ctx, "", "", bulkRequestParams, bulkItems) + if result.connErr == nil { + duration := time.Since(begin) + client.observer.ReportLatency(duration) + client.log.Debugf( + "doBulkRequest: %d events have been sent to elasticsearch in %v.", + len(result.events), duration) + } } return result @@ -277,7 +291,7 @@ func (client *Client) handleBulkResultError( // to drop it and log the error state. batch.Drop() client.observer.PermanentErrors(len(bulkResult.events)) - err := apm.CaptureError(ctx, fmt.Errorf("failed to perform bulk index operation: %w", bulkResult.err)) + err := apm.CaptureError(ctx, fmt.Errorf("failed to perform bulk index operation: %w", bulkResult.connErr)) err.Send() client.log.Error(err) } @@ -285,7 +299,7 @@ func (client *Client) handleBulkResultError( // with the connection. return nil } - err := apm.CaptureError(ctx, fmt.Errorf("failed to perform any bulk index operations: %w", bulkResult.err)) + err := apm.CaptureError(ctx, fmt.Errorf("failed to perform any bulk index operations: %w", bulkResult.connErr)) err.Send() client.log.Error(err) @@ -296,43 +310,7 @@ func (client *Client) handleBulkResultError( // All events were sent successfully batch.ACK() } - return bulkResult.err -} - -// reportBulkResult processes the result of a bulk request. It reports the -// event states to the metrics observer and acknowledges or retries the -// batch as appropriate. -// The returned error is for connection-level errors. A non-nil value should -// be returned from (*Client).Publish to trigger a reconnection attempt. -func (client *Client) reportBulkResult( - ctx context.Context, - span *apm.Span, - batch publisher.Batch, - bulkResult bulkResult, -) { - span.Context.SetLabel("events_published", len(bulkResult.events)) - client.log.Debugf("PublishEvents: %d events have been sent to elasticsearch in %v.", - len(bulkResult.events), bulkResult.duration) - - eventsToRetry, stats := client.bulkCollectPublishFails(bulkResult) - stats.reportToObserver(client.observer) - - if len(eventsToRetry) > 0 { - span.Context.SetLabel("events_failed", len(eventsToRetry)) - batch.RetryEvents(eventsToRetry) - } else { - batch.ACK() - } -} - -func (stats bulkResultStats) reportToObserver(ob outputs.Observer) { - ob.AckedEvents(stats.acked) - ob.RetryableErrors(stats.fails) - ob.PermanentErrors(stats.nonIndexable) - ob.DuplicateEvents(stats.duplicates) - ob.DeadLetterEvents(stats.deadLetter) - - ob.ErrTooMany(stats.tooMany) + return bulkResult.connErr } // bulkEncodePublishRequest encodes all bulk requests and returns slice of events @@ -541,3 +519,13 @@ func (client *Client) String() string { func (client *Client) Test(d testing.Driver) { client.conn.Test(d) } + +func (stats bulkResultStats) reportToObserver(ob outputs.Observer) { + ob.AckedEvents(stats.acked) + ob.RetryableErrors(stats.fails) + ob.PermanentErrors(stats.nonIndexable) + ob.DuplicateEvents(stats.duplicates) + ob.DeadLetterEvents(stats.deadLetter) + + ob.ErrTooMany(stats.tooMany) +} diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 34850259ba3c..1b47064bc255 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -262,7 +262,11 @@ func TestCollectPublishFailsNone(t *testing.T) { events[i] = publisher.Event{Content: beat.Event{Fields: event}} } - res, _ := client.bulkCollectPublishFails(response, encodeEvents(client, events)) + res, _ := client.bulkCollectPublishFails(bulkResult{ + events: encodeEvents(client, events), + status: 200, + response: response, + }) assert.Equal(t, 0, len(res)) } @@ -288,7 +292,11 @@ func TestCollectPublishFailMiddle(t *testing.T) { eventFail := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 3}}}) events := []publisher.Event{event1, eventFail, event2} - res, stats := client.bulkCollectPublishFails(response, events) + res, stats := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) assert.Equal(t, 1, len(res)) if len(res) == 1 { assert.Equal(t, eventFail, res[0]) @@ -305,57 +313,51 @@ func TestColllectPublishFailFatalErrorNotRetried(t *testing.T) { } func TestCollectPublishFailDeadLetterIndex(t *testing.T) { + const deadLetterIndex = "test_index" client, err := NewClient( clientSettings{ observer: outputs.NewNilObserver(), - deadLetterIndex: "test_index", + deadLetterIndex: deadLetterIndex, }, nil, ) assert.NoError(t, err) - parseError := `{ - "root_cause" : [ - { - "type" : "mapper_parsing_exception", - "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'" - } - ], - "type" : "mapper_parsing_exception", - "reason" : "failed to parse field [bar] of type [long] in document with id '1'. Preview of field's value: 'bar1'", - "caused_by" : { - "type" : "illegal_argument_exception", - "reason" : "For input string: \"bar1\"" - } - }` + const errorMessage = "test error message" response := []byte(` - { "items": [ - {"create": {"status": 200}}, - {"create": { - "error" : ` + parseError + `, - "status" : 400 - } - }, - {"create": {"status": 200}} - ]} - `) +{ + "items": [ + {"create": {"status": 200}}, + { + "create": { + "error" : "` + errorMessage + `", + "status" : 400 + } + }, + {"create": {"status": 200}} + ] +}`) - event1 := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}} - event2 := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 2}}} - eventFail := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": "bar1"}}} - events := encodeEvents(client, []publisher.Event{event1, eventFail, event2}) + event1 := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}}) + event2 := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 2}}}) + eventFail := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": "bar1"}}}) + events := []publisher.Event{event1, eventFail, event2} - res, stats := client.bulkCollectPublishFails(response, events) + res, stats := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) + assert.Equal(t, bulkResultStats{acked: 2, fails: 1, nonIndexable: 0}, stats) assert.Equal(t, 1, len(res)) if len(res) == 1 { - expected := encodeEvent(client, eventFail) - encodedEvent := expected.EncodedEvent.(*encodedEvent) - // Mark the encoded event with the expected error - client.setDeadLetter(encodedEvent, 400, parseError) - - assert.Equal(t, expected, res[0]) + assert.Equalf(t, eventFail, res[0], "bulkCollectPublishFails should return failed event") + encodedEvent, ok := res[0].EncodedEvent.(*encodedEvent) + require.True(t, ok, "event must be encoded as *encodedEvent") + assert.True(t, encodedEvent.deadLetter, "faild event's dead letter flag should be set") + assert.Equalf(t, deadLetterIndex, encodedEvent.index, "failed event's index should match dead letter index") + assert.Contains(t, string(encodedEvent.encoding), errorMessage, "dead letter event should include associated error message") } - assert.Equal(t, bulkResultStats{acked: 2, fails: 1, nonIndexable: 0}, stats) } func TestCollectPublishFailDrop(t *testing.T) { @@ -397,7 +399,11 @@ func TestCollectPublishFailDrop(t *testing.T) { eventFail := publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": "bar1"}}} events := encodeEvents(client, []publisher.Event{event, eventFail, event}) - res, stats := client.bulkCollectPublishFails(response, events) + res, stats := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) assert.Equal(t, 0, len(res)) assert.Equal(t, bulkResultStats{acked: 2, fails: 0, nonIndexable: 1}, stats) } @@ -422,7 +428,11 @@ func TestCollectPublishFailAll(t *testing.T) { event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}} events := encodeEvents(client, []publisher.Event{event, event, event}) - res, stats := client.bulkCollectPublishFails(response, events) + res, stats := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) assert.Equal(t, 3, len(res)) assert.Equal(t, events, res) assert.Equal(t, stats, bulkResultStats{fails: 3, tooMany: 3}) @@ -471,7 +481,11 @@ func TestCollectPipelinePublishFail(t *testing.T) { event := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 2}}} events := encodeEvents(client, []publisher.Event{event}) - res, _ := client.bulkCollectPublishFails(response, events) + res, _ := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) assert.Equal(t, 1, len(res)) assert.Equal(t, events, res) } @@ -498,7 +512,11 @@ func BenchmarkCollectPublishFailsNone(b *testing.B) { events := encodeEvents(client, []publisher.Event{event, event, event}) for i := 0; i < b.N; i++ { - res, _ := client.bulkCollectPublishFails(response, events) + res, _ := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) if len(res) != 0 { b.Fail() } @@ -527,7 +545,11 @@ func BenchmarkCollectPublishFailMiddle(b *testing.B) { events := encodeEvents(client, []publisher.Event{event, eventFail, event}) for i := 0; i < b.N; i++ { - res, _ := client.bulkCollectPublishFails(response, events) + res, _ := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) if len(res) != 1 { b.Fail() } @@ -555,7 +577,11 @@ func BenchmarkCollectPublishFailAll(b *testing.B) { events := encodeEvents(client, []publisher.Event{event, event, event}) for i := 0; i < b.N; i++ { - res, _ := client.bulkCollectPublishFails(response, events) + res, _ := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) if len(res) != 3 { b.Fail() } @@ -800,7 +826,7 @@ func TestClientWithAPIKey(t *testing.T) { assert.Equal(t, "ApiKey aHlva0hHNEJmV2s1dmlLWjE3Mlg6bzQ1SlVreXVTLS15aVNBdXV4bDhVdw==", headers.Get("Authorization")) } -func TestPublishEventsWithBulkFiltering(t *testing.T) { +func TestBulkRequestHasFilterPath(t *testing.T) { makePublishTestClient := func(t *testing.T, url string, configParams map[string]string) *Client { client, err := NewClient( clientSettings{ @@ -822,16 +848,14 @@ func TestPublishEventsWithBulkFiltering(t *testing.T) { event1 := publisher.Event{Content: beat.Event{Fields: mapstr.M{"field": 1}}} + const filterPathKey = "filter_path" + const filterPathValue = "errors,items.*.error,items.*.status" t.Run("Single event with response filtering", func(t *testing.T) { - var expectedFilteringParams = map[string]string{ - "filter_path": "errors,items.*.error,items.*.status", - } - var recParams url.Values - + var reqParams url.Values esMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) if strings.ContainsAny("_bulk", r.URL.Path) { - recParams = r.URL.Query() + reqParams = r.URL.Query() response := []byte(`{"took":85,"errors":false,"items":[{"index":{"status":200}}]}`) _, _ = w.Write(response) } @@ -843,26 +867,23 @@ func TestPublishEventsWithBulkFiltering(t *testing.T) { defer esMock.Close() client := makePublishTestClient(t, esMock.URL, nil) - // Try publishing a batch that can be split - events := encodeEvents(client, []publisher.Event{event1}) - evt, err := client.publishEvents(ctx, events) - require.NoError(t, err) - require.Equal(t, len(recParams), len(expectedFilteringParams)) - require.Nil(t, evt) + batch := encodeBatch(client, &batchMock{events: []publisher.Event{event1}}) + result := client.doBulkRequest(ctx, batch) + require.NoError(t, result.connErr) + // Only param should be the standard filter path + require.Equal(t, len(reqParams), 1, "Only bulk request param should be standard filter path") + require.Equal(t, reqParams[filterPathKey], filterPathValue, "Bulk request should include standard filter path") }) t.Run("Single event with response filtering and preconfigured client params", func(t *testing.T) { var configParams = map[string]string{ "hardcoded": "yes", } - var expectedFilteringParams = map[string]string{ - "filter_path": "errors,items.*.error,items.*.status", - } - var recParams url.Values + var reqParams url.Values esMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) if strings.ContainsAny("_bulk", r.URL.Path) { - recParams = r.URL.Query() + reqParams = r.URL.Query() response := []byte(`{"took":85,"errors":false,"items":[{"index":{"status":200}}]}`) _, _ = w.Write(response) } @@ -874,53 +895,11 @@ func TestPublishEventsWithBulkFiltering(t *testing.T) { defer esMock.Close() client := makePublishTestClient(t, esMock.URL, configParams) - // Try publishing a batch that can be split events := encodeEvents(client, []publisher.Event{event1}) evt, err := client.publishEvents(ctx, events) require.NoError(t, err) - require.Equal(t, len(recParams), len(expectedFilteringParams)+len(configParams)) - require.Nil(t, evt) - }) - t.Run("Single event without response filtering", func(t *testing.T) { - var recParams url.Values - - esMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if strings.ContainsAny("_bulk", r.URL.Path) { - recParams = r.URL.Query() - response := []byte(`{ - "took":85, - "errors":false, - "items":[ - { - "index":{ - "_index":"test", - "_id":"1", - "_version":1, - "result":"created", - "_shards":{"total":2,"successful":1,"failed":0}, - "_seq_no":0, - "_primary_term":1, - "status":201 - } - } - ]}`) - _, _ = w.Write(response) - } - if strings.Contains("/", r.URL.Path) { - response := []byte(`{}`) - _, _ = w.Write(response) - } - w.WriteHeader(http.StatusOK) - - })) - defer esMock.Close() - client := makePublishTestClient(t, esMock.URL, nil) + require.Equal(t, len(reqParams), 2, "Bulk request should include configured parameter and standard filter path") - // Try publishing a batch that can be split - events := encodeEvents(client, []publisher.Event{event1}) - _, err := client.publishEvents(ctx, events) - require.NoError(t, err) - require.Equal(t, len(recParams), 1) }) } @@ -930,17 +909,12 @@ func TestPublishPayloadTooLargeReportsMetrics(t *testing.T) { func TestSetDeadLetter(t *testing.T) { dead_letter_index := "dead_index" - client := &Client{ - deadLetterIndex: dead_letter_index, - indexSelector: testIndexSelector{}, - } - e := &encodedEvent{ index: "original_index", } errType := 123 errStr := "test error string" - client.setDeadLetter(e, errType, errStr) + e.setDeadLetter(dead_letter_index, errType, errStr) assert.True(t, e.deadLetter, "setDeadLetter should set the event's deadLetter flag") assert.Equal(t, dead_letter_index, e.index, "setDeadLetter should overwrite the event's original index") From e53a90ee900ccee9bb366ade67488cd889bedca7 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 23 May 2024 10:50:27 -0400 Subject: [PATCH 06/14] working on tests --- libbeat/outputs/elasticsearch/client_test.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 1b47064bc255..5f01835f741f 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -305,11 +305,15 @@ func TestCollectPublishFailMiddle(t *testing.T) { } func TestCollectPublishFailDeadLetterSuccess(t *testing.T) { - + t.FailNow() } -func TestColllectPublishFailFatalErrorNotRetried(t *testing.T) { +func TestCollectPublishFailFatalErrorNotRetried(t *testing.T) { + t.FailNow() +} +func TestInvalidBulkIndexResponse(t *testing.T) { + t.FailNow() } func TestCollectPublishFailDeadLetterIndex(t *testing.T) { @@ -895,16 +899,16 @@ func TestBulkRequestHasFilterPath(t *testing.T) { defer esMock.Close() client := makePublishTestClient(t, esMock.URL, configParams) - events := encodeEvents(client, []publisher.Event{event1}) - evt, err := client.publishEvents(ctx, events) - require.NoError(t, err) + batch := encodeBatch(client, &batchMock{events: []publisher.Event{event1}}) + result := client.doBulkRequest(ctx, batch) + require.NoError(t, result.connErr) require.Equal(t, len(reqParams), 2, "Bulk request should include configured parameter and standard filter path") - + require.Equal(t, reqParams[filterPathKey], filterPathValue, "Bulk request should include standard filter path") }) } -func TestPublishPayloadTooLargeReportsMetrics(t *testing.T) { - +func TestPublishPayloadTooLargeReportsFailed(t *testing.T) { + t.FailNow() } func TestSetDeadLetter(t *testing.T) { From 3223bcab429be8ed0383419b3b2d6f2db2ee8dbf Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 24 May 2024 13:15:37 -0400 Subject: [PATCH 07/14] troubleshooting tests --- libbeat/outputs/elasticsearch/client.go | 11 +++++++++++ libbeat/outputs/elasticsearch/client_test.go | 18 ++++++++++++------ libbeat/publisher/pipeline/ttl_batch.go | 2 ++ 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index f56bc1eea3ba..8f15f50306eb 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -212,6 +212,7 @@ func (client *Client) Clone() *Client { } func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error { + fmt.Printf("hi fae, Publish with %v events\n", len(batch.Events())) span, ctx := apm.StartSpan(ctx, "publishEvents", "output") defer span.End() span.Context.SetLabel("events_original", len(batch.Events())) @@ -223,6 +224,8 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error if bulkResult.connErr != nil { // If there was a connection-level error there is no per-item response, // handle it and return. + fmt.Printf("hi fae, calling handleBulkResultError\n") + defer fmt.Printf("hi fae, returning from Publish\n") return client.handleBulkResultError(ctx, batch, bulkResult) } span.Context.SetLabel("events_published", len(bulkResult.events)) @@ -255,6 +258,7 @@ func (client *Client) doBulkRequest( rawEvents := batch.Events() + fmt.Printf("hi fae, doBulkRequest\n") // encode events into bulk request buffer, dropping failed elements from // events slice resultEvents, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), rawEvents) @@ -266,6 +270,7 @@ func (client *Client) doBulkRequest( begin := time.Now() result.status, result.response, result.connErr = client.conn.Bulk(ctx, "", "", bulkRequestParams, bulkItems) + fmt.Printf("hi fae, bulk status %v response %v\n", result.status, string(result.response)) if result.connErr == nil { duration := time.Since(begin) client.observer.ReportLatency(duration) @@ -273,6 +278,8 @@ func (client *Client) doBulkRequest( "doBulkRequest: %d events have been sent to elasticsearch in %v.", len(result.events), duration) } + } else { + fmt.Printf("hi fae, doBulkRequest had no events left after encoding\n") } return result @@ -282,11 +289,14 @@ func (client *Client) handleBulkResultError( ctx context.Context, batch publisher.Batch, bulkResult bulkResult, ) error { if bulkResult.status == http.StatusRequestEntityTooLarge { + fmt.Printf("hi fae, got statusrequestentitytoolarge\n") if batch.SplitRetry() { + fmt.Printf("hi fae, split a batch\n") // Report that we split a batch client.observer.BatchSplit() client.observer.RetryableErrors(len(bulkResult.events)) } else { + fmt.Printf("hi fae, batch split failed\n") // If the batch could not be split, there is no option left but // to drop it and log the error state. batch.Drop() @@ -403,6 +413,7 @@ func getPipeline(event *beat.Event, defaultSelector *outil.Selector) (string, er // Each of the events will be reported in the returned stats as exactly one of // acked, duplicates, fails, nonIndexable, or deadLetter. func (client *Client) bulkCollectPublishFails(bulkResult bulkResult) ([]publisher.Event, bulkResultStats) { + fmt.Printf("hi fae, bulkCollectPublishFails\n") events := bulkResult.events if len(bulkResult.events) == 0 { diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 5f01835f741f..5a62980d3756 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -217,21 +217,27 @@ func TestPublish(t *testing.T) { // test results directly without atomics/mutexes. done := false retryCount := 0 + var retryBatches []publisher.Batch batch := encodeBatch(client, pipeline.NewBatchForTesting( []publisher.Event{event1, event2, event3}, func(b publisher.Batch) { // The retry function sends the batch back through Publish. // In a live pipeline it would instead be sent to eventConsumer - // first and then back to Publish when an output worker was - // available. + // and then back to Publish when an output worker was available. retryCount++ - err := client.Publish(ctx, b) - assert.NoError(t, err, "Publish should return without error") + retryBatches = append(retryBatches, b) + //err := client.Publish(ctx, b) + //assert.NoError(t, err, "Publish should return without error") }, func() { done = true }, )) - err := client.Publish(ctx, batch) - assert.NoError(t, err, "Publish should return without error") + retryBatches = []publisher.Batch{batch} + for len(retryBatches) > 0 { + batch := retryBatches[0] + retryBatches = retryBatches[1:] + err := client.Publish(ctx, batch) + assert.NoError(t, err, "Publish should return without error") + } // There should be two retries: {[event1], [event2, event3]}. // The first split batch should fail and be dropped since it contains diff --git a/libbeat/publisher/pipeline/ttl_batch.go b/libbeat/publisher/pipeline/ttl_batch.go index dab77fa56597..8810bf31a815 100644 --- a/libbeat/publisher/pipeline/ttl_batch.go +++ b/libbeat/publisher/pipeline/ttl_batch.go @@ -18,6 +18,7 @@ package pipeline import ( + "fmt" "sync/atomic" "github.com/elastic/beats/v7/libbeat/publisher" @@ -161,6 +162,7 @@ func (b *ttlBatch) Cancelled() { } func (b *ttlBatch) RetryEvents(events []publisher.Event) { + fmt.Printf("hi fae, ttlBatch.RetryEvents\n") b.events = events b.Retry() } From 3ccb4f16c436412e6c0fe03a64ace394d933d9ed Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 24 May 2024 20:02:30 -0400 Subject: [PATCH 08/14] update / add tests --- libbeat/outputs/elasticsearch/client.go | 16 +- libbeat/outputs/elasticsearch/client_test.go | 173 +++++++++++++++---- libbeat/publisher/pipeline/ttl_batch.go | 2 - 3 files changed, 144 insertions(+), 47 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 8f15f50306eb..e3a7aee4839b 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -212,7 +212,6 @@ func (client *Client) Clone() *Client { } func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error { - fmt.Printf("hi fae, Publish with %v events\n", len(batch.Events())) span, ctx := apm.StartSpan(ctx, "publishEvents", "output") defer span.End() span.Context.SetLabel("events_original", len(batch.Events())) @@ -224,8 +223,6 @@ func (client *Client) Publish(ctx context.Context, batch publisher.Batch) error if bulkResult.connErr != nil { // If there was a connection-level error there is no per-item response, // handle it and return. - fmt.Printf("hi fae, calling handleBulkResultError\n") - defer fmt.Printf("hi fae, returning from Publish\n") return client.handleBulkResultError(ctx, batch, bulkResult) } span.Context.SetLabel("events_published", len(bulkResult.events)) @@ -258,7 +255,6 @@ func (client *Client) doBulkRequest( rawEvents := batch.Events() - fmt.Printf("hi fae, doBulkRequest\n") // encode events into bulk request buffer, dropping failed elements from // events slice resultEvents, bulkItems := client.bulkEncodePublishRequest(client.conn.GetVersion(), rawEvents) @@ -270,7 +266,6 @@ func (client *Client) doBulkRequest( begin := time.Now() result.status, result.response, result.connErr = client.conn.Bulk(ctx, "", "", bulkRequestParams, bulkItems) - fmt.Printf("hi fae, bulk status %v response %v\n", result.status, string(result.response)) if result.connErr == nil { duration := time.Since(begin) client.observer.ReportLatency(duration) @@ -278,8 +273,6 @@ func (client *Client) doBulkRequest( "doBulkRequest: %d events have been sent to elasticsearch in %v.", len(result.events), duration) } - } else { - fmt.Printf("hi fae, doBulkRequest had no events left after encoding\n") } return result @@ -289,21 +282,16 @@ func (client *Client) handleBulkResultError( ctx context.Context, batch publisher.Batch, bulkResult bulkResult, ) error { if bulkResult.status == http.StatusRequestEntityTooLarge { - fmt.Printf("hi fae, got statusrequestentitytoolarge\n") if batch.SplitRetry() { - fmt.Printf("hi fae, split a batch\n") // Report that we split a batch client.observer.BatchSplit() client.observer.RetryableErrors(len(bulkResult.events)) } else { - fmt.Printf("hi fae, batch split failed\n") // If the batch could not be split, there is no option left but // to drop it and log the error state. batch.Drop() client.observer.PermanentErrors(len(bulkResult.events)) - err := apm.CaptureError(ctx, fmt.Errorf("failed to perform bulk index operation: %w", bulkResult.connErr)) - err.Send() - client.log.Error(err) + client.log.Error(errPayloadTooLarge) } // Don't propagate a too-large error since it doesn't indicate a problem // with the connection. @@ -320,6 +308,7 @@ func (client *Client) handleBulkResultError( // All events were sent successfully batch.ACK() } + client.observer.RetryableErrors(len(bulkResult.events)) return bulkResult.connErr } @@ -413,7 +402,6 @@ func getPipeline(event *beat.Event, defaultSelector *outil.Selector) (string, er // Each of the events will be reported in the returned stats as exactly one of // acked, duplicates, fails, nonIndexable, or deadLetter. func (client *Client) bulkCollectPublishFails(bulkResult bulkResult) ([]publisher.Event, bulkResultStats) { - fmt.Printf("hi fae, bulkCollectPublishFails\n") events := bulkResult.events if len(bulkResult.events) == 0 { diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 5a62980d3756..5124c0defe9d 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -49,6 +49,7 @@ import ( c "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/monitoring" libversion "github.com/elastic/elastic-agent-libs/version" ) @@ -83,17 +84,19 @@ func (bm *batchMock) RetryEvents(events []publisher.Event) { } func TestPublish(t *testing.T) { - makePublishTestClient := func(t *testing.T, url string) *Client { + + makePublishTestClient := func(t *testing.T, url string) (*Client, *monitoring.Registry) { + reg := monitoring.NewRegistry() client, err := NewClient( clientSettings{ - observer: outputs.NewNilObserver(), + observer: outputs.NewStats(reg), connection: eslegclient.ConnectionSettings{URL: url}, indexSelector: testIndexSelector{}, }, nil, ) require.NoError(t, err) - return client + return client, reg } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -109,7 +112,7 @@ func TestPublish(t *testing.T) { _, _ = w.Write([]byte("Request failed to get to the server (status code: 413)")) // actual response from ES })) defer esMock.Close() - client := makePublishTestClient(t, esMock.URL) + client, reg := makePublishTestClient(t, esMock.URL) // Try publishing a batch that can be split batch := encodeBatch(client, &batchMock{ @@ -120,6 +123,8 @@ func TestPublish(t *testing.T) { assert.NoError(t, err, "Publish should split the batch without error") assert.True(t, batch.didSplit, "batch should be split") + assertRegistryUint(t, reg, "events.failed", 1, "Splitting a batch should report the event as failed/retried") + assertRegistryUint(t, reg, "events.dropped", 0, "Splitting a batch should not report any dropped events") // Try publishing a batch that cannot be split batch = encodeBatch(client, &batchMock{ @@ -131,6 +136,9 @@ func TestPublish(t *testing.T) { assert.NoError(t, err, "Publish should drop the batch without error") assert.False(t, batch.didSplit, "batch should not be split") assert.True(t, batch.drop, "unsplittable batch should be dropped") + assertRegistryUint(t, reg, "events.failed", 1, "Failed batch split should not report any more retryable failures") + assertRegistryUint(t, reg, "events.dropped", 1, "Failed batch split should report a dropped event") + }) t.Run("retries the batch if bad HTTP status", func(t *testing.T) { @@ -138,7 +146,7 @@ func TestPublish(t *testing.T) { w.WriteHeader(http.StatusInternalServerError) })) defer esMock.Close() - client := makePublishTestClient(t, esMock.URL) + client, reg := makePublishTestClient(t, esMock.URL) batch := encodeBatch(client, &batchMock{ events: []publisher.Event{event1, event2}, @@ -149,40 +157,44 @@ func TestPublish(t *testing.T) { assert.Error(t, err) assert.False(t, batch.ack, "should not be acknowledged") assert.Len(t, batch.retryEvents, 2, "all events should be retried") + assertRegistryUint(t, reg, "events.failed", 2, "HTTP failure should report failed events") }) t.Run("live batches, still too big after split", func(t *testing.T) { - // Test a live (non-mocked) batch where both events by themselves are - // rejected by the server as too large after the initial split. + // Test a live (non-mocked) batch where all three events by themselves are + // rejected by the server as too large after the initial batch splits. esMock := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusRequestEntityTooLarge) _, _ = w.Write([]byte("Request failed to get to the server (status code: 413)")) // actual response from ES })) defer esMock.Close() - client := makePublishTestClient(t, esMock.URL) + client, reg := makePublishTestClient(t, esMock.URL) // Because our tests don't use a live eventConsumer routine, // everything will happen synchronously and it's safe to track // test results directly without atomics/mutexes. done := false retryCount := 0 + var retryBatches []publisher.Batch batch := encodeBatch(client, pipeline.NewBatchForTesting( []publisher.Event{event1, event2, event3}, func(b publisher.Batch) { // The retry function sends the batch back through Publish. // In a live pipeline it would instead be sent to eventConsumer - // first and then back to Publish when an output worker was - // available. + // and then back to Publish when an output worker was available. retryCount++ - // We shouldn't need to re-encode the events since that was done - // before the initial Publish call - err := client.Publish(ctx, b) - assert.NoError(t, err, "Publish should return without error") + retryBatches = append(retryBatches, b) }, func() { done = true }, )) - err := client.Publish(ctx, batch) - assert.NoError(t, err, "Publish should return without error") + retryBatches = []publisher.Batch{batch} + // Loop until all pending retries are complete, the same as a pipeline caller would. + for len(retryBatches) > 0 { + batch := retryBatches[0] + retryBatches = retryBatches[1:] + err := client.Publish(ctx, batch) + assert.NoError(t, err, "Publish should return without error") + } // For three events there should be four retries in total: // {[event1], [event2, event3]}, then {[event2], [event3]}. @@ -190,6 +202,15 @@ func TestPublish(t *testing.T) { // events, all 3 will fail and be dropped. assert.Equal(t, 4, retryCount, "3-event batch should produce 4 total retries") assert.True(t, done, "batch should be marked as done") + // Metrics should report: + // 8 total events (3 + 1 + 2 + 1 + 1 from the batches described above) + // 3 dropped events (each event is dropped once) + // 5 failed events (8 - 3, for each event's attempted publish calls before being dropped) + // 0 active events (because Publish is complete) + assertRegistryUint(t, reg, "events.total", 8, "Publish is called on 8 events total") + assertRegistryUint(t, reg, "events.dropped", 3, "All 3 events should be dropped") + assertRegistryUint(t, reg, "events.failed", 5, "Split batches should retry 5 events before dropping them") + assertRegistryUint(t, reg, "events.active", 0, "Active events should be zero when Publish returns") }) t.Run("live batches, one event too big after split", func(t *testing.T) { @@ -206,11 +227,11 @@ func TestPublish(t *testing.T) { } else { // Report success with no events dropped w.WriteHeader(200) - _, _ = io.WriteString(w, "{\"items\": []}") + _, _ = io.WriteString(w, "{\"items\": [{\"index\":{\"status\":200}},{\"index\":{\"status\":200}},{\"index\":{\"status\":200}}]}") } })) defer esMock.Close() - client := makePublishTestClient(t, esMock.URL) + client, reg := makePublishTestClient(t, esMock.URL) // Because our tests don't use a live eventConsumer routine, // everything will happen synchronously and it's safe to track @@ -226,8 +247,6 @@ func TestPublish(t *testing.T) { // and then back to Publish when an output worker was available. retryCount++ retryBatches = append(retryBatches, b) - //err := client.Publish(ctx, b) - //assert.NoError(t, err, "Publish should return without error") }, func() { done = true }, )) @@ -246,9 +265,26 @@ func TestPublish(t *testing.T) { // (one with failure, one with success). assert.Equal(t, 2, retryCount, "splitting with one large event should produce two retries") assert.True(t, done, "batch should be marked as done") + // The metrics should show: + // 6 total events (3 + 1 + 2) + // 1 dropped event (because only one event is uningestable) + // 2 acked events (because the other two ultimately succeed) + // 3 failed events (because all events fail and are retried on the first call) + // 0 active events (because Publish is finished) + assertRegistryUint(t, reg, "events.total", 6, "Publish is called on 6 events total") + assertRegistryUint(t, reg, "events.dropped", 1, "One event should be dropped") + assertRegistryUint(t, reg, "events.failed", 3, "Split batches should retry 3 events before dropping them") + assertRegistryUint(t, reg, "events.active", 0, "Active events should be zero when Publish returns") }) } +func assertRegistryUint(t *testing.T, reg *monitoring.Registry, key string, expected uint64, message string) { + t.Helper() + value := reg.Get(key).(*monitoring.Uint) + assert.NotNilf(t, value, "expected registry entry for key '%v'", key) + assert.Equal(t, expected, value.Get(), message) +} + func TestCollectPublishFailsNone(t *testing.T) { client, err := NewClient( clientSettings{ @@ -311,15 +347,94 @@ func TestCollectPublishFailMiddle(t *testing.T) { } func TestCollectPublishFailDeadLetterSuccess(t *testing.T) { - t.FailNow() + const deadLetterIndex = "test_index" + client, err := NewClient( + clientSettings{ + observer: outputs.NewNilObserver(), + deadLetterIndex: deadLetterIndex, + }, + nil, + ) + assert.NoError(t, err) + + const errorMessage = "test error message" + // Return a successful response + response := []byte(`{"items": [{"create": {"status": 200}}]}`) + + event1 := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}}) + event1.EncodedEvent.(*encodedEvent).setDeadLetter(deadLetterIndex, 123, errorMessage) + events := []publisher.Event{event1} + + // The event should be successful after being set to dead letter, so it + // should be reported in the metrics as deadLetter + res, stats := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) + assert.Equal(t, bulkResultStats{acked: 0, deadLetter: 1}, stats) + assert.Equal(t, 0, len(res)) } func TestCollectPublishFailFatalErrorNotRetried(t *testing.T) { - t.FailNow() + // Test that a fatal error sending to the dead letter index is reported as + // a dropped event, and is not retried forever + const deadLetterIndex = "test_index" + client, err := NewClient( + clientSettings{ + observer: outputs.NewNilObserver(), + deadLetterIndex: deadLetterIndex, + }, + nil, + ) + assert.NoError(t, err) + + const errorMessage = "test error message" + // Return a fatal error + response := []byte(`{"items": [{"create": {"status": 499}}]}`) + + event1 := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}}) + event1.EncodedEvent.(*encodedEvent).setDeadLetter(deadLetterIndex, 123, errorMessage) + events := []publisher.Event{event1} + + // The event should fail permanently while being sent to the dead letter + // index, so it should be dropped instead of retrying. + res, stats := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) + assert.Equal(t, bulkResultStats{acked: 0, nonIndexable: 1}, stats) + assert.Equal(t, 0, len(res)) } -func TestInvalidBulkIndexResponse(t *testing.T) { - t.FailNow() +func TestCollectPublishFailInvalidBulkIndexResponse(t *testing.T) { + client, err := NewClient( + clientSettings{observer: outputs.NewNilObserver()}, + nil, + ) + assert.NoError(t, err) + + // Return a truncated response without valid item data + response := []byte(`{"items": [...`) + + event1 := encodeEvent(client, publisher.Event{Content: beat.Event{Fields: mapstr.M{"bar": 1}}}) + events := []publisher.Event{event1} + + // The event should be successful after being set to dead letter, so it + // should be reported in the metrics as deadLetter + res, stats := client.bulkCollectPublishFails(bulkResult{ + events: events, + status: 200, + response: response, + }) + // The event should be returned for retry, and should appear in aggregated + // stats as failed (retryable error) + assert.Equal(t, bulkResultStats{acked: 0, fails: 1}, stats) + assert.Equal(t, 1, len(res)) + if len(res) > 0 { + assert.Equal(t, event1, res[0]) + } } func TestCollectPublishFailDeadLetterIndex(t *testing.T) { @@ -364,7 +479,7 @@ func TestCollectPublishFailDeadLetterIndex(t *testing.T) { assert.Equalf(t, eventFail, res[0], "bulkCollectPublishFails should return failed event") encodedEvent, ok := res[0].EncodedEvent.(*encodedEvent) require.True(t, ok, "event must be encoded as *encodedEvent") - assert.True(t, encodedEvent.deadLetter, "faild event's dead letter flag should be set") + assert.True(t, encodedEvent.deadLetter, "failed event's dead letter flag should be set") assert.Equalf(t, deadLetterIndex, encodedEvent.index, "failed event's index should match dead letter index") assert.Contains(t, string(encodedEvent.encoding), errorMessage, "dead letter event should include associated error message") } @@ -882,7 +997,7 @@ func TestBulkRequestHasFilterPath(t *testing.T) { require.NoError(t, result.connErr) // Only param should be the standard filter path require.Equal(t, len(reqParams), 1, "Only bulk request param should be standard filter path") - require.Equal(t, reqParams[filterPathKey], filterPathValue, "Bulk request should include standard filter path") + require.Equal(t, filterPathValue, reqParams.Get(filterPathKey), "Bulk request should include standard filter path") }) t.Run("Single event with response filtering and preconfigured client params", func(t *testing.T) { var configParams = map[string]string{ @@ -909,14 +1024,10 @@ func TestBulkRequestHasFilterPath(t *testing.T) { result := client.doBulkRequest(ctx, batch) require.NoError(t, result.connErr) require.Equal(t, len(reqParams), 2, "Bulk request should include configured parameter and standard filter path") - require.Equal(t, reqParams[filterPathKey], filterPathValue, "Bulk request should include standard filter path") + require.Equal(t, filterPathValue, reqParams.Get(filterPathKey), "Bulk request should include standard filter path") }) } -func TestPublishPayloadTooLargeReportsFailed(t *testing.T) { - t.FailNow() -} - func TestSetDeadLetter(t *testing.T) { dead_letter_index := "dead_index" e := &encodedEvent{ diff --git a/libbeat/publisher/pipeline/ttl_batch.go b/libbeat/publisher/pipeline/ttl_batch.go index 8810bf31a815..dab77fa56597 100644 --- a/libbeat/publisher/pipeline/ttl_batch.go +++ b/libbeat/publisher/pipeline/ttl_batch.go @@ -18,7 +18,6 @@ package pipeline import ( - "fmt" "sync/atomic" "github.com/elastic/beats/v7/libbeat/publisher" @@ -162,7 +161,6 @@ func (b *ttlBatch) Cancelled() { } func (b *ttlBatch) RetryEvents(events []publisher.Event) { - fmt.Printf("hi fae, ttlBatch.RetryEvents\n") b.events = events b.Retry() } From c7b084be79c6c36d1a18a8b43022a1354648ea7e Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 29 May 2024 16:06:15 -0400 Subject: [PATCH 09/14] review comments --- libbeat/outputs/elasticsearch/client.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index b764503ed1f1..e05c4e0b261f 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -109,6 +109,8 @@ const ( defaultEventType = "doc" ) +// Flags passed with the Bulk API request: we filter the response to include +// only the fields we need for checking request/item state. var bulkRequestParams = map[string]string{ "filter_path": "errors,items.*.error,items.*.status", } @@ -456,14 +458,14 @@ func (client *Client) applyItemStatus( } else { stats.acked++ } - return false // ok value + return false // no retry needed } if itemStatus == 409 { - // 409 is used to indicate an event with same ID already exists if - // `create` op_type is used. + // 409 is used to indicate there is already an event with the same ID, or + // with identical Time Series Data Stream dimensions when TSDS is active. stats.duplicates++ - return false // ok + return false // no retry needed } if itemStatus == http.StatusTooManyRequests { From 6a11ae000448c57f94a23dad25905a425cda46ce Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 29 May 2024 16:25:43 -0400 Subject: [PATCH 10/14] lint --- libbeat/esleg/eslegclient/bulkapi.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/libbeat/esleg/eslegclient/bulkapi.go b/libbeat/esleg/eslegclient/bulkapi.go index f60be7a45a94..8512426ed9ed 100644 --- a/libbeat/esleg/eslegclient/bulkapi.go +++ b/libbeat/esleg/eslegclient/bulkapi.go @@ -23,7 +23,6 @@ import ( "encoding/json" "errors" "io" - "io/ioutil" "net/http" "strings" @@ -142,7 +141,7 @@ func (r *bulkRequest) reset(body BodyEncoder) { rc, ok := bdy.(io.ReadCloser) if !ok && body != nil { - rc = ioutil.NopCloser(bdy) + rc = io.NopCloser(bdy) } switch v := bdy.(type) { From e95092f4ef0465ce71471ceee4657916d67fc279 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 5 Jun 2024 17:44:59 -0400 Subject: [PATCH 11/14] event errors shouldn't break the connection --- libbeat/outputs/elasticsearch/client_integration_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 56567931ee4e..765fd3eec5aa 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -238,10 +238,7 @@ func TestClientBulkPublishEventsWithDeadletterIndex(t *testing.T) { "testfield": "foo0", }, })) - err = output.Publish(context.Background(), batch) - if err == nil { - t.Fatal("Expecting mapping conflict") - } + _ = output.Publish(context.Background(), batch) _, _, err = client.conn.Refresh(deadletterIndex) if err == nil { t.Fatal("expecting index to not exist yet") From d7848fe039797977827c08957f095ddc2a6dc318 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 5 Jun 2024 21:01:45 -0400 Subject: [PATCH 12/14] fix tests with undocumented dependencies on the text content of debug logs :( --- libbeat/tests/integration/ca_pinning_test.go | 2 +- libbeat/tests/integration/template_test.go | 4 ++-- libbeat/tests/system/test_ilm.py | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/libbeat/tests/integration/ca_pinning_test.go b/libbeat/tests/integration/ca_pinning_test.go index 51e098885eae..7a334e42c08c 100644 --- a/libbeat/tests/integration/ca_pinning_test.go +++ b/libbeat/tests/integration/ca_pinning_test.go @@ -54,7 +54,7 @@ output.elasticsearch: mockbeat.WriteConfigFile(fmt.Sprintf(cfg, esURL.String(), caPath)) mockbeat.Start() mockbeat.WaitForLogs("mockbeat start running.", 60*time.Second) - mockbeat.WaitForLogs("PublishEvents: 1 events have been published", 60*time.Second) + mockbeat.WaitForLogs("doBulkRequest: 1 events have been sent"", 60*time.Second) } func TestCAPinningBadSHA(t *testing.T) { diff --git a/libbeat/tests/integration/template_test.go b/libbeat/tests/integration/template_test.go index aec46e448b99..3dc30cbf4301 100644 --- a/libbeat/tests/integration/template_test.go +++ b/libbeat/tests/integration/template_test.go @@ -224,7 +224,7 @@ logging: mockbeat.WaitForLogs("mockbeat start running.", 60*time.Second) mockbeat.WaitForLogs("Template with name \\\"mockbeat-9.9.9\\\" loaded.", 20*time.Second) require.Eventually(t, func() bool { - return mockbeat.LogMatch("PublishEvents: [[:digit:]]+ events have been published") + return mockbeat.LogMatch("doBulkRequest: [[:digit:]]+ events have been sent") }, 20*time.Second, 100*time.Millisecond, "looking for PublishEvents") status, body, err := HttpDo(t, http.MethodGet, indexURL) @@ -296,7 +296,7 @@ logging: mockbeat.Start() mockbeat.WaitForLogs("mockbeat start running.", 60*time.Second) require.Eventually(t, func() bool { - return mockbeat.LogMatch("PublishEvents: [[:digit:]]+ events have been published") + return mockbeat.LogMatch("doBulkRequest: [[:digit:]]+ events have been sent") }, 20*time.Second, 100*time.Millisecond, "looking for PublishEvents") u := fmt.Sprintf("%s/_index_template/%s", esUrl.String(), datastream) diff --git a/libbeat/tests/system/test_ilm.py b/libbeat/tests/system/test_ilm.py index 630ab5515932..0313097d938b 100644 --- a/libbeat/tests/system/test_ilm.py +++ b/libbeat/tests/system/test_ilm.py @@ -52,7 +52,7 @@ def test_ilm_default(self): proc = self.start_beat() self.wait_until(lambda: self.log_contains("mockbeat start running.")) self.wait_until(lambda: self.log_contains(MSG_ILM_POLICY_LOADED)) - self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published")) + self.wait_until(lambda: self.log_contains("doBulkRequest: 1 events have been sent")) proc.check_kill_and_wait() self.idxmgmt.assert_data_stream_created(self.data_stream) @@ -69,7 +69,7 @@ def test_ilm_disabled(self): self.render_config(ilm={"enabled": False}) proc = self.start_beat() self.wait_until(lambda: self.log_contains("mockbeat start running.")) - self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published")) + self.wait_until(lambda: self.log_contains("doBulkRequest: 1 events have been sent")) proc.check_kill_and_wait() self.idxmgmt.assert_index_template_loaded(self.data_stream) @@ -89,7 +89,7 @@ def test_policy_name(self): proc = self.start_beat() self.wait_until(lambda: self.log_contains("mockbeat start running.")) self.wait_until(lambda: self.log_contains(MSG_ILM_POLICY_LOADED)) - self.wait_until(lambda: self.log_contains("PublishEvents: 1 events have been published")) + self.wait_until(lambda: self.log_contains("doBulkRequest: 1 events have been sent")) proc.check_kill_and_wait() self.idxmgmt.assert_index_template_loaded(self.data_stream) From bb4b14d47e574017ad2d6be06dba6beaa39f41e1 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 5 Jun 2024 21:19:09 -0400 Subject: [PATCH 13/14] fix typo --- libbeat/tests/integration/ca_pinning_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/tests/integration/ca_pinning_test.go b/libbeat/tests/integration/ca_pinning_test.go index 7a334e42c08c..98c3db6729dd 100644 --- a/libbeat/tests/integration/ca_pinning_test.go +++ b/libbeat/tests/integration/ca_pinning_test.go @@ -54,7 +54,7 @@ output.elasticsearch: mockbeat.WriteConfigFile(fmt.Sprintf(cfg, esURL.String(), caPath)) mockbeat.Start() mockbeat.WaitForLogs("mockbeat start running.", 60*time.Second) - mockbeat.WaitForLogs("doBulkRequest: 1 events have been sent"", 60*time.Second) + mockbeat.WaitForLogs("doBulkRequest: 1 events have been sent", 60*time.Second) } func TestCAPinningBadSHA(t *testing.T) { From ee9e6fd35f4a4c1afe8efc418a62c0d540a38e4f Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 6 Jun 2024 10:18:54 -0400 Subject: [PATCH 14/14] adjust flaky test --- filebeat/input/filestream/internal/task/group_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/filebeat/input/filestream/internal/task/group_test.go b/filebeat/input/filestream/internal/task/group_test.go index 5ce15d455e3e..30b9858a1de3 100644 --- a/filebeat/input/filestream/internal/task/group_test.go +++ b/filebeat/input/filestream/internal/task/group_test.go @@ -195,8 +195,8 @@ func TestGroup_Go(t *testing.T) { }) t.Run("without limit, all goroutines run", func(t *testing.T) { - // 100 <= limit <= 100000 - limit := rand.Int63n(100000-100) + 100 + // 100 <= limit <= 10000 + limit := rand.Int63n(10000-100) + 100 t.Logf("running %d goroutines", limit) g := NewGroup(uint64(limit), time.Second, noopLogger{}, "")