Skip to content

Commit f2d4182

Browse files
klauspostharshavardhana
authored andcommitted
Add OpenTelemetry types and handlers (#348)
1 parent b797652 commit f2d4182

6 files changed

+760
-4
lines changed

opentelemetry.go

+136
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Copyright (c) 2015-2025 MinIO, Inc.
2+
//
3+
// This file is part of MinIO Object Storage stack
4+
//
5+
// This program is free software: you can redistribute it and/or modify
6+
// it under the terms of the GNU Affero General Public License as
7+
// published by the Free Software Foundation, either version 3 of the
8+
// License, or (at your option) any later version.
9+
//
10+
// This program is distributed in the hope that it will be useful,
11+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
// GNU Affero General Public License for more details.
14+
//
15+
// You should have received a copy of the GNU Affero General Public License
16+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
17+
18+
package madmin
19+
20+
import (
21+
"context"
22+
"crypto/rsa"
23+
"encoding/json"
24+
"io"
25+
"net/http"
26+
27+
"github.com/minio/madmin-go/v4/estream"
28+
)
29+
30+
//go:generate msgp $GOFILE
31+
32+
//msgp:replace TraceType with:uint64
33+
34+
// ServiceTelemetryOpts is a request to add following types to tracing.
35+
type ServiceTelemetryOpts struct {
36+
// Types to add to tracing.
37+
Types TraceType `json:"types"`
38+
39+
// Public cert to encrypt stream.
40+
PubCert []byte
41+
42+
// Sample rate to set for this filter.
43+
// If <=0 or >=1 no sampling will be performed
44+
// and all hits will be traced.
45+
SampleRate float64 `json:"sampleRate"`
46+
47+
// Disable sampling and only do tracing when a trace id is set on incoming request.
48+
ParentOnly bool `json:"parentOnly"`
49+
50+
// Tag adds a `custom.tag` field to all traces triggered by this.
51+
TagKV map[string]string `json:"tags"`
52+
53+
// On incoming HTTP types, only trigger if substring is in request.
54+
HTTPFilter struct {
55+
Func string `json:"funcFilter"`
56+
UserAgent string `json:"userAgent"`
57+
Header map[string]string `json:"header"`
58+
} `json:"httpFilter"`
59+
}
60+
61+
//msgp:ignore ServiceTelemetry
62+
63+
// ServiceTelemetry holds http telemetry spans, serialized and compressed.
64+
type ServiceTelemetry struct {
65+
SpanMZ []byte // Serialized and Compressed spans.
66+
Err error // Any error that occurred
67+
}
68+
69+
// ServiceTelemetryStream - gets raw stream for service telemetry.
70+
func (adm AdminClient) ServiceTelemetryStream(ctx context.Context, opts ServiceTelemetryOpts) (io.ReadCloser, error) {
71+
bopts, err := json.Marshal(opts)
72+
if err != nil {
73+
return nil, err
74+
}
75+
reqData := requestData{
76+
relPath: adminAPIPrefixV4 + "/telemetry",
77+
content: bopts,
78+
}
79+
// Execute GET to call trace handler
80+
resp, err := adm.executeMethod(ctx, http.MethodPost, reqData)
81+
if err != nil {
82+
return nil, err
83+
}
84+
if resp.StatusCode != http.StatusOK {
85+
closeResponse(resp)
86+
return nil, httpRespToErrorResponse(resp)
87+
}
88+
89+
return resp.Body, nil
90+
}
91+
92+
// ServiceTelemetry - perform trace request and return individual packages.
93+
// If options contains a public key the private key must be provided.
94+
// If context is canceled the function will return.
95+
func (adm AdminClient) ServiceTelemetry(ctx context.Context, opts ServiceTelemetryOpts, dst chan<- ServiceTelemetry, pk *rsa.PrivateKey) {
96+
defer close(dst)
97+
resp, err := adm.ServiceTelemetryStream(ctx, opts)
98+
if err != nil {
99+
dst <- ServiceTelemetry{Err: err}
100+
return
101+
}
102+
dec, err := estream.NewReader(resp)
103+
if err != nil {
104+
dst <- ServiceTelemetry{Err: err}
105+
return
106+
}
107+
if pk != nil {
108+
dec.SetPrivateKey(pk)
109+
}
110+
for {
111+
st, err := dec.NextStream()
112+
if err != nil {
113+
dst <- ServiceTelemetry{Err: err}
114+
return
115+
}
116+
if ctx.Err() != nil {
117+
return
118+
}
119+
block, err := io.ReadAll(st)
120+
if err == nil && len(block) == 0 {
121+
// Ignore 0 sized blocks.
122+
continue
123+
}
124+
if ctx.Err() != nil {
125+
return
126+
}
127+
select {
128+
case <-ctx.Done():
129+
return
130+
case dst <- ServiceTelemetry{SpanMZ: block, Err: err}:
131+
if err != nil {
132+
return
133+
}
134+
}
135+
}
136+
}

0 commit comments

Comments
 (0)