Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# REQUIRED
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# REQUIRED for all kinds
# Change summary; a 80ish characters long description of the change.
summary: report crashing otel process cleanly with proper status reporting

# REQUIRED for breaking-change, deprecation, known-issue
# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# description:

# REQUIRED for breaking-change, deprecation, known-issue
# impact:

# REQUIRED for breaking-change, deprecation, known-issue
# action:

# REQUIRED for all kinds
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component:

# AUTOMATED
# OPTIONAL to manually add other PR URLs
# PR URL: A link the PR that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
# pr: https://github.com/owner/repo/1234

# AUTOMATED
# OPTIONAL to manually add other issue URLs
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
# issue: https://github.com/owner/repo/1234
127 changes: 127 additions & 0 deletions internal/pkg/otel/manager/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@ import (
"errors"
"fmt"
"net"
"strings"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/pipeline"
)

// for testing purposes
Expand Down Expand Up @@ -80,3 +86,124 @@ func findRandomTCPPorts(count int) (ports []int, err error) {

return ports, err
}

// otelConfigToStatus converts the `cfg` to `status.AggregateStatus` using the reported error.
//
// The flow of this function comes from https://github.com/open-telemetry/opentelemetry-collector/blob/main/service/internal/graph/graph.go
// It's a much simpler version, but follows the same for loop ordering and building of connectors of the internal
// graph system that OTEL uses to build its component graph.
func otelConfigToStatus(cfg *confmap.Conf, err error) (*status.AggregateStatus, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is complex enough to have its own tests. The configurations in manager_test.go right now don't cover it all e.g. there are no configurations with connectors.

To me it seems like tests for this should focus on mapping status for complex configurations and the tests in manager_test.go can focus purely on ensuring various failure scenarios result in capturing a status regardless of what the configuration is.

// marshall into config
var c otelcol.Config
if unmarshalErr := cfg.Unmarshal(&c); unmarshalErr != nil {
return nil, fmt.Errorf("could not unmarshal config: %w", unmarshalErr)
}

// should at least define a single pipeline
if len(c.Service.Pipelines) == 0 {
return nil, fmt.Errorf("no pipelines defined")
}

// aggregators are used to create the overall status structure
// aggGeneric is used to for a generic aggregator status where all instances get the same error
// aggSpecific is used to provide status to the specific instance that caused the error
// aggSpecific is only used if matchOccurred is true
aggGeneric := status.NewAggregator(status.PriorityPermanent)
aggSpecific := status.NewAggregator(status.PriorityPermanent)
matchOccurred := false

// extensions
for _, id := range c.Service.Extensions {
instanceID := componentstatus.NewInstanceID(id, component.KindExtension)
aggGeneric.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err))
if recordSpecificErr(aggSpecific, instanceID, err) {
matchOccurred = true
}
}

// track connectors
connectors := make(map[component.ID]struct{})
connectorsAsReceiver := make(map[component.ID][]pipeline.ID)
connectorsAsExporter := make(map[component.ID][]pipeline.ID)

// pipelines
for pipelineID, pipelineCfg := range c.Service.Pipelines {
for _, recvID := range pipelineCfg.Receivers {
instanceID := componentstatus.NewInstanceID(recvID, component.KindReceiver, pipelineID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth commenting that the upstream graph building code creates a single instance id for each receiver, containing information about all the pipelines it appears in. The status reporting then makes a copy of the status for each pipeline, so it's fine for us to do so here as well, but it's worth calling out imo.

_, isConnector := c.Connectors[recvID]
if isConnector {
connectors[recvID] = struct{}{}
connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID)
}
aggGeneric.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err))
if recordSpecificErr(aggSpecific, instanceID, err) {
matchOccurred = true
}
}
for _, procID := range pipelineCfg.Processors {
instanceID := componentstatus.NewInstanceID(procID, component.KindProcessor, pipelineID)
aggGeneric.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err))
if recordSpecificErr(aggSpecific, instanceID, err) {
matchOccurred = true
}
}
for _, exporterID := range pipelineCfg.Exporters {
instanceID := componentstatus.NewInstanceID(exporterID, component.KindExporter, pipelineID)
_, isConnector := c.Connectors[exporterID]
if isConnector {
connectors[exporterID] = struct{}{}
connectorsAsExporter[exporterID] = append(connectorsAsExporter[exporterID], pipelineID)
}
aggGeneric.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err))
if recordSpecificErr(aggSpecific, instanceID, err) {
matchOccurred = true
}
}
}

// connectors
for connID := range connectors {
extraMatchStr := fmt.Sprintf("connector %q used as", connID)
for _, eID := range connectorsAsExporter[connID] {
for _, rID := range connectorsAsReceiver[connID] {
instanceID := componentstatus.NewInstanceID(
connID, component.KindConnector, eID, rID,
)
aggGeneric.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err))
if recordSpecificErr(aggSpecific, instanceID, err, extraMatchStr) {
matchOccurred = true
}
}
}
}

