Skip to content

Commit 46a8ce4

Browse files
committed
feature: httpclient supporting retry net.Client.Retry()
feature: retry() filter Signed-off-by: Sandor Szücs <[email protected]>
1 parent d6c49d7 commit 46a8ce4

10 files changed

+687
-8
lines changed

filters/builtin/builtin.go

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/zalando/skipper/filters/fadein"
1717
"github.com/zalando/skipper/filters/flowid"
1818
logfilter "github.com/zalando/skipper/filters/log"
19+
"github.com/zalando/skipper/filters/retry"
1920
"github.com/zalando/skipper/filters/rfc"
2021
"github.com/zalando/skipper/filters/scheduler"
2122
"github.com/zalando/skipper/filters/sed"
@@ -231,6 +232,7 @@ func Filters() []filters.Spec {
231232
fadein.NewEndpointCreated(),
232233
consistenthash.NewConsistentHashKey(),
233234
consistenthash.NewConsistentHashBalanceFactor(),
235+
retry.NewRetry(),
234236
tls.New(),
235237
}
236238
}

filters/filters.go

+1
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ const (
343343
FifoWithBodyName = "fifoWithBody"
344344
LifoName = "lifo"
345345
LifoGroupName = "lifoGroup"
346+
RetryName = "retry"
346347
RfcPathName = "rfcPath"
347348
RfcHostName = "rfcHost"
348349
BearerInjectorName = "bearerinjector"

filters/retry/retry.go

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package retry
2+
3+
import (
4+
"github.com/zalando/skipper/filters"
5+
)
6+
7+
type retry struct{}
8+
9+
// NewRetry creates a filter specification for the retry() filter
10+
func NewRetry() filters.Spec { return retry{} }
11+
12+
func (retry) Name() string { return filters.RetryName }
13+
func (retry) CreateFilter([]interface{}) (filters.Filter, error) { return retry{}, nil }
14+
func (retry) Response(filters.FilterContext) {}
15+
16+
func (retry) Request(ctx filters.FilterContext) {
17+
ctx.StateBag()[filters.RetryName] = struct{}{}
18+
}

filters/retry/retry_test.go

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package retry
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"net/http"
7+
"net/http/httptest"
8+
"testing"
9+
10+
"github.com/AlexanderYastrebov/noleak"
11+
"github.com/zalando/skipper/eskip"
12+
"github.com/zalando/skipper/filters"
13+
"github.com/zalando/skipper/proxy/proxytest"
14+
)
15+
16+
func TestRetry(t *testing.T) {
17+
for _, tt := range []struct {
18+
name string
19+
method string
20+
body string
21+
}{
22+
{
23+
name: "test GET",
24+
method: "GET",
25+
},
26+
{
27+
name: "test POST",
28+
method: "POST",
29+
body: "hello POST",
30+
},
31+
{
32+
name: "test PATCH",
33+
method: "PATCH",
34+
body: "hello PATCH",
35+
},
36+
{
37+
name: "test PUT",
38+
method: "PUT",
39+
body: "hello PUT",
40+
}} {
41+
t.Run(tt.name, func(t *testing.T) {
42+
i := 0
43+
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
44+
if i == 0 {
45+
i++
46+
w.WriteHeader(http.StatusBadGateway)
47+
return
48+
}
49+
50+
got, err := io.ReadAll(r.Body)
51+
if err != nil {
52+
t.Fatalf("got no data")
53+
}
54+
s := string(got)
55+
if tt.body != s {
56+
t.Fatalf("Failed to get the right data want: %q, got: %q", tt.body, s)
57+
}
58+
59+
w.WriteHeader(http.StatusOK)
60+
}))
61+
defer backend.Close()
62+
63+
noleak.Check(t)
64+
65+
fr := make(filters.Registry)
66+
retry := NewRetry()
67+
fr.Register(retry)
68+
r := &eskip.Route{
69+
Filters: []*eskip.Filter{
70+
{Name: retry.Name()},
71+
},
72+
Backend: backend.URL,
73+
}
74+
75+
proxy := proxytest.New(fr, r)
76+
defer proxy.Close()
77+
78+
buf := bytes.NewBufferString(tt.body)
79+
req, err := http.NewRequest(tt.method, proxy.URL, buf)
80+
if err != nil {
81+
t.Fatal(err)
82+
}
83+
84+
rsp, err := http.DefaultClient.Do(req)
85+
if err != nil {
86+
t.Fatalf("Failed to execute retry: %v", err)
87+
}
88+
89+
if rsp.StatusCode != http.StatusOK {
90+
t.Fatalf("unexpected status code: %s", rsp.Status)
91+
}
92+
rsp.Body.Close()
93+
})
94+
}
95+
}

