Skip to content

Commit df36a6d

Browse files
[8.19](backport #47693) [beatreceiver] Do not require specifiying otelconsumer output (#47716)
* [beatreceiver] Do not require specifiying otelconsumer output (#47693) * do not require explicit config (cherry picked from commit b80d1e9) # Conflicts: # x-pack/filebeat/fbreceiver/receiver_test.go # x-pack/filebeat/tests/integration/otel_lsexporter_test.go # x-pack/libbeat/cmd/instance/beat.go * fix conflicts * fix conflict * remove new file * make update --------- Co-authored-by: Khushi Jain <[email protected]>
1 parent 932bd62 commit df36a6d

File tree

11 files changed

+47
-111
lines changed

11 files changed

+47
-111
lines changed

x-pack/filebeat/fbreceiver/receiver_leak_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,6 @@ func TestLeak(t *testing.T) {
5050
},
5151
},
5252
},
53-
"output": map[string]any{
54-
"otelconsumer": map[string]any{},
55-
},
5653
"logging": map[string]any{
5754
"level": "debug",
5855
"selectors": []string{

x-pack/filebeat/fbreceiver/receiver_test.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,6 @@ func TestNewReceiver(t *testing.T) {
6060
},
6161
},
6262
},
63-
"output": map[string]any{
64-
"otelconsumer": map[string]any{},
65-
},
6663
"logging": map[string]any{
6764
"level": "debug",
6865
"selectors": []string{
@@ -135,9 +132,6 @@ func benchmarkFactoryWithLogLevel(b *testing.B, level zapcore.Level) {
135132
},
136133
},
137134
},
138-
"output": map[string]any{
139-
"otelconsumer": map[string]any{},
140-
},
141135
"logging": map[string]any{
142136
"level": level.String(),
143137
"selectors": []string{
@@ -199,9 +193,6 @@ func TestMultipleReceivers(t *testing.T) {
199193
},
200194
},
201195
},
202-
"output": map[string]any{
203-
"otelconsumer": map[string]any{},
204-
},
205196
"logging": map[string]any{
206197
"level": "info",
207198
"selectors": []string{
@@ -314,9 +305,6 @@ func TestReceiverDegraded(t *testing.T) {
314305
},
315306
},
316307
},
317-
"output": map[string]any{
318-
"otelconsumer": map[string]any{},
319-
},
320308
"logging": map[string]any{
321309
"level": "debug",
322310
"selectors": []string{
@@ -442,9 +430,6 @@ func TestReceiverHook(t *testing.T) {
442430
},
443431
},
444432
},
445-
"output": map[string]any{
446-
"otelconsumer": map[string]any{},
447-
},
448433
"management.otel.enabled": true,
449434
"path.home": t.TempDir(),
450435
},

