Skip to content

Commit 86235b7

Browse files
[exporter/signalfx] Prioritize token from context in exporter (open-telemetry#37276)
#### Description In preparation for deprecating access_token_passthrough from the signalfx receiver and exporter, update the signalfx exporter to taken accessToken from the context Testing Updated unit tests tested end to end with the following setup telemetrygen sending metrics in otlp over grpc with X-Sf-Token header and token otlp receiver with include_metadata set to true batch processor using the X-Sf-Token key to group access_token_passthrough set to true
1 parent be26a2a commit 86235b7

File tree

5 files changed

+275
-5
lines changed

5 files changed

+275
-5
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: 'enhancement'
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: 'signalfxexporter'
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: 'Prioritize retrieving token from context when accesstokenpassthrough is enabled'
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [37102]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

exporter/signalfxexporter/dpclient.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"sync"
1616

1717
sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
18+
"go.opentelemetry.io/collector/client"
1819
"go.opentelemetry.io/collector/consumer/consumererror"
1920
"go.opentelemetry.io/collector/pdata/pmetric"
2021
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
@@ -88,7 +89,7 @@ func (s *sfxDPClient) pushMetricsData(
8889
}
8990

9091
// All metrics in the pmetric.Metrics will have the same access token because of the BatchPerResourceMetrics.
91-
metricToken := s.retrieveAccessToken(rms.At(0))
92+
metricToken := s.retrieveAccessToken(ctx, rms.At(0))
9293

9394
// export SFx format
9495
sfxDataPoints := s.converter.MetricsToSignalFxV2(md)
@@ -194,12 +195,18 @@ func (s *sfxDPClient) encodeBody(dps []*sfxpb.DataPoint) (bodyReader io.Reader,
194195
return s.getReader(body)
195196
}
196197

197-
func (s *sfxDPClient) retrieveAccessToken(md pmetric.ResourceMetrics) string {
198+
func (s *sfxDPClient) retrieveAccessToken(ctx context.Context, md pmetric.ResourceMetrics) string {
198199
if !s.accessTokenPassthrough {
199200
// Nothing to do if token is pass through not configured or resource is nil.
200201
return ""
201202
}
202203

204+
cl := client.FromContext(ctx)
205+
ss := cl.Metadata.Get(splunk.SFxAccessTokenHeader)
206+
if len(ss) > 0 {
207+
return ss[0]
208+
}
209+
203210
attrs := md.Resource().Attributes()
204211
if accessToken, ok := attrs.Get(splunk.SFxAccessTokenLabel); ok {
205212
return accessToken.Str()

exporter/signalfxexporter/eventclient.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"strings"
1212

1313
sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
14+
"go.opentelemetry.io/collector/client"
1415
"go.opentelemetry.io/collector/consumer/consumererror"
1516
"go.opentelemetry.io/collector/pdata/pcommon"
1617
"go.opentelemetry.io/collector/pdata/plog"
@@ -33,7 +34,7 @@ func (s *sfxEventClient) pushLogsData(ctx context.Context, ld plog.Logs) (int, e
3334
return 0, nil
3435
}
3536

36-
accessToken := s.retrieveAccessToken(rls.At(0))
37+
accessToken := s.retrieveAccessToken(ctx, rls.At(0))
3738

3839
var sfxEvents []*sfxpb.Event
3940
numDroppedLogRecords := 0
@@ -104,12 +105,18 @@ func (s *sfxEventClient) encodeBody(events []*sfxpb.Event) (bodyReader io.Reader
104105
return s.getReader(body)
105106
}
106107

107-
func (s *sfxEventClient) retrieveAccessToken(rl plog.ResourceLogs) string {
108+
func (s *sfxEventClient) retrieveAccessToken(ctx context.Context, rl plog.ResourceLogs) string {
108109
if !s.accessTokenPassthrough {
109110
// Nothing to do if token is pass through not configured or resource is nil.
110111
return ""
111112
}
112113

114+
cl := client.FromContext(ctx)
115+
ss := cl.Metadata.Get(splunk.SFxAccessTokenHeader)
116+
if len(ss) > 0 {
117+
return ss[0]
118+
}
119+
113120
attrs := rl.Resource().Attributes()
114121
if accessToken, ok := attrs.Get(splunk.SFxAccessTokenLabel); ok && accessToken.Type() == pcommon.ValueTypeStr {
115122
return accessToken.Str()

exporter/signalfxexporter/exporter_test.go

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
sfxpb "github.com/signalfx/com_signalfx_metrics_protobuf/model"
2424
"github.com/stretchr/testify/assert"
2525
"github.com/stretchr/testify/require"
26+
"go.opentelemetry.io/collector/client"
2627
"go.opentelemetry.io/collector/component/componenttest"
2728
"go.opentelemetry.io/collector/config/confighttp"
2829
"go.opentelemetry.io/collector/config/configopaque"
@@ -567,6 +568,234 @@ func TestConsumeMetricsWithAccessTokenPassthrough(t *testing.T) {
567568
}
568569
}
569570

571+
func TestConsumeMetricsAccessTokenPassthroughPriorityToContext(t *testing.T) {
572+
fromHeaders := "AccessTokenFromClientHeaders"
573+
fromLabels := []string{"AccessTokenFromLabel0", "AccessTokenFromLabel1"}
574+
fromContext := "AccessTokenFromContext"
575+
576+
validMetricsWithToken := func(includeToken bool, token string, histogram bool) pmetric.Metrics {
577+
out := pmetric.NewMetrics()
578+
rm := out.ResourceMetrics().AppendEmpty()
579+
580+
if includeToken {
581+
rm.Resource().Attributes().PutStr("com.splunk.signalfx.access_token", token)
582+
}
583+
584+
ilm := rm.ScopeMetrics().AppendEmpty()
585+
m := ilm.Metrics().AppendEmpty()
586+
587+
if histogram {
588+
buildHistogram(m, "test_histogram", pcommon.Timestamp(100000000), 1)
589+
} else {
590+
m.SetName("test_gauge")
591+
592+
dp := m.SetEmptyGauge().DataPoints().AppendEmpty()
593+
dp.Attributes().PutStr("k0", "v0")
594+
dp.Attributes().PutStr("k1", "v1")
595+
dp.SetDoubleValue(123)
596+
}
597+
598+
return out
599+
}
600+
601+
tests := []struct {
602+
name string
603+
accessTokenPassthrough bool
604+
metrics pmetric.Metrics
605+
additionalHeaders map[string]string
606+
pushedTokens []string
607+
sendOTLPHistograms bool
608+
inContext bool
609+
}{
610+
{
611+
name: "passthrough access token and included in md",
612+
accessTokenPassthrough: true,
613+
inContext: true,
614+
metrics: validMetricsWithToken(true, fromLabels[0], false),
615+
pushedTokens: []string{fromContext},
616+
},
617+
{
618+
name: "passthrough access token and not included in md",
619+
accessTokenPassthrough: true,
620+
inContext: true,
621+
metrics: validMetricsWithToken(false, fromLabels[0], false),
622+
pushedTokens: []string{fromContext},
623+
sendOTLPHistograms: false,
624+
},
625+
{
626+
name: "passthrough access token and included in md",
627+
accessTokenPassthrough: true,
628+
inContext: false,
629+
metrics: validMetricsWithToken(true, fromLabels[0], false),
630+
pushedTokens: []string{fromLabels[0]},
631+
},
632+
{
633+
name: "passthrough access token and not included in md",
634+
accessTokenPassthrough: true,
635+
inContext: false,
636+
metrics: validMetricsWithToken(false, fromLabels[0], false),
637+
pushedTokens: []string{fromHeaders},
638+
sendOTLPHistograms: false,
639+
},
640+
}
641+
for _, tt := range tests {
642+
receivedTokens := struct {
643+
sync.Mutex
644+
tokens []string
645+
}{}
646+
receivedTokens.tokens = []string{}
647+
t.Run(tt.name, func(t *testing.T) {
648+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
649+
assert.Equal(t, tt.name, r.Header.Get("test_header_"))
650+
receivedTokens.Lock()
651+
652+
token := r.Header.Get("x-sf-token")
653+
receivedTokens.tokens = append(receivedTokens.tokens, token)
654+
655+
receivedTokens.Unlock()
656+
w.WriteHeader(http.StatusAccepted)
657+
}))
658+
defer server.Close()
659+
660+
factory := NewFactory()
661+
cfg := factory.CreateDefaultConfig().(*Config)
662+
cfg.IngestURL = server.URL
663+
cfg.APIURL = server.URL
664+
cfg.ClientConfig.Headers = make(map[string]configopaque.String)
665+
for k, v := range tt.additionalHeaders {
666+
cfg.ClientConfig.Headers[k] = configopaque.String(v)
667+
}
668+
cfg.ClientConfig.Headers["test_header_"] = configopaque.String(tt.name)
669+
cfg.AccessToken = configopaque.String(fromHeaders)
670+
cfg.AccessTokenPassthrough = tt.accessTokenPassthrough
671+
cfg.SendOTLPHistograms = tt.sendOTLPHistograms
672+
sfxExp, err := NewFactory().CreateMetrics(context.Background(), exportertest.NewNopSettings(), cfg)
673+
require.NoError(t, err)
674+
ctx := context.Background()
675+
if tt.inContext {
676+
ctx = client.NewContext(
677+
ctx,
678+
client.Info{Metadata: client.NewMetadata(
679+
map[string][]string{splunk.SFxAccessTokenHeader: {fromContext}},
680+
)},
681+
)
682+
}
683+
require.NoError(t, sfxExp.Start(ctx, componenttest.NewNopHost()))
684+
defer func() {
685+
require.NoError(t, sfxExp.Shutdown(context.Background()))
686+
}()
687+
688+
err = sfxExp.ConsumeMetrics(ctx, tt.metrics)
689+
690+
assert.NoError(t, err)
691+
require.Eventually(t, func() bool {
692+
receivedTokens.Lock()
693+
defer receivedTokens.Unlock()
694+
return len(tt.pushedTokens) == len(receivedTokens.tokens)
695+
}, 1*time.Second, 10*time.Millisecond)
696+
sort.Strings(tt.pushedTokens)
697+
sort.Strings(receivedTokens.tokens)
698+
assert.Equal(t, tt.pushedTokens, receivedTokens.tokens)
699+
})
700+
}
701+
}
702+
703+
func TestConsumeLogsAccessTokenPassthrough(t *testing.T) {
704+
fromHeaders := "AccessTokenFromClientHeaders"
705+
fromLabels := "AccessTokenFromLabel"
706+
fromContext := "AccessTokenFromContext"
707+
708+
newLogData := func(includeToken bool) plog.Logs {
709+
out := makeSampleResourceLogs()
710+
makeSampleResourceLogs().ResourceLogs().At(0).CopyTo(out.ResourceLogs().AppendEmpty())
711+
712+
if includeToken {
713+
out.ResourceLogs().At(0).Resource().Attributes().PutStr("com.splunk.signalfx.access_token", fromLabels)
714+
out.ResourceLogs().At(1).Resource().Attributes().PutStr("com.splunk.signalfx.access_token", fromLabels)
715+
}
716+
return out
717+
}
718+
719+
tests := []struct {
720+
name string
721+
accessTokenPassthrough bool
722+
includedInLogData bool
723+
inContext bool
724+
expectedToken string
725+
}{
726+
{
727+
name: "passthrough access token and not included in request context",
728+
inContext: true,
729+
accessTokenPassthrough: true,
730+
includedInLogData: true,
731+
expectedToken: fromContext,
732+
},
733+
{
734+
name: "passthrough access token and included in logs",
735+
inContext: false,
736+
accessTokenPassthrough: true,
737+
includedInLogData: true,
738+
expectedToken: fromLabels,
739+
},
740+
{
741+
name: "passthrough access token and not included in logs",
742+
inContext: false,
743+
accessTokenPassthrough: false,
744+
includedInLogData: false,
745+
expectedToken: fromHeaders,
746+
},
747+
}
748+
for _, tt := range tests {
749+
t.Run(tt.name, func(t *testing.T) {
750+
receivedTokens := struct {
751+
sync.Mutex
752+
tokens []string
753+
}{}
754+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
755+
assert.Equal(t, tt.name, r.Header.Get("test_header_"))
756+
receivedTokens.Lock()
757+
receivedTokens.tokens = append(receivedTokens.tokens, r.Header.Get("x-sf-token"))
758+
receivedTokens.Unlock()
759+
w.WriteHeader(http.StatusAccepted)
760+
}))
761+
defer server.Close()
762+
763+
factory := NewFactory()
764+
cfg := factory.CreateDefaultConfig().(*Config)
765+
cfg.IngestURL = server.URL
766+
cfg.APIURL = server.URL
767+
cfg.Headers = make(map[string]configopaque.String)
768+
cfg.Headers["test_header_"] = configopaque.String(tt.name)
769+
cfg.AccessToken = configopaque.String(fromHeaders)
770+
cfg.AccessTokenPassthrough = tt.accessTokenPassthrough
771+
sfxExp, err := NewFactory().CreateLogs(context.Background(), exportertest.NewNopSettings(), cfg)
772+
require.NoError(t, err)
773+
require.NoError(t, sfxExp.Start(context.Background(), componenttest.NewNopHost()))
774+
defer func() {
775+
require.NoError(t, sfxExp.Shutdown(context.Background()))
776+
}()
777+
778+
ctx := context.Background()
779+
if tt.inContext {
780+
ctx = client.NewContext(
781+
ctx,
782+
client.Info{Metadata: client.NewMetadata(
783+
map[string][]string{splunk.SFxAccessTokenHeader: {"AccessTokenFromContext"}},
784+
)},
785+
)
786+
}
787+
assert.NoError(t, sfxExp.ConsumeLogs(ctx, newLogData(tt.includedInLogData)))
788+
789+
require.Eventually(t, func() bool {
790+
receivedTokens.Lock()
791+
defer receivedTokens.Unlock()
792+
return len(receivedTokens.tokens) == 1
793+
}, 1*time.Second, 10*time.Millisecond)
794+
assert.Equal(t, tt.expectedToken, receivedTokens.tokens[0])
795+
})
796+
}
797+
}
798+
570799
func TestNewEventExporter(t *testing.T) {
571800
got, err := newEventExporter(nil, exportertest.NewNopSettings())
572801
assert.EqualError(t, err, "nil config")

exporter/signalfxexporter/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ require (
1717
github.com/shirou/gopsutil/v4 v4.24.12
1818
github.com/signalfx/com_signalfx_metrics_protobuf v0.0.3
1919
github.com/stretchr/testify v1.10.0
20+
go.opentelemetry.io/collector/client v1.23.1-0.20250114172347-71aae791d7f8
2021
go.opentelemetry.io/collector/component v0.117.1-0.20250114172347-71aae791d7f8
2122
go.opentelemetry.io/collector/component/componenttest v0.117.1-0.20250114172347-71aae791d7f8
2223
go.opentelemetry.io/collector/config/confighttp v0.117.1-0.20250114172347-71aae791d7f8
@@ -71,7 +72,6 @@ require (
7172
github.com/tklauser/go-sysconf v0.3.12 // indirect
7273
github.com/tklauser/numcpus v0.6.1 // indirect
7374
github.com/yusufpapurcu/wmi v1.2.4 // indirect
74-
go.opentelemetry.io/collector/client v1.23.1-0.20250114172347-71aae791d7f8 // indirect
7575
go.opentelemetry.io/collector/config/configauth v0.117.1-0.20250114172347-71aae791d7f8 // indirect
7676
go.opentelemetry.io/collector/config/configcompression v1.23.1-0.20250114172347-71aae791d7f8 // indirect
7777
go.opentelemetry.io/collector/config/configtelemetry v0.117.1-0.20250114172347-71aae791d7f8 // indirect

0 commit comments

Comments
 (0)