io/copy_stream.go

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package io
2+
3+
import (
4+
"io"
5+
)
6+
7+
type ReadWriterLen interface {
8+
io.ReadWriter
9+
Len() int
10+
}
11+
12+
type CopyBodyStream struct {
13+
left int
14+
buf ReadWriterLen
15+
input io.ReadCloser
16+
}
17+
18+
func NewCopyBodyStream(left int, buf ReadWriterLen, rc io.ReadCloser) *CopyBodyStream {
19+
return &CopyBodyStream{
20+
left: left,
21+
buf: buf,
22+
input: rc,
23+
}
24+
}
25+
26+
func (cb *CopyBodyStream) Len() int {
27+
return cb.buf.Len()
28+
}
29+
30+
func (cb *CopyBodyStream) Read(p []byte) (n int, err error) {
31+
n, err = cb.input.Read(p)
32+
if cb.left > 0 && n > 0 {
33+
m := min(n, cb.left)
34+
written, err := cb.buf.Write(p[:m])
35+
if err != nil {
36+
return 0, err
37+
}
38+
cb.left -= written
39+
}
40+
return n, err
41+
}
42+
43+
func (cb *CopyBodyStream) Close() error {
44+
return cb.input.Close()
45+
}
46+
47+
func (cb *CopyBodyStream) GetBody() io.ReadCloser {
48+
return io.NopCloser(cb.buf)
49+
}

io/copy_stream_test.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io
2+
3+
import (
4+
"bytes"
5+
"io"
6+
"testing"
7+
)
8+
9+
func TestCopyBodyStream(t *testing.T) {
10+
s := "content"
11+
bbuf := io.NopCloser(bytes.NewBufferString(s))
12+
cbs := NewCopyBodyStream(len(s), &bytes.Buffer{}, bbuf)
13+
14+
buf := make([]byte, len(s))
15+
_, err := cbs.Read(buf)
16+
if err != nil {
17+
t.Fatal(err)
18+
}
19+
20+
if cbs.Len() != len(buf) {
21+
t.Fatalf("Failed to have the same buf buffer size want: %d, got: %d", cbs.Len(), len(buf))
22+
}
23+
24+
got, err := io.ReadAll(cbs.GetBody())
25+
if err != nil {
26+
t.Fatalf("Failed to read: %v", err)
27+
}
28+
if gotStr := string(got); s != gotStr {
29+
t.Fatalf("Failed to get the right content: %s != %s", s, gotStr)
30+
}
31+
32+
if err = cbs.Close(); err != nil {
33+
t.Fatal(err)
34+
}
35+
}
36+
37+
func TestCopyBodyStreamFailedRead(t *testing.T) {
38+
s := "content"
39+
bbuf := io.NopCloser(bytes.NewBufferString(s))
40+
41+
failingBuf := &failingWriter{buf: &bytes.Buffer{}}
42+
43+
cbs := NewCopyBodyStream(len(s), failingBuf, bbuf)
44+
45+
buf := make([]byte, len(s))
46+
_, err := cbs.Read(buf)
47+
if err == nil {
48+
t.Fatal("Want to have failing buffer write inside Read()")
49+
}
50+
}

