Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,6 @@ private void doExecute0(ClientRequestContext ctx, HttpRequestDuplicator rootReqD

final RetryConfig<HttpResponse> config = mappedRetryConfig(ctx);
if (!ctx.exchangeType().isResponseStreaming() || config.requiresResponseTrailers()) {
RetryLimiterExecutor.onCompletedAttempt(config.retryLimiter(), ctx, ctx.log().partial(),
totalAttempts);
response.aggregate().handle((aggregated, cause) -> {
if (cause != null) {
derivedCtx.logBuilder().endRequest(cause);
Expand All @@ -349,8 +347,6 @@ private void doExecute0(ClientRequestContext ctx, HttpRequestDuplicator rootReqD
return null;
});
} else {
RetryLimiterExecutor.onCompletedAttempt(config.retryLimiter(), ctx, ctx.log().partial(),
totalAttempts);
handleStreamingResponse(config, ctx, rootReqDuplicator, originalReq, returnedRes,
future, derivedCtx, response);
}
Expand Down Expand Up @@ -533,6 +529,11 @@ private void handleRetryDecision(@Nullable RetryDecision decision, @Nullable Ret
HttpRequestDuplicator rootReqDuplicator, HttpRequest originalReq,
HttpResponse returnedRes, CompletableFuture<HttpResponse> future,
HttpResponse originalRes) {
// Notify the retry limiter that this attempt has completed
final RetryConfig<HttpResponse> config = mappedRetryConfig(ctx);
RetryLimiterExecutor.onCompletedAttempt(config.retryLimiter(), ctx, derivedCtx.log().partial(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can do this but then you also have to cover the exceptional case with handleException

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A test case for this would be great

getTotalAttempts(ctx));

final Backoff backoff = decision != null ? decision.backoff() : null;
if (backoff != null) {
final long millisAfter = useRetryAfter ? getRetryAfterMillis(derivedCtx) : -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.common.logging.RequestLogProperty;
import com.linecorp.armeria.internal.common.InternalGrpcWebTrailers;

/**
Expand Down Expand Up @@ -178,10 +179,11 @@ public boolean shouldRetry(ClientRequestContext ctx, int numAttemptsSoFar) {
*/
@Override
public void onCompletedAttempt(ClientRequestContext ctx, RequestLog requestLog, int numAttemptsSoFar) {
// Exception flow, no response available, do nothing.
if (requestLog.responseCause() != null) {
// Check if response headers and trailers are available if not we don't have a valid response
if (!requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)) {
return;
}

// Extract the headers to be able to evaluate the gRPC status
// Check HTTP trailers first, because most gRPC responses have non-empty payload + trailers.
HttpHeaders maybeGrpcTrailers = requestLog.responseTrailers();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the request log does not have trailers, we will throw an RequestLogAvailabilityException and not return null. You can check for its existence with log.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS). Then you can also remove the cause check above.

ref: https://javadoc.io/doc/com.linecorp.armeria/armeria-javadoc/latest/com/linecorp/armeria/common/logging/RequestLog.html#responseTrailers()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed 👍

Expand All @@ -203,9 +205,9 @@ public void onCompletedAttempt(ClientRequestContext ctx, RequestLog requestLog,
final String status = maybeGrpcTrailers.get("grpc-status");
final boolean decrement = retryableStatuses.contains(status);

while (true) {
boolean updated;
do {
final int currentCount = tokenCount.get();
final boolean updated;
if (decrement) {
if (currentCount == 0) {
break;
Expand All @@ -219,10 +221,7 @@ public void onCompletedAttempt(ClientRequestContext ctx, RequestLog requestLog,
final int incremented = currentCount + tokenRatio;
updated = tokenCount.compareAndSet(currentCount, Math.min(incremented, maxTokens));
}
if (updated) {
break;
}
}
} while (!updated);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.common.logging.RequestLogProperty;

class GrpcRetryLimiterTest {

Expand Down Expand Up @@ -250,8 +251,8 @@ void onCompletedAttempt_withRetryableStatus_decrementsTokens() {
final RequestLog requestLog = mock(RequestLog.class);
final HttpHeaders headers = createGrpcHeaders("14"); // UNAVAILABLE

when(requestLog.responseCause()).thenReturn(null);
when(requestLog.responseTrailers()).thenReturn(headers);
when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)).thenReturn(true);

final int initialTokens = limiter.tokenCount.get();
limiter.onCompletedAttempt(ctx, requestLog, 1);
Expand All @@ -265,8 +266,8 @@ void onCompletedAttempt_withNonRetryableStatus_incrementsTokens() {
final RequestLog requestLog = mock(RequestLog.class);
final HttpHeaders headers = createGrpcHeaders("0"); // OK

when(requestLog.responseCause()).thenReturn(null);
when(requestLog.responseTrailers()).thenReturn(headers);
when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)).thenReturn(true);

// Set token count to less than max to allow increment
limiter.tokenCount.set(5000);
Expand All @@ -281,9 +282,7 @@ void onCompletedAttempt_withNonRetryableStatus_incrementsTokens() {
void onCompletedAttempt_withException_doesNotChangeTokens() {
final GrpcRetryLimiter limiter = new GrpcRetryLimiter(10.0f, 1.0f);
final RequestLog requestLog = mock(RequestLog.class);
final Exception exception = new RuntimeException("Test exception");

when(requestLog.responseCause()).thenReturn(exception);
when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)).thenReturn(false);

final int initialTokens = limiter.tokenCount.get();
limiter.onCompletedAttempt(ctx, requestLog, 1);
Expand All @@ -297,9 +296,9 @@ void onCompletedAttempt_withNoGrpcStatus_doesNotChangeTokens() {
final RequestLog requestLog = mock(RequestLog.class);
final HttpHeaders headers = createEmptyHeaders(); // No grpc-status

when(requestLog.responseCause()).thenReturn(null);
when(requestLog.responseTrailers()).thenReturn(headers);
when(requestLog.responseHeaders()).thenReturn(createEmptyResponseHeaders());
when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)).thenReturn(true);

final int initialTokens = limiter.tokenCount.get();
limiter.onCompletedAttempt(ctx, requestLog, 1);
Expand All @@ -314,9 +313,9 @@ void onCompletedAttempt_withGrpcStatusInHeaders() {
final HttpHeaders trailers = createEmptyHeaders(); // No grpc-status in trailers
final ResponseHeaders headers = createResponseHeaders("14"); // UNAVAILABLE in headers

when(requestLog.responseCause()).thenReturn(null);
when(requestLog.responseTrailers()).thenReturn(trailers);
when(requestLog.responseHeaders()).thenReturn(headers);
when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)).thenReturn(true);

final int initialTokens = limiter.tokenCount.get();
limiter.onCompletedAttempt(ctx, requestLog, 1);
Expand All @@ -330,8 +329,8 @@ void onCompletedAttempt_tokenCountNeverGoesBelowZero() {
final RequestLog requestLog = mock(RequestLog.class);
final HttpHeaders headers = createGrpcHeaders("14"); // UNAVAILABLE

when(requestLog.responseCause()).thenReturn(null);
when(requestLog.responseTrailers()).thenReturn(headers);
when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)).thenReturn(true);

// Set token count to 0
limiter.tokenCount.set(0);
Expand All @@ -347,8 +346,8 @@ void onCompletedAttempt_tokenCountNeverExceedsMaxTokens() {
final RequestLog requestLog = mock(RequestLog.class);
final HttpHeaders headers = createGrpcHeaders("0"); // OK

when(requestLog.responseCause()).thenReturn(null);
when(requestLog.responseTrailers()).thenReturn(headers);
when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)).thenReturn(true);

// Set token count to max
limiter.tokenCount.set(limiter.maxTokens);
Expand All @@ -368,8 +367,8 @@ void onCompletedAttempt_concurrentAccess() throws InterruptedException {
final RequestLog requestLog = mock(RequestLog.class);
final HttpHeaders headers = createGrpcHeaders("14"); // UNAVAILABLE

when(requestLog.responseCause()).thenReturn(null);
when(requestLog.responseTrailers()).thenReturn(headers);
when(requestLog.isAvailable(RequestLogProperty.RESPONSE_HEADERS, RequestLogProperty.RESPONSE_TRAILERS)).thenReturn(true);

// Start with max tokens
limiter.tokenCount.set(limiter.maxTokens);
Expand Down