Skip to content

Commit 245ab96

Browse files
committed
review feedback
1 parent 046c6b7 commit 245ab96

File tree

3 files changed

+41
-47
lines changed

3 files changed

+41
-47
lines changed

exporter/otelarrowexporter/factory.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,15 @@ func createDefaultConfig() component.Config {
7272
}
7373
}
7474

75-
func (e *baseExporter) helperOptions() []exporterhelper.Option {
75+
func helperOptions(e exp) []exporterhelper.Option {
76+
cfg := e.getConfig().(*Config)
7677
return []exporterhelper.Option{
7778
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
78-
exporterhelper.WithTimeout(e.config.TimeoutSettings),
79-
exporterhelper.WithRetry(e.config.RetryConfig),
80-
exporterhelper.WithQueue(e.config.QueueSettings),
79+
exporterhelper.WithTimeout(cfg.TimeoutSettings),
80+
exporterhelper.WithRetry(cfg.RetryConfig),
81+
exporterhelper.WithQueue(cfg.QueueSettings),
8182
exporterhelper.WithStart(e.start),
82-
exporterhelper.WithBatcher(e.config.BatcherConfig),
83+
exporterhelper.WithBatcher(cfg.BatcherConfig),
8384
exporterhelper.WithShutdown(e.shutdown),
8485
}
8586
}
@@ -109,7 +110,7 @@ func createTracesExporter(
109110
}
110111
return exporterhelper.NewTracesExporter(ctx, exp.getSettings(), exp.getConfig(),
111112
exp.pushTraces,
112-
exp.helperOptions()...,
113+
helperOptions(exp)...,
113114
)
114115
}
115116

@@ -128,7 +129,7 @@ func createMetricsExporter(
128129
}
129130
return exporterhelper.NewMetricsExporter(ctx, exp.getSettings(), exp.getConfig(),
130131
exp.pushMetrics,
131-
exp.helperOptions()...,
132+
helperOptions(exp)...,
132133
)
133134
}
134135

@@ -147,6 +148,6 @@ func createLogsExporter(
147148
}
148149
return exporterhelper.NewLogsExporter(ctx, exp.getSettings(), exp.getConfig(),
149150
exp.pushLogs,
150-
exp.helperOptions()...,
151+
helperOptions(exp)...,
151152
)
152153
}

exporter/otelarrowexporter/metadata.go

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,25 @@ import (
77
"context"
88
"errors"
99
"fmt"
10+
"runtime"
1011
"sort"
1112
"strings"
1213
"sync"
1314

15+
arrowPkg "github.com/apache/arrow/go/v16/arrow"
1416
"go.opentelemetry.io/collector/client"
1517
"go.opentelemetry.io/collector/component"
16-
"go.opentelemetry.io/collector/consumer"
1718
"go.opentelemetry.io/collector/consumer/consumererror"
1819
"go.opentelemetry.io/collector/exporter"
19-
"go.opentelemetry.io/collector/exporter/exporterhelper"
2020
"go.opentelemetry.io/collector/pdata/plog"
2121
"go.opentelemetry.io/collector/pdata/pmetric"
2222
"go.opentelemetry.io/collector/pdata/ptrace"
2323
"go.opentelemetry.io/otel/attribute"
2424
"go.uber.org/multierr"
2525
"google.golang.org/grpc/metadata"
26+
27+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd"
28+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"
2629
)
2730

