diff --git a/changelog/fragments/1764798615-Update-input-status-when-pipeline-fails-to-connect.yaml b/changelog/fragments/1764798615-Update-input-status-when-pipeline-fails-to-connect.yaml new file mode 100644 index 000000000000..d16640b5c15f --- /dev/null +++ b/changelog/fragments/1764798615-Update-input-status-when-pipeline-fails-to-connect.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Update input status when pipeline fails to connect + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: filebeat + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/beats/issues/45649 diff --git a/filebeat/input/filestream/internal/input-logfile/harvester.go b/filebeat/input/filestream/internal/input-logfile/harvester.go index a020360dacd7..2e4a2a2ccb29 100644 --- a/filebeat/input/filestream/internal/input-logfile/harvester.go +++ b/filebeat/input/filestream/internal/input-logfile/harvester.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/filebeat/input/filestream/internal/task" inputv2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-concert/ctxtool" ) @@ -204,6 +205,13 @@ func startHarvester( ctx.Logger.Errorf("Harvester crashed with: %+v", err) hg.readers.remove(srcID) } + + if err != nil { + ctx.StatusReporter.UpdateStatus( + status.Failed, + fmt.Sprintf("harvester failed: %s", err), + ) + } }() if restart { diff --git a/filebeat/input/net/manager.go b/filebeat/input/net/manager.go index 2e5ac1750dde..e8d841ebe4f8 100644 --- a/filebeat/input/net/manager.go +++ b/filebeat/input/net/manager.go @@ -165,6 +165,10 @@ func (w wrapper) Run(ctx v2.Context, pipeline beat.PipelineConnector) (err error m := w.inp.InitMetrics(ctx.ID, ctx.MetricsRegistry, ctx.Logger) if err := w.initWorkers(ctx, pipeline, m); err != nil { logger.Errorf("cannot initialise pipeline workers: %s", err) + ctx.UpdateStatus( + status.Failed, + fmt.Sprintf("cannot initialise workers: %s", err), + ) return fmt.Errorf("cannot initialise pipeline workers: %w", err) } diff --git a/filebeat/input/v2/input-cursor/input.go b/filebeat/input/v2/input-cursor/input.go index 57f0a7d40f06..2318e7627562 100644 --- a/filebeat/input/v2/input-cursor/input.go +++ b/filebeat/input/v2/input-cursor/input.go @@ -30,6 +30,7 @@ import ( input "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/acker" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" @@ -174,10 +175,14 @@ func (inp *managedInput) runSource( } }() - client, err := pipeline.ConnectWith(beat.ClientConfig{ + client, err := pipeline.ConnectWith(beat.ClientConfig{ // HERE, error connecting to the pipeline EventListener: newInputACKHandler(ctx.Logger), }) if err != nil { + ctx.StatusReporter.UpdateStatus( + status.Failed, + fmt.Sprintf("cannot connect to publishing pipeline: %s", err), + ) return err } defer client.Close() @@ -224,6 +229,7 @@ func newInputACKHandler(log *logp.Logger) beat.EventListener { if n == 0 { return } + //nolint:errcheck // We know it will always work private[last].(*updateOp).Execute(n) }) } diff --git a/x-pack/filebeat/tests/integration/managerV2_test.go b/x-pack/filebeat/tests/integration/managerV2_test.go index b15eb62a1fb6..7ae8b4f8e91f 100644 --- a/x-pack/filebeat/tests/integration/managerV2_test.go +++ b/x-pack/filebeat/tests/integration/managerV2_test.go @@ -1054,3 +1054,202 @@ func TestReloadErrorHandling(t *testing.T) { t.Cleanup(server.Stop) } + +func TestFoo(t *testing.T) { + filebeat := NewFilebeat(t) + + integration.WriteLogFile(t, "/tmp/flog.log", 100, false) + + outputUnit := &proto.UnitExpected{ + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "discard", + Name: "discard", + Source: integration.RequireNewStruct(t, + map[string]any{ + "type": "discard", + "hosts": []any{"http://localhost:9200"}, + }), + }, + } + + brokenFilestream := &proto.UnitExpected{ + Id: "broken-Filestream", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "filestream-input", + Type: "filestream", + Name: "filestream", + Streams: []*proto.Stream{ + { + Id: "filestream-input", + Source: integration.RequireNewStruct(t, map[string]any{ + "enabled": true, + "type": "filestream", + "paths": "/tmp/flog.log", + "processors": []any{ + map[string]any{ + "add_fields": map[string]any{ + "fields_under_root": true, // invalid + "fields": map[string]any{ + "labels": map[string]any{ + "foo": "bar", + }, + }, + }, + }, + }, + }), + }, + }, + }, + } + + brokenCEL := &proto.UnitExpected{ + Id: "broken-cel", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "cel-input", + Type: "cel", + Name: "cel", + Streams: []*proto.Stream{ + { + Id: "cel-input", + Source: integration.RequireNewStruct(t, map[string]any{ + "enabled": true, + "type": "cel", + "interval": "1m", + "resource.url": "https://api.ipify.org/?format=text", + "program": `{"events": [{"ip": string(get(state.url).Body)}]}`, + "processors": []any{ + map[string]any{ + "add_fields": map[string]any{ + "fields_under_root": true, // invalid + "fields": map[string]any{ + "labels": map[string]any{ + "foo": "bar", + }, + }, + }, + }, + }, + }), + }, + }, + }, + } + + // TODO: Remove me + fmt.Fprint(io.Discard, &brokenFilestream, &brokenCEL) + + testCases := map[string]*proto.UnitExpected{ + "cel": brokenCEL, + "filestream": brokenFilestream, + "net inputs": { + Id: "broken-tcp", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "tcp-input", + Type: "tcp", + Name: "tcp", + Streams: []*proto.Stream{ + { + Id: "tcp-input", + Source: integration.RequireNewStruct(t, map[string]any{ + "enabled": true, + "type": "tcp", + "host": "localhost:9042", // random port + "processors": []any{ + map[string]any{ + "add_fields": map[string]any{ + "fields_under_root": true, // invalid + "fields": map[string]any{ + "labels": map[string]any{ + "foo": "bar", + }, + }, + }, + }, + }, + }), + }, + }, + }, + }, + } + + for name, inputUnit := range testCases { + t.Run(name, func(t *testing.T) { + finalStateReached := atomic.Bool{} + var units = []*proto.UnitExpected{ + outputUnit, + // brokenFilestream, + inputUnit, + } + + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + for _, unit := range observed.Units { + t.Logf("ID: %s, State: %s", unit.GetId(), unit.GetState().String()) + } + + inputUnit.State = proto.State_FAILED + expectedState := []*proto.UnitExpected{ + outputUnit, + inputUnit, + } + if management.DoesStateMatch(observed, expectedState, 0) { + finalStateReached.Store(true) + } + + inputUnit.State = proto.State_HEALTHY + return &proto.CheckinExpected{ + Units: units, + } + }, + ActionImpl: func(response *proto.ActionResponse) error { return nil }, + } + + server.Port = 3000 + require.NoError(t, server.Start()) + t.Cleanup(server.Stop) + + //nolint:forbidigo // I'll remove it later + fmt.Println( + "Connection string:\n", + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + ) + + // c := make(chan any) + // <-c + + if true { //TODO: remove me + filebeat.Start( + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + ) + + require.Eventually(t, func() bool { + return finalStateReached.Load() + }, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy") + + t.Cleanup(server.Stop) + } + }) + } +}