diff --git a/exporter/otlpexporter/factory.go b/exporter/otlpexporter/factory.go index 81166979506..49a932041db 100644 --- a/exporter/otlpexporter/factory.go +++ b/exporter/otlpexporter/factory.go @@ -59,7 +59,7 @@ func createTraces( oce := newExporter(cfg, set) oCfg := cfg.(*Config) return exporterhelper.NewTraces(ctx, set, cfg, - oce.pushTraces, + oce.pushTracesWithStatus, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(oCfg.TimeoutConfig), exporterhelper.WithRetry(oCfg.RetryConfig), @@ -78,7 +78,7 @@ func createMetrics( oce := newExporter(cfg, set) oCfg := cfg.(*Config) return exporterhelper.NewMetrics(ctx, set, cfg, - oce.pushMetrics, + oce.pushMetricsWithStatus, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(oCfg.TimeoutConfig), exporterhelper.WithRetry(oCfg.RetryConfig), @@ -97,7 +97,7 @@ func createLogs( oce := newExporter(cfg, set) oCfg := cfg.(*Config) return exporterhelper.NewLogs(ctx, set, cfg, - oce.pushLogs, + oce.pushLogsWithStatus, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(oCfg.TimeoutConfig), exporterhelper.WithRetry(oCfg.RetryConfig), @@ -116,7 +116,7 @@ func createProfilesExporter( oce := newExporter(cfg, set) oCfg := cfg.(*Config) return exporterhelperprofiles.NewProfilesExporter(ctx, set, cfg, - oce.pushProfiles, + oce.pushProfilesWithStatus, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(oCfg.TimeoutConfig), exporterhelper.WithRetry(oCfg.RetryConfig), diff --git a/exporter/otlpexporter/otlp.go b/exporter/otlpexporter/otlp.go index 2ca4deadd93..f61d53b97d9 100644 --- a/exporter/otlpexporter/otlp.go +++ b/exporter/otlpexporter/otlp.go @@ -92,14 +92,11 @@ func (e *baseExporter) shutdown(context.Context) error { return nil } -func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) (err error) { - defer func() { - e.reportStatusFromError(err) - }() +func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { req := ptraceotlp.NewExportRequestFromTraces(td) resp, respErr := e.traceExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err = processError(respErr); err != nil { - return + if err := processError(respErr); err != nil { + return err } partialSuccess := resp.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) { @@ -108,17 +105,23 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) (err er zap.Int64("dropped_spans", resp.PartialSuccess().RejectedSpans()), ) } - return + return nil +} + +func (e *baseExporter) pushTracesWithStatus(ctx context.Context, td ptrace.Traces) error { + if err := e.pushTraces(ctx, td); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil } -func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) (err error) { - defer func() { - e.reportStatusFromError(err) - }() +func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { req := pmetricotlp.NewExportRequestFromMetrics(md) resp, respErr := e.metricExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err = processError(respErr); err != nil { - return + if err := processError(respErr); err != nil { + return err } partialSuccess := resp.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) { @@ -127,17 +130,23 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) (err zap.Int64("dropped_data_points", resp.PartialSuccess().RejectedDataPoints()), ) } - return + return nil } -func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) (err error) { - defer func() { - e.reportStatusFromError(err) - }() +func (e *baseExporter) pushMetricsWithStatus(ctx context.Context, md pmetric.Metrics) error { + if err := e.pushMetrics(ctx, md); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + +func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { req := plogotlp.NewExportRequestFromLogs(ld) resp, respErr := e.logExporter.Export(e.enhanceContext(ctx), req, e.callOptions...) - if err = processError(respErr); err != nil { - return + if err := processError(respErr); err != nil { + return err } partialSuccess := resp.PartialSuccess() if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) { @@ -146,7 +155,16 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) (err error) { zap.Int64("dropped_log_records", resp.PartialSuccess().RejectedLogRecords()), ) } - return + return nil +} + +func (e *baseExporter) pushLogsWithStatus(ctx context.Context, ld plog.Logs) error { + if err := e.pushLogs(ctx, ld); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil } func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) error { @@ -165,6 +183,15 @@ func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) e return nil } +func (e *baseExporter) pushProfilesWithStatus(ctx context.Context, td pprofile.Profiles) error { + if err := e.pushProfiles(ctx, td); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) enhanceContext(ctx context.Context) context.Context { if e.metadata.Len() > 0 { return metadata.NewOutgoingContext(ctx, e.metadata) @@ -203,14 +230,6 @@ func processError(err error) error { return err } -func (e *baseExporter) reportStatusFromError(err error) { - if err != nil { - componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) - return - } - componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) -} - func shouldRetry(code codes.Code, retryInfo *errdetails.RetryInfo) bool { switch code { case codes.Canceled, diff --git a/exporter/otlphttpexporter/factory.go b/exporter/otlphttpexporter/factory.go index c3a91a44bb9..2844d3b3348 100644 --- a/exporter/otlphttpexporter/factory.go +++ b/exporter/otlphttpexporter/factory.go @@ -90,7 +90,7 @@ func createTraces( } return exporterhelper.NewTraces(ctx, set, cfg, - oce.pushTraces, + oce.pushTracesWithStatus, exporterhelper.WithStart(oce.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // explicitly disable since we rely on http.Client timeout logic. @@ -116,7 +116,7 @@ func createMetrics( } return exporterhelper.NewMetrics(ctx, set, cfg, - oce.pushMetrics, + oce.pushMetricsWithStatus, exporterhelper.WithStart(oce.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // explicitly disable since we rely on http.Client timeout logic. @@ -141,7 +141,7 @@ func createLogs( } return exporterhelper.NewLogs(ctx, set, cfg, - oce.pushLogs, + oce.pushLogsWithStatus, exporterhelper.WithStart(oce.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // explicitly disable since we rely on http.Client timeout logic. @@ -167,7 +167,7 @@ func createProfiles( } return exporterhelperprofiles.NewProfilesExporter(ctx, set, cfg, - oce.pushProfiles, + oce.pushProfilesWithStatus, exporterhelper.WithStart(oce.start), exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), // explicitly disable since we rely on http.Client timeout logic. diff --git a/exporter/otlphttpexporter/otlp.go b/exporter/otlphttpexporter/otlp.go index ad066cb343e..0eeb756bae1 100644 --- a/exporter/otlphttpexporter/otlp.go +++ b/exporter/otlphttpexporter/otlp.go @@ -114,6 +114,15 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error { return e.export(ctx, e.tracesURL, request, e.tracesPartialSuccessHandler) } +func (e *baseExporter) pushTracesWithStatus(ctx context.Context, td ptrace.Traces) error { + if err := e.pushTraces(ctx, td); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { tr := pmetricotlp.NewExportRequestFromMetrics(md) @@ -134,6 +143,15 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro return e.export(ctx, e.metricsURL, request, e.metricsPartialSuccessHandler) } +func (e *baseExporter) pushMetricsWithStatus(ctx context.Context, md pmetric.Metrics) error { + if err := e.pushMetrics(ctx, md); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { tr := plogotlp.NewExportRequestFromLogs(ld) @@ -155,6 +173,15 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { return e.export(ctx, e.logsURL, request, e.logsPartialSuccessHandler) } +func (e *baseExporter) pushLogsWithStatus(ctx context.Context, ld plog.Logs) error { + if err := e.pushLogs(ctx, ld); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) error { tr := pprofileotlp.NewExportRequestFromProfiles(td) @@ -176,10 +203,16 @@ func (e *baseExporter) pushProfiles(ctx context.Context, td pprofile.Profiles) e return e.export(ctx, e.profilesURL, request, e.profilesPartialSuccessHandler) } -func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) (err error) { - defer func() { - e.reportStatusFromError(err) - }() +func (e *baseExporter) pushProfilesWithStatus(ctx context.Context, td pprofile.Profiles) error { + if err := e.pushProfiles(ctx, td); err != nil { + componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) + return err + } + componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) + return nil +} + +func (e *baseExporter) export(ctx context.Context, url string, request []byte, partialSuccessHandler partialSuccessHandler) error { e.logger.Debug("Preparing to make HTTP request", zap.String("url", url)) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(request)) if err != nil { @@ -248,14 +281,6 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte, p return consumererror.NewPermanent(formattedErr) } -func (e *baseExporter) reportStatusFromError(err error) { - if err != nil { - componentstatus.ReportStatus(e.host, componentstatus.NewRecoverableErrorEvent(err)) - return - } - componentstatus.ReportStatus(e.host, componentstatus.NewEvent(componentstatus.StatusOK)) -} - // Determine if the status code is retryable according to the specification. // For more, see https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/protocol/otlp.md#failures-1 func isRetryableStatusCode(code int) bool {