-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
CDPCP-13131 - Implement an exponential backoff in the Cloudera Terraf…
…orm provider
- Loading branch information
Showing
11 changed files
with
646 additions
and
24 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -100,7 +100,7 @@ jobs: | |
- name: Go Coverage | ||
uses: gwatts/[email protected] | ||
with: | ||
coverage-threshold: 31.8 | ||
coverage-threshold: 32.0 | ||
cover-pkg: ./... | ||
ignore-pattern: | | ||
/cdp-sdk-go/ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
// Copyright 2024 Cloudera. All Rights Reserved. | ||
// | ||
// This file is licensed under the Apache License Version 2.0 (the "License"). | ||
// You may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. | ||
// | ||
// This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS | ||
// OF ANY KIND, either express or implied. Refer to the License for the specific | ||
// permissions and limitations governing your use of the file. | ||
|
||
package cdp | ||
|
||
import ( | ||
"log" | ||
"math" | ||
"math/rand" | ||
"os" | ||
"time" | ||
) | ||
|
||
const ( | ||
expDeltaMin = 0.75 | ||
expDeltaMax = 1.0 | ||
) | ||
|
||
func backoff(retries int) time.Duration { | ||
switch os.Getenv("CDP_TF_BACKOFF_STRATEGY") { | ||
case "linear": | ||
{ | ||
step := intFromEnvOrDefault("CDP_TF_BACKOFF_STEP", defaultLinearBackoffStep) | ||
log.Default().Println("Using linear backoff strategy with step: ", step) | ||
return linearBackoff(retries, step) | ||
} | ||
default: | ||
{ | ||
log.Default().Println("Using exponential backoff strategy") | ||
return exponentialBackoff(retries) | ||
} | ||
} | ||
} | ||
|
||
func exponentialBackoff(retries int) time.Duration { | ||
rndSrc := rand.NewSource(time.Now().UnixNano()) | ||
delta := expDeltaMax - expDeltaMin | ||
jitter := expDeltaMin + rand.New(rndSrc).Float64()*(delta) | ||
return time.Duration((math.Pow(2, float64(retries))*jitter)*float64(time.Millisecond)) * 1000 | ||
} | ||
|
||
func linearBackoff(retries int, step int) time.Duration { | ||
return time.Duration((retries+1)*step) * time.Second | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
// Copyright 2024 Cloudera. All Rights Reserved. | ||
// | ||
// This file is licensed under the Apache License Version 2.0 (the "License"). | ||
// You may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. | ||
// | ||
// This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS | ||
// OF ANY KIND, either express or implied. Refer to the License for the specific | ||
// permissions and limitations governing your use of the file. | ||
|
||
package cdp | ||
|
||
import ( | ||
"os" | ||
"testing" | ||
"time" | ||
) | ||
|
||
func TestLinearBackoffWithPositiveRetries(t *testing.T) { | ||
retries := 3 | ||
step := 2 | ||
expected := 8 * time.Second | ||
|
||
result := linearBackoff(retries, step) | ||
if result != expected { | ||
t.Fatalf("Expected %v, got %v", expected, result) | ||
} | ||
} | ||
|
||
func TestLinearBackoffWithZeroRetries(t *testing.T) { | ||
retries := 0 | ||
step := 2 | ||
expected := 2 * time.Second | ||
|
||
result := linearBackoff(retries, step) | ||
if result != expected { | ||
t.Fatalf("Expected %v, got %v", expected, result) | ||
} | ||
} | ||
|
||
func TestLinearBackoffWithNegativeRetries(t *testing.T) { | ||
retries := -1 | ||
step := 2 | ||
expected := 0 * time.Second | ||
|
||
result := linearBackoff(retries, step) | ||
if result != expected { | ||
t.Fatalf("Expected %v, got %v", expected, result) | ||
} | ||
} | ||
|
||
func TestLinearBackoffWithLargeStep(t *testing.T) { | ||
retries := 2 | ||
step := 1000 | ||
expected := 3000 * time.Second | ||
|
||
result := linearBackoff(retries, step) | ||
if result != expected { | ||
t.Fatalf("Expected %v, got %v", expected, result) | ||
} | ||
} | ||
|
||
func TestExponentialBackoffWithPositiveRetries(t *testing.T) { | ||
retries := 3 | ||
expectedMin := 6 * time.Second | ||
expectedMax := 8 * time.Second | ||
|
||
result := exponentialBackoff(retries) | ||
if result < expectedMin || result > expectedMax { | ||
t.Fatalf("Expected between %v and %v, got %v", expectedMin, expectedMax, result) | ||
} | ||
} | ||
|
||
func TestExponentialBackoffWithZeroRetries(t *testing.T) { | ||
retries := 0 | ||
expectedMin := 0.75 * float64(time.Second) | ||
expectedMax := 1 * time.Second | ||
|
||
result := exponentialBackoff(retries) | ||
if result < time.Duration(int(expectedMin)) || result > expectedMax { | ||
t.Fatalf("Expected between %v and %v, got %v", time.Duration(int(expectedMin)), expectedMax, result) | ||
} | ||
} | ||
|
||
func TestExponentialBackoffWithHighRetries(t *testing.T) { | ||
retries := 10 | ||
expectedMin := 768 * time.Second | ||
expectedMax := 1024 * time.Second | ||
|
||
result := exponentialBackoff(retries) | ||
if result < expectedMin || result > expectedMax { | ||
t.Fatalf("Expected between %v and %v, got %v", expectedMin, expectedMax, result) | ||
} | ||
} | ||
|
||
func TestLinearBackoffStrategyWithDefaultStep(t *testing.T) { | ||
_ = os.Setenv("CDP_TF_BACKOFF_STRATEGY", "linear") | ||
defer func() { | ||
_ = os.Unsetenv("CDP_TF_BACKOFF_STRATEGY") | ||
}() | ||
|
||
result := backoff(2) | ||
expected := defaultLinearBackoffStep * 3 * time.Second | ||
if result != expected { | ||
t.Fatalf("Expected %v, got %v", expected, result) | ||
} | ||
} | ||
|
||
func TestExponentialBackoffStrategyWithDefault(t *testing.T) { | ||
_ = os.Unsetenv("CDP_TF_BACKOFF_STRATEGY") | ||
|
||
result := backoff(3) | ||
expectedMin := 6 * time.Second | ||
expectedMax := 8 * time.Second | ||
if result < expectedMin || result > expectedMax { | ||
t.Fatalf("Expected between %v and %v, got %v", expectedMin, expectedMax, result) | ||
} | ||
} | ||
|
||
func TestLinearBackoffStrategyWithCustomStep(t *testing.T) { | ||
_ = os.Setenv("CDP_TF_BACKOFF_STRATEGY", "linear") | ||
_ = os.Setenv("CDP_TF_BACKOFF_STEP", "5") | ||
defer func() { | ||
_ = os.Unsetenv("CDP_TF_BACKOFF_STRATEGY") | ||
_ = os.Unsetenv("CDP_TF_BACKOFF_STEP") | ||
}() | ||
|
||
result := backoff(1) | ||
expected := 10 * time.Second | ||
if result != expected { | ||
t.Fatalf("Expected %v, got %v", expected, result) | ||
} | ||
} | ||
|
||
func TestExponentialBackoffStrategyWithHighRetries(t *testing.T) { | ||
_ = os.Unsetenv("CDP_TF_BACKOFF_STRATEGY") | ||
|
||
result := backoff(10) | ||
expectedMin := 768 * time.Second | ||
expectedMax := 1024 * time.Second | ||
if result < expectedMin || result > expectedMax { | ||
t.Fatalf("Expected between %v and %v, got %v", expectedMin, expectedMax, result) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
// Copyright 2024 Cloudera. All Rights Reserved. | ||
// | ||
// This file is licensed under the Apache License Version 2.0 (the "License"). | ||
// You may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0. | ||
// | ||
// This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS | ||
// OF ANY KIND, either express or implied. Refer to the License for the specific | ||
// permissions and limitations governing your use of the file. | ||
|
||
package cdp | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"io" | ||
"log" | ||
"net/http" | ||
"time" | ||
) | ||
|
||
var ( | ||
retryableStatusCodes = []int{ | ||
http.StatusServiceUnavailable, | ||
http.StatusTooManyRequests, | ||
http.StatusGatewayTimeout, | ||
http.StatusBadGateway, | ||
http.StatusPreconditionFailed, | ||
} | ||
) | ||
|
||
type RetryableTransport struct { | ||
transport http.RoundTripper | ||
} | ||
|
||
func shouldRetry(err error, resp *http.Response) bool { | ||
if err != nil { | ||
return true | ||
} else if resp == nil { | ||
return false | ||
} | ||
return sliceContains(retryableStatusCodes, resp.StatusCode) | ||
} | ||
|
||
func drainBody(resp *http.Response) { | ||
if resp != nil && resp.Body != nil { | ||
_, err := io.Copy(io.Discard, resp.Body) | ||
if err != nil { | ||
log.Default().Println("Error while draining body: ", err) | ||
} | ||
defer func(Body io.ReadCloser) { | ||
_ = Body.Close() | ||
}(resp.Body) | ||
} | ||
} | ||
|
||
func (t *RetryableTransport) RoundTrip(req *http.Request) (*http.Response, error) { | ||
var bodyBytes []byte | ||
if req.Body != nil { | ||
bodyBytes, _ = io.ReadAll(req.Body) | ||
req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) | ||
} | ||
resp, err := t.transport.RoundTrip(req) | ||
retries := 0 | ||
retryCount := intFromEnvOrDefault("CDP_TF_CALL_RETRY_COUNT", 10) | ||
for shouldRetry(err, resp) && retries < retryCount { | ||
log.Default().Printf("Retrying request (caused by: %+v;%+v)\n", err, resp) | ||
time.Sleep(backoff(retries)) | ||
drainBody(resp) | ||
if req.Body != nil { | ||
req.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) | ||
} | ||
resp, err = t.transport.RoundTrip(req) | ||
fmt.Printf("%v retry out of %v\n", retries+1, retryCount) | ||
retries++ | ||
} | ||
return resp, err | ||
} |
Oops, something went wrong.