x-pack/filebeat/tests/integration/otel_test.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@ func TestFilebeatOTelE2E(t *testing.T) {
6565
- %s
6666
prospector.scanner.fingerprint.enabled: false
6767
file_identity.native: ~
68-
output:
69-
otelconsumer:
7068
processors:
7169
- add_host_metadata: ~
7270
- add_cloud_metadata: ~
@@ -250,8 +248,6 @@ processors:
250248
- type: httpjson
251249
id: httpjson-e2e-otel
252250
request.url: http://localhost:8090/test
253-
output:
254-
otelconsumer:
255251
processors:
256252
- add_host_metadata: ~
257253
- add_cloud_metadata: ~
@@ -450,8 +446,6 @@ func TestFilebeatOTelReceiverE2E(t *testing.T) {
450446
- {{.InputFile}}
451447
prospector.scanner.fingerprint.enabled: false
452448
file_identity.native: ~
453-
output:
454-
otelconsumer:
455449
logging:
456450
level: info
457451
selectors:
@@ -641,8 +635,6 @@ func TestFilebeatOTelMultipleReceiversE2E(t *testing.T) {
641635
- {{$receiver.InputFile}}
642636
prospector.scanner.fingerprint.enabled: false
643637
file_identity.native: ~
644-
output:
645-
otelconsumer:
646638
logging:
647639
level: info
648640
selectors:
@@ -880,8 +872,6 @@ func TestFilebeatOTelDocumentLevelRetries(t *testing.T) {
880872
- {{.InputFile}}
881873
prospector.scanner.fingerprint.enabled: false
882874
file_identity.native: ~
883-
output:
884-
otelconsumer:
885875
logging:
886876
level: debug
887877
queue.mem.flush.timeout: 0s
@@ -1063,8 +1053,6 @@ func TestFileBeatKerberos(t *testing.T) {
10631053
- {{.InputFile}}
10641054
prospector.scanner.fingerprint.enabled: false
10651055
file_identity.native: ~
1066-
output:
1067-
otelconsumer:
10681056
queue.mem.flush.timeout: 0s
10691057
management.otel.enabled: true
10701058
path.home: {{.PathHome}}

x-pack/libbeat/cmd/instance/beat.go

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,24 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
6565
ucfg.VarExp,
6666
}
6767

68+
err = setLogger(b, receiverConfig, core)
69+
if err != nil {
70+
return nil, fmt.Errorf("error configuring beats logger: %w", err)
71+
}
72+
73+
// extracting it here for ease of use
74+
logger := b.Info.Logger
75+
76+
// if output is set and if output is not otelconsumer, inform users
77+
if receiverConfig["output"] != nil && receiverConfig["output"].(map[string]any)["otelconsumer"] == nil { //nolint: errcheck // output will always be of map type
78+
logger.Debugf("configured output does not work with beatreceiver, please use appropriate exporter instead")
79+
}
80+
81+
// all beatreceivers will use otelconsumer output by default
82+
receiverConfig["output"] = map[string]any{
83+
"otelconsumer": map[string]any{},
84+
}
85+
6886
tmp, err := ucfg.NewFrom(receiverConfig, cfOpts...)
6987
if err != nil {
7088
return nil, fmt.Errorf("error converting receiver config to ucfg: %w", err)
@@ -113,37 +131,12 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
113131
return nil, fmt.Errorf("error unpacking config data: %w", err)
114132
}
115133

116-
logpConfig := logp.Config{}
117-
logpConfig.AddCaller = true
118-
logpConfig.Beat = b.Info.Beat
119-
logpConfig.Files.MaxSize = 1
120-
121-
if b.Config.Logging == nil {
122-
b.Config.Logging = config.NewConfig()
123-
}
124-
125-
if err := b.Config.Logging.Unpack(&logpConfig); err != nil {
126-
return nil, fmt.Errorf("error unpacking beats logging config: %w\n%v", err, b.Config.Logging)
127-
}
128-
129-
b.Info.Logger, err = logp.ConfigureWithCoreLocal(logpConfig, core)
130-
if err != nil {
131-
return nil, fmt.Errorf("error configuring beats logp: %w", err)
132-
}
133-
134-
// extracting it here for ease of use
135-
logger := b.Info.Logger
136-
137134
instrumentation, err := instrumentation.New(cfg, b.Info.Beat, b.Info.Version, logger)
138135
if err != nil {
139136
return nil, fmt.Errorf("error setting up instrumentation: %w", err)
140137
}
141138
b.Instrumentation = instrumentation
142139

143-
if err := instance.PromoteOutputQueueSettings(b); err != nil {
144-
return nil, fmt.Errorf("could not promote output queue settings: %w", err)
145-
}
146-
147140
if err := features.UpdateFromConfig(b.RawConfig); err != nil {
148141
return nil, fmt.Errorf("could not parse features: %w", err)
149142
}
@@ -246,17 +239,6 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
246239
}
247240
b.SetProcessors(processors)
248241

249-
// This should be replaced with static config for otel consumer
250-
// but need to figure out if we want the Queue settings from here.
251-
outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
252-
if !outputEnabled {
253-
if b.Manager.Enabled() {
254-
logger.Info("Output is configured through Central Management")
255-
} else {
256-
return nil, fmt.Errorf("no outputs are defined, please define one under the output section")
257-
}
258-
}
259-
260242
reg := b.Monitoring.StatsRegistry().GetOrCreateRegistry("libbeat")
261243

262244
monitors := pipeline.Monitors{
@@ -283,3 +265,31 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
283265

284266
return b, nil
285267
}
268+
269+
// setLogger configures a logp logger and sets it on b.Info.Logger
270+
func setLogger(b *instance.Beat, receiverConfig map[string]any, core zapcore.Core) error {
271+
272+
var err error
273+
logpConfig := logp.Config{}
274+
logpConfig.AddCaller = true
275+
logpConfig.Beat = b.Info.Beat
276+
logpConfig.Files.MaxSize = 1
277+
278+
var logCfg *config.C
279+
if _, ok := receiverConfig["logging"]; !ok {
280+
logCfg = config.NewConfig()
281+
} else {
282+
logCfg = config.MustNewConfigFrom(receiverConfig["logging"])
283+
}
284+
285+
if err := logCfg.Unpack(&logpConfig); err != nil {
286+
return fmt.Errorf("error unpacking beats logging config: %w\n%v", err, b.Config.Logging)
287+
}
288+
289+
b.Info.Logger, err = logp.ConfigureWithCoreLocal(logpConfig, core)
290+
if err != nil {
291+
return fmt.Errorf("error configuring beats logp: %w", err)
292+
}
293+
294+
return nil
295+
}

x-pack/libbeat/cmd/instance/beat_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,6 @@ func TestManager(t *testing.T) {
3030
},
3131
},
3232
},
33-
"output": map[string]any{
34-
"otelconsumer": map[string]any{},
35-
},
3633
"path.home": tmpDir,
3734
}
3835
t.Run("otel management disabled - key missing", func(t *testing.T) {

x-pack/libbeat/common/otelbeat/oteltestcol/collector_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@ func TestNew(t *testing.T) {
2121
message: "test message"
2222
count: 1
2323
processors: ~
24-
output:
25-
otelconsumer:
2624
logging:
2725
level: debug
2826
queue.mem.flush.timeout: 0s

x-pack/metricbeat/mbreceiver/receiver_leak_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,6 @@ func TestLeak(t *testing.T) {
4545
},
4646
},
4747
},
48-
"output": map[string]any{
49-
"otelconsumer": map[string]any{},
50-
},
5148
"logging": map[string]any{
5249
"level": "debug",
5350
"selectors": []string{

x-pack/metricbeat/mbreceiver/receiver_test.go

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,6 @@ func TestNewReceiver(t *testing.T) {
5656
},
5757
},
5858
},
59-
"output": map[string]any{
60-
"otelconsumer": map[string]any{},
61-
},
6259
"logging": map[string]any{
6360
"level": "debug",
6461
"selectors": []string{
@@ -139,9 +136,6 @@ func TestMultipleReceivers(t *testing.T) {
139136
},
140137
},
141138
},
142-
"output": map[string]any{
143-
"otelconsumer": map[string]any{},
144-
},
145139
"logging": map[string]any{
146140
"level": "debug",
147141
"selectors": []string{
@@ -167,9 +161,6 @@ func TestMultipleReceivers(t *testing.T) {
167161
},
168162
},
169163
},
170-
"output": map[string]any{
171-
"otelconsumer": map[string]any{},
172-
},
173164
"logging": map[string]any{
174165
"level": "debug",
175166
"selectors": []string{
@@ -336,9 +327,6 @@ func BenchmarkFactory(b *testing.B) {
336327
},
337328
},
338329
},
339-
"output": map[string]any{
340-
"otelconsumer": map[string]any{},
341-
},
342330
"logging": map[string]any{
343331
"level": "info",
344332
"selectors": []string{
@@ -407,9 +395,6 @@ func TestReceiverDegraded(t *testing.T) {
407395
},
408396
},
409397
},
410-
"output": map[string]any{
411-
"otelconsumer": map[string]any{},
412-
},
413398
"path.home": t.TempDir(),
414399
},
415400
}
@@ -444,10 +429,7 @@ func TestReceiverHook(t *testing.T) {
444429
},
445430
},
446431
"management.otel.enabled": true,
447-
"output": map[string]any{
448-
"otelconsumer": map[string]any{},
449-
},
450-
"path.home": t.TempDir(),
432+
"path.home": t.TempDir(),
451433
},
452434
}
453435
receiverSettings := receiver.Settings{

x-pack/metricbeat/tests/integration/otel_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,6 @@ func TestMetricbeatOTelE2E(t *testing.T) {
6969
- '.*'
7070
metricsets:
7171
- cpu
72-
output:
73-
otelconsumer:
7472
processors:
7573
- add_host_metadata: ~
7674
- add_cloud_metadata: ~
@@ -266,8 +264,6 @@ func TestMetricbeatOTelReceiverE2E(t *testing.T) {
266264
- '.*'
267265
metricsets:
268266
- cpu
269-
output:
270-
otelconsumer:
271267
processors:
272268
- add_host_metadata: ~
273269
- add_cloud_metadata: ~
@@ -463,8 +459,6 @@ func TestMetricbeatOTelMultipleReceiversE2E(t *testing.T) {
463459
target: ''
464460
fields:
465461
receiverid: "{{$i}}"
466-
output:
467-
otelconsumer:
468462
logging:
469463
level: info
470464
selectors:

x-pack/otel/processor/beatprocessor/README.md

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ receivers:
4141
id: host-logs
4242
paths:
4343
- /var/log/*.log
44-
output:
45-
otelconsumer:
4644
```
4745
4846
The above Filebeat receiver configuration does not explicitly specify the `processors` option.
@@ -58,8 +56,6 @@ receivers:
5856
paths:
5957
- /var/log/*.log
6058
processors: []
61-
output:
62-
otelconsumer:
6359
```
6460

6561
The above Filebeat receiver configuration specifies an empty list of processors.
@@ -82,8 +78,6 @@ receivers:
8278
- add_host_metadata:
8379
netinfo:
8480
enabled: false
85-
output:
86-
otelconsumer:
8781
```
8882

8983
is functionally equivalent to this one, using the Beat processor:
@@ -98,8 +92,6 @@ receivers:
9892
paths:
9993
- /var/log/*.log
10094
processors: []
101-
output:
102-
otelconsumer:
10395
10496
processors:
10597
beat:

0 commit comments

Comments
 (0)