diff --git a/changelog/fragments/1764175228-report-crashing-otel-process-cleanly-with-proper-status-reporting.yaml b/changelog/fragments/1764175228-report-crashing-otel-process-cleanly-with-proper-status-reporting.yaml new file mode 100644 index 00000000000..9fe5dba2c84 --- /dev/null +++ b/changelog/fragments/1764175228-report-crashing-otel-process-cleanly-with-proper-status-reporting.yaml @@ -0,0 +1,45 @@ +# REQUIRED +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# REQUIRED for all kinds +# Change summary; a 80ish characters long description of the change. +summary: report crashing otel process cleanly with proper status reporting + +# REQUIRED for breaking-change, deprecation, known-issue +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# description: + +# REQUIRED for breaking-change, deprecation, known-issue +# impact: + +# REQUIRED for breaking-change, deprecation, known-issue +# action: + +# REQUIRED for all kinds +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: + +# AUTOMATED +# OPTIONAL to manually add other PR URLs +# PR URL: A link the PR that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +# pr: https://github.com/owner/repo/1234 + +# AUTOMATED +# OPTIONAL to manually add other issue URLs +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +# issue: https://github.com/owner/repo/1234 diff --git a/internal/pkg/otel/manager/common.go b/internal/pkg/otel/manager/common.go index e4c9542a0cf..bb32df65774 100644 --- a/internal/pkg/otel/manager/common.go +++ b/internal/pkg/otel/manager/common.go @@ -9,8 +9,14 @@ import ( "errors" "fmt" "net" + "strings" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/otelcol" + "go.opentelemetry.io/collector/pipeline" ) // for testing purposes @@ -80,3 +86,124 @@ func findRandomTCPPorts(count int) (ports []int, err error) { return ports, err } + +// otelConfigToStatus converts the `cfg` to `status.AggregateStatus` using the reported error. +// +// The flow of this function comes from https://github.com/open-telemetry/opentelemetry-collector/blob/main/service/internal/graph/graph.go +// It's a much simpler version, but follows the same for loop ordering and building of connectors of the internal +// graph system that OTEL uses to build its component graph. +func otelConfigToStatus(cfg *confmap.Conf, err error) (*status.AggregateStatus, error) { + // marshall into config + var c otelcol.Config + if unmarshalErr := cfg.Unmarshal(&c); unmarshalErr != nil { + return nil, fmt.Errorf("could not unmarshal config: %w", unmarshalErr) + } + + // should at least define a single pipeline + if len(c.Service.Pipelines) == 0 { + return nil, fmt.Errorf("no pipelines defined") + } + + // aggregators are used to create the overall status structure + // aggGeneric is used to for a generic aggregator status where all instances get the same error + // aggSpecific is used to provide status to the specific instance that caused the error + // aggSpecific is only used if matchOccurred is true + aggGeneric := status.NewAggregator(status.PriorityPermanent) + aggSpecific := status.NewAggregator(status.PriorityPermanent) + matchOccurred := false + + // extensions + for _, id := range c.Service.Extensions { + instanceID := componentstatus.NewInstanceID(id, component.KindExtension) + aggGeneric.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err)) + if recordSpecificErr(aggSpecific, instanceID, err) { + matchOccurred = true + } + } + + // track connectors + connectors := make(map[component.ID]struct{}) + connectorsAsReceiver := make(map[component.ID][]pipeline.ID) + connectorsAsExporter := make(map[component.ID][]pipeline.ID) + + // pipelines + for pipelineID, pipelineCfg := range c.Service.Pipelines { + for _, recvID := range pipelineCfg.Receivers { + instanceID := componentstatus.NewInstanceID(recvID, component.KindReceiver, pipelineID) + _, isConnector := c.Connectors[recvID] + if isConnector { + connectors[recvID] = struct{}{} + connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID) + } + aggGeneric.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err)) + if recordSpecificErr(aggSpecific, instanceID, err) { + matchOccurred = true + } + } + for _, procID := range pipelineCfg.Processors { + instanceID := componentstatus.NewInstanceID(procID, component.KindProcessor, pipelineID) + aggGeneric.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err)) + if recordSpecificErr(aggSpecific, instanceID, err) { + matchOccurred = true + } + } + for _, exporterID := range pipelineCfg.Exporters { + instanceID := componentstatus.NewInstanceID(exporterID, component.KindExporter, pipelineID) + _, isConnector := c.Connectors[exporterID] + if isConnector { + connectors[exporterID] = struct{}{} + connectorsAsExporter[exporterID] = append(connectorsAsExporter[exporterID], pipelineID) + } + aggGeneric.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err)) + if recordSpecificErr(aggSpecific, instanceID, err) { + matchOccurred = true + } + } + } + + // connectors + for connID := range connectors { + extraMatchStr := fmt.Sprintf("connector %q used as", connID) + for _, eID := range connectorsAsExporter[connID] { + for _, rID := range connectorsAsReceiver[connID] { + instanceID := componentstatus.NewInstanceID( + connID, component.KindConnector, eID, rID, + ) + aggGeneric.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err)) + if recordSpecificErr(aggSpecific, instanceID, err, extraMatchStr) { + matchOccurred = true + } + } + } + } + + if matchOccurred { + // specific for the matched error + aggStatus, _ := aggSpecific.AggregateStatus(status.ScopeAll, status.Verbose) + return aggStatus, nil + } + // no match found so generic failed on all instances + aggStatus, _ := aggGeneric.AggregateStatus(status.ScopeAll, status.Verbose) + return aggStatus, nil +} + +func recordSpecificErr(agg *status.Aggregator, instanceID *componentstatus.InstanceID, err error, extraMatchStrs ...string) bool { + forIDStr := fmt.Sprintf("for id: %q", instanceID.ComponentID().String()) + failedMatchStr := fmt.Sprintf("failed to start %q %s:", instanceID.ComponentID().String(), strings.ToLower(instanceID.Kind().String())) + if strings.Contains(err.Error(), forIDStr) || strings.Contains(err.Error(), failedMatchStr) { + // specific so this instance gets the reported error + agg.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err)) + return true + } + // extra matchers + for _, matchStr := range extraMatchStrs { + if strings.Contains(err.Error(), matchStr) { + // specific so this instance gets the reported error + agg.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err)) + return true + } + } + // not specific to this instance, so we record this one as starting + agg.RecordStatus(instanceID, componentstatus.NewEvent(componentstatus.StatusStarting)) + return false +} diff --git a/internal/pkg/otel/manager/execution_subprocess.go b/internal/pkg/otel/manager/execution_subprocess.go index 798dc62e095..08efb61189d 100644 --- a/internal/pkg/otel/manager/execution_subprocess.go +++ b/internal/pkg/otel/manager/execution_subprocess.go @@ -11,6 +11,7 @@ import ( "fmt" "os" "os/exec" + "sync" "time" "github.com/gofrs/uuid/v5" @@ -110,9 +111,11 @@ func (r *subprocessExecution) startCollector(ctx context.Context, baseLogger *lo return nil, fmt.Errorf("failed to marshal config to yaml: %w", err) } - stdOut := runtimeLogger.NewLogWriterWithDefaults(baseLogger.Core(), zapcore.Level(r.logLevel)) + stdOutLast := newZapLast(baseLogger.Core()) + stdOut := runtimeLogger.NewLogWriterWithDefaults(stdOutLast, zapcore.Level(r.logLevel)) // info level for stdErr because by default collector writes to stderr - stdErr := runtimeLogger.NewLogWriterWithDefaults(baseLogger.Core(), zapcore.Level(r.logLevel)) + stdErrLast := newZapLast(baseLogger.Core()) + stdErr := runtimeLogger.NewLogWriterWithDefaults(stdErrLast, zapcore.Level(r.logLevel)) procCtx, procCtxCancel := context.WithCancel(ctx) env := os.Environ() @@ -220,7 +223,18 @@ func (r *subprocessExecution) startCollector(ctx context.Context, baseLogger *lo // report nil error so that the caller can be notified that the process has exited without error r.reportErrFn(ctx, processErrCh, nil) } else { - r.reportErrFn(ctx, processErrCh, fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String())) + var procReportErr error + if stdErrLast.Last().Message != "" { + // use stderr message as the error + procReportErr = errors.New(stdErrLast.Last().Message) + } else if stdOutLast.Last().Message != "" { + // use last stdout message as the error + procReportErr = errors.New(stdOutLast.Last().Message) + } else { + // neither case use standard process error + procReportErr = fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String()) + } + r.reportErrFn(ctx, processErrCh, procReportErr) } return } @@ -338,3 +352,33 @@ func (s *procHandle) Stop(waitTime time.Duration) { case <-s.processDoneCh: } } + +type zapWriter interface { + Write(zapcore.Entry, []zapcore.Field) error +} +type zapLast struct { + wrapped zapWriter + last zapcore.Entry + mx sync.Mutex +} + +func newZapLast(w zapWriter) *zapLast { + return &zapLast{ + wrapped: w, + } +} + +// Write stores the most recent log entry. +func (z *zapLast) Write(entry zapcore.Entry, fields []zapcore.Field) error { + z.mx.Lock() + z.last = entry + z.mx.Unlock() + return z.wrapped.Write(entry, fields) +} + +// Last returns the last log entry. +func (z *zapLast) Last() zapcore.Entry { + z.mx.Lock() + defer z.mx.Unlock() + return z.last +} diff --git a/internal/pkg/otel/manager/manager.go b/internal/pkg/otel/manager/manager.go index af41ba0a8cd..8ae59136599 100644 --- a/internal/pkg/otel/manager/manager.go +++ b/internal/pkg/otel/manager/manager.go @@ -69,7 +69,11 @@ type OTelManager struct { // baseLogger is the base logger for the otel collector, and doesn't include any agent-specific fields. baseLogger *logger.Logger logger *logger.Logger - errCh chan error + + // errCh should only be used to send critical errors that will mark the entire elastic-agent as failed + // if it's an issue with starting or running the collector those should not be critical errors, instead + // they should be reported as failed components to the elastic-agent + errCh chan error // Agent info and monitoring config getter for otel config generation agentInfo info.Agent @@ -220,18 +224,20 @@ func (m *OTelManager) Run(ctx context.Context) error { continue } + // at this point no critical errors are occurring + // any issues starting the collector are reporting in the status + reportErr(ctx, m.errCh, nil) + newRetries := m.recoveryRetries.Add(1) m.logger.Infof("collector recovery restarting, total retries: %d", newRetries) m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) if err != nil { - reportErr(ctx, m.errCh, err) + // report a startup error (this gets reported as status) + m.reportStartupErr(ctx, err) // reset the restart timer to the next backoff recoveryDelay := m.recoveryTimer.ResetNext() m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err) - } else { - reportErr(ctx, m.errCh, nil) } - case err = <-m.collectorRunErr: m.recoveryTimer.Stop() if err == nil { @@ -241,12 +247,12 @@ func (m *OTelManager) Run(ctx context.Context) error { m.proc = nil } + // no critical error from this point forward + reportErr(ctx, m.errCh, nil) + if m.mergedCollectorCfg == nil { // no configuration then the collector should not be // running. - // ensure that the coordinator knows that there is no error - // as the collector is not running anymore - reportErr(ctx, m.errCh, nil) continue } @@ -256,17 +262,11 @@ func (m *OTelManager) Run(ctx context.Context) error { // provided and the collector stopped with a clean exit m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh) if err != nil { - // failed to create the collector (this is different then - // it's failing to run). we do not retry creation on failure - // as it will always fail. A new configuration is required for - // it not to fail (a new configuration will result in the retry) - reportErr(ctx, m.errCh, err) + // report a startup error (this gets reported as status) + m.reportStartupErr(ctx, err) // reset the restart timer to the next backoff recoveryDelay := m.recoveryTimer.ResetNext() m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err) - } else { - // all good at the moment (possible that it will fail) - reportErr(ctx, m.errCh, nil) } } else { // error occurred while running the collector, this occurs in the @@ -281,7 +281,8 @@ func (m *OTelManager) Run(ctx context.Context) error { } // pass the error to the errCh so the coordinator, unless it's a cancel error if !errors.Is(err, context.Canceled) { - reportErr(ctx, m.errCh, err) + // report a startup error (this gets reported as status) + m.reportStartupErr(ctx, err) // reset the restart timer to the next backoff recoveryDelay := m.recoveryTimer.ResetNext() m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err) @@ -295,6 +296,7 @@ func (m *OTelManager) Run(ctx context.Context) error { m.recoveryRetries.Store(0) mergedCfg, err := buildMergedConfig(cfgUpdate, m.agentInfo, m.beatMonitoringConfigGetter, m.baseLogger) if err != nil { + // critical error, merging the configuration should always work reportErr(ctx, m.errCh, err) continue } @@ -339,7 +341,12 @@ func (m *OTelManager) Run(ctx context.Context) error { case otelStatus := <-collectorStatusCh: err = m.reportOtelStatusUpdate(ctx, otelStatus) if err != nil { + // critical error and not handling the status update correctly + // can't properly report status if this fails, so we report it as critical reportErr(ctx, m.errCh, err) + } else { + // status update was able to be reporting clear any critical error + reportErr(ctx, m.errCh, nil) } } } @@ -555,6 +562,25 @@ func (m *OTelManager) handleOtelStatusUpdate(otelStatus *status.AggregateStatus) return m.processComponentStates(componentStates), nil } +// reportStartupErr maps this error to the *status.AggregateStatus. +// this is done by parsing the `m.mergedCollectorCfg` and converting it into the best effort *status.AggregateStatus. +func (m *OTelManager) reportStartupErr(ctx context.Context, err error) { + criticalErr := func(err error) error { + otelStatus, err := otelConfigToStatus(m.mergedCollectorCfg, err) + if err != nil { + return err + } + return m.reportOtelStatusUpdate(ctx, otelStatus) + }(err) + if criticalErr != nil { + // critical error occurred + reportErr(ctx, m.errCh, fmt.Errorf("failed to report statup error: %w", criticalErr)) + } else { + // no error reporting (clear critical) + reportErr(ctx, m.errCh, nil) + } +} + // reportOtelStatusUpdate processes status updates from the underlying otel collector and reports separate collector // and component state updates to the external watch channels. func (m *OTelManager) reportOtelStatusUpdate(ctx context.Context, otelStatus *status.AggregateStatus) error { diff --git a/internal/pkg/otel/manager/manager_test.go b/internal/pkg/otel/manager/manager_test.go index b38be8dccf6..fe997296f48 100644 --- a/internal/pkg/otel/manager/manager_test.go +++ b/internal/pkg/otel/manager/manager_test.go @@ -230,6 +230,30 @@ func (e *EventListener) EnsureHealthy(t *testing.T, u time.Time) { }, 60*time.Second, 1*time.Second, "otel collector never got healthy") } +// EnsureFatal ensures that the OTelManager is fatal by checking the latest error and status. +func (e *EventListener) EnsureFatal(t *testing.T, u time.Time, extraT ...func(collectT *assert.CollectT, latestErr *EventTime[error], latestStatus *EventTime[*status.AggregateStatus])) { + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + e.mtx.Lock() + latestErr := e.err + latestStatus := e.collectorStatus + e.mtx.Unlock() + + // we expect to have a reported error which is nil and a reported status which is StatusOK + require.NotNil(collect, latestErr) + assert.Nil(collect, latestErr.Value()) + assert.False(collect, latestErr.Before(u)) + require.NotNil(collect, latestStatus) + require.NotNil(collect, latestStatus.Value()) + assert.False(collect, latestStatus.Before(u)) + require.Equal(collect, componentstatus.StatusFatalError, latestStatus.Value().Status()) + + // extra checks + for _, et := range extraT { + et(collect, latestErr, latestStatus) + } + }, 60*time.Second, 1*time.Second, "otel collector never fatal") +} + // EnsureOffWithoutError ensures that the OTelManager is off without an error by checking the latest error and status. func (e *EventListener) EnsureOffWithoutError(t *testing.T, u time.Time) { require.EventuallyWithT(t, func(collect *assert.CollectT) { @@ -423,7 +447,7 @@ func TestOTelManager_Run(t *testing.T) { }, }, { - name: "subprocess collector panics", + name: "subprocess collector panics restarts", execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0) }, @@ -435,7 +459,6 @@ func TestOTelManager_Run(t *testing.T) { _ = os.Unsetenv("TEST_SUPERVISED_COLLECTOR_PANIC") }) - // ensure that it got healthy cfg := confmap.NewFromStringMap(testConfig) m.Update(cfg, nil) @@ -458,6 +481,47 @@ func TestOTelManager_Run(t *testing.T) { assert.GreaterOrEqual(t, uint32(3), seenRecoveredTimes, "recovery retries should be 3") }, }, + { + name: "subprocess collector panics reports fatal", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0) + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + // panic instantly always + err := os.Setenv("TEST_SUPERVISED_COLLECTOR_PANIC", "0s") + require.NoError(t, err, "failed to set TEST_SUPERVISED_COLLECTOR_PANIC env var") + t.Cleanup(func() { + _ = os.Unsetenv("TEST_SUPERVISED_COLLECTOR_PANIC") + }) + + cfg := confmap.NewFromStringMap(testConfig) + m.Update(cfg, nil) + + // ensure that it reports a generic fatal error for all components, a panic cannot be assigned to + // a specific component in the collector + e.EnsureFatal(t, time.Now().Add(time.Second), func(collectT *assert.CollectT, _ *EventTime[error], latestStatus *EventTime[*status.AggregateStatus]) { + status := latestStatus.Value() + + // healthcheck auto added + extensions, ok := status.ComponentStatusMap["extensions"] + require.True(collectT, ok, "extensions should be present") + assert.Equal(collectT, extensions.Status(), componentstatus.StatusFatalError) + + metrics, ok := status.ComponentStatusMap["pipeline:metrics"] + require.True(collectT, ok, "pipeline metrics should be present") + assert.Equal(collectT, metrics.Status(), componentstatus.StatusFatalError) + + logs, ok := status.ComponentStatusMap["pipeline:logs"] + require.True(collectT, ok, "pipeline logs should be present") + assert.Equal(collectT, logs.Status(), componentstatus.StatusFatalError) + + traces, ok := status.ComponentStatusMap["pipeline:traces"] + require.True(collectT, ok, "pipeline traces should be present") + assert.Equal(collectT, traces.Status(), componentstatus.StatusFatalError) + }) + }, + }, { name: "subprocess collector killed if delayed and manager is stopped", execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { @@ -600,7 +664,7 @@ func TestOTelManager_Run(t *testing.T) { }, }, { - name: "subprocess collector invalid config", + name: "subprocess collector empty config", execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0) }, @@ -610,7 +674,13 @@ func TestOTelManager_Run(t *testing.T) { // Errors channel is non-blocking, should be able to send an Update that causes an error multiple // times without it blocking on sending over the errCh. for range 3 { - cfg := confmap.New() // invalid config + // empty config + // + // this is really validating a flow that is not possible with the elastic-agent + // if the OTEL configuration is determined to be empty then it will not be ran + // + // this does give a good test of a truly invalid configuration + cfg := confmap.New() // empty config m.Update(cfg, nil) // delay between updates to ensure the collector will have to fail @@ -640,6 +710,235 @@ func TestOTelManager_Run(t *testing.T) { assert.Error(t, err, "otel manager should have returned an error") }, }, + { + name: "subprocess collector invalid receivers/exporters", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0) + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + // not valid receivers/exporters + // + // this needs to be reported as status errors + cfg := confmap.NewFromStringMap(map[string]interface{}{ + "receivers": map[string]interface{}{ + "invalid_receiver": map[string]interface{}{}, + }, + "exporters": map[string]interface{}{ + "invalid_exporter": map[string]interface{}{}, + }, + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "receivers": []string{"invalid_receiver"}, + "exporters": []string{"invalid_exporter"}, + }, + }, + }, + }) + m.Update(cfg, nil) + e.EnsureFatal(t, time.Now().Add(time.Second), func(collectT *assert.CollectT, _ *EventTime[error], latestStatus *EventTime[*status.AggregateStatus]) { + status := latestStatus.Value() + + // healthcheck auto added + _, ok := status.ComponentStatusMap["extensions"] + require.True(collectT, ok, "extensions should be present") + + traces, ok := status.ComponentStatusMap["pipeline:traces"] + require.True(collectT, ok, "pipeline traces should be present") + assert.Equal(collectT, traces.Status(), componentstatus.StatusFatalError) + + exporter, ok := traces.ComponentStatusMap["exporter:invalid_exporter"] + require.True(collectT, ok, "exporter should be present") + receiver, ok := traces.ComponentStatusMap["receiver:invalid_receiver"] + require.True(collectT, ok, "receiver should be present") + + // don't want this test to rely on OTEL to report in a specific order, so it checks + // both ways either invalid_receiver or invalid_exporter is reported as failed + if exporter.Status() == componentstatus.StatusFatalError { + assert.Equal(collectT, exporter.Status(), componentstatus.StatusFatalError) // already true, just makes it clear + assert.Equal(collectT, receiver.Status(), componentstatus.StatusStarting) + } else { + assert.Equal(collectT, receiver.Status(), componentstatus.StatusFatalError) + assert.Equal(collectT, exporter.Status(), componentstatus.StatusStarting) + } + }) + }, + }, + { + name: "subprocess collector failing to start receiver", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0) + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + // unable to start receiver + // + // this needs to be reported as status errors + cfg := confmap.NewFromStringMap(map[string]interface{}{ + "receivers": map[string]interface{}{ + "otlp": map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{ + "endpoint": "nonexistent.invalid:4317", // not valid domain + }, + }, + }, + }, + "exporters": map[string]interface{}{ + "debug": map[string]interface{}{ + "verbosity": "detailed", + }, + }, + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces": map[string]interface{}{ + "receivers": []string{"otlp"}, + "exporters": []string{"debug"}, + }, + }, + }, + }) + m.Update(cfg, nil) + e.EnsureFatal(t, time.Now().Add(time.Second), func(collectT *assert.CollectT, _ *EventTime[error], latestStatus *EventTime[*status.AggregateStatus]) { + status := latestStatus.Value() + + // healthcheck auto added + _, ok := status.ComponentStatusMap["extensions"] + require.True(collectT, ok, "extensions should be present") + + traces, ok := status.ComponentStatusMap["pipeline:traces"] + require.True(collectT, ok, "pipeline traces should be present") + assert.Equal(collectT, traces.Status(), componentstatus.StatusFatalError) + + exporter, ok := traces.ComponentStatusMap["exporter:debug"] + require.True(collectT, ok, "debug exporter should be present") + receiver, ok := traces.ComponentStatusMap["receiver:otlp"] + require.True(collectT, ok, "otlp receiver should be present") + + assert.Equal(collectT, receiver.Status(), componentstatus.StatusFatalError) + assert.Equal(collectT, exporter.Status(), componentstatus.StatusStarting) + }) + }, + }, + { + name: "subprocess collector failing complex", + execModeFn: func(collectorRunErr chan error) (collectorExecution, error) { + return newSubprocessExecution(logp.DebugLevel, testBinary, 0, 0) + }, + restarter: newRecoveryBackoff(100*time.Nanosecond, 10*time.Second, time.Minute), + testFn: func(t *testing.T, m *OTelManager, e *EventListener, exec *testExecution, managerCtxCancel context.CancelFunc, collectorRunErr chan error) { + // complex configuration that validates that otelConfigToStatus is able to compute a valid + // aggregated status model of the failure + // + // this needs to be reported as status errors + cfg := confmap.NewFromStringMap(map[string]interface{}{ + "receivers": map[string]interface{}{ + "otlp/1": map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{ + "endpoint": "localhost:17777", // same as below (results in failure) + }, + }, + }, + "otlp/2": map[string]interface{}{ + "protocols": map[string]interface{}{ + "grpc": map[string]interface{}{ + "endpoint": "localhost:17777", // same as above (results in failure) + }, + }, + }, + }, + "processors": map[string]interface{}{ + "batch/1": map[string]interface{}{}, + "batch/2": map[string]interface{}{}, + "batch/3": map[string]interface{}{}, + }, + "connectors": map[string]interface{}{ + "forward": map[string]interface{}{}, + }, + "exporters": map[string]interface{}{ + "debug/1": map[string]interface{}{ + "verbosity": "detailed", + }, + "debug/2": map[string]interface{}{ + "verbosity": "detailed", + }, + "debug/3": map[string]interface{}{ + "verbosity": "detailed", + }, + }, + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "traces/1": map[string]interface{}{ + "receivers": []string{"otlp/1"}, + "processors": []string{"batch/1"}, + "exporters": []string{"forward", "debug/1"}, + }, + "traces/2": map[string]interface{}{ + "receivers": []string{"otlp/2"}, + "processors": []string{"batch/2"}, + "exporters": []string{"forward", "debug/2"}, + }, + "traces/3": map[string]interface{}{ + "receivers": []string{"forward"}, + "processors": []string{"batch/3"}, + "exporters": []string{"debug/3"}, + }, + }, + }, + }) + m.Update(cfg, nil) + e.EnsureFatal(t, time.Now().Add(time.Second), func(collectT *assert.CollectT, _ *EventTime[error], latestStatus *EventTime[*status.AggregateStatus]) { + status := latestStatus.Value() + + // healthcheck auto added + _, ok := status.ComponentStatusMap["extensions"] + require.True(collectT, ok, "extensions should be present") + + // all traces present + traces1, ok := status.ComponentStatusMap["pipeline:traces/1"] + require.True(collectT, ok, "pipeline traces/1 should be present") + traces2, ok := status.ComponentStatusMap["pipeline:traces/2"] + require.True(collectT, ok, "pipeline traces/2 should be present") + traces3, ok := status.ComponentStatusMap["pipeline:traces/3"] + require.True(collectT, ok, "pipeline traces/3 should be present") + + // traces/3 should always be marked starting, it doesn't have a receiver that should fail + assert.Equal(collectT, traces3.Status(), componentstatus.StatusStarting) + + // connector should be present + connector1, ok := traces1.ComponentStatusMap["connector:forward"] + require.True(collectT, ok, "connector:forward should be present in traces/1") + assert.Equal(collectT, connector1.Status(), componentstatus.StatusStarting) + connector2, ok := traces1.ComponentStatusMap["connector:forward"] + require.True(collectT, ok, "connector:forward should be present in traces/2") + assert.Equal(collectT, connector2.Status(), componentstatus.StatusStarting) + connector3, ok := traces1.ComponentStatusMap["connector:forward"] + require.True(collectT, ok, "connector:forward should be present in traces/3") + assert.Equal(collectT, connector3.Status(), componentstatus.StatusStarting) + + // don't assume the order in which OTEL starts the components, + // the checks below ensure no specific order + + if traces1.Status() == componentstatus.StatusStarting { + // traces2 should be failing then + assert.Equal(collectT, traces2.Status(), componentstatus.StatusFatalError) + + receiver, ok := traces2.ComponentStatusMap["receiver:otlp/2"] + require.True(collectT, ok, "otlp/2 receiver should be present") + assert.Equal(collectT, receiver.Status(), componentstatus.StatusFatalError) + } else { + // traces1 should be failing then + assert.Equal(collectT, traces1.Status(), componentstatus.StatusFatalError) + + receiver, ok := traces2.ComponentStatusMap["receiver:otlp/1"] + require.True(collectT, ok, "otlp/1 receiver should be present") + assert.Equal(collectT, receiver.Status(), componentstatus.StatusFatalError) + } + }) + }, + }, } { t.Run(tc.name, func(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) @@ -1530,22 +1829,46 @@ func TestOTelManagerEndToEnd(t *testing.T) { assert.Len(t, collectorStatus.ComponentStatusMap, 0) }) - t.Run("collector error is passed up to the component manager", func(t *testing.T) { + t.Run("collector execution error is passed as status not error", func(t *testing.T) { collectorErr := errors.New("collector error") + var err error + var aggStatus *status.AggregateStatus + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case aggStatus = <-mgr.WatchCollector(): + case <-mgr.WatchComponents(): + // don't block (ignored for test) + case e := <-mgr.Errors(): + err = e + if err != nil { + // only return if real error (nil is just clearing the error state) + return + } + case <-time.After(time.Second): + // didn't get an error (good!) + return + } + } + }() + select { case <-ctx.Done(): t.Fatal("timeout waiting for collector status update") case execution.errCh <- collectorErr: } + wg.Wait() - // we should get an error - select { - case <-ctx.Done(): - t.Fatal("timeout waiting for collector status update") - case err := <-mgr.Errors(): - assert.Equal(t, collectorErr, err) - } + // should not come in as an error + require.Nil(t, err, "got unexpected error from the collector execution") + + // should have a fatal error in status + require.NotNil(t, aggStatus) + assert.Equal(t, aggStatus.Status(), componentstatus.StatusFatalError) }) } diff --git a/internal/pkg/otel/manager/testing/testing.go b/internal/pkg/otel/manager/testing/testing.go index 5e25f60a6a8..b7faadec708 100644 --- a/internal/pkg/otel/manager/testing/testing.go +++ b/internal/pkg/otel/manager/testing/testing.go @@ -7,6 +7,7 @@ package main import ( "context" "errors" + "fmt" "os" "time" @@ -50,6 +51,7 @@ func main() { exitCode := 0 err = cmd.RunCollector(ctx, nil, true, "debug", monitoringURL) if err != nil && !errors.Is(err, context.Canceled) { + fmt.Fprintln(os.Stderr, err) exitCode = 1 }