From 0af597560f71016b89c8bc9672d0b7543e17e803 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 3 Dec 2025 16:46:30 -0500 Subject: [PATCH 1/5] [WIP] Update input status when pipeline fails to connect This commit updates the status of V2 Inputs managed by the input-cursor.managedInput when the pipeline fails to connect. One of the instances where the pipeline fails to connect is when invalid processors are defined in the input configuration. --- ...status-when-pipeline-fails-to-connect.yaml | 32 ++++ filebeat/input/v2/input-cursor/input.go | 7 +- .../tests/integration/managerV2_test.go | 154 ++++++++++++++++++ 3 files changed, 192 insertions(+), 1 deletion(-) create mode 100644 changelog/fragments/1764798615-Update-input-status-when-pipeline-fails-to-connect.yaml diff --git a/changelog/fragments/1764798615-Update-input-status-when-pipeline-fails-to-connect.yaml b/changelog/fragments/1764798615-Update-input-status-when-pipeline-fails-to-connect.yaml new file mode 100644 index 000000000000..d16640b5c15f --- /dev/null +++ b/changelog/fragments/1764798615-Update-input-status-when-pipeline-fails-to-connect.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Update input status when pipeline fails to connect + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: filebeat + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/beats/issues/45649 diff --git a/filebeat/input/v2/input-cursor/input.go b/filebeat/input/v2/input-cursor/input.go index 57f0a7d40f06..758fa0a3104d 100644 --- a/filebeat/input/v2/input-cursor/input.go +++ b/filebeat/input/v2/input-cursor/input.go @@ -30,6 +30,7 @@ import ( input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/acker" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" @@ -174,10 +175,14 @@ func (inp *managedInput) runSource( } }() - client, err := pipeline.ConnectWith(beat.ClientConfig{ + client, err := pipeline.ConnectWith(beat.ClientConfig{ // HERE, error connecting to the pipeline EventListener: newInputACKHandler(ctx.Logger), }) if err != nil { + ctx.StatusReporter.UpdateStatus( + status.Failed, + fmt.Sprintf("cannot connect to publishing pipeline: %s", err), + ) return err } defer client.Close() diff --git a/x-pack/filebeat/tests/integration/managerV2_test.go b/x-pack/filebeat/tests/integration/managerV2_test.go index b15eb62a1fb6..544795969eaa 100644 --- a/x-pack/filebeat/tests/integration/managerV2_test.go +++ b/x-pack/filebeat/tests/integration/managerV2_test.go @@ -1054,3 +1054,157 @@ func TestReloadErrorHandling(t *testing.T) { t.Cleanup(server.Stop) } + +func TestFoo(t *testing.T) { + filebeat := NewFilebeat(t) + + integration.WriteLogFile(t, "/tmp/flog.log", 100, false) + + outputUnit := proto.UnitExpected{ + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "discard", + Name: "discard", + Source: integration.RequireNewStruct(t, + map[string]any{ + "type": "discard", + "hosts": []any{"http://localhost:9200"}, + }), + }, + } + + brokenFilestream := proto.UnitExpected{ + Id: "broken-Filestream", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "filestream-input", + Type: "filestream", + Name: "filestream", + Streams: []*proto.Stream{ + { + Id: "filestream-input", + Source: integration.RequireNewStruct(t, map[string]any{ + "enabled": true, + "type": "filestream", + "paths": "/tmp/flog.log", + "processors": []any{ + map[string]any{ + "add_fields": map[string]any{ + "fields_under_root": true, // invalid + "fields": map[string]any{ + "labels": map[string]any{ + "foo": "bar", + }, + }, + }, + }, + }, + }), + }, + }, + }, + } + + brokenCEL := proto.UnitExpected{ + Id: "broken-cel", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "cel-input", + Type: "cel", + Name: "cel", + Streams: []*proto.Stream{ + { + Id: "cel-input", + Source: integration.RequireNewStruct(t, map[string]any{ + "enabled": true, + "type": "cel", + "interval": "1m", + "resource.url": "https://api.ipify.org/?format=text", + "program": `{"events": [{"ip": string(get(state.url).Body)}]}`, + "processors": []any{ + map[string]any{ + "add_fields": map[string]any{ + "fields_under_root": true, // invalid + "fields": map[string]any{ + "labels": map[string]any{ + "foo": "bar", + }, + }, + }, + }, + }, + }), + }, + }, + }, + } + + fmt.Fprint(io.Discard, &brokenFilestream, &brokenCEL) + + finalStateReached := atomic.Bool{} + var units = []*proto.UnitExpected{ + &outputUnit, + // &brokenFilestream, + &brokenCEL, + } + + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + for _, unit := range observed.Units { + t.Logf("ID: %s, State: %s", unit.GetId(), unit.GetState().String()) + } + + brokenCEL.State = proto.State_FAILED + expectedState := []*proto.UnitExpected{ + &outputUnit, + &brokenCEL, + } + if management.DoesStateMatch(observed, expectedState, 0) { + finalStateReached.Store(true) + } + + brokenCEL.State = proto.State_HEALTHY + return &proto.CheckinExpected{ + Units: units, + } + }, + ActionImpl: func(response *proto.ActionResponse) error { return nil }, + } + + server.Port = 3000 + require.NoError(t, server.Start()) + t.Cleanup(server.Stop) + + fmt.Println( + "Connection string:\n", + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + ) + + // c := make(chan any) + // <-c + + if true { + filebeat.Start( + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + ) + + require.Eventually(t, func() bool { + return finalStateReached.Load() + }, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy") + + t.Cleanup(server.Stop) + } +} From bb3828da06b499ce156c4e3406ded31cb42cdedd Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 3 Dec 2025 16:57:28 -0500 Subject: [PATCH 2/5] make the test a table test --- .../tests/integration/managerV2_test.go | 100 ++++++++++-------- 1 file changed, 55 insertions(+), 45 deletions(-) diff --git a/x-pack/filebeat/tests/integration/managerV2_test.go b/x-pack/filebeat/tests/integration/managerV2_test.go index 544795969eaa..bf4d094d451e 100644 --- a/x-pack/filebeat/tests/integration/managerV2_test.go +++ b/x-pack/filebeat/tests/integration/managerV2_test.go @@ -1060,7 +1060,7 @@ func TestFoo(t *testing.T) { integration.WriteLogFile(t, "/tmp/flog.log", 100, false) - outputUnit := proto.UnitExpected{ + outputUnit := &proto.UnitExpected{ Id: "output-unit", Type: proto.UnitType_OUTPUT, ConfigStateIdx: 1, @@ -1078,7 +1078,7 @@ func TestFoo(t *testing.T) { }, } - brokenFilestream := proto.UnitExpected{ + brokenFilestream := &proto.UnitExpected{ Id: "broken-Filestream", Type: proto.UnitType_INPUT, ConfigStateIdx: 1, @@ -1113,7 +1113,7 @@ func TestFoo(t *testing.T) { }, } - brokenCEL := proto.UnitExpected{ + brokenCEL := &proto.UnitExpected{ Id: "broken-cel", Type: proto.UnitType_INPUT, ConfigStateIdx: 1, @@ -1150,61 +1150,71 @@ func TestFoo(t *testing.T) { }, } + // TODO: Remove me fmt.Fprint(io.Discard, &brokenFilestream, &brokenCEL) - finalStateReached := atomic.Bool{} - var units = []*proto.UnitExpected{ - &outputUnit, - // &brokenFilestream, - &brokenCEL, + testCases := map[string]*proto.UnitExpected{ + "cel": brokenCEL, + "filestream": brokenFilestream, } - server := &mock.StubServerV2{ - CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { - for _, unit := range observed.Units { - t.Logf("ID: %s, State: %s", unit.GetId(), unit.GetState().String()) + for name, inputUnit := range testCases { + t.Run(name, func(t *testing.T) { + finalStateReached := atomic.Bool{} + var units = []*proto.UnitExpected{ + outputUnit, + // brokenFilestream, + inputUnit, } - brokenCEL.State = proto.State_FAILED - expectedState := []*proto.UnitExpected{ - &outputUnit, - &brokenCEL, - } - if management.DoesStateMatch(observed, expectedState, 0) { - finalStateReached.Store(true) - } + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + for _, unit := range observed.Units { + t.Logf("ID: %s, State: %s", unit.GetId(), unit.GetState().String()) + } - brokenCEL.State = proto.State_HEALTHY - return &proto.CheckinExpected{ - Units: units, + inputUnit.State = proto.State_FAILED + expectedState := []*proto.UnitExpected{ + outputUnit, + inputUnit, + } + if management.DoesStateMatch(observed, expectedState, 0) { + finalStateReached.Store(true) + } + + inputUnit.State = proto.State_HEALTHY + return &proto.CheckinExpected{ + Units: units, + } + }, + ActionImpl: func(response *proto.ActionResponse) error { return nil }, } - }, - ActionImpl: func(response *proto.ActionResponse) error { return nil }, - } - server.Port = 3000 - require.NoError(t, server.Start()) - t.Cleanup(server.Stop) + server.Port = 3000 + require.NoError(t, server.Start()) + t.Cleanup(server.Stop) - fmt.Println( - "Connection string:\n", - "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), - "-E", "management.enabled=true", - ) + fmt.Println( + "Connection string:\n", + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + ) - // c := make(chan any) - // <-c + // c := make(chan any) + // <-c - if true { - filebeat.Start( - "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), - "-E", "management.enabled=true", - ) + if true { //TODO: remove me + filebeat.Start( + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + ) - require.Eventually(t, func() bool { - return finalStateReached.Load() - }, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy") + require.Eventually(t, func() bool { + return finalStateReached.Load() + }, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy") - t.Cleanup(server.Stop) + t.Cleanup(server.Stop) + } + }) } } From 9e0f49947b94425752c11e85768c30fa441aee3c Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 3 Dec 2025 17:04:11 -0500 Subject: [PATCH 3/5] [WIP] Filestream: set input status to failed when harvester fails --- .../input/filestream/internal/input-logfile/harvester.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index a020360dacd7..2e4a2a2ccb29 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/filebeat/input/filestream/internal/task" inputv2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-concert/ctxtool" ) @@ -204,6 +205,13 @@ func startHarvester( ctx.Logger.Errorf("Harvester crashed with: %+v", err) hg.readers.remove(srcID) } + + if err != nil { + ctx.StatusReporter.UpdateStatus( + status.Failed, + fmt.Sprintf("harvester failed: %s", err), + ) + } }() if restart { From 3f8e13a38550f296d70eae85aa35cf84fc7e4d74 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 3 Dec 2025 17:07:07 -0500 Subject: [PATCH 4/5] Fix lint warnings --- filebeat/input/v2/input-cursor/input.go | 1 + x-pack/filebeat/tests/integration/managerV2_test.go | 1 + 2 files changed, 2 insertions(+) diff --git a/filebeat/input/v2/input-cursor/input.go b/filebeat/input/v2/input-cursor/input.go index 758fa0a3104d..2318e7627562 100644 --- a/filebeat/input/v2/input-cursor/input.go +++ b/filebeat/input/v2/input-cursor/input.go @@ -229,6 +229,7 @@ func newInputACKHandler(log *logp.Logger) beat.EventListener { if n == 0 { return } + //nolint:errcheck // We know it will always work private[last].(*updateOp).Execute(n) }) } diff --git a/x-pack/filebeat/tests/integration/managerV2_test.go b/x-pack/filebeat/tests/integration/managerV2_test.go index bf4d094d451e..9597f8c7fff4 100644 --- a/x-pack/filebeat/tests/integration/managerV2_test.go +++ b/x-pack/filebeat/tests/integration/managerV2_test.go @@ -1194,6 +1194,7 @@ func TestFoo(t *testing.T) { require.NoError(t, server.Start()) t.Cleanup(server.Stop) + //nolint:forbidigo // I'll remove it later fmt.Println( "Connection string:\n", "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), From acd7a2a34c9e5514ec4f75279c939d5a51039380 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 3 Dec 2025 17:18:36 -0500 Subject: [PATCH 5/5] Fix net/input/manager manager/wrapper --- filebeat/input/net/manager.go | 4 +++ .../tests/integration/managerV2_test.go | 34 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/filebeat/input/net/manager.go b/filebeat/input/net/manager.go index 2e5ac1750dde..e8d841ebe4f8 100644 --- a/filebeat/input/net/manager.go +++ b/filebeat/input/net/manager.go @@ -165,6 +165,10 @@ func (w wrapper) Run(ctx v2.Context, pipeline beat.PipelineConnector) (err error m := w.inp.InitMetrics(ctx.ID, ctx.MetricsRegistry, ctx.Logger) if err := w.initWorkers(ctx, pipeline, m); err != nil { logger.Errorf("cannot initialise pipeline workers: %s", err) + ctx.UpdateStatus( + status.Failed, + fmt.Sprintf("cannot initialise workers: %s", err), + ) return fmt.Errorf("cannot initialise pipeline workers: %w", err) } diff --git a/x-pack/filebeat/tests/integration/managerV2_test.go b/x-pack/filebeat/tests/integration/managerV2_test.go index 9597f8c7fff4..7ae8b4f8e91f 100644 --- a/x-pack/filebeat/tests/integration/managerV2_test.go +++ b/x-pack/filebeat/tests/integration/managerV2_test.go @@ -1156,6 +1156,40 @@ func TestFoo(t *testing.T) { testCases := map[string]*proto.UnitExpected{ "cel": brokenCEL, "filestream": brokenFilestream, + "net inputs": { + Id: "broken-tcp", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "tcp-input", + Type: "tcp", + Name: "tcp", + Streams: []*proto.Stream{ + { + Id: "tcp-input", + Source: integration.RequireNewStruct(t, map[string]any{ + "enabled": true, + "type": "tcp", + "host": "localhost:9042", // random port + "processors": []any{ + map[string]any{ + "add_fields": map[string]any{ + "fields_under_root": true, // invalid + "fields": map[string]any{ + "labels": map[string]any{ + "foo": "bar", + }, + }, + }, + }, + }, + }), + }, + }, + }, + }, } for name, inputUnit := range testCases {