diff --git a/changelog/fragments/1763578431-remove-otel.component.id-and-otel.component.kind-from-beat-receiver-events.yaml b/changelog/fragments/1763578431-remove-otel.component.id-and-otel.component.kind-from-beat-receiver-events.yaml new file mode 100644 index 000000000000..bd9e7017aa07 --- /dev/null +++ b/changelog/fragments/1763578431-remove-otel.component.id-and-otel.component.kind-from-beat-receiver-events.yaml @@ -0,0 +1,45 @@ +# REQUIRED +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: breaking-change + +# REQUIRED for all kinds +# Change summary; a 80ish characters long description of the change. +summary: remove otel.component.id and otel.component.kind from beat receiver events + +# REQUIRED for breaking-change, deprecation, known-issue +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# description: + +# REQUIRED for breaking-change, deprecation, known-issue +# impact: + +# REQUIRED for breaking-change, deprecation, known-issue +# action: + +# REQUIRED for all kinds +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: all + +# AUTOMATED +# OPTIONAL to manually add other PR URLs +# PR URL: A link the PR that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/beats/pull/47729 + +# AUTOMATED +# OPTIONAL to manually add other issue URLs +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/beats/issues/47600 diff --git a/x-pack/filebeat/fbreceiver/receiver_test.go b/x-pack/filebeat/fbreceiver/receiver_test.go index 22bd248c414c..253550518c01 100644 --- a/x-pack/filebeat/fbreceiver/receiver_test.go +++ b/x-pack/filebeat/fbreceiver/receiver_test.go @@ -93,8 +93,7 @@ func TestNewReceiver(t *testing.T) { AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) { _ = zapLogs require.Lenf(c, logs["r1"], 1, "expected 1 log, got %d", len(logs["r1"])) - assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") - assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") + assert.Equal(c, "test", logs["r1"][0].Flatten()["message"], "expected message field to contain string 'test'") var lastError strings.Builder assert.Conditionf(c, func() bool { return getFromSocket(t, &lastError, monitorSocket, "stats") @@ -242,10 +241,8 @@ func TestMultipleReceivers(t *testing.T) { require.Greater(c, len(logs["r1"]), 0, "receiver r1 does not have any logs") require.Greater(c, len(logs["r2"]), 0, "receiver r2 does not have any logs") - assert.Equal(c, "filebeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record") - assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record") - assert.Equal(c, "filebeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record") - assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r2 log record") + assert.Equal(c, "test", logs["r1"][0].Flatten()["message"], "expected r1 message field to be 'test'") + assert.Equal(c, "test", logs["r2"][0].Flatten()["message"], "expected r2 message field to be 'test'") // Make sure that each receiver has a separate logger // instance and does not interfere with others. Previously, the diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 1abf2527128a..c9298985e29a 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -52,7 +52,7 @@ func TestFilebeatOTelE2E(t *testing.T) { otelMonitoringPort := int(libbeattesting.MustAvailableTCP4Port(t)) filebeatMonitoringPort := int(libbeattesting.MustAvailableTCP4Port(t)) - var beatsCfgFile = ` + beatsCfgFile := ` filebeat.inputs: - type: filestream id: filestream-input-id @@ -199,19 +199,14 @@ setup.template.pattern: logs-filebeat-default "agent.id", "log.file.inode", "log.file.path", - // only present in beats receivers - "agent.otelcol.component.id", - "agent.otelcol.component.kind", "log.file.device_id", // changes value between filebeat and otel receiver "container.id", // only present in filebeat } assertMapsEqual(t, filebeatDoc, otelDoc, ignoredFields, "expected documents to be equal") - assert.Equal(t, "filebeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") - assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") - assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in filebeat log record") - assert.NotContains(t, filebeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in filebeat log record") + assert.Equal(t, "filebeat", otelDoc.Flatten()["agent.type"], "expected agent.type field to be 'filebeat' in otel docs") + assert.Equal(t, "filebeat", filebeatDoc.Flatten()["agent.type"], "expected agent.type field to be 'filebeat' in filebeat docs") assertMonitoring(t, otelMonitoringPort) } @@ -270,6 +265,115 @@ func assertMonitoring(t *testing.T, port int) { require.Equal(t, http.StatusNotFound, r.StatusCode, "incorrect status code") } +func TestFilebeatOTelInspect(t *testing.T) { + filebeatOTel := integration.NewBeat( + t, + "filebeat-otel", + "../../filebeat.test", + "otel", + ) + + beatsCfgFile := ` +filebeat.inputs: + - type: filestream + id: filestream-input-id + enabled: true + file_identity.native: ~ + prospector.scanner.fingerprint.enabled: false + paths: + - /tmp/log.log +output: + elasticsearch: + hosts: + - localhost:9200 + username: admin + password: testing + index: index +queue.mem.flush.timeout: 0s +setup.template.enabled: false +processors: + - add_host_metadata: ~ + - add_cloud_metadata: ~ + - add_docker_metadata: ~ + - add_kubernetes_metadata: ~ +` + expectedExporter := `exporters: + elasticsearch: + auth: + authenticator: beatsauth + compression: gzip + compression_params: + level: 1 + endpoints: + - http://localhost:9200 + logs_index: index + mapping: + mode: bodymap + max_conns_per_host: 1 + password: testing + retry: + enabled: true + initial_interval: 1s + max_interval: 1m0s + max_retries: 3 + sending_queue: + batch: + flush_timeout: 10s + max_size: 1600 + min_size: 0 + sizer: items + block_on_overflow: true + enabled: true + num_consumers: 1 + queue_size: 3200 + wait_for_result: true + user: admin +extensions: + beatsauth: + idle_connection_timeout: 3s + proxy_disable: false + timeout: 1m30s +` + + expectedReceiver := `receivers: + filebeatreceiver: + filebeat: + inputs: + - enabled: true + file_identity: + native: null + id: filestream-input-id + paths: + - /tmp/log.log + prospector: + scanner: + fingerprint: + enabled: false + type: filestream` + expectedService := `service: + extensions: + - beatsauth + pipelines: + logs: + exporters: + - elasticsearch + receivers: + - filebeatreceiver +` + filebeatOTel.WriteConfigFile(beatsCfgFile) + + filebeatOTel.Start("inspect") + defer filebeatOTel.Stop() + + require.EventuallyWithT(t, func(collect *assert.CollectT) { + out, err := filebeatOTel.ReadStdout() + require.NoError(collect, err) + require.Contains(collect, out, expectedExporter) + require.Contains(collect, out, expectedReceiver) + require.Contains(collect, out, expectedService) + }, 10*time.Second, 500*time.Millisecond, "failed to get output of inspect command") +} + func TestFilebeatOTelDocumentLevelRetries(t *testing.T) { tests := []struct { name string diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go index ad776c757e4f..fcc4131c7539 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer.go @@ -34,10 +34,6 @@ const ( esDocumentIDAttribute = "elasticsearch.document_id" beatNameCtxKey = "beat_name" beatVersionCtxtKey = "beat_version" - // otelComponentIDKey is the key used to store the Beat receiver's component id in the beat event. - otelComponentIDKey = "otelcol.component.id" - // otelComponentKindKey is the key used to store the Beat receiver's component kind in the beat event. This is always "receiver". - otelComponentKindKey = "otelcol.component.kind" ) func init() { @@ -149,15 +145,6 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch) } logRecord.SetObservedTimestamp(observedTimestamp) - if agent, _ := beatEvent.GetValue("agent"); agent != nil { - switch agent := agent.(type) { - case mapstr.M: - agent[otelComponentIDKey] = out.beatInfo.ComponentID - agent[otelComponentKindKey] = "receiver" - beatEvent["agent"] = agent - } - } - otelmap.ConvertNonPrimitive(beatEvent) // if data_stream field is set on beatEvent. Add it to logrecord.Attributes to support dynamic indexing diff --git a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go index 420de1e8aca3..beadfac61098 100644 --- a/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go +++ b/x-pack/libbeat/outputs/otelconsumer/otelconsumer_test.go @@ -268,73 +268,4 @@ func TestPublish(t *testing.T) { assert.Len(t, batch.Signals, 1) assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) }) - t.Run("sets otel specific-fields", func(t *testing.T) { - testCases := []struct { - name string - componentID string - componentKind string - expectedComponentID string - expectedComponentKind string - }{ - { - name: "sets beat component ID", - componentID: "filebeatreceiver/1", - expectedComponentID: "filebeatreceiver/1", - expectedComponentKind: "receiver", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - event := beat.Event{ - Fields: mapstr.M{ - "field": 1, - "agent": mapstr.M{}, - }, - Meta: mapstr.M{ - "_id": "abc123", - }, - } - ch := make(chan plog.Logs, 1) - batch := outest.NewBatch(event) - var countLogs int - otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { - countLogs = countLogs + ld.LogRecordCount() - ch <- ld - return nil - }) - otelConsumer.beatInfo.ComponentID = tc.componentID - err := otelConsumer.Publish(ctx, batch) - assert.NoError(t, err) - assert.Len(t, batch.Signals, 1) - assert.Equal(t, outest.BatchACK, batch.Signals[0].Tag) - assert.Equal(t, len(batch.Events()), countLogs, "all events should be consumed") - log := <-ch - for i := 0; i < log.ResourceLogs().Len(); i++ { - resourceLog := log.ResourceLogs().At(i) - for j := 0; j < resourceLog.ScopeLogs().Len(); j++ { - scopeLog := resourceLog.ScopeLogs().At(j) - for k := 0; k < scopeLog.LogRecords().Len(); k++ { - logRecord := scopeLog.LogRecords().At(k) - body := logRecord.Body().Map() - - // Traverse nested "agent.otelcol.component" structure - agentVal, ok := body.Get("agent") - require.True(t, ok, "expected 'agent' in log body") - - agentMap := agentVal.Map() - idVal, ok := agentMap.Get("otelcol.component.id") - require.True(t, ok, "expected 'agent.otelcol.component.id' in log body") - assert.Equal(t, tc.expectedComponentID, idVal.AsString()) - - kindVal, ok := agentMap.Get("otelcol.component.kind") - require.True(t, ok, "expected 'agent.otelcol.component.kind' in log body") - assert.Equal(t, tc.expectedComponentKind, kindVal.AsString()) - } - } - } - - }) - } - }) } diff --git a/x-pack/metricbeat/mbreceiver/receiver_test.go b/x-pack/metricbeat/mbreceiver/receiver_test.go index 9691c61cde9e..503b0553d548 100644 --- a/x-pack/metricbeat/mbreceiver/receiver_test.go +++ b/x-pack/metricbeat/mbreceiver/receiver_test.go @@ -87,8 +87,8 @@ func TestNewReceiver(t *testing.T) { require.Conditionf(c, func() bool { return len(logs["r1"]) > 0 }, "expected at least one ingest log, got logs: %v", logs["r1"]) - assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") - assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") + assert.Equal(c, "metricbeat", logs["r1"][0].Flatten()["agent.type"], "expected agent.type field in to be 'metricbeat'") + var lastError strings.Builder assert.Conditionf(c, func() bool { return getFromSocket(t, &lastError, monitorSocket, "stats") @@ -204,10 +204,9 @@ func TestMultipleReceivers(t *testing.T) { require.Conditionf(c, func() bool { return len(logs["r1"]) > 0 && len(logs["r2"]) > 0 }, "expected at least one ingest log for each receiver, got logs: %v", logs) - assert.Equal(c, "metricbeatreceiver/r1", logs["r1"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r1 log record") - assert.Equal(c, "receiver", logs["r1"][0].Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in r1 log record") - assert.Equal(c, "metricbeatreceiver/r2", logs["r2"][0].Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in r2 log record") - assert.Equal(c, "receiver", logs["r2"][0].Flatten()["agent.otelcol.component.kind"], "expected otelcol.component.kind field in r2 log record") + assert.Equal(c, "metricbeat", logs["r1"][0].Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in r1") + assert.Equal(c, "metricbeat", logs["r2"][0].Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in r2") + var lastError strings.Builder assert.Conditionf(c, func() bool { tests := []string{monitorSocket1, monitorSocket2} diff --git a/x-pack/metricbeat/tests/integration/otel_test.go b/x-pack/metricbeat/tests/integration/otel_test.go index fb47332e8700..3539c77082c9 100644 --- a/x-pack/metricbeat/tests/integration/otel_test.go +++ b/x-pack/metricbeat/tests/integration/otel_test.go @@ -123,7 +123,7 @@ service: oteltestcol.New(t, configBuffer.String()) - var beatsCfgFile = ` + beatsCfgFile := ` metricbeat: modules: - module: system @@ -206,16 +206,9 @@ http.port: {{.MonitoringPort}} var metricbeatDoc, otelDoc mapstr.M otelDoc = otelDocs.Hits.Hits[0].Source metricbeatDoc = metricbeatDocs.Hits.Hits[0].Source - ignoredFields := []string{ - // only present in beats receivers - "agent.otelcol.component.id", - "agent.otelcol.component.kind", - } - assert.Equal(t, "metricbeatreceiver", otelDoc.Flatten()["agent.otelcol.component.id"], "expected agent.otelcol.component.id field in log record") - assert.Equal(t, "receiver", otelDoc.Flatten()["agent.otelcol.component.kind"], "expected agent.otelcol.component.kind field in log record") - assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.id", "expected agent.otelcol.component.id field not to be present in metricbeat log record") - assert.NotContains(t, metricbeatDoc.Flatten(), "agent.otelcol.component.kind", "expected agent.otelcol.component.kind field not to be present in metricbeat log record") - assertMapstrKeysEqual(t, otelDoc, metricbeatDoc, ignoredFields, "expected documents keys to be equal") + assert.Equal(t, "metricbeat", otelDoc.Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in otel docs") + assert.Equal(t, "metricbeat", metricbeatDoc.Flatten()["agent.type"], "expected agent.type field to be 'metricbeat' in metricbeat docs") + assertMapstrKeysEqual(t, otelDoc, metricbeatDoc, nil, "expected documents keys to be equal") assertMonitoring(t, metricbeatMonitoringPort) } @@ -546,12 +539,7 @@ service: assert.GreaterOrEqualf(ct, r1Docs.Hits.Total.Value, 1, "expected at least 1 log for receiver 1, got %d", r1Docs.Hits.Total.Value) }, 1*time.Minute, 100*time.Millisecond, "expected at least 1 log for each receiver") - ignoredFields := []string{ - // only present in beats receivers - "agent.otelcol.component.id", - "agent.otelcol.component.kind", - } - assertMapstrKeysEqual(t, r0Docs.Hits.Hits[0].Source, r1Docs.Hits.Hits[0].Source, ignoredFields, "expected documents keys to be equal") + assertMapstrKeysEqual(t, r0Docs.Hits.Hits[0].Source, r1Docs.Hits.Hits[0].Source, nil, "expected documents keys to be equal") for _, rec := range otelConfig.Receivers { assertMonitoring(t, rec.MonitoringPort) }