Skip to content

Commit 0af5975

Browse files
committed
[WIP] Update input status when pipeline fails to connect
This commit updates the status of V2 Inputs managed by the input-cursor.managedInput when the pipeline fails to connect. One of the instances where the pipeline fails to connect is when invalid processors are defined in the input configuration.
1 parent 56a35a9 commit 0af5975

File tree

3 files changed

+192
-1
lines changed

3 files changed

+192
-1
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: bug-fix
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: Update input status when pipeline fails to connect
15+
16+
# Long description; in case the summary is not enough to describe the change
17+
# this field accommodate a description without length limits.
18+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
19+
#description:
20+
21+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
22+
component: filebeat
23+
24+
# PR URL; optional; the PR number that added the changeset.
25+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
26+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
27+
# Please provide it if you are adding a fragment for a different PR.
28+
#pr: https://github.com/owner/repo/1234
29+
30+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
31+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
32+
issue: https://github.com/elastic/beats/issues/45649

filebeat/input/v2/input-cursor/input.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
input "github.com/elastic/beats/v7/filebeat/input/v2"
3131
"github.com/elastic/beats/v7/libbeat/beat"
3232
"github.com/elastic/beats/v7/libbeat/common/acker"
33+
"github.com/elastic/beats/v7/libbeat/management/status"
3334
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
3435
"github.com/elastic/elastic-agent-libs/logp"
3536
"github.com/elastic/elastic-agent-libs/monitoring"
@@ -174,10 +175,14 @@ func (inp *managedInput) runSource(
174175
}
175176
}()
176177

177-
client, err := pipeline.ConnectWith(beat.ClientConfig{
178+
client, err := pipeline.ConnectWith(beat.ClientConfig{ // HERE, error connecting to the pipeline
178179
EventListener: newInputACKHandler(ctx.Logger),
179180
})
180181
if err != nil {
182+
ctx.StatusReporter.UpdateStatus(
183+
status.Failed,
184+
fmt.Sprintf("cannot connect to publishing pipeline: %s", err),
185+
)
181186
return err
182187
}
183188
defer client.Close()

x-pack/filebeat/tests/integration/managerV2_test.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,3 +1054,157 @@ func TestReloadErrorHandling(t *testing.T) {
10541054

10551055
t.Cleanup(server.Stop)
10561056
}
1057+
1058+
func TestFoo(t *testing.T) {
1059+
filebeat := NewFilebeat(t)
1060+
1061+
integration.WriteLogFile(t, "/tmp/flog.log", 100, false)
1062+
1063+
outputUnit := proto.UnitExpected{
1064+
Id: "output-unit",
1065+
Type: proto.UnitType_OUTPUT,
1066+
ConfigStateIdx: 1,
1067+
State: proto.State_HEALTHY,
1068+
LogLevel: proto.UnitLogLevel_DEBUG,
1069+
Config: &proto.UnitExpectedConfig{
1070+
Id: "default",
1071+
Type: "discard",
1072+
Name: "discard",
1073+
Source: integration.RequireNewStruct(t,
1074+
map[string]any{
1075+
"type": "discard",
1076+
"hosts": []any{"http://localhost:9200"},
1077+
}),
1078+
},
1079+
}
1080+
1081+
brokenFilestream := proto.UnitExpected{
1082+
Id: "broken-Filestream",
1083+
Type: proto.UnitType_INPUT,
1084+
ConfigStateIdx: 1,
1085+
State: proto.State_HEALTHY,
1086+
LogLevel: proto.UnitLogLevel_DEBUG,
1087+
Config: &proto.UnitExpectedConfig{
1088+
Id: "filestream-input",
1089+
Type: "filestream",
1090+
Name: "filestream",
1091+
Streams: []*proto.Stream{
1092+
{
1093+
Id: "filestream-input",
1094+
Source: integration.RequireNewStruct(t, map[string]any{
1095+
"enabled": true,
1096+
"type": "filestream",
1097+
"paths": "/tmp/flog.log",
1098+
"processors": []any{
1099+
map[string]any{
1100+
"add_fields": map[string]any{
1101+
"fields_under_root": true, // invalid
1102+
"fields": map[string]any{
1103+
"labels": map[string]any{
1104+
"foo": "bar",
1105+
},
1106+
},
1107+
},
1108+
},
1109+
},
1110+
}),
1111+
},
1112+
},
1113+
},
1114+
}
1115+
1116+
brokenCEL := proto.UnitExpected{
1117+
Id: "broken-cel",
1118+
Type: proto.UnitType_INPUT,
1119+
ConfigStateIdx: 1,
1120+
State: proto.State_HEALTHY,
1121+
LogLevel: proto.UnitLogLevel_DEBUG,
1122+
Config: &proto.UnitExpectedConfig{
1123+
Id: "cel-input",
1124+
Type: "cel",
1125+
Name: "cel",
1126+
Streams: []*proto.Stream{
1127+
{
1128+
Id: "cel-input",
1129+
Source: integration.RequireNewStruct(t, map[string]any{
1130+
"enabled": true,
1131+
"type": "cel",
1132+
"interval": "1m",
1133+
"resource.url": "https://api.ipify.org/?format=text",
1134+
"program": `{"events": [{"ip": string(get(state.url).Body)}]}`,
1135+
"processors": []any{
1136+
map[string]any{
1137+
"add_fields": map[string]any{
1138+
"fields_under_root": true, // invalid
1139+
"fields": map[string]any{
1140+
"labels": map[string]any{
1141+
"foo": "bar",
1142+
},
1143+
},
1144+
},
1145+
},
1146+
},
1147+
}),
1148+
},
1149+
},
1150+
},
1151+
}
1152+
1153+
fmt.Fprint(io.Discard, &brokenFilestream, &brokenCEL)
1154+
1155+
finalStateReached := atomic.Bool{}
1156+
var units = []*proto.UnitExpected{
1157+
&outputUnit,
1158+
// &brokenFilestream,
1159+
&brokenCEL,
1160+
}
1161+
1162+
server := &mock.StubServerV2{
1163+
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
1164+
for _, unit := range observed.Units {
1165+
t.Logf("ID: %s, State: %s", unit.GetId(), unit.GetState().String())
1166+
}
1167+
1168+
brokenCEL.State = proto.State_FAILED
1169+
expectedState := []*proto.UnitExpected{
1170+
&outputUnit,
1171+
&brokenCEL,
1172+
}
1173+
if management.DoesStateMatch(observed, expectedState, 0) {
1174+
finalStateReached.Store(true)
1175+
}
1176+
1177+
brokenCEL.State = proto.State_HEALTHY
1178+
return &proto.CheckinExpected{
1179+
Units: units,
1180+
}
1181+
},
1182+
ActionImpl: func(response *proto.ActionResponse) error { return nil },
1183+
}
1184+
1185+
server.Port = 3000
1186+
require.NoError(t, server.Start())
1187+
t.Cleanup(server.Stop)
1188+
1189+
fmt.Println(
1190+
"Connection string:\n",
1191+
"-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port),
1192+
"-E", "management.enabled=true",
1193+
)
1194+
1195+
// c := make(chan any)
1196+
// <-c
1197+
1198+
if true {
1199+
filebeat.Start(
1200+
"-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port),
1201+
"-E", "management.enabled=true",
1202+
)
1203+
1204+
require.Eventually(t, func() bool {
1205+
return finalStateReached.Load()
1206+
}, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy")
1207+
1208+
t.Cleanup(server.Stop)
1209+
}
1210+
}

0 commit comments

Comments
 (0)