Skip to content

Commit 85c595d

Browse files
feat(externalmetrics): implement ExternalMetricsProvider for querying external metrics
Co-authored-by: Johan Lore <[email protected]> Co-authored-by: Maxime Véroone <[email protected]>
1 parent 3a27fd1 commit 85c595d

File tree

5 files changed

+287
-12
lines changed

5 files changed

+287
-12
lines changed

go.mod

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ require (
2424
google.golang.org/grpc v1.76.0
2525
google.golang.org/protobuf v1.36.10
2626
gopkg.in/h2non/gock.v1 v1.1.2
27-
k8s.io/api v0.34.1
28-
k8s.io/apimachinery v0.34.1
29-
k8s.io/client-go v0.34.1
30-
k8s.io/code-generator v0.34.1
27+
k8s.io/api v0.34.2
28+
k8s.io/apimachinery v0.34.2
29+
k8s.io/client-go v0.34.2
30+
k8s.io/code-generator v0.34.2
3131
k8s.io/klog/v2 v2.130.1
32+
k8s.io/metrics v0.34.2
3233
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397
3334
knative.dev/serving v0.46.6
3435
)

go.sum

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -275,20 +275,22 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
275275
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
276276
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
277277
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
278-
k8s.io/api v0.34.1 h1:jC+153630BMdlFukegoEL8E/yT7aLyQkIVuwhmwDgJM=
279-
k8s.io/api v0.34.1/go.mod h1:SB80FxFtXn5/gwzCoN6QCtPD7Vbu5w2n1S0J5gFfTYk=
280-
k8s.io/apimachinery v0.34.1 h1:dTlxFls/eikpJxmAC7MVE8oOeP1zryV7iRyIjB0gky4=
281-
k8s.io/apimachinery v0.34.1/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw=
282-
k8s.io/client-go v0.34.1 h1:ZUPJKgXsnKwVwmKKdPfw4tB58+7/Ik3CrjOEhsiZ7mY=
283-
k8s.io/client-go v0.34.1/go.mod h1:kA8v0FP+tk6sZA0yKLRG67LWjqufAoSHA2xVGKw9Of8=
284-
k8s.io/code-generator v0.34.1 h1:WpphT26E+j7tEgIUfFr5WfbJrktCGzB3JoJH9149xYc=
285-
k8s.io/code-generator v0.34.1/go.mod h1:DeWjekbDnJWRwpw3s0Jat87c+e0TgkxoR4ar608yqvg=
278+
k8s.io/api v0.34.2 h1:fsSUNZhV+bnL6Aqrp6O7lMTy6o5x2C4XLjnh//8SLYY=
279+
k8s.io/api v0.34.2/go.mod h1:MMBPaWlED2a8w4RSeanD76f7opUoypY8TFYkSM+3XHw=
280+
k8s.io/apimachinery v0.34.2 h1:zQ12Uk3eMHPxrsbUJgNF8bTauTVR2WgqJsTmwTE/NW4=
281+
k8s.io/apimachinery v0.34.2/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw=
282+
k8s.io/client-go v0.34.2 h1:Co6XiknN+uUZqiddlfAjT68184/37PS4QAzYvQvDR8M=
283+
k8s.io/client-go v0.34.2/go.mod h1:2VYDl1XXJsdcAxw7BenFslRQX28Dxz91U9MWKjX97fE=
284+
k8s.io/code-generator v0.34.2 h1:9bG6jTxmsU3HXE5BNYJTC8AZ1D6hVVfkm8yYSkdkGY0=
285+
k8s.io/code-generator v0.34.2/go.mod h1:dnDDEd6S/z4uZ+PG1aE58ySCi/lR4+qT3a4DddE4/2I=
286286
k8s.io/gengo/v2 v2.0.0-20250604051438-85fd79dbfd9f h1:SLb+kxmzfA87x4E4brQzB33VBbT2+x7Zq9ROIHmGn9Q=
287287
k8s.io/gengo/v2 v2.0.0-20250604051438-85fd79dbfd9f/go.mod h1:EJykeLsmFC60UQbYJezXkEsG2FLrt0GPNkU5iK5GWxU=
288288
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
289289
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
290290
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b h1:MloQ9/bdJyIu9lb1PzujOPolHyvO06MXG5TUIj2mNAA=
291291
k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b/go.mod h1:UZ2yyWbFTpuhSbFhv24aGNOdoRdJZgsIObGBUaYVsts=
292+
k8s.io/metrics v0.34.2 h1:zao91FNDVPRGIiHLO2vqqe21zZVPien1goyzn0hsz90=
293+
k8s.io/metrics v0.34.2/go.mod h1:Ydulln+8uZZctUM8yrUQX4rfq/Ay6UzsuXf24QJ37Vc=
292294
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 h1:hwvWFiBzdWw1FhfY1FooPn3kzWuJ8tmbZBHi4zVsl1Y=
293295
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
294296
knative.dev/networking v0.0.0-20250902160145-7dad473f6351 h1:Gv/UqbN0AK+ORoT5e2Kg+3+uMW/y9CCdhpXKxYaVV6k=
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
Copyright 2020 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package providers
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
"fmt"
23+
"io"
24+
"net"
25+
"net/http"
26+
"net/url"
27+
"time"
28+
29+
flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
30+
"k8s.io/metrics/pkg/apis/external_metrics"
31+
)
32+
33+
const (
34+
metricServiceEndpointPath = "/apis/external.metrics.k8s.io/v1beta1"
35+
namespacesPath = "/namespaces/"
36+
37+
autorisationHeaderKey = "Authorization"
38+
applicationBearerToken = "bearerToken"
39+
)
40+
41+
// ExternalMetricsProvider executes datadog queries
42+
type ExternalMetricsProvider struct {
43+
metricServiceEndpoint string
44+
bearerToken string // Find out if we can get authoritative answer that this is the standard
45+
46+
timeout time.Duration
47+
unsafeSsl bool
48+
}
49+
50+
// NewExternalMetricsProvider takes a canary spec, a provider spec, and
51+
// returns a client ready to execute queries against the Service
52+
func NewExternalMetricsProvider(metricInterval string,
53+
provider flaggerv1.MetricTemplateProvider,
54+
credentials map[string][]byte) (*ExternalMetricsProvider, error) {
55+
56+
if provider.Address == "" {
57+
return nil, fmt.Errorf("the Url of the external metric service must be provided")
58+
}
59+
60+
externalMetrics := ExternalMetricsProvider{
61+
metricServiceEndpoint: fmt.Sprintf("%s%s", provider.Address, metricServiceEndpointPath),
62+
timeout: 5 * time.Second,
63+
unsafeSsl: provider.InsecureSkipVerify,
64+
}
65+
66+
if b, ok := credentials[applicationBearerToken]; ok {
67+
externalMetrics.bearerToken = string(b)
68+
}
69+
70+
return &externalMetrics, nil
71+
}
72+
73+
// RunQuery retrieves the ExternalMetricValue from the ExternalMetricsProvider.metricServiceUrl
74+
// and returns the first result as a float64
75+
func (p *ExternalMetricsProvider) RunQuery(query string) (float64, error) {
76+
77+
metricsQueryUrl := fmt.Sprintf("%s%s%s", p.metricServiceEndpoint, namespacesPath, query)
78+
//TODO add labelSelector as queryString (in the docs of this provider as it's embedded in the query string)
79+
80+
req, err := http.NewRequest("GET", metricsQueryUrl, nil)
81+
if err != nil {
82+
return 0, fmt.Errorf("error http.NewRequest: %w", err)
83+
}
84+
if p.bearerToken != "" {
85+
req.Header.Add(autorisationHeaderKey, fmt.Sprintf("Bearer %s", p.bearerToken))
86+
}
87+
88+
ctx, cancel := context.WithTimeout(req.Context(), p.timeout)
89+
defer cancel()
90+
r, err := http.DefaultClient.Do(req.WithContext(ctx))
91+
if err != nil {
92+
return 0, fmt.Errorf("request failed: %w", err)
93+
}
94+
95+
defer r.Body.Close()
96+
b, err := io.ReadAll(r.Body)
97+
if err != nil {
98+
return 0, fmt.Errorf("error reading body: %w", err)
99+
}
100+
101+
if r.StatusCode != http.StatusOK {
102+
return 0, fmt.Errorf("error response: %s: %w", string(b), err)
103+
}
104+
105+
var res external_metrics.ExternalMetricValueList
106+
if err := json.Unmarshal(b, &res); err != nil {
107+
return 0, fmt.Errorf("error unmarshaling result: %w, '%s'", err, string(b))
108+
}
109+
110+
if len(res.Items) < 1 {
111+
return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound)
112+
}
113+
114+
// TODO
115+
vs := res.Items[0].Value.AsApproximateFloat64()
116+
117+
return vs, nil
118+
}
119+
120+
// IsOnline will only check the TCP endpoint reachability,
121+
// given that external metric servers don't have a common health check endpoint defined
122+
func (p *ExternalMetricsProvider) IsOnline() (bool, error) {
123+
var d net.Dialer
124+
125+
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
126+
defer cancel()
127+
128+
metricServiceUrl, err := url.Parse(p.metricServiceEndpoint)
129+
if err != nil {
130+
return false, fmt.Errorf("error parsing metric service url: %w", err)
131+
}
132+
133+
conn, err := d.DialContext(ctx, "tcp", metricServiceUrl.Host)
134+
defer conn.Close()
135+
if err != nil {
136+
return false, fmt.Errorf("connection failed: %w", err)
137+
}
138+
return true, err
139+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
Copyright 2020 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package providers
18+
19+
import (
20+
json2 "encoding/json"
21+
"errors"
22+
"fmt"
23+
"net/http"
24+
"net/http/httptest"
25+
"strconv"
26+
"testing"
27+
"time"
28+
29+
"github.com/stretchr/testify/assert"
30+
"github.com/stretchr/testify/require"
31+
"k8s.io/apimachinery/pkg/api/resource"
32+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
34+
flaggerv1 "github.com/fluxcd/flagger/pkg/apis/flagger/v1beta1"
35+
"k8s.io/metrics/pkg/apis/external_metrics"
36+
)
37+
38+
func TestNewExternalMetricsProvider(t *testing.T) {
39+
bt := "token"
40+
cs := map[string][]byte{
41+
"bearerToken": []byte(bt),
42+
}
43+
44+
provider := flaggerv1.MetricTemplateProvider{
45+
Address: "https://external-metrics.default.svc.cluster.local",
46+
InsecureSkipVerify: false,
47+
}
48+
49+
emp, err := NewExternalMetricsProvider("100s", provider, cs)
50+
require.NoError(t, err)
51+
assert.Equal(t, "https://external-metrics.default.svc.cluster.local/apis/external.metrics.k8s.io/v1beta1", emp.metricServiceEndpoint)
52+
assert.Equal(t, provider.InsecureSkipVerify, emp.unsafeSsl)
53+
assert.Equal(t, 5*time.Second, emp.timeout)
54+
assert.Equal(t, bt, emp.bearerToken)
55+
}
56+
57+
func TestExternalMetrics_RunQuery(t *testing.T) {
58+
token := "mytoken"
59+
t.Run("ok", func(t *testing.T) {
60+
expected := 1.11111
61+
eq := `default/myMetric?label1=value1`
62+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
63+
aq := fmt.Sprintf("%s?%s", r.URL.EscapedPath(), r.URL.RawQuery)
64+
assert.Equal(t, fmt.Sprintf("%s%s%s", metricServiceEndpointPath, namespacesPath, eq), aq)
65+
assert.Equal(t, fmt.Sprintf("Bearer %s", token), r.Header.Get(autorisationHeaderKey))
66+
67+
q, err := resource.ParseQuantity(strconv.FormatFloat(expected, 'f', -1, 64))
68+
assert.NoError(t, err)
69+
70+
ret := &external_metrics.ExternalMetricValueList{
71+
TypeMeta: v1.TypeMeta{
72+
APIVersion: "external.metrics.k8s.io/v1beta1",
73+
Kind: "ExternalMetricValueList",
74+
},
75+
ListMeta: v1.ListMeta{},
76+
Items: []external_metrics.ExternalMetricValue{
77+
{
78+
MetricName: "myMetric",
79+
MetricLabels: map[string]string{
80+
"label1": "value1",
81+
},
82+
Value: q,
83+
},
84+
},
85+
}
86+
json, err := json2.Marshal(ret)
87+
assert.NoError(t, err)
88+
w.Write(json)
89+
}))
90+
defer ts.Close()
91+
92+
dp, err := NewExternalMetricsProvider("1m",
93+
flaggerv1.MetricTemplateProvider{Address: ts.URL},
94+
map[string][]byte{
95+
applicationBearerToken: []byte(token),
96+
},
97+
)
98+
require.NoError(t, err)
99+
100+
f, err := dp.RunQuery(eq)
101+
require.NoError(t, err)
102+
assert.Equal(t, expected, f)
103+
})
104+
105+
t.Run("no values", func(t *testing.T) {
106+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
107+
ret := &external_metrics.ExternalMetricValueList{
108+
TypeMeta: v1.TypeMeta{
109+
APIVersion: "external.metrics.k8s.io/v1beta1",
110+
Kind: "ExternalMetricValueList",
111+
},
112+
ListMeta: v1.ListMeta{},
113+
Items: []external_metrics.ExternalMetricValue{},
114+
}
115+
json, err := json2.Marshal(ret)
116+
assert.NoError(t, err)
117+
w.Write(json)
118+
}))
119+
defer ts.Close()
120+
121+
dp, err := NewExternalMetricsProvider("1m",
122+
flaggerv1.MetricTemplateProvider{Address: ts.URL},
123+
map[string][]byte{
124+
applicationBearerToken: []byte(token),
125+
},
126+
)
127+
require.NoError(t, err)
128+
_, err = dp.RunQuery("")
129+
require.True(t, errors.Is(err, ErrNoValuesFound))
130+
})
131+
}

pkg/metrics/providers/factory.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ func (factory Factory) Provider(metricInterval string, provider flaggerv1.Metric
2929
return NewPrometheusProvider(provider, credentials)
3030
case "datadog":
3131
return NewDatadogProvider(metricInterval, provider, credentials)
32+
case "externalmetrics":
33+
return NewExternalMetricsProvider(metricInterval, provider, credentials)
3234
case "cloudwatch":
3335
return NewCloudWatchProvider(metricInterval, provider)
3436
case "newrelic":

0 commit comments

Comments
 (0)