Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Update input status when pipeline fails to connect

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: filebeat

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: https://github.com/owner/repo/1234

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/beats/issues/45649
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/beats/v7/filebeat/input/filestream/internal/task"
inputv2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/ctxtool"
)
Expand Down Expand Up @@ -204,6 +205,13 @@ func startHarvester(
ctx.Logger.Errorf("Harvester crashed with: %+v", err)
hg.readers.remove(srcID)
}

if err != nil {
ctx.StatusReporter.UpdateStatus(
status.Failed,
fmt.Sprintf("harvester failed: %s", err),
)
}
}()

if restart {
Expand Down
4 changes: 4 additions & 0 deletions filebeat/input/net/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
)

// Input is the interface for net inputs
// go:generate moq -out inputmock_test.go . Input

Check failure on line 40 in filebeat/input/net/manager.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA9009: ineffectual compiler directive due to extraneous space: "// go:generate moq -out inputmock_test.go . Input" (staticcheck)
type Input interface {
// Returns the input name
Name() string
Expand Down Expand Up @@ -165,6 +165,10 @@
m := w.inp.InitMetrics(ctx.ID, ctx.MetricsRegistry, ctx.Logger)
if err := w.initWorkers(ctx, pipeline, m); err != nil {
logger.Errorf("cannot initialise pipeline workers: %s", err)
ctx.UpdateStatus(
status.Failed,
fmt.Sprintf("cannot initialise workers: %s", err),
)
return fmt.Errorf("cannot initialise pipeline workers: %w", err)
}

Expand Down
8 changes: 7 additions & 1 deletion filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand Down Expand Up @@ -174,10 +175,14 @@ func (inp *managedInput) runSource(
}
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
client, err := pipeline.ConnectWith(beat.ClientConfig{ // HERE, error connecting to the pipeline
EventListener: newInputACKHandler(ctx.Logger),
})
if err != nil {
ctx.StatusReporter.UpdateStatus(
status.Failed,
fmt.Sprintf("cannot connect to publishing pipeline: %s", err),
)
return err
}
defer client.Close()
Expand Down Expand Up @@ -224,6 +229,7 @@ func newInputACKHandler(log *logp.Logger) beat.EventListener {
if n == 0 {
return
}
//nolint:errcheck // We know it will always work
private[last].(*updateOp).Execute(n)
})
}
199 changes: 199 additions & 0 deletions x-pack/filebeat/tests/integration/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1054,3 +1054,202 @@ func TestReloadErrorHandling(t *testing.T) {

t.Cleanup(server.Stop)
}

func TestFoo(t *testing.T) {
filebeat := NewFilebeat(t)

integration.WriteLogFile(t, "/tmp/flog.log", 100, false)

outputUnit := &proto.UnitExpected{
Id: "output-unit",
Type: proto.UnitType_OUTPUT,
ConfigStateIdx: 1,
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_DEBUG,
Config: &proto.UnitExpectedConfig{
Id: "default",
Type: "discard",
Name: "discard",
Source: integration.RequireNewStruct(t,
map[string]any{
"type": "discard",
"hosts": []any{"http://localhost:9200"},
}),
},
}

brokenFilestream := &proto.UnitExpected{
Id: "broken-Filestream",
Type: proto.UnitType_INPUT,
ConfigStateIdx: 1,
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_DEBUG,
Config: &proto.UnitExpectedConfig{
Id: "filestream-input",
Type: "filestream",
Name: "filestream",
Streams: []*proto.Stream{
{
Id: "filestream-input",
Source: integration.RequireNewStruct(t, map[string]any{
"enabled": true,
"type": "filestream",
"paths": "/tmp/flog.log",
"processors": []any{
map[string]any{
"add_fields": map[string]any{
"fields_under_root": true, // invalid
"fields": map[string]any{
"labels": map[string]any{
"foo": "bar",
},
},
},
},
},
}),
},
},
},
}

brokenCEL := &proto.UnitExpected{
Id: "broken-cel",
Type: proto.UnitType_INPUT,
ConfigStateIdx: 1,
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_DEBUG,
Config: &proto.UnitExpectedConfig{
Id: "cel-input",
Type: "cel",
Name: "cel",
Streams: []*proto.Stream{
{
Id: "cel-input",
Source: integration.RequireNewStruct(t, map[string]any{
"enabled": true,
"type": "cel",
"interval": "1m",
"resource.url": "https://api.ipify.org/?format=text",
"program": `{"events": [{"ip": string(get(state.url).Body)}]}`,
"processors": []any{
map[string]any{
"add_fields": map[string]any{
"fields_under_root": true, // invalid
"fields": map[string]any{
"labels": map[string]any{
"foo": "bar",
},
},
},
},
},
}),
},
},
},
}

// TODO: Remove me
fmt.Fprint(io.Discard, &brokenFilestream, &brokenCEL)

testCases := map[string]*proto.UnitExpected{
"cel": brokenCEL,
"filestream": brokenFilestream,
"net inputs": {
Id: "broken-tcp",
Type: proto.UnitType_INPUT,
ConfigStateIdx: 1,
State: proto.State_HEALTHY,
LogLevel: proto.UnitLogLevel_DEBUG,
Config: &proto.UnitExpectedConfig{
Id: "tcp-input",
Type: "tcp",
Name: "tcp",
Streams: []*proto.Stream{
{
Id: "tcp-input",
Source: integration.RequireNewStruct(t, map[string]any{
"enabled": true,
"type": "tcp",
"host": "localhost:9042", // random port
"processors": []any{
map[string]any{
"add_fields": map[string]any{
"fields_under_root": true, // invalid
"fields": map[string]any{
"labels": map[string]any{
"foo": "bar",
},
},
},
},
},
}),
},
},
},
},
}

for name, inputUnit := range testCases {
t.Run(name, func(t *testing.T) {
finalStateReached := atomic.Bool{}
var units = []*proto.UnitExpected{
outputUnit,
// brokenFilestream,
inputUnit,
}

server := &mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
for _, unit := range observed.Units {
t.Logf("ID: %s, State: %s", unit.GetId(), unit.GetState().String())
}

inputUnit.State = proto.State_FAILED
expectedState := []*proto.UnitExpected{
outputUnit,
inputUnit,
}
if management.DoesStateMatch(observed, expectedState, 0) {
finalStateReached.Store(true)
}

inputUnit.State = proto.State_HEALTHY
return &proto.CheckinExpected{
Units: units,
}
},
ActionImpl: func(response *proto.ActionResponse) error { return nil },
}

server.Port = 3000
require.NoError(t, server.Start())
t.Cleanup(server.Stop)

//nolint:forbidigo // I'll remove it later
fmt.Println(
"Connection string:\n",
"-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port),
"-E", "management.enabled=true",
)

// c := make(chan any)
// <-c

if true { //TODO: remove me
filebeat.Start(
"-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port),
"-E", "management.enabled=true",
)

require.Eventually(t, func() bool {
return finalStateReached.Load()
}, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy")

t.Cleanup(server.Stop)
}
})
}
}
Loading