if matchOccurred {
// specific for the matched error
aggStatus, _ := aggSpecific.AggregateStatus(status.ScopeAll, status.Verbose)
return aggStatus, nil
}
// no match found so generic failed on all instances
aggStatus, _ := aggGeneric.AggregateStatus(status.ScopeAll, status.Verbose)
return aggStatus, nil
}

func recordSpecificErr(agg *status.Aggregator, instanceID *componentstatus.InstanceID, err error, extraMatchStrs ...string) bool {
forIDStr := fmt.Sprintf("for id: %q", instanceID.ComponentID().String())
failedMatchStr := fmt.Sprintf("failed to start %q %s:", instanceID.ComponentID().String(), strings.ToLower(instanceID.Kind().String()))
Comment on lines +191 to +192
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be good to specify what conditions these lines refer to. The one about starting the component is relatively obvious, but forIDStr, not so much. Is this an error about the otel collector not knowing about the component type?

if strings.Contains(err.Error(), forIDStr) || strings.Contains(err.Error(), failedMatchStr) {
// specific so this instance gets the reported error
agg.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err))
return true
}
// extra matchers
for _, matchStr := range extraMatchStrs {
if strings.Contains(err.Error(), matchStr) {
// specific so this instance gets the reported error
agg.RecordStatus(instanceID, componentstatus.NewFatalErrorEvent(err))
return true
}
}
// not specific to this instance, so we record this one as starting
agg.RecordStatus(instanceID, componentstatus.NewEvent(componentstatus.StatusStarting))
return false
}
50 changes: 47 additions & 3 deletions internal/pkg/otel/manager/execution_subprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"os"
"os/exec"
"sync"
"time"

"github.com/gofrs/uuid/v5"
Expand Down Expand Up @@ -110,9 +111,11 @@ func (r *subprocessExecution) startCollector(ctx context.Context, baseLogger *lo
return nil, fmt.Errorf("failed to marshal config to yaml: %w", err)
}

stdOut := runtimeLogger.NewLogWriterWithDefaults(baseLogger.Core(), zapcore.Level(r.logLevel))
stdOutLast := newZapLast(baseLogger.Core())
stdOut := runtimeLogger.NewLogWriterWithDefaults(stdOutLast, zapcore.Level(r.logLevel))
// info level for stdErr because by default collector writes to stderr
stdErr := runtimeLogger.NewLogWriterWithDefaults(baseLogger.Core(), zapcore.Level(r.logLevel))
stdErrLast := newZapLast(baseLogger.Core())
stdErr := runtimeLogger.NewLogWriterWithDefaults(stdErrLast, zapcore.Level(r.logLevel))

procCtx, procCtxCancel := context.WithCancel(ctx)
env := os.Environ()
Expand Down Expand Up @@ -220,7 +223,18 @@ func (r *subprocessExecution) startCollector(ctx context.Context, baseLogger *lo
// report nil error so that the caller can be notified that the process has exited without error
r.reportErrFn(ctx, processErrCh, nil)
} else {
r.reportErrFn(ctx, processErrCh, fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String()))
var procReportErr error
if stdErrLast.Last().Message != "" {
// use stderr message as the error
procReportErr = errors.New(stdErrLast.Last().Message)
} else if stdOutLast.Last().Message != "" {
// use last stdout message as the error
procReportErr = errors.New(stdOutLast.Last().Message)
} else {
// neither case use standard process error
procReportErr = fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String())
}
r.reportErrFn(ctx, processErrCh, procReportErr)
}
return
}
Expand Down Expand Up @@ -338,3 +352,33 @@ func (s *procHandle) Stop(waitTime time.Duration) {
case <-s.processDoneCh:
}
}

type zapWriter interface {
Write(zapcore.Entry, []zapcore.Field) error
}
type zapLast struct {
wrapped zapWriter
last zapcore.Entry
mx sync.Mutex
}

func newZapLast(w zapWriter) *zapLast {
return &zapLast{
wrapped: w,
}
}

// Write stores the most recent log entry.
func (z *zapLast) Write(entry zapcore.Entry, fields []zapcore.Field) error {
z.mx.Lock()
z.last = entry
z.mx.Unlock()
return z.wrapped.Write(entry, fields)
}

// Last returns the last log entry.
func (z *zapLast) Last() zapcore.Entry {
z.mx.Lock()
defer z.mx.Unlock()
return z.last
}
60 changes: 43 additions & 17 deletions internal/pkg/otel/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ type OTelManager struct {
// baseLogger is the base logger for the otel collector, and doesn't include any agent-specific fields.
baseLogger *logger.Logger
logger *logger.Logger
errCh chan error

// errCh should only be used to send critical errors that will mark the entire elastic-agent as failed
// if it's an issue with starting or running the collector those should not be critical errors, instead
// they should be reported as failed components to the elastic-agent
errCh chan error

// Agent info and monitoring config getter for otel config generation
agentInfo info.Agent
Expand Down Expand Up @@ -220,18 +224,20 @@ func (m *OTelManager) Run(ctx context.Context) error {
continue
}

// at this point no critical errors are occurring
// any issues starting the collector are reporting in the status
reportErr(ctx, m.errCh, nil)

newRetries := m.recoveryRetries.Add(1)
m.logger.Infof("collector recovery restarting, total retries: %d", newRetries)
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh)
if err != nil {
reportErr(ctx, m.errCh, err)
// report a startup error (this gets reported as status)
m.reportStartupErr(ctx, err)
// reset the restart timer to the next backoff
recoveryDelay := m.recoveryTimer.ResetNext()
m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err)
} else {
reportErr(ctx, m.errCh, nil)
}

