-
Notifications
You must be signed in to change notification settings - Fork 973
Add retry limiter support #6318
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
e2eaf8f
1ad33f4
d224ed2
0255f18
805b228
90ceda0
8ca3ae5
e1614d0
ea663f5
1fe7617
f49c3bd
a8095c8
ed307b2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,8 @@ | |
| import com.linecorp.armeria.client.ClientRequestContext; | ||
| import com.linecorp.armeria.client.Endpoint; | ||
| import com.linecorp.armeria.client.SimpleDecoratingClient; | ||
| import com.linecorp.armeria.client.retry.limiter.RetryLimiter; | ||
| import com.linecorp.armeria.client.retry.limiter.RetryLimiterExecutor; | ||
| import com.linecorp.armeria.common.HttpHeaderNames; | ||
| import com.linecorp.armeria.common.HttpRequest; | ||
| import com.linecorp.armeria.common.Request; | ||
|
|
@@ -199,8 +201,9 @@ protected final boolean setResponseTimeout(ClientRequestContext ctx) { | |
| * {@code currentAttemptNo} exceeds the {@code maxAttempts} or the {@code nextDelay} is after | ||
| * the moment which timeout happens. | ||
| */ | ||
| protected final long getNextDelay(ClientRequestContext ctx, Backoff backoff) { | ||
| return getNextDelay(ctx, backoff, -1); | ||
| protected final long getNextDelay(ClientRequestContext ctx, @Nullable RetryLimiter limiter, | ||
| Backoff backoff) { | ||
| return getNextDelay(ctx, limiter, backoff, -1); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -214,7 +217,8 @@ protected final long getNextDelay(ClientRequestContext ctx, Backoff backoff) { | |
| * the moment which timeout happens. | ||
| */ | ||
| @SuppressWarnings("MethodMayBeStatic") // Intentionally left non-static for better user experience. | ||
| protected final long getNextDelay(ClientRequestContext ctx, Backoff backoff, long millisAfterFromServer) { | ||
| protected final long getNextDelay(ClientRequestContext ctx, @Nullable RetryLimiter limiter, | ||
| Backoff backoff, long millisAfterFromServer) { | ||
| requireNonNull(ctx, "ctx"); | ||
| requireNonNull(backoff, "backoff"); | ||
| final State state = state(ctx); | ||
|
|
@@ -237,6 +241,9 @@ protected final long getNextDelay(ClientRequestContext ctx, Backoff backoff, lon | |
| return -1; | ||
| } | ||
|
|
||
| if (!RetryLimiterExecutor.shouldRetry(limiter, ctx, currentAttemptNo)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think in Armeria it is a convention to put the |
||
| return -1; | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to have metrics about this but afaik it does not seem easy to add metrics here, maybe we could add some debug logging? |
||
| } | ||
| return nextDelay; | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import com.linecorp.armeria.client.retry.limiter.RetryLimiter; | ||
| import com.linecorp.armeria.common.HttpResponse; | ||
| import com.linecorp.armeria.common.Response; | ||
| import com.linecorp.armeria.common.RpcResponse; | ||
|
|
@@ -87,25 +88,30 @@ static <T extends Response> RetryConfigBuilder<T> builder0( | |
| private final RetryRule fromRetryRuleWithContent; | ||
| @Nullable | ||
| private RetryRuleWithContent<T> fromRetryRule; | ||
| @Nullable | ||
| private RetryLimiter retryLimiter; | ||
|
|
||
| RetryConfig(RetryRule retryRule, int maxTotalAttempts, long responseTimeoutMillisForEachAttempt) { | ||
| this(requireNonNull(retryRule, "retryRule"), null, | ||
| maxTotalAttempts, responseTimeoutMillisForEachAttempt, 0); | ||
| RetryConfig(RetryRule retryRule, @Nullable RetryLimiter retryLimiter, int maxTotalAttempts, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This and the change below are also breaking |
||
| long responseTimeoutMillisForEachAttempt) { | ||
| this(requireNonNull(retryRule, "retryRule"), null, retryLimiter, | ||
| maxTotalAttempts, responseTimeoutMillisForEachAttempt, 0); | ||
| checkArguments(maxTotalAttempts, responseTimeoutMillisForEachAttempt); | ||
| } | ||
|
|
||
| RetryConfig( | ||
| RetryRuleWithContent<T> retryRuleWithContent, | ||
| @Nullable RetryLimiter retryLimiter, | ||
| int maxContentLength, | ||
| int maxTotalAttempts, | ||
| long responseTimeoutMillisForEachAttempt) { | ||
| this(null, requireNonNull(retryRuleWithContent, "retryRuleWithContent"), | ||
| maxTotalAttempts, responseTimeoutMillisForEachAttempt, maxContentLength); | ||
| this(null, requireNonNull(retryRuleWithContent, "retryRuleWithContent"), retryLimiter, | ||
| maxTotalAttempts, responseTimeoutMillisForEachAttempt, maxContentLength); | ||
| } | ||
|
|
||
| private RetryConfig( | ||
| @Nullable RetryRule retryRule, | ||
| @Nullable RetryRuleWithContent<T> retryRuleWithContent, | ||
| @Nullable RetryLimiter retryLimiter, | ||
| int maxTotalAttempts, | ||
| long responseTimeoutMillisForEachAttempt, | ||
| int maxContentLength) { | ||
|
|
@@ -120,6 +126,7 @@ private RetryConfig( | |
| } else { | ||
| fromRetryRuleWithContent = RetryRuleUtil.fromRetryRuleWithContent(retryRuleWithContent); | ||
| } | ||
| this.retryLimiter = retryLimiter; | ||
| } | ||
|
|
||
| private static void checkArguments(int maxTotalAttempts, long responseTimeoutMillisForEachAttempt) { | ||
|
|
@@ -145,9 +152,12 @@ public RetryConfigBuilder<T> toBuilder() { | |
| assert retryRule != null; | ||
| builder = builder0(retryRule); | ||
| } | ||
| return builder | ||
| .maxTotalAttempts(maxTotalAttempts) | ||
| .responseTimeoutMillisForEachAttempt(responseTimeoutMillisForEachAttempt); | ||
| builder.maxTotalAttempts(maxTotalAttempts) | ||
| .responseTimeoutMillisForEachAttempt(responseTimeoutMillisForEachAttempt); | ||
| if (retryLimiter != null) { | ||
| builder.limiter(retryLimiter); | ||
| } | ||
| return builder; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -183,6 +193,15 @@ public RetryRuleWithContent<T> retryRuleWithContent() { | |
| return retryRuleWithContent; | ||
| } | ||
|
|
||
| /** | ||
| * Returns the {@link RetryLimiter} which was specified with | ||
| * {@link RetryConfigBuilder#limiter(RetryLimiter)}. | ||
| */ | ||
| @Nullable | ||
| public RetryLimiter retryLimiter() { | ||
| return retryLimiter; | ||
| } | ||
|
|
||
| /** | ||
| * Returns the {@code maxContentLength}, which is non-zero only if a {@link RetryRuleWithContent} is used. | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,8 @@ | |
| import com.linecorp.armeria.client.ClientRequestContext; | ||
| import com.linecorp.armeria.client.HttpClient; | ||
| import com.linecorp.armeria.client.ResponseTimeoutException; | ||
| import com.linecorp.armeria.client.retry.limiter.RetryLimiter; | ||
| import com.linecorp.armeria.client.retry.limiter.RetryLimiterExecutor; | ||
| import com.linecorp.armeria.common.AggregatedHttpResponse; | ||
| import com.linecorp.armeria.common.AggregationOptions; | ||
| import com.linecorp.armeria.common.HttpHeaderNames; | ||
|
|
@@ -330,6 +332,8 @@ private void doExecute0(ClientRequestContext ctx, HttpRequestDuplicator rootReqD | |
|
|
||
| final RetryConfig<HttpResponse> config = mappedRetryConfig(ctx); | ||
| if (!ctx.exchangeType().isResponseStreaming() || config.requiresResponseTrailers()) { | ||
| RetryLimiterExecutor.onCompletedAttempt(config.retryLimiter(), ctx, ctx.log().partial(), | ||
|
||
| totalAttempts); | ||
| response.aggregate().handle((aggregated, cause) -> { | ||
| if (cause != null) { | ||
| derivedCtx.logBuilder().endRequest(cause); | ||
|
|
@@ -346,6 +350,8 @@ private void doExecute0(ClientRequestContext ctx, HttpRequestDuplicator rootReqD | |
| return null; | ||
| }); | ||
| } else { | ||
| RetryLimiterExecutor.onCompletedAttempt(config.retryLimiter(), ctx, ctx.log().partial(), | ||
| totalAttempts); | ||
| handleStreamingResponse(config, ctx, rootReqDuplicator, originalReq, returnedRes, | ||
| future, derivedCtx, response); | ||
| } | ||
|
|
@@ -362,10 +368,11 @@ private void handleResponseWithoutContent(RetryConfig<HttpResponse> config, Clie | |
| } | ||
| try { | ||
| final RetryRule retryRule = retryRule(config); | ||
| final RetryLimiter limiter = config.retryLimiter(); | ||
| final CompletionStage<RetryDecision> f = retryRule.shouldRetry(derivedCtx, responseCause); | ||
| f.handle((decision, shouldRetryCause) -> { | ||
| warnIfExceptionIsRaised(retryRule, shouldRetryCause); | ||
| handleRetryDecision(decision, ctx, derivedCtx, rootReqDuplicator, | ||
| handleRetryDecision(decision, limiter, ctx, derivedCtx, rootReqDuplicator, | ||
| originalReq, returnedRes, future, response); | ||
| return null; | ||
| }); | ||
|
|
@@ -412,8 +419,9 @@ private void handleStreamingResponse(RetryConfig<HttpResponse> retryConfig, Clie | |
| .handle((decision, cause) -> { | ||
| warnIfExceptionIsRaised(ruleWithContent, cause); | ||
| truncatingHttpResponse.abort(); | ||
| handleRetryDecision(decision, ctx, derivedCtx, rootReqDuplicator, | ||
| originalReq, returnedRes, future, duplicated); | ||
| handleRetryDecision(decision, retryConfig.retryLimiter(), ctx, | ||
| derivedCtx, rootReqDuplicator, originalReq, | ||
| returnedRes, future, duplicated); | ||
| return null; | ||
| }); | ||
| } catch (Throwable cause) { | ||
|
|
@@ -449,9 +457,9 @@ private void handleAggregatedResponse(RetryConfig<HttpResponse> retryConfig, Cli | |
| ruleWithContent.shouldRetry(derivedCtx, aggregatedRes.toHttpResponse(), null) | ||
| .handle((decision, cause) -> { | ||
| warnIfExceptionIsRaised(ruleWithContent, cause); | ||
| handleRetryDecision( | ||
| decision, ctx, derivedCtx, rootReqDuplicator, originalReq, | ||
| returnedRes, future, aggregatedRes.toHttpResponse()); | ||
| handleRetryDecision(decision, retryConfig.retryLimiter(), ctx, derivedCtx, | ||
| rootReqDuplicator, originalReq, returnedRes, future, | ||
| aggregatedRes.toHttpResponse()); | ||
| return null; | ||
| }); | ||
| } catch (Throwable cause) { | ||
|
|
@@ -521,14 +529,15 @@ private static void handleException(ClientRequestContext ctx, | |
| ctx.logBuilder().endResponse(cause); | ||
| } | ||
|
|
||
| private void handleRetryDecision(@Nullable RetryDecision decision, ClientRequestContext ctx, | ||
| ClientRequestContext derivedCtx, HttpRequestDuplicator rootReqDuplicator, | ||
| HttpRequest originalReq, HttpResponse returnedRes, | ||
| CompletableFuture<HttpResponse> future, HttpResponse originalRes) { | ||
| private void handleRetryDecision(@Nullable RetryDecision decision, @Nullable RetryLimiter limiter, | ||
| ClientRequestContext ctx, ClientRequestContext derivedCtx, | ||
| HttpRequestDuplicator rootReqDuplicator, HttpRequest originalReq, | ||
| HttpResponse returnedRes, CompletableFuture<HttpResponse> future, | ||
| HttpResponse originalRes) { | ||
| final Backoff backoff = decision != null ? decision.backoff() : null; | ||
| if (backoff != null) { | ||
| final long millisAfter = useRetryAfter ? getRetryAfterMillis(derivedCtx) : -1; | ||
| final long nextDelay = getNextDelay(ctx, backoff, millisAfter); | ||
| final long nextDelay = getNextDelay(ctx, limiter, backoff, millisAfter); | ||
| if (nextDelay >= 0) { | ||
| abortResponse(originalRes, derivedCtx); | ||
| scheduleNextRetry( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note:
AbstractRetryingClientis public so this and the change above are breaking changes. Let us note it in the PR description.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But these are protected methods, do they also count as public api?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this class is also not final people could inherit from it and use those methods. I cannot think of a use-case for it but theoretically it is possible. In the doc comments we are also not advising against it 🤷