io/failing_write_buffer_test.go

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
)
7+
8+
type failingWriter struct {
9+
buf *bytes.Buffer
10+
}
11+
12+
func (*failingWriter) Write([]byte) (int, error) {
13+
return 0, fmt.Errorf("failed to write")
14+
}
15+
16+
func (fw *failingWriter) Read(p []byte) (int, error) {
17+
return fw.buf.Read(p)
18+
}
19+
20+
func (fw *failingWriter) Len() int {
21+
return fw.buf.Len()
22+
}

net/httpclient.go

+39-8
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package net
22

33
import (
4+
"bytes"
45
"crypto/tls"
6+
"errors"
57
"fmt"
68
"io"
79
"net/http"
@@ -14,6 +16,7 @@ import (
1416

1517
"github.com/opentracing/opentracing-go"
1618
"github.com/opentracing/opentracing-go/ext"
19+
skpio "github.com/zalando/skipper/io"
1720
"github.com/zalando/skipper/logging"
1821
"github.com/zalando/skipper/secrets"
1922
)
@@ -23,15 +26,18 @@ const (
2326
defaultRefreshInterval = 5 * time.Minute
2427
)
2528

29+
var errRequestNotFound = errors.New("request not found")
30+
2631
// Client adds additional features like Bearer token injection, and
2732
// opentracing to the wrapped http.Client with the same interface as
2833
// http.Client from the stdlib.
2934
type Client struct {
30-
once sync.Once
31-
client http.Client
32-
tr *Transport
33-
log logging.Logger
34-
sr secrets.SecretsReader
35+
once sync.Once
36+
client http.Client
37+
tr *Transport
38+
log logging.Logger
39+
sr secrets.SecretsReader
40+
retryBuffers *sync.Map
3541
}
3642

3743
// NewClient creates a wrapped http.Client and uses Transport to
@@ -67,9 +73,10 @@ func NewClient(o Options) *Client {
6773
Transport: tr,
6874
CheckRedirect: o.CheckRedirect,
6975
},
70-
tr: tr,
71-
log: o.Log,
72-
sr: sr,
76+
tr: tr,
77+
log: o.Log,
78+
sr: sr,
79+
retryBuffers: &sync.Map{},
7380
}
7481

7582
return c
@@ -125,9 +132,33 @@ func (c *Client) Do(req *http.Request) (*http.Response, error) {
125132
req.Header.Set("Authorization", "Bearer "+string(b))
126133
}
127134
}
135+
if req.Body != nil && req.Body != http.NoBody && req.ContentLength > 0 {
136+
retryBuffer := skpio.NewCopyBodyStream(int(req.ContentLength), &bytes.Buffer{}, req.Body)
137+
c.retryBuffers.Store(req, retryBuffer)
138+
req.Body = retryBuffer
139+
}
128140
return c.client.Do(req)
129141
}
130142

143+
func (c *Client) Retry(req *http.Request) (*http.Response, error) {
144+
if req.Body == nil || req.Body == http.NoBody {
145+
return c.Do(req)
146+
}
147+
148+
buf, ok := c.retryBuffers.LoadAndDelete(req)
149+
if !ok {
150+
return nil, fmt.Errorf("no retry possible, %w: %s %s", errRequestNotFound, req.Method, req.URL)
151+
}
152+
153+
retryBuffer, ok := buf.(*skpio.CopyBodyStream)
154+
if !ok {
155+
return nil, fmt.Errorf("no retry possible, no retry buffer for request: %s %s", req.Method, req.URL)
156+
}
157+
req.Body = retryBuffer.GetBody()
158+
159+
return c.Do(req)
160+
}
161+
131162
// CloseIdleConnections delegates the call to the underlying
132163
// http.Client.
133164
func (c *Client) CloseIdleConnections() {

0 commit comments

Comments
 (0)