diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index d7e07846e0c4..4c27494fa68b 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -274,7 +274,7 @@ func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) { // Queue metrics are reported under the pipeline namespace var pipelineMetrics *monitoring.Registry if c.monitors.Metrics != nil { - pipelineMetrics := c.monitors.Metrics.GetRegistry("pipeline") + pipelineMetrics = c.monitors.Metrics.GetRegistry("pipeline") if pipelineMetrics == nil { pipelineMetrics = c.monitors.Metrics.NewRegistry("pipeline") } diff --git a/libbeat/publisher/pipeline/controller_test.go b/libbeat/publisher/pipeline/controller_test.go index 2e4f0df990f6..706c159e3d4a 100644 --- a/libbeat/publisher/pipeline/controller_test.go +++ b/libbeat/publisher/pipeline/controller_test.go @@ -33,6 +33,7 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/monitoring" //"github.com/elastic/beats/v7/libbeat/tests/resources" @@ -243,3 +244,26 @@ func TestQueueProducerBlocksUntilOutputIsSet(t *testing.T) { }) assert.True(t, allFinished, "All queueProducer requests should be unblocked once an output is set") } + +func TestQueueMetrics(t *testing.T) { + // More thorough testing of queue metrics are in the queue package, + // here we just want to make sure that they appear under the right + // monitoring namespace. + reg := monitoring.NewRegistry() + controller := outputController{ + queueFactory: memqueue.FactoryForSettings(memqueue.Settings{Events: 1000}), + consumer: &eventConsumer{ + targetChan: make(chan consumerTarget, 4), + retryObserver: nilObserver, + }, + monitors: Monitors{Metrics: reg}, + } + controller.Set(outputs.Group{ + Clients: []outputs.Client{newMockClient(nil)}, + }) + entry := reg.Get("pipeline.queue.max_events") + require.NotNil(t, entry, "pipeline.queue.max_events must exist") + value, ok := entry.(*monitoring.Uint) + require.True(t, ok, "pipeline.queue.max_events must be a *monitoring.Uint") + assert.Equal(t, uint64(1000), value.Get(), "pipeline.queue.max_events should match the events configuration key") +}