@@ -22,23 +22,26 @@ import (
22
22
"io"
23
23
"log/slog"
24
24
"net/http"
25
+ "net/url"
26
+ "path"
25
27
"strconv"
26
28
"strings"
27
29
"time"
28
30
29
31
"github.com/klauspost/compress/snappy"
30
32
"google.golang.org/protobuf/proto"
31
33
32
- "github.com/prometheus/client_golang/api"
33
- "github.com/prometheus/client_golang/internal/github.com/efficientgo/core/backoff"
34
+ "github.com/prometheus/client_golang/exp/internal/github.com/efficientgo/core/backoff"
34
35
)
35
36
36
37
// API is a client for Prometheus Remote Protocols.
37
38
// NOTE(bwplotka): Only https://prometheus.io/docs/specs/remote_write_spec_2_0/ is currently implemented,
38
39
// read protocols to be implemented if there will be a demand.
39
40
type API struct {
40
- client api.Client
41
- opts apiOpts
41
+ baseURL * url.URL
42
+ client * http.Client
43
+
44
+ opts apiOpts
42
45
43
46
reqBuf , comprBuf []byte
44
47
}
@@ -92,6 +95,14 @@ func WithAPINoRetryOnRateLimit() APIOption {
92
95
}
93
96
}
94
97
98
+ // WithAPIBackoff returns APIOption that allows overriding backoff configuration.
99
+ func WithAPIBackoff (backoff backoff.Config ) APIOption {
100
+ return func (o * apiOpts ) error {
101
+ o .backoff = backoff
102
+ return nil
103
+ }
104
+ }
105
+
95
106
type nopSlogHandler struct {}
96
107
97
108
func (n nopSlogHandler ) Enabled (context.Context , slog.Level ) bool { return false }
@@ -103,7 +114,12 @@ func (n nopSlogHandler) WithGroup(string) slog.Handler { return n }
103
114
//
104
115
// It is not safe to use the returned API from multiple goroutines, create a
105
116
// separate *API for each goroutine.
106
- func NewAPI (c api.Client , opts ... APIOption ) (* API , error ) {
117
+ func NewAPI (client * http.Client , baseURL string , opts ... APIOption ) (* API , error ) {
118
+ parsedURL , err := url .Parse (baseURL )
119
+ if err != nil {
120
+ return nil , fmt .Errorf ("invalid base URL: %w" , err )
121
+ }
122
+
107
123
o := * defaultAPIOpts
108
124
for _ , opt := range opts {
109
125
if err := opt (& o ); err != nil {
@@ -115,9 +131,16 @@ func NewAPI(c api.Client, opts ...APIOption) (*API, error) {
115
131
o .logger = slog .New (nopSlogHandler {})
116
132
}
117
133
134
+ if client == nil {
135
+ client = http .DefaultClient
136
+ }
137
+
138
+ parsedURL .Path = path .Join (parsedURL .Path , o .path )
139
+
118
140
return & API {
119
- client : c ,
120
- opts : o ,
141
+ opts : o ,
142
+ client : client ,
143
+ baseURL : parsedURL ,
121
144
}, nil
122
145
}
123
146
@@ -157,6 +180,9 @@ type v2Request interface {
157
180
// - If neither is supported, it will marshaled using generic google.golang.org/protobuf methods and
158
181
// error out on unknown scheme.
159
182
func (r * API ) Write (ctx context.Context , msg any ) (_ WriteResponseStats , err error ) {
183
+ // Reset the buffer.
184
+ r .reqBuf = r .reqBuf [:0 ]
185
+
160
186
// Detect content-type.
161
187
cType := WriteProtoFullNameV1
162
188
if _ , ok := msg .(v2Request ); ok {
@@ -189,7 +215,6 @@ func (r *API) Write(ctx context.Context, msg any) (_ WriteResponseStats, err err
189
215
}
190
216
case proto.Message :
191
217
// Generic proto.
192
- r .reqBuf = r .reqBuf [:0 ]
193
218
r .reqBuf , err = (proto.MarshalOptions {}).MarshalAppend (r .reqBuf , m )
194
219
if err != nil {
195
220
return WriteResponseStats {}, fmt .Errorf ("encoding request %w" , err )
@@ -266,8 +291,7 @@ func compressPayload(tmpbuf *[]byte, enc Compression, inp []byte) (compressed []
266
291
}
267
292
268
293
func (r * API ) attemptWrite (ctx context.Context , compr Compression , proto WriteProtoFullName , payload []byte , attempt int ) (WriteResponseStats , error ) {
269
- u := r .client .URL (r .opts .path , nil )
270
- req , err := http .NewRequest (http .MethodPost , u .String (), bytes .NewReader (payload ))
294
+ req , err := http .NewRequest (http .MethodPost , r .baseURL .String (), bytes .NewReader (payload ))
271
295
if err != nil {
272
296
// Errors from NewRequest are from unparsable URLs, so are not
273
297
// recoverable.
@@ -287,11 +311,17 @@ func (r *API) attemptWrite(ctx context.Context, compr Compression, proto WritePr
287
311
req .Header .Set ("Retry-Attempt" , strconv .Itoa (attempt ))
288
312
}
289
313
290
- resp , body , err := r .client .Do (ctx , req )
314
+ resp , err := r .client .Do (req . WithContext ( ctx ) )
291
315
if err != nil {
292
316
// Errors from Client.Do are likely network errors, so recoverable.
293
317
return WriteResponseStats {}, retryableError {err , 0 }
294
318
}
319
+ defer resp .Body .Close ()
320
+
321
+ body , err := io .ReadAll (resp .Body )
322
+ if err != nil {
323
+ return WriteResponseStats {}, fmt .Errorf ("reading response body: %w" , err )
324
+ }
295
325
296
326
rs := WriteResponseStats {}
297
327
if proto == WriteProtoFullNameV2 {
@@ -334,19 +364,14 @@ type writeStorage interface {
334
364
Store (ctx context.Context , proto WriteProtoFullName , serializedRequest []byte ) (_ WriteResponseStats , code int , _ error )
335
365
}
336
366
337
- // remoteWriteDecompressor is an interface that allows decompressing the body of the request.
338
- type remoteWriteDecompressor interface {
339
- Decompress (ctx context.Context , body io.ReadCloser ) (decompressed []byte , _ error )
340
- }
341
-
342
367
type handler struct {
343
368
store writeStorage
344
369
opts handlerOpts
345
370
}
346
371
347
372
type handlerOpts struct {
348
- logger * slog.Logger
349
- decompressor remoteWriteDecompressor
373
+ logger * slog.Logger
374
+ middlewares [] func (http. Handler ) http. Handler
350
375
}
351
376
352
377
// HandlerOption represents an option for the handler.
@@ -360,22 +385,71 @@ func WithHandlerLogger(logger *slog.Logger) HandlerOption {
360
385
}
361
386
}
362
387
363
- // WithHandlerDecompressor returns HandlerOption that allows providing remoteWriteDecompressor.
364
- // By default, SimpleSnappyDecompressor is used.
365
- func WithHandlerDecompressor (decompressor remoteWriteDecompressor ) HandlerOption {
388
+ // WithHandlerMiddleware returns HandlerOption that allows providing middlewares.
389
+ // Multiple middlewares can be provided and will be applied in the order they are passed.
390
+ // When using this option, SnappyDecompressorMiddleware is not applied by default so
391
+ // it (or any other decompression middleware) needs to be added explicitly.
392
+ func WithHandlerMiddlewares (middlewares ... func (http.Handler ) http.Handler ) HandlerOption {
366
393
return func (o * handlerOpts ) {
367
- o .decompressor = decompressor
394
+ o .middlewares = middlewares
395
+ }
396
+ }
397
+
398
+ // SnappyDecompressorMiddleware returns a middleware that checks if the request body is snappy-encoded and decompresses it.
399
+ // If the request body is not snappy-encoded, it returns an error.
400
+ // Used by default in NewRemoteWriteHandler.
401
+ func SnappyDecompressorMiddleware (logger * slog.Logger ) func (http.Handler ) http.Handler {
402
+ return func (next http.Handler ) http.Handler {
403
+ return http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
404
+ enc := r .Header .Get ("Content-Encoding" )
405
+ if enc != "" && enc != string (SnappyBlockCompression ) {
406
+ err := fmt .Errorf ("%v encoding (compression) is not accepted by this server; only %v is acceptable" , enc , SnappyBlockCompression )
407
+ logger .Error ("Error decoding remote write request" , "err" , err )
408
+ http .Error (w , err .Error (), http .StatusUnsupportedMediaType )
409
+ return
410
+ }
411
+
412
+ // Read the request body.
413
+ bodyBytes , err := io .ReadAll (r .Body )
414
+ if err != nil {
415
+ logger .Error ("Error reading request body" , "err" , err )
416
+ http .Error (w , err .Error (), http .StatusBadRequest )
417
+ return
418
+ }
419
+
420
+ decompressed , err := snappy .Decode (nil , bodyBytes )
421
+ if err != nil {
422
+ // TODO(bwplotka): Add more context to responded error?
423
+ logger .Error ("Error snappy decoding remote write request" , "err" , err )
424
+ http .Error (w , err .Error (), http .StatusBadRequest )
425
+ return
426
+ }
427
+
428
+ // Replace the body with decompressed data
429
+ r .Body = io .NopCloser (bytes .NewReader (decompressed ))
430
+ next .ServeHTTP (w , r )
431
+ })
368
432
}
369
433
}
370
434
371
435
// NewRemoteWriteHandler returns HTTP handler that receives Remote Write 2.0
372
436
// protocol https://prometheus.io/docs/specs/remote_write_spec_2_0/.
373
437
func NewRemoteWriteHandler (store writeStorage , opts ... HandlerOption ) http.Handler {
374
- o := handlerOpts {logger : slog .New (nopSlogHandler {}), decompressor : & SimpleSnappyDecompressor {}}
438
+ o := handlerOpts {
439
+ logger : slog .New (nopSlogHandler {}),
440
+ middlewares : []func (http.Handler ) http.Handler {SnappyDecompressorMiddleware (slog .New (nopSlogHandler {}))},
441
+ }
375
442
for _ , opt := range opts {
376
443
opt (& o )
377
444
}
378
- return & handler {opts : o , store : store }
445
+ h := & handler {opts : o , store : store }
446
+
447
+ // Apply all middlewares in order
448
+ var handler http.Handler = h
449
+ for i := len (o .middlewares ) - 1 ; i >= 0 ; i -- {
450
+ handler = o.middlewares [i ](handler )
451
+ }
452
+ return handler
379
453
}
380
454
381
455
// ParseProtoMsg parses the content-type header and returns the proto message type.
@@ -412,10 +486,13 @@ func ParseProtoMsg(contentType string) (WriteProtoFullName, error) {
412
486
}
413
487
414
488
func (h * handler ) ServeHTTP (w http.ResponseWriter , r * http.Request ) {
489
+ if r .Method != http .MethodPost {
490
+ http .Error (w , "Method not allowed" , http .StatusMethodNotAllowed )
491
+ return
492
+ }
493
+
415
494
contentType := r .Header .Get ("Content-Type" )
416
495
if contentType == "" {
417
- // Don't break yolo 1.0 clients if not needed.
418
- // We could give http.StatusUnsupportedMediaType, but let's assume 1.0 message by default.
419
496
contentType = appProtoContentType
420
497
}
421
498
@@ -426,26 +503,15 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
426
503
return
427
504
}
428
505
429
- enc := r .Header .Get ("Content-Encoding" )
430
- if enc == "" {
431
- // Don't break yolo 1.0 clients if not needed. This is similar to what we did
432
- // before 2.0: https://github.com/prometheus/prometheus/blob/d78253319daa62c8f28ed47e40bafcad2dd8b586/storage/remote/write_handler.go#L62
433
- // We could give http.StatusUnsupportedMediaType, but let's assume snappy by default.
434
- } else if enc != string (SnappyBlockCompression ) {
435
- err := fmt .Errorf ("%v encoding (compression) is not accepted by this server; only %v is acceptable" , enc , SnappyBlockCompression )
436
- h .opts .logger .Error ("Error decoding remote write request" , "err" , err )
437
- http .Error (w , err .Error (), http .StatusUnsupportedMediaType )
438
- }
439
-
440
- // Decompress the request body.
441
- decompressed , err := h .opts .decompressor .Decompress (r .Context (), r .Body )
506
+ // Read the already decompressed body
507
+ body , err := io .ReadAll (r .Body )
442
508
if err != nil {
443
- h .opts .logger .Error ("Error decompressing remote write request" , "err" , err .Error ())
509
+ h .opts .logger .Error ("Error reading request body " , "err" , err .Error ())
444
510
http .Error (w , err .Error (), http .StatusBadRequest )
445
511
return
446
512
}
447
513
448
- stats , code , storeErr := h .store .Store (r .Context (), msgType , decompressed )
514
+ stats , code , storeErr := h .store .Store (r .Context (), msgType , body )
449
515
450
516
// Set required X-Prometheus-Remote-Write-Written-* response headers, in all cases.
451
517
stats .SetHeaders (w )
@@ -454,32 +520,11 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
454
520
if code == 0 {
455
521
code = http .StatusInternalServerError
456
522
}
457
- if code / 5 == 100 { // 5xx
523
+ if code / 100 == 5 { // 5xx
458
524
h .opts .logger .Error ("Error while storing the remote write request" , "err" , storeErr .Error ())
459
525
}
460
526
http .Error (w , storeErr .Error (), code )
461
527
return
462
528
}
463
529
w .WriteHeader (http .StatusNoContent )
464
530
}
465
-
466
- // SimpleSnappyDecompressor is a simple implementation of the remoteWriteDecompressor interface.
467
- type SimpleSnappyDecompressor struct {}
468
-
469
- func (s * SimpleSnappyDecompressor ) Decompress (ctx context.Context , body io.ReadCloser ) (decompressed []byte , _ error ) {
470
- // Read the request body.
471
- bodyBytes , err := io .ReadAll (body )
472
- if err != nil {
473
- return nil , fmt .Errorf ("error reading request body: %w" , err )
474
- }
475
-
476
- decompressed , err = snappy .Decode (nil , bodyBytes )
477
- if err != nil {
478
- // TODO(bwplotka): Add more context to responded error?
479
- return nil , fmt .Errorf ("error snappy decoding request body: %w" , err )
480
- }
481
-
482
- return decompressed , nil
483
- }
484
-
485
- var _ remoteWriteDecompressor = & SimpleSnappyDecompressor {}
0 commit comments