Skip to content

Move retry backoff behavior into separate middleware #313

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/@unreleased/pr-313.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: improvement
improvement:
description: Move retry backoff behavior into separate middleware
links:
- https://github.com/palantir/conjure-go-runtime/pull/313
16 changes: 14 additions & 2 deletions conjure-go-client/httpclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func (c *clientImpl) Do(ctx context.Context, params ...RequestParam) (*http.Resp
var err error
var resp *http.Response

retrier := internal.NewRequestRetrier(uris, c.backoffOptions.CurrentRetryParams().Start(ctx), attempts)
retrier := internal.NewRequestRetrier(uris, attempts)
backoffMiddleware := c.getBackoffMiddleware(ctx)
for {
uri, isRelocated := retrier.GetNextURI(resp, err)
if uri == "" {
Expand All @@ -106,7 +107,7 @@ func (c *clientImpl) Do(ctx context.Context, params ...RequestParam) (*http.Resp
if err != nil {
svc1log.FromContext(ctx).Debug("Retrying request", svc1log.Stacktrace(err))
}
resp, err = c.doOnce(ctx, uri, isRelocated, params...)
resp, err = c.doOnce(ctx, uri, isRelocated, backoffMiddleware, params...)
}
if err != nil {
return nil, err
Expand All @@ -118,6 +119,7 @@ func (c *clientImpl) doOnce(
ctx context.Context,
baseURI string,
useBaseURIOnly bool,
backoffMiddleware Middleware,
params ...RequestParam,
) (*http.Response, error) {

Expand Down Expand Up @@ -165,6 +167,7 @@ func (c *clientImpl) doOnce(

// must precede the error decoders to read the status code of the raw response.
transport = wrapTransport(transport, c.uriScorer.CurrentURIScoringMiddleware())
transport = wrapTransport(transport, backoffMiddleware)
// request decoder must precede the client decoder
// must precede the body middleware to read the response body
transport = wrapTransport(transport, b.errorDecoderMiddleware, c.errorDecoderMiddleware)
Expand All @@ -191,6 +194,15 @@ func (c *clientImpl) doOnce(
return resp, unwrapURLError(ctx, respErr)
}

func (c *clientImpl) getBackoffMiddleware(ctx context.Context) Middleware {
retrier := c.backoffOptions.CurrentRetryParams().Start(ctx)
// call Next once so that the first repeated URI has backoff
retrier.Next()
return internal.NewBackoffMiddleware(func() {
retrier.Next()
})
}

// unwrapURLError converts a *url.Error to a werror. We need this because all
// errors from the stdlib's client.Do are wrapped in *url.Error, and if we
// were to blindly return that we would lose any werror params stored on the
Expand Down
49 changes: 49 additions & 0 deletions conjure-go-client/httpclient/internal/backoff_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) 2022 Palantir Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"net/http"
)

type BackoffMiddleware struct {
backoff func()
seenUris map[string]struct{}
throttled bool
}

// NewBackoffMiddleware returns a Middleware that implements backoff for URIs that have already been seen and when
// the previous response was a 429. The backoff function is expected to block for the desired backoff duration.
func NewBackoffMiddleware(backoff func()) *BackoffMiddleware {
return &BackoffMiddleware{
backoff: backoff,
seenUris: make(map[string]struct{}),
throttled: false,
}
}

func (b *BackoffMiddleware) RoundTrip(req *http.Request, next http.RoundTripper) (*http.Response, error) {
baseURI := getBaseURI(req.URL)
_, seen := b.seenUris[baseURI]
if seen || b.throttled {
b.backoff()
}
b.seenUris[baseURI] = struct{}{}
resp, err := next.RoundTrip(req)
errCode, _ := StatusCodeFromError(err)
throttled, _ := isThrottleResponse(resp, errCode)
b.throttled = throttled
return resp, err
}
107 changes: 38 additions & 69 deletions conjure-go-client/httpclient/internal/request_retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"net/http"
"net/url"
"strings"

"github.com/palantir/pkg/retry"
)

const (
Expand All @@ -32,26 +30,22 @@ const (
// first call to GetNextURI
type RequestRetrier struct {
currentURI string
retrier retry.Retrier
uris []string
offset int
relocatedURIs map[string]struct{}
failedURIs map[string]struct{}
maxAttempts int
attemptCount int
}

// NewRequestRetrier creates a new request retrier.
// Regardless of maxAttempts, mesh URIs will never be retried.
func NewRequestRetrier(uris []string, retrier retry.Retrier, maxAttempts int) *RequestRetrier {
func NewRequestRetrier(uris []string, maxAttempts int) *RequestRetrier {
offset := 0
return &RequestRetrier{
currentURI: uris[offset],
retrier: retrier,
uris: uris,
offset: offset,
relocatedURIs: map[string]struct{}{},
failedURIs: map[string]struct{}{},
maxAttempts: maxAttempts,
attemptCount: 0,
}
Expand All @@ -72,10 +66,7 @@ func (r *RequestRetrier) GetNextURI(resp *http.Response, respErr error) (uri str
r.attemptCount++
}()
if r.attemptCount == 0 {
// First attempt is always successful. Trigger the first retry so later calls have backoff
// but ignore the returned value to ensure that the client can instrument the request even
// if the context is done.
r.retrier.Next()
// First attempt is always successful
return r.removeMeshSchemeIfPresent(r.currentURI), false
}
if !r.attemptsRemaining() {
Expand All @@ -86,80 +77,58 @@ func (r *RequestRetrier) GetNextURI(resp *http.Response, respErr error) (uri str
// Mesh uris don't get retried
return "", false
}
retryFn := r.getRetryFn(resp, respErr)
if retryFn == nil {
nextURI := r.getNextURI(resp, respErr)
if nextURI == "" {
// The previous response was not retryable
return "", false
}
// Updates currentURI
if !retryFn() {
return "", false
}
return r.currentURI, r.isRelocatedURI(r.currentURI)
return nextURI, r.isRelocatedURI(nextURI)
}

func (r *RequestRetrier) getRetryFn(resp *http.Response, respErr error) func() bool {
func (r *RequestRetrier) getNextURI(resp *http.Response, respErr error) string {
errCode, _ := StatusCodeFromError(respErr)
if retryOther, _ := isThrottleResponse(resp, errCode); retryOther {
// 429: throttle
// Immediately backoff and select the next URI.
// TODO(whickman): use the retry-after header once #81 is resolved
return r.nextURIAndBackoff
} else if isUnavailableResponse(resp, errCode) {
// 503: go to next node
return r.nextURIOrBackoff
} else if shouldTryOther, otherURI := isRetryOtherResponse(resp, respErr, errCode); shouldTryOther {
// 307 or 308: go to next node, or particular node if provided.
if otherURI != nil {
return func() bool {
r.setURIAndResetBackoff(otherURI)
return true
}
}
return r.nextURIOrBackoff
} else if errCode >= http.StatusBadRequest && errCode < http.StatusInternalServerError {
return nil
} else if resp == nil {
// if we get a nil response, we can assume there is a problem with host and can move on to the next.
return r.nextURIOrBackoff
// 2XX and 4XX responses (except 429) are not retryable
if isSuccess(resp) || isNonRetryableClientError(resp, errCode) {
return ""
}
return nil
// 307 or 308: go to particular node if provided
if shouldTryOther, otherURI := isRetryOtherResponse(resp, respErr, errCode); shouldTryOther && otherURI != nil {
r.setRelocatedURI(otherURI)
} else {
r.nextURI()
}
return r.currentURI
}

func (r *RequestRetrier) setURIAndResetBackoff(otherURI *url.URL) {
// If the URI returned by relocation header is a relative path
// We will resolve it with the current URI
if !otherURI.IsAbs() {
if currentURI := parseLocationURL(r.currentURI); currentURI != nil {
otherURI = currentURI.ResolveReference(otherURI)
}
}
nextURI := otherURI.String()
r.relocatedURIs[otherURI.String()] = struct{}{}
r.retrier.Reset()
r.currentURI = nextURI
func isSuccess(resp *http.Response) bool {
// Check for a 2XX status
return resp != nil && resp.StatusCode >= 200 && resp.StatusCode < 300
}

// If lastURI was already marked failed, we perform a backoff as determined by the retrier before returning the next URI and its offset.
// Otherwise, we add lastURI to failedURIs and return the next URI and its offset immediately.
func (r *RequestRetrier) nextURIOrBackoff() bool {
_, performBackoff := r.failedURIs[r.currentURI]
r.markFailedAndMoveToNextURI()
// If the URI has failed before, perform a backoff
if performBackoff || len(r.uris) == 1 {
return r.retrier.Next()
func isNonRetryableClientError(resp *http.Response, errCode int) bool {
// Check for a 4XX status parsed from the error or in the response
if isClientError(errCode) || (resp != nil && isClientError(resp.StatusCode)) {
// 429 is retryable
if isThrottle, _ := isThrottleResponse(resp, errCode); !isThrottle {
return true
}
}
return true
return false
}

// Marks the current URI as failed, gets the next URI, and performs a backoff as determined by the retrier.
func (r *RequestRetrier) nextURIAndBackoff() bool {
r.markFailedAndMoveToNextURI()
return r.retrier.Next()
func (r *RequestRetrier) setRelocatedURI(uri *url.URL) {
// If the URI returned by relocation header is a relative path we will resolve it with the current URI
if !uri.IsAbs() {
if currentURI := parseLocationURL(r.currentURI); currentURI != nil {
uri = currentURI.ResolveReference(uri)
}
}
nextURI := uri.String()
r.relocatedURIs[uri.String()] = struct{}{}
r.currentURI = nextURI
}

func (r *RequestRetrier) markFailedAndMoveToNextURI() {
r.failedURIs[r.currentURI] = struct{}{}
func (r *RequestRetrier) nextURI() {
nextURIOffset := (r.offset + 1) % len(r.uris)
nextURI := r.uris[nextURIOffset]
r.currentURI = nextURI
Expand Down
Loading