Skip to content
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
3fcb875
remove drop on cancel option
faec May 29, 2024
47c07a6
producer.Cancel -> producer.Close
faec May 29, 2024
f9d4c39
remove OnDrop callbacks
faec May 29, 2024
d398b46
remove internal cancellation helpers
faec May 29, 2024
bad2498
remove the queue's shipper metrics hook
faec May 29, 2024
eca723b
remove unused fields and producer cancel tests
faec May 29, 2024
c79cc3e
Merge branch 'remove-producer-cancel' into queue-metrics
faec May 29, 2024
bffd70d
fix merge
faec May 29, 2024
7409be1
moving metric ownership around
faec May 29, 2024
75ac0f4
plumbing for queue metrics
faec May 29, 2024
ca949b8
flesh out queue observer internals
faec May 29, 2024
a279862
update queue filled percent
faec May 29, 2024
d0f1939
Merge branch 'main' into queue-metrics
faec May 30, 2024
517ffe1
clean up shipper metric hooks
faec May 30, 2024
4f6d02c
use the metrics observer from the memqueue
faec May 30, 2024
fd18f4e
configure gauges
faec May 30, 2024
2f6ba9b
report queue metrics from the disk queue
faec May 30, 2024
ce2c287
fix disk queue initialization
faec May 30, 2024
e70c13b
outputObserver -> retryObserver
faec May 30, 2024
e6dbb2d
move queue draining logic into the queue
faec May 30, 2024
afa3793
shadow acked var the simple way
faec May 30, 2024
f674a9f
Merge branch 'main' of github.com:elastic/beats into queue-metrics
faec May 31, 2024
076326a
fix active events, metric paths
faec May 31, 2024
729ff87
add memory queue observer tests
faec Jun 5, 2024
91bfeef
add disk queue tests
faec Jun 5, 2024
1d0b0a8
Merge branch 'main' of github.com:elastic/beats into queue-metrics
faec Jun 6, 2024
73aded1
add nil observer checks during queue creation
faec Jun 6, 2024
983e8af
make check
faec Jun 6, 2024
8c1d66a
adjust flaky test
faec Jun 6, 2024
1278fab
Merge branch 'main' of github.com:elastic/beats into queue-metrics
faec Jun 6, 2024
2981307
fix producer queue shutdown handling
faec Jun 6, 2024
0a114c4
Merge branch 'main' into queue-metrics
pierrehilbert Jun 7, 2024
0f06ff0
fix masked panic in some unit tests
faec Jun 7, 2024
e4a1ee6
fix ack handler initialization
faec Jun 7, 2024
d49def3
Merge branch 'queue-metrics' of github.com:faec/beats into queue-metrics
faec Jun 7, 2024
7621c1f
fix goroutine checker in test
faec Jun 7, 2024
2df11e4
add metrics nil check
faec Jun 7, 2024
714678c
fix tests
faec Jun 10, 2024
b3a728e
document queue metrics
faec Jun 11, 2024
3b5f068
hopefully decrease test flakiness
faec Jun 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 33 additions & 30 deletions libbeat/monitoring/report/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,36 +37,39 @@ import (
// TODO: Replace this with a proper solution that uses the metric type from
// where it is defined. See: https://github.com/elastic/beats/issues/5433
var gauges = map[string]bool{
"libbeat.output.events.active": true,
"libbeat.pipeline.events.active": true,
"libbeat.pipeline.clients": true,
"libbeat.pipeline.queue.max_events": true,
"libbeat.pipeline.queue.filled.pct.events": true,
"libbeat.config.module.running": true,
"registrar.states.current": true,
"filebeat.events.active": true,
"filebeat.harvester.running": true,
"filebeat.harvester.open_files": true,
"beat.memstats.memory_total": true,
"beat.memstats.memory_alloc": true,
"beat.memstats.rss": true,
"beat.memstats.gc_next": true,
"beat.info.uptime.ms": true,
"beat.cgroup.memory.mem.usage.bytes": true,
"beat.cpu.user.ticks": true,
"beat.cpu.system.ticks": true,
"beat.cpu.total.value": true,
"beat.cpu.total.ticks": true,
"beat.handles.open": true,
"beat.handles.limit.hard": true,
"beat.handles.limit.soft": true,
"beat.runtime.goroutines": true,
"system.load.1": true,
"system.load.5": true,
"system.load.15": true,
"system.load.norm.1": true,
"system.load.norm.5": true,
"system.load.norm.15": true,
"libbeat.output.events.active": true,
"libbeat.pipeline.events.active": true,
"libbeat.pipeline.clients": true,
"libbeat.pipeline.queue.max_events": true,
"libbeat.pipeline.queue.max_bytes": true,
"libbeat.pipeline.queue.filled.events": true,
"libbeat.pipeline.queue.filled.bytes": true,
"libbeat.pipeline.queue.filled.pct": true,
"libbeat.config.module.running": true,
"registrar.states.current": true,
"filebeat.events.active": true,
"filebeat.harvester.running": true,
"filebeat.harvester.open_files": true,
"beat.memstats.memory_total": true,
"beat.memstats.memory_alloc": true,
"beat.memstats.rss": true,
"beat.memstats.gc_next": true,
"beat.info.uptime.ms": true,
"beat.cgroup.memory.mem.usage.bytes": true,
"beat.cpu.user.ticks": true,
"beat.cpu.system.ticks": true,
"beat.cpu.total.value": true,
"beat.cpu.total.ticks": true,
"beat.handles.open": true,
"beat.handles.limit.hard": true,
"beat.handles.limit.soft": true,
"beat.runtime.goroutines": true,
"system.load.1": true,
"system.load.5": true,
"system.load.15": true,
"system.load.norm.1": true,
"system.load.norm.5": true,
"system.load.norm.15": true,
}

// isGauge returns true when the given metric key name represents a gauge value.
Expand Down
16 changes: 5 additions & 11 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ type client struct {
mutex sync.Mutex
waiter *clientCloseWaiter

eventFlags publisher.EventFlags
canDrop bool
eventWaitGroup *sync.WaitGroup
eventFlags publisher.EventFlags
canDrop bool

// Open state, signaling, and sync primitives for coordinating client Close.
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
Expand Down Expand Up @@ -132,10 +131,8 @@ func (c *client) publish(e beat.Event) {
}

func (c *client) Close() error {
// first stop ack handling. ACK handler might block on wait (with timeout), waiting
// for pending events to be ACKed.
c.closeOnce.Do(func() {
c.isOpen.Store(false)
if c.isOpen.Swap(false) {
// Only do shutdown handling the first time Close is called
c.onClosing()

c.logger.Debug("client: closing acker")
Expand All @@ -158,7 +155,7 @@ func (c *client) Close() error {
}
c.logger.Debug("client: done closing processors")
}
})
}
return nil
}

Expand All @@ -180,9 +177,6 @@ func (c *client) onNewEvent() {
}

func (c *client) onPublished() {
if c.eventWaitGroup != nil {
c.eventWaitGroup.Add(1)
}
c.observer.publishedEvent()
if c.clientListener != nil {
c.clientListener.Published()
Expand Down
75 changes: 21 additions & 54 deletions libbeat/publisher/pipeline/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,59 +60,27 @@ func TestClient(t *testing.T) {
// Note: no asserts. If closing fails we have a deadlock, because Publish
// would block forever

cases := map[string]struct {
context bool
close func(client beat.Client, cancel func())
}{
"close unblocks client without context": {
context: false,
close: func(client beat.Client, _ func()) {
client.Close()
},
},
"close unblocks client with context": {
context: true,
close: func(client beat.Client, _ func()) {
client.Close()
},
},
"context cancel unblocks client": {
context: true,
close: func(client beat.Client, cancel func()) {
cancel()
},
},
}

logp.TestingSetup()
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

for name, test := range cases {
t.Run(name, func(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)
pipeline := makePipeline(t, Settings{}, makeTestQueue())
defer pipeline.Close()

pipeline := makePipeline(t, Settings{}, makeTestQueue())
defer pipeline.Close()

client, err := pipeline.ConnectWith(beat.ClientConfig{})
if err != nil {
t.Fatal(err)
}
defer client.Close()
client, err := pipeline.ConnectWith(beat.ClientConfig{})
if err != nil {
t.Fatal(err)
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
client.Publish(beat.Event{})
}()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
client.Publish(beat.Event{})
}()

test.close(client, func() {
client.Close()
})
wg.Wait()
})
}
client.Close()
wg.Wait()
})

