Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestDiagnosticLocalConfig(t *testing.T) {
ServerCA: "/path/to/server/ca",
},
},
MetricsPeriod: monitoringCfg.DefaultMetricsCollectionInterval,
},
},
}
Expand All @@ -105,11 +106,12 @@ agent:
http: null
logs: false
metrics: false
metrics_period: ""
metrics_period: "1m0s"
namespace: ""
pprof: null
failure_threshold: null
traces: true
use_output: ""
apm:
hosts:
- host1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
agent:
monitoring:
http:
enabled: false
metrics: true
inputs:
- _runtime_experimental: otel
id: filestream-monitoring-agent
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
agent:
monitoring:
http:
enabled: false
metrics: true
inputs:
- _runtime_experimental: process
id: filestream-monitoring-agent
Expand Down
121 changes: 18 additions & 103 deletions internal/pkg/agent/application/monitoring/component/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
_ "embed"
"fmt"
"maps"
"math"
"net"
"net/url"
"os"
Expand All @@ -18,7 +17,6 @@ import (
"slices"
"strconv"
"strings"
"time"
"unicode"

koanfmaps "github.com/knadh/koanf/maps"
Expand Down Expand Up @@ -59,15 +57,11 @@ const (
fileSchemePrefix = "file"
unixSchemePrefix = "unix"

defaultOutputName = "default"
outputsKey = "outputs"
inputsKey = "inputs"
idKey = "id"
agentKey = "agent"
monitoringKey = "monitoring"
serviceKey = "service"
useOutputKey = "use_output"
monitoringMetricsPeriodKey = "metrics_period"
failureThresholdKey = "failure_threshold"
monitoringOutput = "monitoring"
defaultMonitoringNamespace = "default"
Expand All @@ -81,14 +75,6 @@ const (
prometheusMonitoringComponentId = "prometheus/" + monitoringMetricsUnitID

windowsOS = "windows"

// metricset execution period used for the monitoring metrics inputs
// we set this to 60s to reduce the load/data volume on the monitoring cluster
defaultMetricsCollectionInterval = 60 * time.Second

// metricset stream failure threshold before the stream is marked as DEGRADED
// to avoid marking the agent degraded for transient errors, we set the default threshold to 5
defaultMetricsStreamFailureThreshold = uint(5)
)

var (
Expand Down Expand Up @@ -182,67 +168,12 @@ func (b *BeatsMonitor) MonitoringConfig(

cfg := make(map[string]interface{})

monitoringOutputName := defaultOutputName
metricsCollectionIntervalString := b.config.C.MetricsPeriod
failureThreshold := b.config.C.FailureThreshold
if agentCfg, found := policy[agentKey]; found {
// The agent section is required for feature flags
cfg[agentKey] = agentCfg

agentCfgMap, ok := agentCfg.(map[string]interface{})
if ok {
if monitoringCfg, found := agentCfgMap[monitoringKey]; found {
monitoringMap, ok := monitoringCfg.(map[string]interface{})
if ok {
if use, found := monitoringMap[useOutputKey]; found {
if useStr, ok := use.(string); ok {
monitoringOutputName = useStr
}
}

if metricsPeriod, found := monitoringMap[monitoringMetricsPeriodKey]; found {
if metricsPeriodStr, ok := metricsPeriod.(string); ok {
metricsCollectionIntervalString = metricsPeriodStr
}
}

if policyFailureThresholdRaw, found := monitoringMap[failureThresholdKey]; found {
switch policyValue := policyFailureThresholdRaw.(type) {
case uint:
failureThreshold = &policyValue
case int:
if policyValue < 0 {
return nil, fmt.Errorf("converting policy failure threshold int to uint, value must be non-negative: %v", policyValue)
}
unsignedValue := uint(policyValue)
failureThreshold = &unsignedValue
case float64:
if policyValue < 0 || policyValue > math.MaxUint {
return nil, fmt.Errorf("converting policy failure threshold float64 to uint, value out of range: %v", policyValue)
}
truncatedUnsignedValue := uint(policyValue)
failureThreshold = &truncatedUnsignedValue
case string:
parsedPolicyValue, err := strconv.ParseUint(policyValue, 10, 64)
if err != nil {
return nil, fmt.Errorf("converting policy failure threshold string to uint: %w", err)
}
if parsedPolicyValue > math.MaxUint {
// this is to catch possible overflow in 32-bit envs, should not happen that often
return nil, fmt.Errorf("converting policy failure threshold from string to uint, value out of range: %v", policyValue)
}
uintPolicyValue := uint(parsedPolicyValue)
failureThreshold = &uintPolicyValue
default:
return nil, fmt.Errorf("unsupported type for policy failure threshold: %T", policyFailureThresholdRaw)
}
}
}
}
}
}

outputCfg, err := b.injectMonitoringOutput(policy, cfg, monitoringOutputName)
outputCfg, err := b.injectMonitoringOutput(policy, cfg)
if err != nil && !errors.Is(err, errNoOutputPresent) {
return nil, errors.New(err, "failed to inject monitoring output")
} else if errors.Is(err, errNoOutputPresent) {
Expand All @@ -269,8 +200,7 @@ func (b *BeatsMonitor) MonitoringConfig(
}

if b.config.C.MonitorMetrics {
if err := b.injectMetricsInput(
cfg, componentInfos, metricsCollectionIntervalString, failureThreshold, monitoringRuntime); err != nil {
if err := b.injectMetricsInput(cfg, componentInfos, monitoringRuntime); err != nil {
return nil, errors.New(err, "failed to inject monitoring output")
}
}
Expand Down Expand Up @@ -418,7 +348,8 @@ func (b *BeatsMonitor) initInputs(cfg map[string]interface{}) {

// injectMonitoringOutput injects the monitoring output into the configuration. It takes an existing output named
// `monitoringOutputName` and makes a copy of it named `monitoring`. It returns the output configuration.
func (b *BeatsMonitor) injectMonitoringOutput(source, dest map[string]interface{}, monitoringOutputName string) (map[string]any, error) {
func (b *BeatsMonitor) injectMonitoringOutput(source, dest map[string]interface{}) (map[string]any, error) {
monitoringOutputName := b.config.C.UseOutput
outputsNode, found := source[outputsKey]
if !found {
return nil, errNoOutputPresent
Expand Down Expand Up @@ -567,22 +498,12 @@ func (b *BeatsMonitor) getCollectorTelemetryEndpoint() string {
func (b *BeatsMonitor) injectMetricsInput(
cfg map[string]interface{},
componentInfos []componentInfo,
metricsCollectionIntervalString string,
failureThreshold *uint,
monitoringRuntime component.RuntimeManager,
) error {
if metricsCollectionIntervalString == "" {
metricsCollectionIntervalString = defaultMetricsCollectionInterval.String()
}

if failureThreshold == nil {
defaultValue := defaultMetricsStreamFailureThreshold
failureThreshold = &defaultValue
}
monitoringNamespace := b.monitoringNamespace()

beatsStreams := b.getBeatsStreams(componentInfos, failureThreshold, metricsCollectionIntervalString)
httpStreams := b.getHttpStreams(componentInfos, failureThreshold, metricsCollectionIntervalString)
beatsStreams := b.getBeatsStreams(componentInfos)
httpStreams := b.getHttpStreams(componentInfos)

inputs := []interface{}{
map[string]interface{}{
Expand Down Expand Up @@ -615,7 +536,7 @@ func (b *BeatsMonitor) injectMetricsInput(
if usingOtelRuntime(componentInfos) && slices.ContainsFunc(componentInfos, func(ci componentInfo) bool {
return ci.ID == prometheusMonitoringComponentId
}) {
prometheusStream := b.getPrometheusStream(failureThreshold, metricsCollectionIntervalString)
prometheusStream := b.getPrometheusStream()
inputs = append(inputs, map[string]interface{}{
idKey: fmt.Sprintf("%s-collector", monitoringMetricsUnitID),
"name": fmt.Sprintf("%s-collector", monitoringMetricsUnitID),
Expand All @@ -633,7 +554,7 @@ func (b *BeatsMonitor) injectMetricsInput(

// add system/process metrics for services that can't be monitored via json/beats metrics
inputs = append(inputs, b.getServiceComponentProcessMetricInputs(
componentInfos, metricsCollectionIntervalString)...)
componentInfos)...)

inputsNode, found := cfg[inputsKey]
if !found {
Expand Down Expand Up @@ -733,13 +654,13 @@ func (b *BeatsMonitor) getServiceComponentFilestreamStreams(componentInfos []com
// Note: The return type must be []any due to protobuf serialization quirks.
func (b *BeatsMonitor) getHttpStreams(
componentInfos []componentInfo,
failureThreshold *uint,
metricsCollectionIntervalString string,
) []any {
monitoringNamespace := b.monitoringNamespace()
sanitizedAgentName := sanitizeName(agentName)
indexName := fmt.Sprintf("metrics-elastic_agent.%s-%s", sanitizedAgentName, monitoringNamespace)
dataset := fmt.Sprintf("elastic_agent.%s", sanitizedAgentName)
metricsCollectionIntervalString := b.config.C.MetricsPeriod.String()
failureThreshold := b.config.C.FailureThreshold
httpStreams := make([]any, 0, len(componentInfos))

agentStream := map[string]any{
Expand Down Expand Up @@ -848,10 +769,7 @@ func (b *BeatsMonitor) getHttpStreams(

// getPrometheusStream returns the stream definition for prometheus/metrics input.
// Note: The return type must be []any due to protobuf serialization quirks.
func (b *BeatsMonitor) getPrometheusStream(
failureThreshold *uint,
metricsCollectionIntervalString string,
) any {
func (b *BeatsMonitor) getPrometheusStream() any {
monitoringNamespace := b.monitoringNamespace()

// Send these metrics through the metricbeat monitoring datastream, since
Expand All @@ -875,12 +793,12 @@ func (b *BeatsMonitor) getPrometheusStream(
"metrics_path": "/metrics",
"hosts": []interface{}{prometheusHost},
"namespace": monitoringNamespace,
"period": metricsCollectionIntervalString,
"period": b.config.C.MetricsPeriod.String(),
"index": indexName,
"processors": processorsForCollectorPrometheusStream(monitoringNamespace, dataset, b.agentInfo),
}
if failureThreshold != nil {
otelStream[failureThresholdKey] = *failureThreshold
if b.config.C.FailureThreshold != nil {
otelStream[failureThresholdKey] = *b.config.C.FailureThreshold
}
return otelStream
}
Expand All @@ -889,8 +807,6 @@ func (b *BeatsMonitor) getPrometheusStream(
// Note: The return type must be []any due to protobuf serialization quirks.
func (b *BeatsMonitor) getBeatsStreams(
componentInfos []componentInfo,
failureThreshold *uint,
metricsCollectionIntervalString string,
) []any {
monitoringNamespace := b.monitoringNamespace()
beatsStreams := make([]any, 0, len(componentInfos))
Expand All @@ -915,13 +831,13 @@ func (b *BeatsMonitor) getBeatsStreams(
},
"metricsets": []interface{}{"stats"},
"hosts": endpoints,
"period": metricsCollectionIntervalString,
"period": b.config.C.MetricsPeriod.String(),
"index": indexName,
"processors": processorsForBeatsStream(binaryName, compInfo.ID, monitoringNamespace, dataset, b.agentInfo, compInfo.RuntimeManager),
}

if failureThreshold != nil {
beatsStream[failureThresholdKey] = *failureThreshold
if b.config.C.FailureThreshold != nil {
beatsStream[failureThresholdKey] = *b.config.C.FailureThreshold
}

beatsStreams = append(beatsStreams, beatsStream)
Expand All @@ -935,7 +851,6 @@ func (b *BeatsMonitor) getBeatsStreams(
// Note: The return type must be []any due to protobuf serialization quirks.
func (b *BeatsMonitor) getServiceComponentProcessMetricInputs(
componentInfos []componentInfo,
metricsCollectionIntervalString string,
) []any {
monitoringNamespace := b.monitoringNamespace()
inputs := []any{}
Expand Down Expand Up @@ -963,7 +878,7 @@ func (b *BeatsMonitor) getServiceComponentProcessMetricInputs(
"namespace": monitoringNamespace,
},
"metricsets": []interface{}{"process"},
"period": metricsCollectionIntervalString,
"period": b.config.C.MetricsPeriod.String(),
"index": fmt.Sprintf("metrics-elastic_agent.%s-%s", name, monitoringNamespace),
"process.pid": compInfo.Pid,
"process.cgroups.enabled": false,
Expand Down
Loading