diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java index e8e5cc277e3..24ebffdf850 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/AbstractRetryingClient.java @@ -15,7 +15,6 @@ */ package com.linecorp.armeria.client.retry; -import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; import java.util.concurrent.TimeUnit; @@ -27,19 +26,10 @@ import com.linecorp.armeria.client.Client; import com.linecorp.armeria.client.ClientFactory; import com.linecorp.armeria.client.ClientRequestContext; -import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.SimpleDecoratingClient; -import com.linecorp.armeria.common.HttpHeaderNames; -import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.Request; import com.linecorp.armeria.common.Response; -import com.linecorp.armeria.common.RpcRequest; -import com.linecorp.armeria.common.annotation.Nullable; -import com.linecorp.armeria.common.util.TimeoutMode; -import com.linecorp.armeria.internal.client.ClientUtil; -import io.netty.util.AsciiString; -import io.netty.util.AttributeKey; import io.netty.util.concurrent.ScheduledFuture; /** @@ -48,98 +38,34 @@ * @param the {@link Request} type * @param the {@link Response} type */ -public abstract class AbstractRetryingClient +abstract class AbstractRetryingClient extends SimpleDecoratingClient { private static final Logger logger = LoggerFactory.getLogger(AbstractRetryingClient.class); - /** - * The header which indicates the retry count of a {@link Request}. - * The server might use this value to reject excessive retries, etc. - */ - public static final AsciiString ARMERIA_RETRY_COUNT = HttpHeaderNames.of("armeria-retry-count"); - - private static final AttributeKey STATE = - AttributeKey.valueOf(AbstractRetryingClient.class, "STATE"); - - private final RetryConfigMapping mapping; - - @Nullable - private final RetryConfig retryConfig; + private final RetryConfigMapping retryConfigMapping; /** * Creates a new instance that decorates the specified {@link Client}. */ AbstractRetryingClient( - Client delegate, RetryConfigMapping mapping, @Nullable RetryConfig retryConfig) { + Client delegate, RetryConfigMapping retryConfigMapping) { super(delegate); - this.mapping = requireNonNull(mapping, "mapping"); - this.retryConfig = retryConfig; + this.retryConfigMapping = requireNonNull(retryConfigMapping, "mapping"); } @Override public final O execute(ClientRequestContext ctx, I req) throws Exception { - final RetryConfig config = mapping.get(ctx, req); + final RetryConfig config = retryConfigMapping.get(ctx, req); requireNonNull(config, "mapping.get() returned null"); - - final State state = new State(config, ctx.responseTimeoutMillis()); - ctx.setAttr(STATE, state); - return doExecute(ctx, req); - } - - /** - * Returns the current {@link RetryConfigMapping} set for this client. - */ - protected final RetryConfigMapping mapping() { - return mapping; + return doExecute(ctx, req, config); } /** * Invoked by {@link #execute(ClientRequestContext, Request)} * after the deadline for response timeout is set. */ - protected abstract O doExecute(ClientRequestContext ctx, I req) throws Exception; - - /** - * This should be called when retrying is finished. - */ - protected static void onRetryingComplete(ClientRequestContext ctx) { - ctx.logBuilder().endResponseWithLastChild(); - } - - /** - * Returns the {@link RetryRule}. - * - * @throws IllegalStateException if the {@link RetryRule} is not set - */ - protected final RetryRule retryRule() { - checkState(retryConfig != null, "No retryRule set. Are you using RetryConfigMapping?"); - final RetryRule retryRule = retryConfig.retryRule(); - checkState(retryRule != null, "retryRule is not set."); - return retryRule; - } - - /** - * Fetches the {@link RetryConfig} that was mapped by the configured {@link RetryConfigMapping} for a given - * logical request. - */ - final RetryConfig mappedRetryConfig(ClientRequestContext ctx) { - @SuppressWarnings("unchecked") - final RetryConfig config = (RetryConfig) state(ctx).config; - return config; - } - - /** - * Returns the {@link RetryRuleWithContent}. - * - * @throws IllegalStateException if the {@link RetryRuleWithContent} is not set - */ - protected final RetryRuleWithContent retryRuleWithContent() { - checkState(retryConfig != null, "No retryRuleWithContent set. Are you using RetryConfigMapping?"); - final RetryRuleWithContent retryRuleWithContent = retryConfig.retryRuleWithContent(); - checkState(retryRuleWithContent != null, "retryRuleWithContent is not set."); - return retryRuleWithContent; - } + protected abstract O doExecute(ClientRequestContext ctx, I req, RetryConfig config) throws Exception; /** * Schedules next retry. @@ -170,39 +96,6 @@ protected static void scheduleNextRetry(ClientRequestContext ctx, } } - /** - * Resets the {@link ClientRequestContext#responseTimeoutMillis()}. - * - * @return {@code true} if the response timeout is set, {@code false} if it can't be set due to the timeout - */ - @SuppressWarnings("MethodMayBeStatic") // Intentionally left non-static for better user experience. - protected final boolean setResponseTimeout(ClientRequestContext ctx) { - requireNonNull(ctx, "ctx"); - final long responseTimeoutMillis = state(ctx).responseTimeoutMillis(); - if (responseTimeoutMillis < 0) { - return false; - } else if (responseTimeoutMillis == 0) { - ctx.clearResponseTimeout(); - return true; - } else { - ctx.setResponseTimeoutMillis(TimeoutMode.SET_FROM_NOW, responseTimeoutMillis); - return true; - } - } - - /** - * Returns the next delay which retry will be made after. The delay will be: - * - *

{@code Math.min(responseTimeoutMillis, Backoff.nextDelayMillis(int))} - * - * @return the number of milliseconds to wait for before attempting a retry. -1 if the - * {@code currentAttemptNo} exceeds the {@code maxAttempts} or the {@code nextDelay} is after - * the moment which timeout happens. - */ - protected final long getNextDelay(ClientRequestContext ctx, Backoff backoff) { - return getNextDelay(ctx, backoff, -1); - } - /** * Returns the next delay which retry will be made after. The delay will be: * @@ -214,128 +107,25 @@ protected final long getNextDelay(ClientRequestContext ctx, Backoff backoff) { * the moment which timeout happens. */ @SuppressWarnings("MethodMayBeStatic") // Intentionally left non-static for better user experience. - protected final long getNextDelay(ClientRequestContext ctx, Backoff backoff, long millisAfterFromServer) { - requireNonNull(ctx, "ctx"); - requireNonNull(backoff, "backoff"); - final State state = state(ctx); - final int currentAttemptNo = state.currentAttemptNoWith(backoff); - - if (currentAttemptNo < 0) { - logger.debug("Exceeded the default number of max attempt: {}", state.config.maxTotalAttempts()); + protected final long getNextDelay(RetryContext rctx, Backoff backoff, long millisAfterFromServer) { + if (rctx.counter().hasReachedMaxAttempts()) { + logger.debug("Exceeded the default number of max attempt: {}", rctx.config().maxTotalAttempts()); return -1; } - long nextDelay = backoff.nextDelayMillis(currentAttemptNo); + rctx.counter().consumeAttemptFrom(backoff); + long nextDelay = backoff.nextDelayMillis(rctx.counter().attemptsSoFarWithBackoff(backoff)); if (nextDelay < 0) { logger.debug("Exceeded the number of max attempts in the backoff: {}", backoff); return -1; } nextDelay = Math.max(nextDelay, millisAfterFromServer); - if (state.timeoutForWholeRetryEnabled() && nextDelay > state.actualResponseTimeoutMillis()) { + if (rctx.timeoutForWholeRetryEnabled() && nextDelay > rctx.actualResponseTimeoutMillis()) { // The nextDelay will be after the moment which timeout will happen. So return just -1. return -1; } return nextDelay; } - - /** - * Returns the total number of attempts of the current request represented by the specified - * {@link ClientRequestContext}. - */ - protected static int getTotalAttempts(ClientRequestContext ctx) { - final State state = ctx.attr(STATE); - if (state == null) { - return 0; - } - return state.totalAttemptNo; - } - - /** - * Creates a new derived {@link ClientRequestContext}, replacing the requests. - * If {@link ClientRequestContext#endpointGroup()} exists, a new {@link Endpoint} will be selected. - */ - protected static ClientRequestContext newDerivedContext(ClientRequestContext ctx, - @Nullable HttpRequest req, - @Nullable RpcRequest rpcReq, - boolean initialAttempt) { - return ClientUtil.newDerivedContext(ctx, req, rpcReq, initialAttempt); - } - - private static State state(ClientRequestContext ctx) { - final State state = ctx.attr(STATE); - assert state != null; - return state; - } - - private static final class State { - - private final RetryConfig config; - private final long deadlineNanos; - private final boolean isTimeoutEnabled; - - @Nullable - private Backoff lastBackoff; - private int currentAttemptNoWithLastBackoff; - private int totalAttemptNo; - - State(RetryConfig config, long responseTimeoutMillis) { - this.config = config; - - if (responseTimeoutMillis <= 0 || responseTimeoutMillis == Long.MAX_VALUE) { - deadlineNanos = 0; - isTimeoutEnabled = false; - } else { - deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(responseTimeoutMillis); - isTimeoutEnabled = true; - } - totalAttemptNo = 1; - } - - /** - * Returns the smaller value between {@link RetryConfig#responseTimeoutMillisForEachAttempt()} and - * remaining {@link #responseTimeoutMillis}. - * - * @return 0 if the response timeout for both of each request and whole retry is disabled or - * -1 if the elapsed time from the first request has passed {@code responseTimeoutMillis} - */ - long responseTimeoutMillis() { - if (!timeoutForWholeRetryEnabled()) { - return config.responseTimeoutMillisForEachAttempt(); - } - - final long actualResponseTimeoutMillis = actualResponseTimeoutMillis(); - - // Consider 0 or less than 0 of actualResponseTimeoutMillis as timed out. - if (actualResponseTimeoutMillis <= 0) { - return -1; - } - - if (config.responseTimeoutMillisForEachAttempt() > 0) { - return Math.min(config.responseTimeoutMillisForEachAttempt(), actualResponseTimeoutMillis); - } - - return actualResponseTimeoutMillis; - } - - boolean timeoutForWholeRetryEnabled() { - return isTimeoutEnabled; - } - - long actualResponseTimeoutMillis() { - return TimeUnit.NANOSECONDS.toMillis(deadlineNanos - System.nanoTime()); - } - - int currentAttemptNoWith(Backoff backoff) { - if (totalAttemptNo++ >= config.maxTotalAttempts()) { - return -1; - } - if (lastBackoff != backoff) { - lastBackoff = backoff; - currentAttemptNoWithLastBackoff = 1; - } - return currentAttemptNoWithLastBackoff++; - } - } } diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryAttempt.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryAttempt.java new file mode 100644 index 00000000000..c5778e204ef --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryAttempt.java @@ -0,0 +1,43 @@ +/* + * Copyright 2025 LINE Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.linecorp.armeria.client.retry; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.common.Response; + +class RetryAttempt { + private final ClientRequestContext ctx; + private final O res; + + RetryAttempt(ClientRequestContext ctx, O res) { + this.ctx = ctx; + this.res = res; + } + + ClientRequestContext ctx() { + return ctx; + } + + O res() { + return res; + } + + RetryAttempt setRes(O res) { + return new RetryAttempt<>( + ctx, res + ); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryContext.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryContext.java new file mode 100644 index 00000000000..eb67ddd5ba4 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryContext.java @@ -0,0 +1,127 @@ +/* + * Copyright 2025 LINE Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.linecorp.armeria.client.retry; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.common.Request; +import com.linecorp.armeria.common.Response; +import com.linecorp.armeria.common.util.TimeoutMode; + +class RetryContext { + private final ClientRequestContext ctx; + private final I req; + private final O res; + private final CompletableFuture resFuture; + private final RetryCounter counter; + private final RetryConfig config; + private final long deadlineNanos; + private final boolean isTimeoutEnabled; + + RetryContext( + ClientRequestContext ctx, I req, O res, + CompletableFuture resFuture, + RetryConfig config, long responseTimeoutMillis + ) { + this.ctx = ctx; + this.req = req; + this.res = res; + this.resFuture = resFuture; + this.config = config; + counter = new RetryCounter(config.maxTotalAttempts()); + + if (responseTimeoutMillis <= 0 || responseTimeoutMillis == Long.MAX_VALUE) { + deadlineNanos = 0; + isTimeoutEnabled = false; + } else { + deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(responseTimeoutMillis); + isTimeoutEnabled = true; + } + } + + long responseTimeoutMillis() { + if (!isTimeoutEnabled) { + return config.responseTimeoutMillisForEachAttempt(); + } + + final long actualResponseTimeoutMillis = actualResponseTimeoutMillis(); + + // Consider 0 or less than 0 of actualResponseTimeoutMillis as timed out. + if (actualResponseTimeoutMillis <= 0) { + return -1; + } + + if (config.responseTimeoutMillisForEachAttempt() > 0) { + return Math.min(config.responseTimeoutMillisForEachAttempt(), actualResponseTimeoutMillis); + } + + return actualResponseTimeoutMillis; + } + + public boolean timeoutForWholeRetryEnabled() { + return isTimeoutEnabled; + } + + public long actualResponseTimeoutMillis() { + return TimeUnit.NANOSECONDS.toMillis( + deadlineNanos - System.nanoTime()); + } + + /** + * Resets the {@link ClientRequestContext#responseTimeoutMillis()}. + * + * @return {@code true} if the response timeout is set, {@code false} if it can't be set due to the timeout + */ + boolean setResponseTimeout() { + final long responseTimeoutMillis = responseTimeoutMillis(); + if (responseTimeoutMillis < 0) { + return false; + } else if (responseTimeoutMillis == 0) { + ctx.clearResponseTimeout(); + return true; + } else { + ctx.setResponseTimeoutMillis(TimeoutMode.SET_FROM_NOW, responseTimeoutMillis); + return true; + } + } + + ClientRequestContext ctx() { + return ctx; + } + + I req() { + return req; + } + + O res() { + return res; + } + + RetryConfig config() { + return config; + } + + CompletableFuture resFuture() { + return resFuture; + } + + RetryCounter counter() { + return counter; + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryCounter.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryCounter.java new file mode 100644 index 00000000000..13a4e67c0ca --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryCounter.java @@ -0,0 +1,86 @@ +/* + * Copyright 2025 LINE Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.linecorp.armeria.client.retry; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + +import com.google.common.base.MoreObjects; + +import com.linecorp.armeria.common.annotation.Nullable; + +final class RetryCounter { + private final int maxAttempts; + + private int numberAttemptsSoFar; + @Nullable + private Backoff lastBackoff; + private int numberAttemptsSoFarForLastBackoff; + + RetryCounter(int maxAttempts) { + checkArgument(maxAttempts > 0, "maxAttempts: %s (expected: > 0)", maxAttempts); + this.maxAttempts = maxAttempts; + numberAttemptsSoFar = 0; + lastBackoff = null; + numberAttemptsSoFarForLastBackoff = 0; + } + + public void consumeAttemptFrom(@Nullable Backoff backoff) { + checkState(!hasReachedMaxAttempts(), "Exceeded the maximum number of attempts: %s", maxAttempts); + + ++numberAttemptsSoFar; + + if (backoff != null) { + if (lastBackoff != backoff) { + lastBackoff = backoff; + numberAttemptsSoFarForLastBackoff = 0; + } + numberAttemptsSoFarForLastBackoff++; + } else { + assert lastBackoff == null; + } + } + + public int attemptsSoFarWithBackoff(Backoff backoff) { + requireNonNull(backoff, "backoff"); + if (lastBackoff != backoff) { + return 0; + } else { + return numberAttemptsSoFarForLastBackoff; + } + } + + public boolean hasReachedMaxAttempts() { + return numberAttemptsSoFar >= maxAttempts; + } + + @Override + public String toString() { + return MoreObjects + .toStringHelper(this) + .add("maxAttempts", maxAttempts) + .add("numberAttemptsSoFar", numberAttemptsSoFar) + .add("lastBackoff", lastBackoff) + .add("numberAttemptsSoFarForLastBackoff", numberAttemptsSoFarForLastBackoff) + .toString(); + } + + public int numberAttemptsSoFar() { + return numberAttemptsSoFar; + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java index d914bfece68..146c856b30e 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingClient.java @@ -50,9 +50,11 @@ import com.linecorp.armeria.common.logging.RequestLogProperty; import com.linecorp.armeria.common.stream.AbortedStreamException; import com.linecorp.armeria.common.util.Exceptions; +import com.linecorp.armeria.common.util.UnmodifiableFuture; import com.linecorp.armeria.internal.client.AggregatedHttpRequestDuplicator; import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil; import com.linecorp.armeria.internal.client.ClientRequestContextExtension; +import com.linecorp.armeria.internal.client.ClientUtil; import com.linecorp.armeria.internal.client.TruncatingHttpResponse; import io.netty.handler.codec.DateFormatter; @@ -236,260 +238,312 @@ public static Function newDecorator(RetryRul RetryConfigMapping mapping, @Nullable RetryConfig retryConfig, boolean useRetryAfter) { - super(delegate, mapping, retryConfig); + super(delegate, retryConfig != null ? (ctx, req) -> retryConfig : mapping); this.useRetryAfter = useRetryAfter; } @Override - protected HttpResponse doExecute(ClientRequestContext ctx, HttpRequest req) throws Exception { - final CompletableFuture responseFuture = new CompletableFuture<>(); - final HttpResponse res = HttpResponse.of(responseFuture, ctx.eventLoop()); - if (ctx.exchangeType().isRequestStreaming()) { - final HttpRequestDuplicator reqDuplicator = req.toDuplicator(ctx.eventLoop().withoutContext(), 0); - doExecute0(ctx, reqDuplicator, req, res, responseFuture); - } else { - req.aggregate(AggregationOptions.usePooledObjects(ctx.alloc(), ctx.eventLoop())) - .handle((agg, cause) -> { - if (cause != null) { - handleException(ctx, null, responseFuture, cause, true); - } else { - final HttpRequestDuplicator reqDuplicator = new AggregatedHttpRequestDuplicator(agg); - doExecute0(ctx, reqDuplicator, req, res, responseFuture); - } - return null; - }); - } + protected HttpResponse doExecute(ClientRequestContext ctx, HttpRequest req, + RetryConfig config) + throws Exception { + final CompletableFuture resFuture = new CompletableFuture<>(); + final HttpResponse res = HttpResponse.of(resFuture, ctx.eventLoop()); + + retryContext(ctx, req, res, resFuture, config) + .handle((rctx, cause) -> { + if (cause != null) { + resFuture.completeExceptionally(cause); + ctx.logBuilder().endRequest(cause); + ctx.logBuilder().endResponse(cause); + return null; + } + + // The request or attemptRes has been aborted by the client before it receives a attemptRes, + // so stop retrying. + rctx.req().whenComplete() + .exceptionally(reqCause -> { + handleException(rctx, reqCause); + return null; + }); + + rctx.res().whenComplete() + .handle((result, resCause) -> { + if (!rctx.resFuture().isDone()) { + // We are still retrying: we were not the ones completing rctx.res(). + handleException(rctx, + resCause == null ? AbortedStreamException.get() : resCause); + } + return null; + }); + + rctx.counter().consumeAttemptFrom(null); + retry(rctx); + + return null; + }); + return res; } - private void doExecute0(ClientRequestContext ctx, HttpRequestDuplicator rootReqDuplicator, - HttpRequest originalReq, HttpResponse returnedRes, - CompletableFuture future) { - final int totalAttempts = getTotalAttempts(ctx); - final boolean initialAttempt = totalAttempts <= 1; - // The request or response has been aborted by the client before it receives a response, - // so stop retrying. - if (originalReq.whenComplete().isCompletedExceptionally()) { - originalReq.whenComplete().handle((unused, cause) -> { - handleException(ctx, rootReqDuplicator, future, cause, initialAttempt); - return null; - }); + private void retry(HttpRetryContext rctx) { + if (isRequestCompletedExternally(rctx)) { return; } - if (returnedRes.isComplete()) { - returnedRes.whenComplete().handle((result, cause) -> { - final Throwable abortCause; - if (cause != null) { - abortCause = cause; - } else { - abortCause = AbortedStreamException.get(); - } - handleException(ctx, rootReqDuplicator, future, abortCause, initialAttempt); - return null; - }); + + if (!rctx.setResponseTimeout()) { + handleException(rctx, ResponseTimeoutException.get()); return; } - if (!setResponseTimeout(ctx)) { - handleException(ctx, rootReqDuplicator, future, ResponseTimeoutException.get(), initialAttempt); + final RetryAttempt attempt; + try { + attempt = executeAttempt(rctx); + } catch (Throwable cause) { + handleException(rctx, cause); return; } - final HttpRequest duplicateReq; - if (initialAttempt) { - duplicateReq = rootReqDuplicator.duplicate(); + if (rctx.ctx().exchangeType().isResponseStreaming() && !rctx.config().requiresResponseTrailers()) { + handleStreamingAttemptResponse(rctx, attempt); } else { - final RequestHeadersBuilder newHeaders = originalReq.headers().toBuilder(); - newHeaders.setInt(ARMERIA_RETRY_COUNT, totalAttempts - 1); - duplicateReq = rootReqDuplicator.duplicate(newHeaders.build()); + handleAggregatedAttemptResponse(rctx, attempt); } + } - final ClientRequestContext derivedCtx; - try { - derivedCtx = newDerivedContext(ctx, duplicateReq, ctx.rpcRequest(), initialAttempt); - } catch (Throwable t) { - handleException(ctx, rootReqDuplicator, future, t, initialAttempt); - return; + private void handleAggregatedAttemptResponse(HttpRetryContext rctx, RetryAttempt attempt) { + attempt.res().aggregate().handle((aggAttemptRes, cause) -> { + if (cause != null) { + attempt.ctx().logBuilder().endRequest(cause); + attempt.ctx().logBuilder().endResponse(cause); + decideAndHandleDecision(rctx, attempt.setRes(HttpResponse.ofFailure(cause)), + HttpResponse.ofFailure(cause), cause); + } else { + completeLogIfBytesNotTransferred(attempt.ctx(), aggAttemptRes); + attempt.ctx().log().whenAvailable(RequestLogProperty.RESPONSE_END_TIME).thenRun(() -> { + decideAndHandleDecision(rctx, attempt.setRes(aggAttemptRes.toHttpResponse()), + aggAttemptRes.toHttpResponse(), null); + }); + } + return null; + }); + } + + private void decideAndHandleDecision(HttpRetryContext rctx, + RetryAttempt attempt, + @Nullable HttpResponse resToDecide, + @Nullable Throwable causeToDecide) { + decide(rctx, attempt, resToDecide, causeToDecide) + .handle((decision, decisionCause) -> { + if (resToDecide != null) { + resToDecide.abort(); + } + + if (decisionCause != null) { + abortAttempt(attempt); + handleException(rctx, decisionCause); + } else { + handleDecision(rctx, attempt, decision); + } + return null; + }); + } + + private CompletionStage decide(HttpRetryContext rctx, + RetryAttempt attempt, + @Nullable HttpResponse resToDecide, + @Nullable Throwable causeToDecide) { + if (causeToDecide != null) { + causeToDecide = Exceptions.peel(causeToDecide); } - final HttpRequest ctxReq = derivedCtx.request(); - assert ctxReq != null; - final HttpResponse response; - final ClientRequestContextExtension ctxExtension = derivedCtx.as(ClientRequestContextExtension.class); - if (!initialAttempt && ctxExtension != null && derivedCtx.endpoint() == null) { - // clear the pending throwable to retry endpoint selection - ClientPendingThrowableUtil.removePendingThrowable(derivedCtx); - // if the endpoint hasn't been selected, try to initialize the ctx with a new endpoint/event loop - response = initContextAndExecuteWithFallback( - unwrap(), ctxExtension, HttpResponse::of, - (context, cause) -> HttpResponse.ofFailure(cause), ctxReq, false); - } else { - response = executeWithFallback(unwrap(), derivedCtx, - (context, cause) -> HttpResponse.ofFailure(cause), ctxReq, false); + try { + if (rctx.config().needsContentInRule()) { + assert resToDecide != null ^ causeToDecide != null; + final RetryRuleWithContent retryRuleWithContent = + rctx.config().retryRuleWithContent(); + assert retryRuleWithContent != null; + return retryRuleWithContent + .shouldRetry(attempt.ctx(), resToDecide, causeToDecide) + .handle((decision, cause) -> { + warnIfExceptionIsRaised(retryRuleWithContent, cause); + return decision; + }); + } else { + final RetryRule retryRuleWithoutContent = rctx.config().retryRule(); + assert retryRuleWithoutContent != null; + return retryRuleWithoutContent + .shouldRetry(attempt.ctx(), causeToDecide) + .handle((decision, cause) -> { + warnIfExceptionIsRaised(retryRuleWithoutContent, cause); + return decision; + }); + } + } catch (Throwable ruleCause) { + return UnmodifiableFuture.exceptionallyCompletedFuture(ruleCause); } + } - final RetryConfig config = mappedRetryConfig(ctx); - if (!ctx.exchangeType().isResponseStreaming() || config.requiresResponseTrailers()) { - response.aggregate().handle((aggregated, cause) -> { - if (cause != null) { - derivedCtx.logBuilder().endRequest(cause); - derivedCtx.logBuilder().endResponse(cause); - handleResponseWithoutContent(config, ctx, rootReqDuplicator, originalReq, returnedRes, - future, derivedCtx, HttpResponse.ofFailure(cause), cause); - } else { - completeLogIfBytesNotTransferred(aggregated, derivedCtx); - derivedCtx.log().whenAvailable(RequestLogProperty.RESPONSE_END_TIME).thenRun(() -> { - handleAggregatedResponse(config, ctx, rootReqDuplicator, originalReq, returnedRes, - future, derivedCtx, aggregated); - }); - } - return null; - }); - } else { - handleStreamingResponse(config, ctx, rootReqDuplicator, originalReq, returnedRes, - future, derivedCtx, response); + private void handleDecision(HttpRetryContext rctx, RetryAttempt attempt, + @Nullable RetryDecision decision) { + final Backoff backoff = decision != null ? decision.backoff() : null; + if (backoff != null) { + final long millisAfter = useRetryAfter ? getRetryAfterMillis(attempt.ctx()) : -1; + final long nextDelay = getNextDelay(rctx, backoff, millisAfter); + if (nextDelay >= 0) { + abortAttempt(attempt); + scheduleNextRetry( + rctx.ctx(), scheduleCause -> handleException(rctx, scheduleCause), + () -> retry(rctx), + nextDelay); + return; + } } + rctx.ctx().logBuilder().endResponseWithLastChild(); + rctx.resFuture().complete(attempt.res()); + rctx.reqDuplicator().close(); } - // TODO(ikhoon): Add a request-scope class such as RetryRequestContext to avoid passing too many parameters. - private void handleResponseWithoutContent(RetryConfig config, ClientRequestContext ctx, - HttpRequestDuplicator rootReqDuplicator, HttpRequest originalReq, - HttpResponse returnedRes, CompletableFuture future, - ClientRequestContext derivedCtx, HttpResponse response, - @Nullable Throwable responseCause) { - if (responseCause != null) { - responseCause = Exceptions.peel(responseCause); + private RetryAttempt executeAttempt(HttpRetryContext rctx) { + final boolean isInitialAttempt = rctx.counter().numberAttemptsSoFar() <= 1; + final HttpRequest duplicateReq; + if (isInitialAttempt) { + duplicateReq = rctx.reqDuplicator().duplicate(); + } else { + final RequestHeadersBuilder newHeaders = rctx.req().headers().toBuilder(); + newHeaders.setInt(ClientUtil.ARMERIA_RETRY_COUNT, rctx.counter().numberAttemptsSoFar() - 1); + duplicateReq = rctx.reqDuplicator().duplicate(newHeaders.build()); } - try { - final RetryRule retryRule = retryRule(config); - final CompletionStage f = retryRule.shouldRetry(derivedCtx, responseCause); - f.handle((decision, shouldRetryCause) -> { - warnIfExceptionIsRaised(retryRule, shouldRetryCause); - handleRetryDecision(decision, ctx, derivedCtx, rootReqDuplicator, - originalReq, returnedRes, future, response); - return null; - }); - } catch (Throwable cause) { - response.abort(); - handleException(ctx, rootReqDuplicator, future, cause, false); + + final ClientRequestContext attemptCtx; + + attemptCtx = ClientUtil.newDerivedContext(rctx.ctx(), duplicateReq, rctx.ctx().rpcRequest(), + isInitialAttempt); + + final HttpRequest attemptReq = attemptCtx.request(); + assert attemptReq != null; + final HttpResponse attemptRes; + final ClientRequestContextExtension ctxExtension = attemptCtx.as(ClientRequestContextExtension.class); + if (!isInitialAttempt && ctxExtension != null && attemptCtx.endpoint() == null) { + // clear the pending throwable to retry endpoint selection + ClientPendingThrowableUtil.removePendingThrowable(attemptCtx); + // if the endpoint hasn't been selected, try to initialize the ctx with a new endpoint/event loop + attemptRes = initContextAndExecuteWithFallback( + unwrap(), ctxExtension, HttpResponse::of, + (context, cause) -> HttpResponse.ofFailure(cause), attemptReq, false); + } else { + attemptRes = executeWithFallback(unwrap(), attemptCtx, + (context, cause) -> HttpResponse.ofFailure(cause), attemptReq, + false); } + + return new RetryAttempt<>(attemptCtx, attemptRes); } - private void handleStreamingResponse(RetryConfig retryConfig, ClientRequestContext ctx, - HttpRequestDuplicator rootReqDuplicator, - HttpRequest originalReq, HttpResponse returnedRes, - CompletableFuture future, - ClientRequestContext derivedCtx, - HttpResponse response) { - final SplitHttpResponse splitResponse = response.split(); - splitResponse.headers().handle((headers, headersCause) -> { + private void handleStreamingAttemptResponse(HttpRetryContext rctx, + RetryAttempt attempt) { + final SplitHttpResponse splitAttemptRes = attempt.res().split(); + splitAttemptRes.headers().handle((headers, headersCause) -> { final Throwable responseCause; if (headersCause == null) { - final RequestLog log = derivedCtx.log().getIfAvailable(RequestLogProperty.RESPONSE_CAUSE); + final RequestLog log = attempt.ctx().log().getIfAvailable(RequestLogProperty.RESPONSE_CAUSE); responseCause = log != null ? log.responseCause() : null; } else { responseCause = Exceptions.peel(headersCause); } - completeLogIfBytesNotTransferred(response, headers, derivedCtx, responseCause); - - derivedCtx.log().whenAvailable(RequestLogProperty.RESPONSE_HEADERS).thenRun(() -> { - if (retryConfig.needsContentInRule() && responseCause == null) { - final HttpResponse response0 = splitResponse.unsplit(); - final HttpResponseDuplicator duplicator = - response0.toDuplicator(derivedCtx.eventLoop().withoutContext(), - derivedCtx.maxResponseLength()); + completeLogIfBytesNotTransferred(attempt.ctx(), headers, responseCause, attempt.res()); + + attempt.ctx().log().whenAvailable(RequestLogProperty.RESPONSE_HEADERS).thenRun(() -> { + if (rctx.config().needsContentInRule() && responseCause == null) { + final HttpResponse unsplitAttemptRes = splitAttemptRes.unsplit(); + final HttpResponseDuplicator attemptResDuplicator = + unsplitAttemptRes.toDuplicator(attempt.ctx().eventLoop().withoutContext(), + attempt.ctx().maxResponseLength()); try { - final TruncatingHttpResponse truncatingHttpResponse = - new TruncatingHttpResponse(duplicator.duplicate(), - retryConfig.maxContentLength()); - final HttpResponse duplicated = duplicator.duplicate(); - duplicator.close(); - - final RetryRuleWithContent ruleWithContent = - retryConfig.retryRuleWithContent(); - assert ruleWithContent != null; - ruleWithContent.shouldRetry(derivedCtx, truncatingHttpResponse, null) - .handle((decision, cause) -> { - warnIfExceptionIsRaised(ruleWithContent, cause); - truncatingHttpResponse.abort(); - handleRetryDecision(decision, ctx, derivedCtx, rootReqDuplicator, - originalReq, returnedRes, future, duplicated); - return null; - }); + final RetryAttempt attemptWithResHeaders = attempt.setRes( + attemptResDuplicator.duplicate()); + final TruncatingHttpResponse truncatingAttemptRes = + new TruncatingHttpResponse(attemptResDuplicator.duplicate(), + rctx.config().maxContentLength()); + attemptResDuplicator.close(); + + decideAndHandleDecision(rctx, attemptWithResHeaders, truncatingAttemptRes, + null); } catch (Throwable cause) { - duplicator.abort(cause); - handleException(ctx, rootReqDuplicator, future, cause, false); + attemptResDuplicator.abort(cause); } } else { - final HttpResponse response0; if (responseCause != null) { - splitResponse.body().abort(responseCause); - response0 = HttpResponse.ofFailure(responseCause); + splitAttemptRes.body().abort(responseCause); + decideAndHandleDecision( + rctx, attempt.setRes(HttpResponse.ofFailure(responseCause)), null, + responseCause + ); } else { - response0 = splitResponse.unsplit(); + decideAndHandleDecision(rctx, + attempt.setRes(splitAttemptRes.unsplit()), + null, + null); } - handleResponseWithoutContent(retryConfig, ctx, rootReqDuplicator, originalReq, returnedRes, - future, derivedCtx, response0, responseCause); } }); return null; }); } - private void handleAggregatedResponse(RetryConfig retryConfig, ClientRequestContext ctx, - HttpRequestDuplicator rootReqDuplicator, - HttpRequest originalReq, HttpResponse returnedRes, - CompletableFuture future, - ClientRequestContext derivedCtx, - AggregatedHttpResponse aggregatedRes) { - if (retryConfig.needsContentInRule()) { - final RetryRuleWithContent ruleWithContent = retryConfig.retryRuleWithContent(); - assert ruleWithContent != null; - try { - ruleWithContent.shouldRetry(derivedCtx, aggregatedRes.toHttpResponse(), null) - .handle((decision, cause) -> { - warnIfExceptionIsRaised(ruleWithContent, cause); - handleRetryDecision( - decision, ctx, derivedCtx, rootReqDuplicator, originalReq, - returnedRes, future, aggregatedRes.toHttpResponse()); - return null; - }); - } catch (Throwable cause) { - handleException(ctx, rootReqDuplicator, future, cause, false); - } - return; + private CompletableFuture retryContext(ClientRequestContext ctx, HttpRequest req, + HttpResponse res, + CompletableFuture resFuture, + RetryConfig config) { + + if (ctx.exchangeType().isRequestStreaming()) { + return UnmodifiableFuture.completedFuture(new HttpRetryContext( + ctx, req, req.toDuplicator(ctx.eventLoop().withoutContext(), 0), res, resFuture, config, + ctx.responseTimeoutMillis() + )); + } else { + return req.aggregate(AggregationOptions.usePooledObjects(ctx.alloc(), ctx.eventLoop())) + .thenApply(AggregatedHttpRequestDuplicator::new) + .thenApply(reqDuplicator -> new HttpRetryContext( + ctx, req, reqDuplicator, res, resFuture, + config, ctx.responseTimeoutMillis() + )); } - handleResponseWithoutContent(retryConfig, ctx, rootReqDuplicator, originalReq, returnedRes, - future, derivedCtx, aggregatedRes.toHttpResponse(), null); } - private static void completeLogIfBytesNotTransferred(AggregatedHttpResponse response, - ClientRequestContext ctx) { + private boolean isRequestCompletedExternally(HttpRetryContext rctx) { + return rctx.req().whenComplete().isCompletedExceptionally() || + rctx.res() + .whenComplete() + .isDone(); + } + + private static void completeLogIfBytesNotTransferred( + ClientRequestContext ctx, AggregatedHttpResponse res) { if (!ctx.log().isAvailable(RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME)) { final RequestLogBuilder logBuilder = ctx.logBuilder(); logBuilder.endRequest(); - logBuilder.responseHeaders(response.headers()); - if (!response.trailers().isEmpty()) { - logBuilder.responseTrailers(response.trailers()); + logBuilder.responseHeaders(res.headers()); + if (!res.trailers().isEmpty()) { + logBuilder.responseTrailers(res.trailers()); } logBuilder.endResponse(); } } private static void completeLogIfBytesNotTransferred( - HttpResponse response, @Nullable ResponseHeaders headers, ClientRequestContext ctx, - @Nullable Throwable responseCause) { + ClientRequestContext ctx, + @Nullable ResponseHeaders headers, @Nullable Throwable resCause, HttpResponse res) { if (!ctx.log().isAvailable(RequestLogProperty.REQUEST_FIRST_BYTES_TRANSFERRED_TIME)) { final RequestLogBuilder logBuilder = ctx.logBuilder(); - if (responseCause != null) { - logBuilder.endRequest(responseCause); - logBuilder.endResponse(responseCause); + if (resCause != null) { + logBuilder.endRequest(resCause); + logBuilder.endResponse(resCause); } else { logBuilder.endRequest(); if (headers != null) { logBuilder.responseHeaders(headers); } - response.whenComplete().handle((unused, cause) -> { + res.whenComplete().handle((unused, cause) -> { if (cause != null) { logBuilder.endResponse(cause); } else { @@ -507,48 +561,21 @@ private static void warnIfExceptionIsRaised(Object retryRule, @Nullable Throwabl } } - private static void handleException(ClientRequestContext ctx, - @Nullable HttpRequestDuplicator rootReqDuplicator, - CompletableFuture future, Throwable cause, - boolean endRequestLog) { - future.completeExceptionally(cause); - if (rootReqDuplicator != null) { - rootReqDuplicator.abort(cause); + private static void handleException(HttpRetryContext rctx, Throwable cause) { + rctx.resFuture().completeExceptionally(cause); + rctx.reqDuplicator().abort(cause); + if (!rctx.ctx().logBuilder().isRequestComplete()) { + rctx.ctx().logBuilder().endRequest(cause); } - if (endRequestLog) { - ctx.logBuilder().endRequest(cause); - } - ctx.logBuilder().endResponse(cause); + rctx.ctx().logBuilder().endResponse(cause); } - private void handleRetryDecision(@Nullable RetryDecision decision, ClientRequestContext ctx, - ClientRequestContext derivedCtx, HttpRequestDuplicator rootReqDuplicator, - HttpRequest originalReq, HttpResponse returnedRes, - CompletableFuture future, HttpResponse originalRes) { - final Backoff backoff = decision != null ? decision.backoff() : null; - if (backoff != null) { - final long millisAfter = useRetryAfter ? getRetryAfterMillis(derivedCtx) : -1; - final long nextDelay = getNextDelay(ctx, backoff, millisAfter); - if (nextDelay >= 0) { - abortResponse(originalRes, derivedCtx); - scheduleNextRetry( - ctx, cause -> handleException(ctx, rootReqDuplicator, future, cause, false), - () -> doExecute0(ctx, rootReqDuplicator, originalReq, returnedRes, future), - nextDelay); - return; - } - } - onRetryingComplete(ctx); - future.complete(originalRes); - rootReqDuplicator.close(); - } - - private static void abortResponse(HttpResponse originalRes, ClientRequestContext derivedCtx) { + private static void abortAttempt(RetryAttempt attempt) { // Set response content with null to make sure that the log is complete. - final RequestLogBuilder logBuilder = derivedCtx.logBuilder(); + final RequestLogBuilder logBuilder = attempt.ctx().logBuilder(); logBuilder.responseContent(null, null); logBuilder.responseContentPreview(null); - originalRes.abort(); + attempt.res().abort(); } private static long getRetryAfterMillis(ClientRequestContext ctx) { @@ -591,4 +618,20 @@ private static RetryRule retryRule(RetryConfig retryConfig) { return rule; } } + + private static final class HttpRetryContext extends RetryContext { + private final HttpRequestDuplicator reqDuplicator; + + HttpRetryContext(ClientRequestContext ctx, HttpRequest req, + HttpRequestDuplicator reqDuplicator, + HttpResponse res, CompletableFuture resFuture, + RetryConfig config, long responseTimeoutMillis) { + super(ctx, req, res, resFuture, config, responseTimeoutMillis); + this.reqDuplicator = reqDuplicator; + } + + HttpRequestDuplicator reqDuplicator() { + return reqDuplicator; + } + } } diff --git a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java index a1c0f2749c7..a8f8adb7dd8 100644 --- a/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java +++ b/core/src/main/java/com/linecorp/armeria/client/retry/RetryingRpcClient.java @@ -32,6 +32,7 @@ import com.linecorp.armeria.common.RpcResponse; import com.linecorp.armeria.internal.client.ClientPendingThrowableUtil; import com.linecorp.armeria.internal.client.ClientRequestContextExtension; +import com.linecorp.armeria.internal.client.ClientUtil; import com.linecorp.armeria.internal.common.util.StringUtil; /** @@ -137,103 +138,117 @@ public static RetryingRpcClientBuilder builder(RetryConfigMapping m * Creates a new instance that decorates the specified {@link RpcClient}. */ RetryingRpcClient(RpcClient delegate, RetryConfigMapping mapping) { - super(delegate, mapping, null); + super(delegate, mapping); } @Override - protected RpcResponse doExecute(ClientRequestContext ctx, RpcRequest req) throws Exception { - final CompletableFuture future = new CompletableFuture<>(); - final RpcResponse res = RpcResponse.from(future); - doExecute0(ctx, req, res, future); + protected RpcResponse doExecute(ClientRequestContext ctx, RpcRequest req, RetryConfig config) + throws Exception { + final CompletableFuture resFuture = new CompletableFuture<>(); + final RpcResponse res = RpcResponse.from(resFuture); + final RetryContext rctx = + new RetryContext<>(ctx, req, res, resFuture, config, ctx.responseTimeoutMillis()); + rctx.counter().consumeAttemptFrom(null); + retry(rctx); return res; } - private void doExecute0(ClientRequestContext ctx, RpcRequest req, - RpcResponse returnedRes, CompletableFuture future) { - final int totalAttempts = getTotalAttempts(ctx); + private void retry(RetryContext rctx) { + final int totalAttempts = rctx.counter().numberAttemptsSoFar(); final boolean initialAttempt = totalAttempts <= 1; - if (ctx.isCancelled() || returnedRes.isDone()) { + if (rctx.ctx().isCancelled() || rctx.res().isDone()) { // The response has been cancelled by the client before it receives a response, so stop retrying. - handleException(ctx, future, new CancellationException( + handleException(rctx, new CancellationException( "the response returned to the client has been cancelled"), initialAttempt); return; } - if (!setResponseTimeout(ctx)) { - handleException(ctx, future, ResponseTimeoutException.get(), initialAttempt); + if (!rctx.setResponseTimeout()) { + handleException(rctx, ResponseTimeoutException.get(), initialAttempt); return; } - final ClientRequestContext derivedCtx = newDerivedContext(ctx, null, req, initialAttempt); + final RetryAttempt attempt = executeAttempt(rctx); - if (!initialAttempt) { - derivedCtx.mutateAdditionalRequestHeaders( - mutator -> mutator.add(ARMERIA_RETRY_COUNT, StringUtil.toString(totalAttempts - 1))); - } - - final RpcResponse res; - - final ClientRequestContextExtension ctxExtension = derivedCtx.as(ClientRequestContextExtension.class); - final EndpointGroup endpointGroup = derivedCtx.endpointGroup(); - if (!initialAttempt && ctxExtension != null && - endpointGroup != null && derivedCtx.endpoint() == null) { - // clear the pending throwable to retry endpoint selection - ClientPendingThrowableUtil.removePendingThrowable(derivedCtx); - // if the endpoint hasn't been selected, try to initialize the ctx with a new endpoint/event loop - res = initContextAndExecuteWithFallback(unwrap(), ctxExtension, RpcResponse::from, - (context, cause) -> RpcResponse.ofFailure(cause), - req, true); - } else { - res = executeWithFallback(unwrap(), derivedCtx, - (context, cause) -> RpcResponse.ofFailure(cause), - req, true); - } - - final RetryConfig retryConfig = mappedRetryConfig(ctx); final RetryRuleWithContent retryRule = - retryConfig.needsContentInRule() ? - retryConfig.retryRuleWithContent() : retryConfig.fromRetryRule(); - res.handle((unused1, cause) -> { + rctx.config().needsContentInRule() ? + rctx.config().retryRuleWithContent() : rctx.config().fromRetryRule(); + attempt.res().handle((unused1, cause) -> { try { assert retryRule != null; - retryRule.shouldRetry(derivedCtx, res, cause).handle((decision, unused3) -> { + retryRule.shouldRetry(attempt.ctx(), attempt.res(), cause).handle((decision, unused3) -> { final Backoff backoff = decision != null ? decision.backoff() : null; if (backoff != null) { - final long nextDelay = getNextDelay(derivedCtx, backoff); + final long nextDelay = getNextDelay(rctx, backoff, -1); if (nextDelay < 0) { - onRetryComplete(ctx, derivedCtx, res, future); + onRetryComplete(rctx, attempt); return null; } - scheduleNextRetry(ctx, cause0 -> handleException(ctx, future, cause0, false), - () -> doExecute0(ctx, req, returnedRes, future), nextDelay); + scheduleNextRetry(rctx.ctx(), cause0 -> handleException(rctx, cause0, false), + () -> retry(rctx), nextDelay); } else { - onRetryComplete(ctx, derivedCtx, res, future); + onRetryComplete(rctx, attempt); } return null; }); } catch (Throwable t) { - handleException(ctx, future, t, false); + handleException(rctx, t, false); } return null; }); } - private static void onRetryComplete(ClientRequestContext ctx, ClientRequestContext derivedCtx, - RpcResponse res, CompletableFuture future) { - onRetryingComplete(ctx); - final HttpRequest actualHttpReq = derivedCtx.request(); + private RetryAttempt executeAttempt(RetryContext rctx) { + final int totalAttempts = rctx.counter().numberAttemptsSoFar(); + final boolean initialAttempt = totalAttempts <= 1; + final ClientRequestContext attemptCtx = ClientUtil.newDerivedContext(rctx.ctx(), null, rctx.req(), + initialAttempt); + + if (!initialAttempt) { + attemptCtx.mutateAdditionalRequestHeaders( + mutator -> mutator.add(ClientUtil.ARMERIA_RETRY_COUNT, + StringUtil.toString(totalAttempts - 1))); + } + + final RpcResponse attemptRes; + final ClientRequestContextExtension attemptCtxExtension = attemptCtx.as( + ClientRequestContextExtension.class); + final EndpointGroup endpointGroup = attemptCtx.endpointGroup(); + if (!initialAttempt && attemptCtxExtension != null && + endpointGroup != null && attemptCtx.endpoint() == null) { + // clear the pending throwable to retry endpoint selection + ClientPendingThrowableUtil.removePendingThrowable(attemptCtx); + // if the endpoint hasn't been selected, try to initialize the ctx with a new endpoint/event loop + attemptRes = initContextAndExecuteWithFallback(unwrap(), attemptCtxExtension, RpcResponse::from, + (context, cause) -> + RpcResponse.ofFailure(cause), + rctx.req(), true); + } else { + attemptRes = executeWithFallback(unwrap(), attemptCtx, + (context, cause) -> + RpcResponse.ofFailure(cause), + rctx.req(), true); + } + + return new RetryAttempt<>(attemptCtx, attemptRes); + } + + private static void onRetryComplete(RetryContext rctx, + RetryAttempt attempt) { + rctx.ctx().logBuilder().endResponseWithLastChild(); + final HttpRequest actualHttpReq = attempt.ctx().request(); if (actualHttpReq != null) { - ctx.updateRequest(actualHttpReq); + rctx.ctx().updateRequest(actualHttpReq); } - future.complete(res); + rctx.resFuture().complete(attempt.res()); } - private static void handleException(ClientRequestContext ctx, CompletableFuture future, + private static void handleException(RetryContext rctx, Throwable cause, boolean endRequestLog) { - future.completeExceptionally(cause); + rctx.resFuture().completeExceptionally(cause); if (endRequestLog) { - ctx.logBuilder().endRequest(cause); + rctx.ctx().logBuilder().endRequest(cause); } - ctx.logBuilder().endResponse(cause); + rctx.ctx().logBuilder().endResponse(cause); } } diff --git a/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java b/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java index a4f65d598fc..15e10f59863 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java +++ b/core/src/main/java/com/linecorp/armeria/internal/client/ClientUtil.java @@ -30,6 +30,7 @@ import com.linecorp.armeria.client.UnprocessedRequestException; import com.linecorp.armeria.client.WebClient; import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.HttpHeaderNames; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.Request; import com.linecorp.armeria.common.RequestId; @@ -45,6 +46,8 @@ import com.linecorp.armeria.common.util.Exceptions; import com.linecorp.armeria.common.util.SafeCloseable; +import io.netty.util.AsciiString; + public final class ClientUtil { /** @@ -53,6 +56,12 @@ public final class ClientUtil { public static final URI UNDEFINED_URI = URI.create("http://" + ClientBuilderParamsUtil.UNDEFINED_URI_AUTHORITY); + /** + * The header which indicates the retry count of a {@link Request}. + * The server might use this value to reject excessive retries, etc. + */ + public static final AsciiString ARMERIA_RETRY_COUNT = HttpHeaderNames.of("armeria-retry-count"); + public static > O initContextAndExecuteWithFallback( U delegate, diff --git a/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientLoadBalancingTest.java b/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientLoadBalancingTest.java index a2b52679da6..b30d19582ef 100644 --- a/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientLoadBalancingTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientLoadBalancingTest.java @@ -105,7 +105,7 @@ void test(TestMode mode) { } // Retry only once on failure. - if (!HttpStatus.OK.equals(status) && AbstractRetryingClient.getTotalAttempts(ctx) <= 1) { + if (!HttpStatus.OK.equals(status) && ctx.log().partial().currentAttempt() <= 1) { return UnmodifiableFuture.completedFuture(RetryDecision.retry(Backoff.withoutDelay())); } else { return UnmodifiableFuture.completedFuture(RetryDecision.noRetry()); @@ -127,9 +127,9 @@ void test(TestMode mode) { case FAILURE: final List expectedPortsWhenRetried = ImmutableList.builder() - .addAll(expectedPorts) - .addAll(expectedPorts) - .build(); + .addAll(expectedPorts) + .addAll(expectedPorts) + .build(); assertThat(accessedPorts).isEqualTo(expectedPortsWhenRetried); break; } diff --git a/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientTest.java b/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientTest.java index c68446ad7e3..f00e5aa175b 100644 --- a/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientTest.java @@ -16,8 +16,8 @@ package com.linecorp.armeria.client.retry; -import static com.linecorp.armeria.client.retry.AbstractRetryingClient.ARMERIA_RETRY_COUNT; import static com.linecorp.armeria.common.util.Exceptions.peel; +import static com.linecorp.armeria.internal.client.ClientUtil.ARMERIA_RETRY_COUNT; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.catchThrowable; @@ -526,15 +526,15 @@ void honorRetryMapping() { void evaluatesMappingOnce() { final AtomicInteger evaluations = new AtomicInteger(0); final RetryConfigMapping mapping = - (ctx, req) -> { - evaluations.incrementAndGet(); - return RetryConfig - .builder0(RetryRule.builder() - .onStatus(HttpStatus.valueOf(500)) - .thenBackoff()) - .maxTotalAttempts(2) - .build(); - }; + (ctx, req) -> { + evaluations.incrementAndGet(); + return RetryConfig + .builder0(RetryRule.builder() + .onStatus(HttpStatus.valueOf(500)) + .thenBackoff()) + .maxTotalAttempts(2) + .build(); + }; final WebClient client = client(mapping); diff --git a/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/client/retry/RetryingRpcClientTest.java b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/client/retry/RetryingRpcClientTest.java index a93122a018a..eba3209fdc4 100644 --- a/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/client/retry/RetryingRpcClientTest.java +++ b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/client/retry/RetryingRpcClientTest.java @@ -15,7 +15,7 @@ */ package com.linecorp.armeria.it.client.retry; -import static com.linecorp.armeria.client.retry.AbstractRetryingClient.ARMERIA_RETRY_COUNT; +import static com.linecorp.armeria.internal.client.ClientUtil.ARMERIA_RETRY_COUNT; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.catchThrowable;