case err = <-m.collectorRunErr:
m.recoveryTimer.Stop()
if err == nil {
Expand All @@ -241,12 +247,12 @@ func (m *OTelManager) Run(ctx context.Context) error {
m.proc = nil
}

// no critical error from this point forward
reportErr(ctx, m.errCh, nil)

if m.mergedCollectorCfg == nil {
// no configuration then the collector should not be
// running.
// ensure that the coordinator knows that there is no error
// as the collector is not running anymore
reportErr(ctx, m.errCh, nil)
continue
}

Expand All @@ -256,17 +262,11 @@ func (m *OTelManager) Run(ctx context.Context) error {
// provided and the collector stopped with a clean exit
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.logger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh, forceFetchStatusCh)
if err != nil {
// failed to create the collector (this is different then
// it's failing to run). we do not retry creation on failure
// as it will always fail. A new configuration is required for
// it not to fail (a new configuration will result in the retry)
reportErr(ctx, m.errCh, err)
// report a startup error (this gets reported as status)
m.reportStartupErr(ctx, err)
// reset the restart timer to the next backoff
recoveryDelay := m.recoveryTimer.ResetNext()
m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err)
} else {
// all good at the moment (possible that it will fail)
reportErr(ctx, m.errCh, nil)
}
} else {
// error occurred while running the collector, this occurs in the
Expand All @@ -281,7 +281,8 @@ func (m *OTelManager) Run(ctx context.Context) error {
}
// pass the error to the errCh so the coordinator, unless it's a cancel error
if !errors.Is(err, context.Canceled) {
reportErr(ctx, m.errCh, err)
// report a startup error (this gets reported as status)
m.reportStartupErr(ctx, err)
// reset the restart timer to the next backoff
recoveryDelay := m.recoveryTimer.ResetNext()
m.logger.Errorf("collector exited with error (will try to recover in %s): %v", recoveryDelay.String(), err)
Expand All @@ -295,6 +296,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
m.recoveryRetries.Store(0)
mergedCfg, err := buildMergedConfig(cfgUpdate, m.agentInfo, m.beatMonitoringConfigGetter, m.baseLogger)
if err != nil {
// critical error, merging the configuration should always work
reportErr(ctx, m.errCh, err)
continue
}
Expand Down Expand Up @@ -339,7 +341,12 @@ func (m *OTelManager) Run(ctx context.Context) error {
case otelStatus := <-collectorStatusCh:
err = m.reportOtelStatusUpdate(ctx, otelStatus)
if err != nil {
// critical error and not handling the status update correctly
// can't properly report status if this fails, so we report it as critical
reportErr(ctx, m.errCh, err)
} else {
// status update was able to be reporting clear any critical error
reportErr(ctx, m.errCh, nil)
}
}
}
Expand Down Expand Up @@ -555,6 +562,25 @@ func (m *OTelManager) handleOtelStatusUpdate(otelStatus *status.AggregateStatus)
return m.processComponentStates(componentStates), nil
}

// reportStartupErr maps this error to the *status.AggregateStatus.
// this is done by parsing the `m.mergedCollectorCfg` and converting it into the best effort *status.AggregateStatus.
func (m *OTelManager) reportStartupErr(ctx context.Context, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I don't like about this function, is that it pretends to report an error, but actually reports a status. We have different delivery requirements for these. Reporting errors is non-blocking, as a newer error can always overwrite an older error. The same is not true about statuses - we need to make sure all the component statuses are delivered, or else we get bugs like the one in #10675.

As a result, reportStartupErr needs to be called with care, as it can potentially deadlock the manager. I wonder if we should bite the bullet and just make the update channel buffered with some reasonable size, and emit a fatal error if it can't be written to. What do you think?

criticalErr := func(err error) error {
otelStatus, err := otelConfigToStatus(m.mergedCollectorCfg, err)
if err != nil {
return err
}
return m.reportOtelStatusUpdate(ctx, otelStatus)
}(err)
if criticalErr != nil {
// critical error occurred
reportErr(ctx, m.errCh, fmt.Errorf("failed to report statup error: %w", criticalErr))
} else {
// no error reporting (clear critical)
reportErr(ctx, m.errCh, nil)
}
}

// reportOtelStatusUpdate processes status updates from the underlying otel collector and reports separate collector
// and component state updates to the external watch channels.
func (m *OTelManager) reportOtelStatusUpdate(ctx context.Context, otelStatus *status.AggregateStatus) error {
Expand Down
Loading