2831
var (
@@ -38,6 +41,10 @@ type metadataExporter struct {
3841

3942
metadataKeys []string
4043
exporters sync.Map
44+
netReporter *netstats.NetworkReporter
45+
46+
// Default user-agent header.
47+
userAgent string
4148

4249
// Guards the size and the storing logic to ensure no more than limit items are stored.
4350
// If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic.
@@ -49,20 +56,40 @@ var _ exp = (*metadataExporter)(nil)
4956

5057
func newMetadataExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (exp, error) {
5158
oCfg := cfg.(*Config)
59+
if oCfg.Endpoint == "" {
60+
return nil, errors.New("OTLP exporter config requires an Endpoint")
61+
}
62+
63+
netReporter, err := netstats.NewExporterNetworkReporter(set)
64+
if err != nil {
65+
return nil, err
66+
}
67+
userAgent := fmt.Sprintf("%s/%s (%s/%s)",
68+
set.BuildInfo.Description, set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH)
69+
70+
if !oCfg.Arrow.Disabled {
71+
// Ignoring an error because Validate() was called.
72+
_ = zstd.SetEncoderConfig(oCfg.Arrow.Zstd)
73+
74+
userAgent += fmt.Sprintf(" ApacheArrow/%s (NumStreams/%d)", arrowPkg.PkgVersion, oCfg.Arrow.NumStreams)
75+
}
76+
5277
// use lower-case, to be consistent with http/2 headers.
5378
mks := make([]string, len(oCfg.MetadataKeys))
5479
for i, k := range oCfg.MetadataKeys {
5580
mks[i] = strings.ToLower(k)
5681
}
5782
sort.Strings(mks)
5883
if len(mks) == 0 {
59-
return newExporter(cfg, set, streamClientFactory)
84+
return newExporter(cfg, set, streamClientFactory, userAgent, netReporter)
6085
}
6186
return &metadataExporter{
6287
config: oCfg,
6388
settings: set,
6489
scf: streamClientFactory,
6590
metadataKeys: mks,
91+
userAgent: userAgent,
92+
netReporter: netReporter,
6693
}, nil
6794
}
6895

@@ -74,17 +101,6 @@ func (e *metadataExporter) getConfig() component.Config {
74101
return e.config
75102
}
76103

77-
func (e *metadataExporter) helperOptions() []exporterhelper.Option {
78-
return []exporterhelper.Option{
79-
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
80-
exporterhelper.WithTimeout(e.config.TimeoutSettings),
81-
exporterhelper.WithRetry(e.config.RetryConfig),
82-
exporterhelper.WithQueue(e.config.QueueSettings),
83-
exporterhelper.WithStart(e.start),
84-
exporterhelper.WithShutdown(e.shutdown),
85-
}
86-
}
87-
88104
func (e *metadataExporter) start(_ context.Context, host component.Host) (err error) {
89105
e.host = host
90106
return nil
@@ -145,7 +161,7 @@ func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute.
145161
return nil, errTooManyExporters
146162
}
147163

148-
newExp, err := newExporter(e.config, e.settings, e.scf)
164+
newExp, err := newExporter(e.config, e.settings, e.scf, e.userAgent, e.netReporter)
149165
if err != nil {
150166
return nil, fmt.Errorf("failed to create exporter: %w", err)
151167
}

exporter/otelarrowexporter/otelarrow.go

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,8 @@ package otelarrowexporter // import "github.com/open-telemetry/opentelemetry-col
55

66
import (
77
"context"
8-
"errors"
9-
"fmt"
10-
"runtime"
118
"time"
129

13-
arrowPkg "github.com/apache/arrow/go/v16/arrow"
1410
arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
1511
"go.opentelemetry.io/collector/component"
1612
"go.opentelemetry.io/collector/config/configcompression"
@@ -38,7 +34,6 @@ import (
3834
)
3935

4036
type exp interface {
41-
helperOptions() []exporterhelper.Option
4237
getSettings() exporter.Settings
4338
getConfig() component.Config
4439

@@ -79,27 +74,9 @@ type streamClientFactory func(conn *grpc.ClientConn) arrow.StreamClientFunc
7974

8075
// Crete new exporter and start it. The exporter will begin connecting but
8176
// this function may return before the connection is established.
82-
func newExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (exp, error) {
77+
func newExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory, userAgent string, netReporter *netstats.NetworkReporter) (exp, error) {
8378
oCfg := cfg.(*Config)
8479

85-
if oCfg.Endpoint == "" {
86-
return nil, errors.New("OTLP exporter config requires an Endpoint")
87-
}
88-
89-
netReporter, err := netstats.NewExporterNetworkReporter(set)
90-
if err != nil {
91-
return nil, err
92-
}
93-
userAgent := fmt.Sprintf("%s/%s (%s/%s)",
94-
set.BuildInfo.Description, set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH)
95-
96-
if !oCfg.Arrow.Disabled {
97-
// Ignoring an error because Validate() was called.
98-
_ = zstd.SetEncoderConfig(oCfg.Arrow.Zstd)
99-
100-
userAgent += fmt.Sprintf(" ApacheArrow/%s (NumStreams/%d)", arrowPkg.PkgVersion, oCfg.Arrow.NumStreams)
101-
}
102-
10380
return &baseExporter{
10481
config: oCfg,
10582
settings: set,

0 commit comments

Comments
 (0)