t.Run("no infinite loop when processing fails", func(t *testing.T) {
Expand Down Expand Up @@ -216,9 +184,6 @@ func TestClient(t *testing.T) {
}

func TestClientWaitClose(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

makePipeline := func(settings Settings, qu queue.Queue) *Pipeline {
p, err := New(beat.Info{},
Monitors{},
Expand All @@ -241,6 +206,9 @@ func TestClientWaitClose(t *testing.T) {
defer pipeline.Close()

t.Run("WaitClose blocks", func(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)

client, err := pipeline.ConnectWith(beat.ClientConfig{
WaitClose: 500 * time.Millisecond,
})
Expand Down Expand Up @@ -272,6 +240,8 @@ func TestClientWaitClose(t *testing.T) {
})

t.Run("ACKing events unblocks WaitClose", func(t *testing.T) {
routinesChecker := resources.NewGoroutinesChecker()
defer routinesChecker.Check(t)
client, err := pipeline.ConnectWith(beat.ClientConfig{
WaitClose: time.Minute,
})
Expand Down Expand Up @@ -344,9 +314,6 @@ func TestMonitoring(t *testing.T) {
require.NoError(t, err)
defer pipeline.Close()

metricsSnapshot := monitoring.CollectFlatSnapshot(metrics, monitoring.Full, true)
assert.Equal(t, int64(maxEvents), metricsSnapshot.Ints["pipeline.queue.max_events"])

telemetrySnapshot := monitoring.CollectFlatSnapshot(telemetry, monitoring.Full, true)
assert.Equal(t, "output_name", telemetrySnapshot.Strings["output.name"])
assert.Equal(t, int64(batchSize), telemetrySnapshot.Ints["output.batch_size"])
Expand Down
16 changes: 8 additions & 8 deletions libbeat/publisher/pipeline/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
type eventConsumer struct {
logger *logp.Logger

// eventConsumer calls the observer methods eventsRetry and eventsDropped.
observer outputObserver
// eventConsumer calls the retryObserver methods eventsRetry and eventsDropped.
retryObserver retryObserver

// When the output changes, the new target is sent to the worker routine
// on this channel. Clients should call eventConsumer.setTarget().
Expand Down Expand Up @@ -73,12 +73,12 @@ type retryRequest struct {

func newEventConsumer(
log *logp.Logger,
observer outputObserver,
observer retryObserver,
) *eventConsumer {
c := &eventConsumer{
logger: log,
observer: observer,
queueReader: makeQueueReader(),
logger: log,
retryObserver: observer,
queueReader: makeQueueReader(),

targetChan: make(chan consumerTarget),
retryChan: make(chan retryRequest),
Expand Down Expand Up @@ -163,7 +163,7 @@ outerLoop:
// Successfully sent a batch to the output workers
if len(retryBatches) > 0 {
// This was a retry, report it to the observer
c.observer.eventsRetry(len(active.Events()))
c.retryObserver.eventsRetry(len(active.Events()))
retryBatches = retryBatches[1:]
} else {
// This was directly from the queue, clear the value so we can
Expand All @@ -183,7 +183,7 @@ outerLoop:
alive := req.batch.reduceTTL()

countDropped := countFailed - len(req.batch.Events())
c.observer.eventsDropped(countDropped)
c.retryObserver.eventsDropped(countDropped)

if !alive {
log.Info("Drop batch")
Expand Down
Loading