Skip to content

Commit 291c7dc

Browse files
committed
move netreporter and fix race?
1 parent c5abed5 commit 291c7dc

File tree

3 files changed

+21
-15
lines changed

3 files changed

+21
-15
lines changed

exporter/otelarrowexporter/metadata.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"google.golang.org/grpc/metadata"
2626

2727
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/compression/zstd"
28+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/otelarrow/netstats"
2829
)
2930

3031
var (
@@ -40,6 +41,7 @@ type metadataExporter struct {
4041

4142
metadataKeys []string
4243
exporters sync.Map
44+
netReporter *netstats.NetworkReporter
4345

4446
userAgent string
4547

@@ -53,6 +55,10 @@ var _ exp = (*metadataExporter)(nil)
5355

5456
func newMetadataExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory) (exp, error) {
5557
oCfg := cfg.(*Config)
58+
netReporter, err := netstats.NewExporterNetworkReporter(set)
59+
if err != nil {
60+
return nil, err
61+
}
5662
userAgent := fmt.Sprintf("%s/%s (%s/%s)",
5763
set.BuildInfo.Description, set.BuildInfo.Version, runtime.GOOS, runtime.GOARCH)
5864

@@ -69,13 +75,15 @@ func newMetadataExporter(cfg component.Config, set exporter.Settings, streamClie
6975
}
7076
sort.Strings(mks)
7177
if len(mks) == 0 {
72-
return newExporter(cfg, set, streamClientFactory, userAgent)
78+
return newExporter(cfg, set, streamClientFactory, userAgent, netReporter)
7379
}
7480
return &metadataExporter{
7581
config: oCfg,
7682
settings: set,
7783
scf: streamClientFactory,
7884
metadataKeys: mks,
85+
userAgent: userAgent,
86+
netReporter: netReporter,
7987
}, nil
8088
}
8189

@@ -135,19 +143,19 @@ func (e *metadataExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
135143
}
136144

137145
func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute.Set, md metadata.MD) (exp, error) {
138-
v, ok := e.exporters.Load(s)
139-
if ok {
140-
return v.(exp), nil
141-
}
142-
143146
e.lock.Lock()
144147
defer e.lock.Unlock()
145148

146149
if e.config.MetadataCardinalityLimit != 0 && e.size >= int(e.config.MetadataCardinalityLimit) {
147150
return nil, errTooManyExporters
148151
}
149152

150-
newExp, err := newExporter(e.config, e.settings, e.scf, e.userAgent)
153+
v, ok := e.exporters.Load(s)
154+
if ok {
155+
return v.(exp), nil
156+
}
157+
158+
newExp, err := newExporter(e.config, e.settings, e.scf, e.userAgent, e.netReporter)
151159
if err != nil {
152160
return nil, fmt.Errorf("failed to create exporter: %w", err)
153161
}
@@ -164,6 +172,7 @@ func (e *metadataExporter) getOrCreateExporter(ctx context.Context, s attribute.
164172
e.exporters.Delete(s)
165173
return nil, fmt.Errorf("failed to start exporter: %w", err)
166174
}
175+
167176
e.size++
168177
}
169178

exporter/otelarrowexporter/metadata_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ func TestSendTracesWithMetadata(t *testing.T) {
4848
cfg.MetadataCardinalityLimit = 10
4949
cfg.MetadataKeys = []string{"key1", "key2"}
5050
set := exportertest.NewNopSettings()
51+
set.BuildInfo.Description = "Collector"
52+
set.BuildInfo.Version = "1.2.3test"
5153
bg := context.Background()
5254
exp, err := factory.CreateTracesExporter(bg, set, cfg)
5355
require.NoError(t, err)
@@ -57,8 +59,8 @@ func TestSendTracesWithMetadata(t *testing.T) {
5759
}()
5860

5961
host := componenttest.NewNopHost()
62+
6063
assert.NoError(t, exp.Start(context.Background(), host))
61-
time.Sleep(1 * time.Second)
6264

6365
// Ensure that initially there is no data in the receiver.
6466
assert.EqualValues(t, 0, rcv.requestCount.Load())
@@ -164,8 +166,8 @@ func TestMetadataExporterCardinalityLimit(t *testing.T) {
164166
}()
165167

166168
host := componenttest.NewNopHost()
169+
167170
assert.NoError(t, exp.Start(context.Background(), host))
168-
time.Sleep(1 * time.Second)
169171

170172
// Ensure that initially there is no data in the receiver.
171173
assert.EqualValues(t, 0, rcv.requestCount.Load())

exporter/otelarrowexporter/otelarrow.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,18 +75,13 @@ type streamClientFactory func(conn *grpc.ClientConn) arrow.StreamClientFunc
7575

7676
// Crete new exporter and start it. The exporter will begin connecting but
7777
// this function may return before the connection is established.
78-
func newExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory, userAgent string) (exp, error) {
78+
func newExporter(cfg component.Config, set exporter.Settings, streamClientFactory streamClientFactory, userAgent string, netReporter *netstats.NetworkReporter) (exp, error) {
7979
oCfg := cfg.(*Config)
8080

8181
if oCfg.Endpoint == "" {
8282
return nil, errors.New("OTLP exporter config requires an Endpoint")
8383
}
8484

85-
netReporter, err := netstats.NewExporterNetworkReporter(set)
86-
if err != nil {
87-
return nil, err
88-
}
89-
9085
return &baseExporter{
9186
config: oCfg,
9287
settings: set,

0 commit comments

Comments
 (0)