Skip to content

Commit 0138e2e

Browse files
authored
Merge pull request #1733 from kane8n/add-splunk-provider
Add Splunk as a metrics provider
2 parents 30f4b25 + d4bd0f2 commit 0138e2e

File tree

9 files changed

+448
-0
lines changed

9 files changed

+448
-0
lines changed

artifacts/flagger/crd.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -1304,6 +1304,7 @@ spec:
13041304
- graphite
13051305
- dynatrace
13061306
- keptn
1307+
- splunk
13071308
address:
13081309
description: API address of this provider
13091310
type: string

charts/flagger/crds/crd.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -1304,6 +1304,7 @@ spec:
13041304
- graphite
13051305
- dynatrace
13061306
- keptn
1307+
- splunk
13071308
address:
13081309
description: API address of this provider
13091310
type: string

docs/gitbook/usage/metrics.md

+51
Original file line numberDiff line numberDiff line change
@@ -730,3 +730,54 @@ Only relevant if the `type` is set to `analysis`.
730730

731731
For the type `analysis`, the value returned by the provider is either `0`
732732
(if the analysis failed), or `1` (analysis passed).
733+
734+
## Splunk
735+
736+
You can create custom metric checks using the Splunk provider.
737+
738+
Create a secret that contains your authentication token that can be found in the Splunk o11y UI.
739+
740+
```yaml
741+
apiVersion: v1
742+
kind: Secret
743+
metadata:
744+
name: splunk
745+
namespace: istio-system
746+
data:
747+
sf_token_key: your-access-token
748+
```
749+
750+
Splunk template example:
751+
752+
```yaml
753+
apiVersion: flagger.app/v1beta1
754+
kind: MetricTemplate
755+
metadata:
756+
name: success-rate
757+
namespace: istio-system
758+
spec:
759+
provider:
760+
type: splunk
761+
address: https://api.<REALM>.signalfx.com
762+
secretRef:
763+
name: splunk
764+
query: |
765+
total = data('traces.count', filter=filter('sf_service', '{{target}}')).sum().publish(enable=False)
766+
success = data('traces.count', filter=filter('sf_service', '{{target}}') and filter('sf_error', 'false')).sum().publish(enable=False)
767+
((success/total) * 100).publish()
768+
```
769+
The query format documentation can be found [here](https://dev.splunk.com/observability/docs/signalflow).
770+
771+
Reference the template in the canary analysis:
772+
773+
```yaml
774+
analysis:
775+
metrics:
776+
- name: "success rate"
777+
templateRef:
778+
name: success-rate
779+
namespace: istio-system
780+
thresholdRange:
781+
max: 99
782+
interval: 1m
783+
```

go.mod

+3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ require (
1414
github.com/hashicorp/go-retryablehttp v0.7.7
1515
github.com/influxdata/influxdb-client-go/v2 v2.13.0
1616
github.com/prometheus/client_golang v1.20.5
17+
github.com/signalfx/signalflow-client-go v0.1.0
18+
github.com/signalfx/signalfx-go v1.34.0
1719
github.com/stretchr/testify v1.9.0
1820
go.uber.org/zap v1.27.0
1921
golang.org/x/sync v0.9.0
@@ -50,6 +52,7 @@ require (
5052
github.com/google/gofuzz v1.2.0 // indirect
5153
github.com/google/s2a-go v0.1.8 // indirect
5254
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
55+
github.com/gorilla/websocket v1.5.1 // indirect
5356
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect
5457
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
5558
github.com/imdario/mergo v0.3.15 // indirect

go.sum

+6
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gT
9999
github.com/googleapis/enterprise-certificate-proxy v0.3.4/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA=
100100
github.com/googleapis/gax-go/v2 v2.14.0 h1:f+jMrjBPl+DL9nI4IQzLUxMq7XrAqFYB7hBPqMNIe8o=
101101
github.com/googleapis/gax-go/v2 v2.14.0/go.mod h1:lhBCnjdLrWRaPvLWhmc8IS24m9mr07qSYnHncrgo+zk=
102+
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
103+
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
102104
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
103105
github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542/go.mod h1:Ow0tF8D4Kplbc8s8sSb3V2oUCygFHVp8gC3Dn6U4MNI=
104106
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
@@ -172,6 +174,10 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg
172174
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
173175
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
174176
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
177+
github.com/signalfx/signalflow-client-go v0.1.0 h1:aqyt+st3/y8x8JtuwYRL9pOkOTJb+KeCoRWi0SuY5vw=
178+
github.com/signalfx/signalflow-client-go v0.1.0/go.mod h1:mY4DTAZuLHyMNGBjSrNdCg5kUU0hSkYjukAnjsVbsQs=
179+
github.com/signalfx/signalfx-go v1.34.0 h1:OQ6tyMY4efWB57EPIQqrpWrAfcSdyfa+bLtmAe7GLfE=
180+
github.com/signalfx/signalfx-go v1.34.0/go.mod h1:IpGZLPvCKNFyspAXoS480jB02mocTpo0KYd8jbl6/T8=
175181
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
176182
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
177183
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=

kustomize/base/flagger/crd.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -1304,6 +1304,7 @@ spec:
13041304
- graphite
13051305
- dynatrace
13061306
- keptn
1307+
- splunk
13071308
address:
13081309
description: API address of this provider
13091310
type: string

pkg/metrics/providers/factory.go

+2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ func (factory Factory) Provider(metricInterval string, provider flaggerv1.Metric
4343
return NewDynatraceProvider(metricInterval, provider, credentials)
4444
case "keptn":
4545
return NewKeptnProvider(config)
46+
case "splunk":
47+
return NewSplunkProvider(metricInterval, provider, credentials)
4648
default:
4749
return NewPrometheusProvider(provider, credentials)
4850
}

pkg/metrics/providers/splunk.go

+195
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
Copyright 2024 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package providers
18+
19+
import (
20+
"cmp"
21+
"context"
22+
"fmt"
23+
"io"
24+
"net/http"
25+
"slices"
26+
"strings"
27+
"time"
28+
29+
"github.com/signalfx/signalflow-client-go/signalflow"
30+
"github.com/signalfx/signalflow-client-go/signalflow/messages"
31+
32+
flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
33+
)
34+
35+
// https://dev.splunk.com/observability/reference
36+
const (
37+
signalFxSignalFlowApiPath = "/v2/signalflow"
38+
signalFxValidationPath = "/v2/metric?limit=1"
39+
40+
signalFxTokenSecretKey = "sf_token_key"
41+
42+
signalFxTokenHeaderKey = "X-SF-Token"
43+
44+
signalFxFromDeltaMultiplierOnMetricInterval = 10
45+
)
46+
47+
// SplunkProvider executes signalfx queries
48+
type SplunkProvider struct {
49+
metricsQueryEndpoint string
50+
apiValidationEndpoint string
51+
52+
timeout time.Duration
53+
token string
54+
fromDelta int64
55+
}
56+
57+
type splunkResponse struct {
58+
}
59+
60+
// NewSplunkProvider takes a canary spec, a provider spec and the credentials map, and
61+
// returns a Splunk client ready to execute queries against the API
62+
func NewSplunkProvider(metricInterval string,
63+
provider flaggerv1.MetricTemplateProvider,
64+
credentials map[string][]byte) (*SplunkProvider, error) {
65+
66+
address := provider.Address
67+
if address == "" {
68+
return nil, fmt.Errorf("splunk endpoint is not set")
69+
}
70+
71+
sp := SplunkProvider{
72+
timeout: 5 * time.Second,
73+
// Convert the configured address to match the protocol of the respective API
74+
// ex.
75+
// https://api.<REALM>.signalfx.com -> wss://stream.<REALM>.signalfx.com
76+
// wss://stream.<REALM>.signalfx.com -> wss://stream.<REALM>.signalfx.com
77+
metricsQueryEndpoint: strings.Replace(strings.Replace(address+signalFxSignalFlowApiPath, "http", "ws", 1), "api", "stream", 1),
78+
// ex.
79+
// https://api.<REALM>.signalfx.com -> https://api.<REALM>.signalfx.com
80+
// wss://stream.<REALM>.signalfx.com -> https://api.<REALM>.signalfx.com
81+
apiValidationEndpoint: strings.Replace(strings.Replace(address+signalFxValidationPath, "ws", "http", 1), "stream", "api", 1),
82+
}
83+
84+
if b, ok := credentials[signalFxTokenSecretKey]; ok {
85+
sp.token = string(b)
86+
} else {
87+
return nil, fmt.Errorf("splunk credentials does not contain sf_token_key")
88+
}
89+
90+
md, err := time.ParseDuration(metricInterval)
91+
if err != nil {
92+
return nil, fmt.Errorf("error parsing metric interval: %w", err)
93+
}
94+
95+
sp.fromDelta = int64(signalFxFromDeltaMultiplierOnMetricInterval * md.Milliseconds())
96+
return &sp, nil
97+
}
98+
99+
// RunQuery executes the query and converts the first result to float64
100+
func (p *SplunkProvider) RunQuery(query string) (float64, error) {
101+
c, err := signalflow.NewClient(signalflow.StreamURL(p.metricsQueryEndpoint), signalflow.AccessToken(p.token))
102+
if err != nil {
103+
return 0, fmt.Errorf("error creating signalflow client: %w", err)
104+
}
105+
106+
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
107+
defer cancel()
108+
109+
now := time.Now().UnixMilli()
110+
comp, err := c.Execute(ctx, &signalflow.ExecuteRequest{
111+
Program: query,
112+
Start: time.UnixMilli(now - p.fromDelta),
113+
Stop: time.UnixMilli(now),
114+
Immediate: true,
115+
})
116+
if err != nil {
117+
return 0, fmt.Errorf("error executing query: %w", err)
118+
}
119+
120+
payloads := p.receivePaylods(comp)
121+
122+
if comp.Err() != nil {
123+
return 0, fmt.Errorf("error executing query: %w", comp.Err())
124+
}
125+
126+
payloads = slices.DeleteFunc(payloads, func(msg messages.DataPayload) bool {
127+
return msg.Value() == nil
128+
})
129+
if len(payloads) < 1 {
130+
return 0, fmt.Errorf("invalid response: %w", ErrNoValuesFound)
131+
}
132+
133+
// Error when a SignalFlow query returns two or more results.
134+
// Since a different TSID is set for each metrics to be retrieved, eliminate duplicate TSIDs and determine if two or more TSIDs exist.
135+
_payloads := slices.Clone(payloads)
136+
slices.SortFunc(_payloads, func(i, j messages.DataPayload) int {
137+
return cmp.Compare(i.TSID, j.TSID)
138+
})
139+
if len(slices.CompactFunc(_payloads, func(i, j messages.DataPayload) bool { return i.TSID == j.TSID })) > 1 {
140+
return 0, fmt.Errorf("invalid response: %w", ErrMultipleValuesReturned)
141+
}
142+
143+
payload := payloads[len(payloads)-1]
144+
switch payload.Type {
145+
case messages.ValTypeLong:
146+
return float64(payload.Int64()), nil
147+
case messages.ValTypeDouble:
148+
return payload.Float64(), nil
149+
case messages.ValTypeInt:
150+
return float64(payload.Int32()), nil
151+
default:
152+
return 0, fmt.Errorf("invalid response: UnsupportedValueType")
153+
}
154+
}
155+
156+
func (p *SplunkProvider) receivePaylods(comp *signalflow.Computation) []messages.DataPayload {
157+
payloads := []messages.DataPayload{}
158+
for dataMsg := range comp.Data() {
159+
if dataMsg == nil {
160+
continue
161+
}
162+
payloads = append(payloads, dataMsg.Payloads...)
163+
}
164+
return payloads
165+
}
166+
167+
// IsOnline calls the provider endpoint and returns an error if the API is unreachable
168+
func (p *SplunkProvider) IsOnline() (bool, error) {
169+
req, err := http.NewRequest("GET", p.apiValidationEndpoint, nil)
170+
if err != nil {
171+
return false, fmt.Errorf("error http.NewRequest: %w", err)
172+
}
173+
174+
req.Header.Add(signalFxTokenHeaderKey, p.token)
175+
176+
ctx, cancel := context.WithTimeout(req.Context(), p.timeout)
177+
defer cancel()
178+
r, err := http.DefaultClient.Do(req.WithContext(ctx))
179+
if err != nil {
180+
return false, fmt.Errorf("request failed: %w", err)
181+
}
182+
183+
defer r.Body.Close()
184+
185+
b, err := io.ReadAll(r.Body)
186+
if err != nil {
187+
return false, fmt.Errorf("error reading body: %w", err)
188+
}
189+
190+
if r.StatusCode != http.StatusOK {
191+
return false, fmt.Errorf("error response: %s", string(b))
192+
}
193+
194+
return true, nil
195+
}

0 commit comments

Comments
 (0)