diff --git a/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientTest.java b/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientTest.java index c68446ad7e3..2630d102125 100644 --- a/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientTest.java +++ b/core/src/test/java/com/linecorp/armeria/client/retry/RetryingClientTest.java @@ -18,9 +18,19 @@ import static com.linecorp.armeria.client.retry.AbstractRetryingClient.ARMERIA_RETRY_COUNT; import static com.linecorp.armeria.common.util.Exceptions.peel; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.assertValidRequestContext; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.verifyAllVerifierValid; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.verifyExactlyOneVerifierValid; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.verifyRequestExistsAndCompleted; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.verifyResponseCause; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.verifyResponseHeader; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.verifyResponseTrailer; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.verifyStatusCode; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.verifyUnprocessedRequestException; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.catchThrowable; +import static org.awaitility.Awaitility.await; import java.time.Duration; import java.util.Arrays; @@ -55,6 +65,8 @@ import com.linecorp.armeria.client.ClientFactory; import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.ClientRequestContextCaptor; +import com.linecorp.armeria.client.Clients; import com.linecorp.armeria.client.HttpClient; import com.linecorp.armeria.client.ResponseTimeoutException; import com.linecorp.armeria.client.UnprocessedRequestException; @@ -75,8 +87,10 @@ import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.logging.RequestLog; import com.linecorp.armeria.common.stream.AbortedStreamException; +import com.linecorp.armeria.common.stream.CancelledSubscriptionException; import com.linecorp.armeria.common.util.UnmodifiableFuture; import com.linecorp.armeria.internal.testing.AnticipatedException; +import com.linecorp.armeria.internal.testing.RequestContextUtils.RequestLogVerifier; import com.linecorp.armeria.server.AbstractHttpService; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.ServiceRequestContext; @@ -102,6 +116,9 @@ static void afterAll() { clientFactory.closeAsync(); } + @Nullable + ClientRequestContext ctx; + private final AtomicInteger responseAbortServiceCallCounter = new AtomicInteger(); private final AtomicInteger requestAbortServiceCallCounter = new AtomicInteger(); @@ -320,7 +337,7 @@ void setUp() { } @Test - void retryWhenContentMatched() { + void retryContentMatched() { final Function retryingDecorator = RetryingClient.builder(new RetryIfContentMatch("Need to retry"), 1024) .newDecorator(); @@ -328,16 +345,30 @@ void retryWhenContentMatched() { .factory(clientFactory) .decorator(retryingDecorator) .build(); - - final AggregatedHttpResponse res = client.get("/retry-content").aggregate().join(); + final AggregatedHttpResponse res; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + res = client.get("/retry-content").aggregate().join(); + ctx = captor.get(); + } assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + awaitValidClientRequestContext(ctx, + verifyStatusCode(HttpStatus.OK), + verifyStatusCode(HttpStatus.OK), + verifyStatusCode(HttpStatus.OK) + ); } @Test void retryWhenStatusMatched() { final WebClient client = client(RetryRule.builder().onServerErrorStatus().onException().thenBackoff()); - final AggregatedHttpResponse res = client.get("/503-then-success").aggregate().join(); + final AggregatedHttpResponse res; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + res = client.get("/503-then-success").aggregate().join(); + ctx = captor.get(); + } assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + awaitValidClientRequestContext(ctx, verifyStatusCode(HttpStatus.SERVICE_UNAVAILABLE), + verifyStatusCode(HttpStatus.OK)); } @Test @@ -346,8 +377,16 @@ void retryWhenStatusMatchedWithContent() { .onServerErrorStatus() .onException() .thenBackoff(), 10000, 0, 100); - final AggregatedHttpResponse res = client.get("/503-then-success").aggregate().join(); + final AggregatedHttpResponse res; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + res = client.get("/503-then-success").aggregate().join(); + ctx = captor.get(); + } assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + await().untilAsserted( + () -> awaitValidClientRequestContext(ctx, verifyStatusCode(HttpStatus.SERVICE_UNAVAILABLE), + verifyStatusCode(HttpStatus.OK)) + ); } @Test @@ -358,8 +397,22 @@ void retryWhenTrailerMatched() { return trailers.getInt("grpc-status", -1) != 0; }) .thenBackoff()); - final AggregatedHttpResponse res = client.get("/trailers-then-success").aggregate().join(); + final AggregatedHttpResponse res; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + res = client.get("/trailers-then-success").aggregate().join(); + ctx = captor.get(); + } assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + awaitValidClientRequestContext(ctx, + verifyAllVerifierValid( + verifyStatusCode(HttpStatus.OK), + verifyResponseTrailer("grpc-status", "3") + ), + verifyAllVerifierValid( + verifyStatusCode(HttpStatus.OK), + verifyResponseTrailer("grpc-status", "0") + ) + ); } @Test @@ -368,16 +421,36 @@ void retryWhenTotalDurationIsHigh() { client(RetryRule.builder() .onTotalDuration((unused, duration) -> duration.toNanos() > 100) .thenBackoff()); - final AggregatedHttpResponse res = client.get("/1sleep-then-success").aggregate().join(); + final AggregatedHttpResponse res; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + res = client.get("/1sleep-then-success").aggregate().join(); + ctx = captor.get(); + } assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + + await().untilAsserted(() -> { + assertThat(ctx.log().isComplete()).isTrue(); + }); + + final int actualAttemptCount = ctx.log().children().size(); + final RequestLogVerifier[] clientLogVerifiers = new RequestLogVerifier[actualAttemptCount]; + Arrays.fill(clientLogVerifiers, verifyStatusCode(HttpStatus.OK)); + clientLogVerifiers[0] = verifyStatusCode(HttpStatus.SERVICE_UNAVAILABLE); + awaitValidClientRequestContext(ctx, clientLogVerifiers); } @Test void disableResponseTimeout() { final WebClient client = client(RetryRule.failsafe(), 0, 0, 100); - final AggregatedHttpResponse res = client.get("/503-then-success").aggregate().join(); + final AggregatedHttpResponse res; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + res = client.get("/503-then-success").aggregate().join(); + ctx = captor.get(); + } assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); // response timeout did not happen. + awaitValidClientRequestContext(ctx, verifyStatusCode(HttpStatus.SERVICE_UNAVAILABLE), + verifyStatusCode(HttpStatus.OK)); } @Test @@ -385,23 +458,53 @@ void respectRetryAfter() { final WebClient client = client(RetryRule.failsafe()); final Stopwatch sw = Stopwatch.createStarted(); - final AggregatedHttpResponse res = client.get("/retry-after-1-second").aggregate().join(); + final AggregatedHttpResponse res; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + res = client.get("/retry-after-1-second").aggregate().join(); + ctx = captor.get(); + } + assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + assertThat(res.headers().get(HttpHeaderNames.RETRY_AFTER.toString())).isNull(); assertThat(sw.elapsed(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo( (long) (TimeUnit.SECONDS.toMillis(1) * 0.9)); + awaitValidClientRequestContext(ctx, + verifyAllVerifierValid( + verifyStatusCode(HttpStatus.SERVICE_UNAVAILABLE), + verifyResponseHeader( + HttpHeaderNames.RETRY_AFTER.toString(), + "1")), + verifyStatusCode(HttpStatus.OK)); } @Test - void respectRetryAfterWithHttpDate() { + void respectRetryAfterWithHttpDate() throws InterruptedException { final WebClient client = client(RetryRule.failsafe()); final Stopwatch sw = Stopwatch.createStarted(); - final AggregatedHttpResponse res = client.get("/retry-after-with-http-date").aggregate().join(); + final AggregatedHttpResponse res; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + res = client.get("/retry-after-with-http-date").aggregate().join(); + ctx = captor.get(); + } + assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + final String expectedRetryAfterHeader = + server.requestContextCaptor().take().log().partial().responseHeaders().get( + HttpHeaderNames.RETRY_AFTER.toString()); + assertThat(expectedRetryAfterHeader).isNotNull(); + // This header should be transferred to the caller as retrying should be transparent. + assertThat(res.headers().get(HttpHeaderNames.RETRY_AFTER.toString())).isNull(); // Since ZonedDateTime doesn't express exact time, // just check out whether it is retried after delayed some time. assertThat(sw.elapsed(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(1000); + awaitValidClientRequestContext(ctx, verifyAllVerifierValid(verifyStatusCode( + HttpStatus.SERVICE_UNAVAILABLE), + verifyResponseHeader( + HttpHeaderNames.RETRY_AFTER.toString(), + expectedRetryAfterHeader)), + verifyStatusCode(HttpStatus.OK)); } @Test @@ -410,27 +513,58 @@ void propagateLastResponseWhenNextRetryIsAfterTimeout() { .onServerErrorStatus() .onException() .thenBackoff(Backoff.fixed(10000000))); - final AggregatedHttpResponse res = client.get("/service-unavailable").aggregate().join(); + final AggregatedHttpResponse res; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + res = client.get("/service-unavailable").aggregate().join(); + ctx = captor.get(); + } assertThat(res.status()).isSameAs(HttpStatus.SERVICE_UNAVAILABLE); + awaitValidClientRequestContext(ctx, verifyStatusCode(HttpStatus.SERVICE_UNAVAILABLE)); } @Test void propagateLastResponseWhenExceedMaxAttempts() { final WebClient client = client( RetryRule.builder().onServerErrorStatus().onException().thenBackoff(Backoff.fixed(1)), 0, 0, 3); - final AggregatedHttpResponse res = client.get("/service-unavailable").aggregate().join(); + final AggregatedHttpResponse res; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + res = client.get("/service-unavailable").aggregate().join(); + ctx = captor.get(); + } assertThat(res.status()).isSameAs(HttpStatus.SERVICE_UNAVAILABLE); + // maximum number of attempts + awaitValidClientRequestContext(ctx, + verifyStatusCode(HttpStatus.SERVICE_UNAVAILABLE), + verifyStatusCode(HttpStatus.SERVICE_UNAVAILABLE), + verifyStatusCode(HttpStatus.SERVICE_UNAVAILABLE)); } @Test - void retryAfterOneYear() { + void retryAfterOneYear() throws InterruptedException { final WebClient client = client(RetryRule.failsafe()); - // The response will be the last response whose headers contains HttpHeaderNames.RETRY_AFTER // because next retry is after timeout - final ResponseHeaders headers = client.get("retry-after-one-year").aggregate().join().headers(); + final ResponseHeaders headers; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + headers = client.get("retry-after-one-year").aggregate().join().headers(); + ctx = captor.get(); + } assertThat(headers.status()).isSameAs(HttpStatus.SERVICE_UNAVAILABLE); assertThat(headers.get(HttpHeaderNames.RETRY_AFTER)).isNotNull(); + final String expectedRetryAfterHeader = server + .requestContextCaptor() + .take() + .log() + .partial() + .responseHeaders() + .get(HttpHeaderNames.RETRY_AFTER.toString()); + assertThat(expectedRetryAfterHeader).isNotNull(); + + awaitValidClientRequestContext(ctx, verifyAllVerifierValid(verifyStatusCode( + HttpStatus.SERVICE_UNAVAILABLE), + verifyResponseHeader( + HttpHeaderNames.RETRY_AFTER.toString(), + expectedRetryAfterHeader))); } @Test @@ -445,8 +579,18 @@ void retryOnResponseTimeout() { }; final WebClient client = client(strategy, 0, 500, 100); - final AggregatedHttpResponse res = client.get("/1sleep-then-success").aggregate().join(); + + final AggregatedHttpResponse res; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + res = client.get("/1sleep-then-success").aggregate().join(); + ctx = captor.get(); + } + assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); + awaitValidClientRequestContext(ctx, verifyAllVerifierValid( + verifyStatusCode(HttpStatus.UNKNOWN), + verifyResponseCause(ResponseTimeoutException.class) + ), verifyStatusCode(HttpStatus.OK)); } @Test @@ -467,10 +611,19 @@ void retryWithContentOnResponseTimeout() { .onException(ResponseTimeoutException.class) .thenBackoff(backoff))); final WebClient client = client(strategy, 0, 500, 100); - final AggregatedHttpResponse res = client.get("/1sleep-then-success").aggregate().join(); + final AggregatedHttpResponse res; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + res = client.get("/1sleep-then-success").aggregate().join(); + ctx = captor.get(); + } assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); // Make sure that all customized RetryRuleWithContents are called. assertThat(queue).containsExactly(1, 2, 3); + awaitValidClientRequestContext(ctx, verifyAllVerifierValid( + verifyStatusCode(HttpStatus.UNKNOWN), + verifyResponseCause(ResponseTimeoutException.class) + ), + verifyStatusCode(HttpStatus.OK)); } @Test @@ -507,49 +660,80 @@ void honorRetryMapping() { final WebClient client = client(mapping); Stopwatch stopwatch = Stopwatch.createStarted(); - assertThat(client.get("/500-always").aggregate().join().status()) - .isEqualTo(HttpStatus.valueOf(500)); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThat(client.get("/500-always").aggregate().join().status()) + .isEqualTo(HttpStatus.valueOf(500)); + ctx = captor.get(); + } assertThat(stopwatch.elapsed()).isBetween(Duration.ofSeconds(2), Duration.ofSeconds(6)); + awaitValidClientRequestContext(ctx, verifyStatusCode(HttpStatus.INTERNAL_SERVER_ERROR), + verifyStatusCode(HttpStatus.INTERNAL_SERVER_ERROR)); stopwatch = Stopwatch.createStarted(); - assertThat(client.get("/501-always").aggregate().join().status()) - .isEqualTo(HttpStatus.valueOf(501)); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThat(client.get("/501-always").aggregate().join().status()) + .isEqualTo(HttpStatus.valueOf(501)); + ctx = captor.get(); + } assertThat(stopwatch.elapsed()).isBetween(Duration.ofSeconds(14), Duration.ofSeconds(28)); + awaitValidClientRequestContext(ctx, + verifyStatusCode(HttpStatus.NOT_IMPLEMENTED), + verifyStatusCode(HttpStatus.NOT_IMPLEMENTED), + verifyStatusCode(HttpStatus.NOT_IMPLEMENTED), + verifyStatusCode(HttpStatus.NOT_IMPLEMENTED), + verifyStatusCode(HttpStatus.NOT_IMPLEMENTED), + verifyStatusCode(HttpStatus.NOT_IMPLEMENTED), + verifyStatusCode(HttpStatus.NOT_IMPLEMENTED), + verifyStatusCode(HttpStatus.NOT_IMPLEMENTED)); stopwatch = Stopwatch.createStarted(); - assertThat(client.get("/502-always").aggregate().join().status()) - .isEqualTo(HttpStatus.valueOf(502)); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThat(client.get("/502-always").aggregate().join().status()) + .isEqualTo(HttpStatus.valueOf(502)); + ctx = captor.get(); + } assertThat(stopwatch.elapsed()).isBetween(Duration.ofSeconds(0), Duration.ofSeconds(2)); + awaitValidClientRequestContext(ctx, verifyStatusCode(HttpStatus.BAD_GATEWAY)); } @Test void evaluatesMappingOnce() { final AtomicInteger evaluations = new AtomicInteger(0); final RetryConfigMapping mapping = - (ctx, req) -> { - evaluations.incrementAndGet(); - return RetryConfig - .builder0(RetryRule.builder() - .onStatus(HttpStatus.valueOf(500)) - .thenBackoff()) - .maxTotalAttempts(2) - .build(); - }; + (ctx, req) -> { + evaluations.incrementAndGet(); + return RetryConfig + .builder0(RetryRule.builder() + .onStatus(HttpStatus.valueOf(500)) + .thenBackoff()) + .maxTotalAttempts(2) + .build(); + }; final WebClient client = client(mapping); - assertThat(client.get("/500-then-success").aggregate().join().status()) - .isEqualTo(HttpStatus.valueOf(200)); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThat(client.get("/500-then-success").aggregate().join().status()) + .isEqualTo(HttpStatus.valueOf(200)); + ctx = captor.get(); + } // 1 logical request; 2 retries assertThat(evaluations.get()).isEqualTo(1); + awaitValidClientRequestContext(ctx, verifyStatusCode(HttpStatus.INTERNAL_SERVER_ERROR), + verifyStatusCode(HttpStatus.OK)); reqCount.set(0); - assertThat(client.get("/500-then-success").aggregate().join().status()) - .isEqualTo(HttpStatus.valueOf(200)); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThat(client.get("/500-then-success").aggregate().join().status()) + .isEqualTo(HttpStatus.valueOf(200)); + ctx = captor.get(); + } - // 2 logical requests; 4 retries + // 2 logical requests; 2 retries assertThat(evaluations.get()).isEqualTo(2); + awaitValidClientRequestContext(ctx, verifyStatusCode(HttpStatus.INTERNAL_SERVER_ERROR), + verifyStatusCode(HttpStatus.OK)); } @Test @@ -582,10 +766,19 @@ void retryWithContentOnUnprocessedException() { .decorator(retryingDecorator) .build(); final Stopwatch stopwatch = Stopwatch.createStarted(); - assertThatThrownBy(() -> client.get("/unprocessed-exception").aggregate().join()) - .isInstanceOf(CompletionException.class) - .hasCauseInstanceOf(UnprocessedRequestException.class); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThatThrownBy(() -> client.get("/unprocessed-exception").aggregate().join()) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(UnprocessedRequestException.class); + ctx = captor.get(); + } assertThat(stopwatch.elapsed()).isBetween(Duration.ofSeconds(7), Duration.ofSeconds(20)); + awaitValidClientRequestContext(ctx, + verifyUnprocessedRequestException(), + verifyUnprocessedRequestException(), + verifyUnprocessedRequestException(), + verifyUnprocessedRequestException(), + verifyUnprocessedRequestException()); } } @@ -594,17 +787,28 @@ void retryWithContentOnUnprocessedException() { void differentBackoffBasedOnStatus(RetryRule retryRule) { final WebClient client = client(retryRule); + AggregatedHttpResponse res; final Stopwatch sw = Stopwatch.createStarted(); - AggregatedHttpResponse res = client.get("/503-then-success").aggregate().join(); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + res = client.get("/503-then-success").aggregate().join(); + ctx = captor.get(); + } assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); assertThat(sw.elapsed(TimeUnit.MILLISECONDS)).isBetween((long) (10 * 0.9), (long) (1000 * 1.1)); + awaitValidClientRequestContext(ctx, verifyStatusCode(HttpStatus.SERVICE_UNAVAILABLE), + verifyStatusCode(HttpStatus.OK)); reqCount.set(0); sw.reset().start(); - res = client.get("/500-then-success").aggregate().join(); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + res = client.get("/500-then-success").aggregate().join(); + ctx = captor.get(); + } assertThat(res.contentUtf8()).isEqualTo("Succeeded after retry"); assertThat(sw.elapsed(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo((long) (1000 * 0.9)); + awaitValidClientRequestContext(ctx, verifyStatusCode(HttpStatus.INTERNAL_SERVER_ERROR), + verifyStatusCode(HttpStatus.OK)); } @Test @@ -613,8 +817,15 @@ void retryWithRequestBody() { .onServerErrorStatus() .onException() .thenBackoff(Backoff.fixed(10))); - final AggregatedHttpResponse res = client.post("/post-ping-pong", "bar").aggregate().join(); + final AggregatedHttpResponse res; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + res = client.post("/post-ping-pong", "bar").aggregate().join(); + ctx = captor.get(); + } assertThat(res.contentUtf8()).isEqualTo("bar"); + awaitValidClientRequestContext(ctx, + verifyStatusCode(HttpStatus.SERVICE_UNAVAILABLE), + verifyStatusCode(HttpStatus.OK)); } @Test @@ -652,11 +863,25 @@ void shouldGetExceptionWhenFactoryIsClosed() { // // Peel CompletionException first. - Throwable t = peel(catchThrowable(() -> client.get("/service-unavailable").aggregate().join())); + Throwable t; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + t = peel(catchThrowable(() -> client.get("/service-unavailable").aggregate().join())); + ctx = captor.get(); + } if (t instanceof UnprocessedRequestException) { final Throwable cause = t.getCause(); assertThat(cause).isInstanceOf(IllegalStateException.class); t = cause; + // not able to schedule second retry. + awaitValidClientRequestContext(ctx, verifyUnprocessedRequestException()); + } else { + awaitValidClientRequestContext(ctx, verifyExactlyOneVerifierValid( + verifyStatusCode(HttpStatus.SERVICE_UNAVAILABLE), + verifyAllVerifierValid( + verifyStatusCode(HttpStatus.UNKNOWN), + verifyResponseCause(IllegalStateException.class) + ) + )); } assertThat(t).isInstanceOf(IllegalStateException.class) .satisfies(cause -> assertThat(cause.getMessage()).matches( @@ -668,37 +893,44 @@ void doNotRetryWhenResponseIsAborted() throws Exception { final List abortCauses = Arrays.asList(null, new IllegalStateException("abort stream with a specified cause")); for (Throwable abortCause : abortCauses) { - final AtomicReference context = new AtomicReference<>(); final WebClient client = WebClient.builder(server.httpUri()) .decorator(RetryingClient.newDecorator(retryAlways)) - .decorator((delegate, ctx, req) -> { - context.set(ctx); - return delegate.execute(ctx, req); - }) .decorator(LoggingClient.newDecorator()) .build(); responseAbortServiceCallCounter.set(0); - final HttpResponse httpResponse = client.get("/response-abort"); + final HttpResponse httpResponse; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + httpResponse = client.get("/response-abort"); + ctx = captor.get(); + } if (abortCause == null) { httpResponse.abort(); } else { httpResponse.abort(abortCause); } - final RequestLog log = context.get().log().whenComplete().join(); + final RequestLog log = ctx.log().whenComplete().join(); final Throwable requestCause = log.requestCause(); if (abortCause == null) { assertThat(log.responseCause()).isExactlyInstanceOf(AbortedStreamException.class); + awaitValidClientRequestContext(ctx, verifyResponseCause(AbortedStreamException.class)); if (requestCause != null) { // A request can either successfully complete or fail depending on timing. assertThat(requestCause).isExactlyInstanceOf(AbortedStreamException.class); } } else { - assertThat(log.responseCause()).isSameAs(abortCause); if (requestCause != null) { + awaitValidClientRequestContext(ctx, + verifyExactlyOneVerifierValid( + verifyResponseCause(abortCause), + verifyResponseCause(AbortedStreamException.class) + ) + ); // A request can either successfully complete or fail depending on timing. assertThat(requestCause).isSameAs(abortCause); + } else { + awaitValidClientRequestContext(ctx, verifyResponseCause(requestCause)); } } @@ -712,25 +944,40 @@ void doNotRetryWhenResponseIsAborted() throws Exception { @Test void doNotRetryWhenSubscriberIsCancelled() throws Exception { final WebClient client = client(retryAlways); - client.get("/subscriber-cancel").subscribe( - new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - s.cancel(); // Cancel as soon as getting the subscription. - } - @Override - public void onNext(HttpObject httpObject) {} + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + client.get("/subscriber-cancel").subscribe( + new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.cancel(); // Cancel as soon as getting the subscription. + } + + @Override + public void onNext(HttpObject httpObject) {} - @Override - public void onError(Throwable t) {} + @Override + public void onError(Throwable t) {} - @Override - public void onComplete() {} - }); + @Override + public void onComplete() {} + }); + + ctx = captor.get(); + } TimeUnit.SECONDS.sleep(1L); // Sleep to check if there's a retry. assertThat(subscriberCancelServiceCallCounter.get()).isEqualTo(1); + awaitValidClientRequestContext(ctx, verifyExactlyOneVerifierValid( + verifyAllVerifierValid( + verifyStatusCode(HttpStatus.SERVICE_UNAVAILABLE), + verifyResponseCause(AbortedStreamException.class) + ), + verifyAllVerifierValid( + verifyStatusCode(HttpStatus.UNKNOWN), + verifyResponseCause(CancelledSubscriptionException.class) + ) + )); } @Test @@ -755,11 +1002,15 @@ void doNotRetryWhenRequestIsAborted() throws Exception { } else { req.abort(abortCause); } - client.execute(req).aggregate(); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + client.execute(req).aggregate(); + ctx = captor.get(); + } TimeUnit.SECONDS.sleep(1); // No request is made. assertThat(responseAbortServiceCallCounter.get()).isZero(); + awaitValidClientRequestContext(ctx); final RequestLog log = context.get().log().whenComplete().join(); if (abortCause == null) { assertThat(log.requestCause()).isExactlyInstanceOf(AbortedStreamException.class); @@ -785,9 +1036,19 @@ void exceptionInDecorator() { .decorator(RetryingClient.newDecorator(strategy, 5)) .build(); - assertThatThrownBy(() -> client.get("/").aggregate().join()) - .hasCauseExactlyInstanceOf(AnticipatedException.class); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThatThrownBy(() -> client.get("/").aggregate().join()) + .isInstanceOf(CompletionException.class) + .hasCauseExactlyInstanceOf(AnticipatedException.class); + ctx = captor.get(); + } assertThat(retryCounter.get()).isEqualTo(5); + // max attempts + awaitValidClientRequestContext(ctx, verifyResponseCause(AnticipatedException.class), + verifyResponseCause(AnticipatedException.class), + verifyResponseCause(AnticipatedException.class), + verifyResponseCause(AnticipatedException.class), + verifyResponseCause(AnticipatedException.class)); } @Test @@ -798,9 +1059,16 @@ void exceptionInRule() { }; final WebClient client = client(rule); - assertThatThrownBy(client.get("/").aggregate()::join) - .isInstanceOf(CompletionException.class) - .hasCauseReference(exception); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThatThrownBy(client.get("/").aggregate()::join) + .isInstanceOf(CompletionException.class) + .hasCauseReference(exception); + ctx = captor.get(); + } + awaitValidClientRequestContext(ctx, verifyExactlyOneVerifierValid( + verifyResponseCause(AbortedStreamException.class), + verifyResponseCause(exception) + )); } @Test @@ -811,9 +1079,21 @@ void exceptionInRuleWithContent() { }; final WebClient client = client(rule, 10000, 0, 100); - assertThatThrownBy(client.get("/").aggregate()::join) - .isInstanceOf(CompletionException.class) - .hasCauseReference(exception); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThatThrownBy(client.get("/").aggregate()::join) + .isInstanceOf(CompletionException.class) + .hasCauseReference(exception); + ctx = captor.get(); + } + + await().untilAsserted(() -> { + ctx.log().isComplete(); + }); + assertThat(ctx.log().children()).hasSize(1); + + awaitValidClientRequestContextWithParentLogVerifier(ctx, + verifyStatusCode(HttpStatus.UNKNOWN), + verifyStatusCode(HttpStatus.NOT_FOUND)); } @Test @@ -828,11 +1108,17 @@ void useSameEventLoopWhenAggregate() throws InterruptedException { }) .decorator(RetryingClient.newDecorator(RetryRule.failsafe(), 2)) .build(); - client.get("/503-then-success").aggregate().whenComplete((unused, cause) -> { - assertThat(eventLoop.get().inEventLoop()).isTrue(); - latch.countDown(); - }); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + client.get("/503-then-success").aggregate() + .whenComplete((unused, cause) -> { + assertThat(eventLoop.get().inEventLoop()).isTrue(); + latch.countDown(); + }); + ctx = captor.get(); + } latch.await(); + awaitValidClientRequestContext(ctx, verifyStatusCode(HttpStatus.SERVICE_UNAVAILABLE), + verifyStatusCode(HttpStatus.OK)); } private WebClient client(RetryRule retryRule) { @@ -886,6 +1172,23 @@ private WebClient client(RetryRuleWithContent retryRuleWithContent .build(); } + private static void awaitValidClientRequestContext(ClientRequestContext ctx, + RequestLogVerifier... childLogVerifiers) { + await().untilAsserted(() -> + assertValidRequestContext( + ctx, verifyRequestExistsAndCompleted(), childLogVerifiers + ) + ); + } + + public static void awaitValidClientRequestContextWithParentLogVerifier( + ClientRequestContext ctx, + RequestLogVerifier parentLogVerifier, + RequestLogVerifier... childLogVerifiers) { + await().untilAsserted(() -> assertValidRequestContext(ctx, parentLogVerifier, + childLogVerifiers)); + } + private static class RetryIfContentMatch implements RetryRuleWithContent { private final String retryContent; private final RetryDecision decision = RetryDecision.retry(Backoff.fixed(100)); diff --git a/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/RequestContextUtils.java b/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/RequestContextUtils.java new file mode 100644 index 00000000000..e04277f7086 --- /dev/null +++ b/testing-internal/src/main/java/com/linecorp/armeria/internal/testing/RequestContextUtils.java @@ -0,0 +1,205 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.internal.testing; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchThrowable; +import static org.assertj.core.api.Assertions.fail; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.UnprocessedRequestException; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.RequestContext; +import com.linecorp.armeria.common.ResponseHeaders; +import com.linecorp.armeria.common.RpcRequest; +import com.linecorp.armeria.common.RpcResponse; +import com.linecorp.armeria.common.logging.RequestLog; + +public final class RequestContextUtils { + private RequestContextUtils() {} + + @FunctionalInterface + public interface RequestLogVerifier { + void verifyLog(RequestLog requestLog) throws Exception; + } + + public static final RequestLogVerifier VERIFY_NOTHING = requestLog -> { + // No verification is performed. + }; + + public static RequestLogVerifier verifyAllVerifierValid(RequestLogVerifier... requestLogVerifiers) { + return requestLog -> { + for (RequestLogVerifier requestLogVerifier : requestLogVerifiers) { + requestLogVerifier.verifyLog(requestLog); + } + }; + } + + public static RequestLogVerifier verifyExactlyOneVerifierValid(RequestLogVerifier... requestLogVerifiers) { + return requestLog -> { + final Throwable[] verifierCauses = new Throwable[requestLogVerifiers.length]; + + for (int i = 0; i < requestLogVerifiers.length; i++) { + final int index = i; + verifierCauses[i] = catchThrowable(() -> requestLogVerifiers[index].verifyLog(requestLog)); + } + + final List nonNullVerifierCauses = Arrays.stream(verifierCauses) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + if (nonNullVerifierCauses.size() != requestLogVerifiers.length - 1) { + final Throwable allCauses = nonNullVerifierCauses.get(0); + + for (int i = 1; i < nonNullVerifierCauses.size(); i++) { + allCauses.addSuppressed(nonNullVerifierCauses.get(i)); + } + + fail(allCauses); + } + }; + } + + public static RequestLogVerifier verifyStatusCode(HttpStatus expectedStatus) { + return requestLog -> assertThat(requestLog.responseHeaders().status()).isEqualTo(expectedStatus); + } + + public static RequestLogVerifier verifyUnprocessedRequestException() { + return verifyAllVerifierValid( + verifyStatusCode(HttpStatus.UNKNOWN), + verifyResponseCause(UnprocessedRequestException.class) + ); + } + + public static RequestLogVerifier verifyRequestCause(Class expectedCauseClass) { + return requestLog -> { + assertThat(requestLog.requestCause()).isExactlyInstanceOf(expectedCauseClass); + }; + } + + public static RequestLogVerifier verifyRequestExistsAndCompleted() { + return requestLog -> { + final RequestContext requestContext = requestLog.context(); + assert requestContext instanceof ClientRequestContext; + final ClientRequestContext ctx = (ClientRequestContext) requestContext; + + final HttpRequest req = ctx.request(); + assertThat(req).isNotNull(); + assert req != null; + assertThat(req.isComplete()).isTrue(); + }; + } + + public static RequestLogVerifier verifyLastChildHasSameHttpRequest() { + return requestLog -> { + final RequestContext requestContext = requestLog.context(); + assert requestContext instanceof ClientRequestContext; + final ClientRequestContext ctx = (ClientRequestContext) requestContext; + + // We expect at least one child log. + assertThat(ctx.log().children().size()).isGreaterThanOrEqualTo(1); + + final HttpRequest lastHttpReq = + ctx.log().children() + .get(ctx.log().children().size() - 1).context().request(); + + if (lastHttpReq != null) { + assertThat(lastHttpReq).isSameAs(ctx.log().context().request()); + } + }; + } + + public static RequestLogVerifier verifyResponseCause(Class expectedCauseClass) { + return requestLog -> { + assertThat(requestLog.responseCause()).isExactlyInstanceOf(expectedCauseClass); + }; + } + + public static RequestLogVerifier verifyResponseCause(Throwable expectedCause) { + return requestLog -> { + assertThat(requestLog.responseCause()).isSameAs(expectedCause); + }; + } + + public static RequestLogVerifier verifyResponseHeader(String headerName, + String expectedHeaderValue) { + return requestLog -> { + final ResponseHeaders headers = requestLog.responseHeaders(); + assertThat(headers.get(headerName)).isEqualTo(expectedHeaderValue); + }; + } + + public static RequestLogVerifier verifyResponseTrailer(String headerName, + String expectedHeaderValue) { + return requestLog -> { + assertThat(requestLog.responseTrailers().get(headerName)).isEqualTo(expectedHeaderValue); + }; + } + + public static RequestLogVerifier verifyResponseContent(String expectedResponseContent) { + return requestLog -> { + assertThat(requestLog.responseContent()).isExactlyInstanceOf(String.class); + assertThat(requestLog.responseContent()).isEqualTo(expectedResponseContent); + }; + } + + public static void assertValidRequestContext( + RequestContext ctx, + RequestLogVerifier parentLogVerifier, + RequestLogVerifier... childLogVerifiers) { + assertValidRequestContext0(ctx, parentLogVerifier, childLogVerifiers); + } + + private static void assertValidRequestContext0( + RequestContext ctx, + RequestLogVerifier parentLogVerifier, + RequestLogVerifier[] childLogVerifiers) { + final int expectedNumRequests = childLogVerifiers.length; + assertThat(ctx.log().isComplete()).isTrue(); + assertThat(ctx.log().children()).hasSize(expectedNumRequests); + + for (int childLogIndex = 0; childLogIndex < expectedNumRequests; childLogIndex++) { + final RequestLog childLog = ctx.log().children().get(childLogIndex).whenComplete().join(); + assertThat(childLog).isNotNull(); + assertThat(childLog.isComplete()).isTrue(); + assertThat(childLog.children()).isEmpty(); + if (ctx.rpcRequest() != null) { + assertThat(childLog.requestContent()).isInstanceOf(RpcRequest.class); + assertThat(childLog.responseContent()).isInstanceOf(RpcResponse.class); + } + + try { + childLogVerifiers[childLogIndex].verifyLog(childLog); + } catch (Throwable e) { + fail("Failed to verify child log (" + (childLogIndex + 1) + + '/' + expectedNumRequests + ')', e); + } + } + + try { + parentLogVerifier.verifyLog(ctx.log().partial()); + } catch (Throwable e) { + fail("Failed to verify parent log", e); + } + } +} diff --git a/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/client/retry/RetryingRpcClientTest.java b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/client/retry/RetryingRpcClientTest.java index 1e56c8b0472..6d1c51c89cf 100644 --- a/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/client/retry/RetryingRpcClientTest.java +++ b/thrift/thrift0.13/src/test/java/com/linecorp/armeria/it/client/retry/RetryingRpcClientTest.java @@ -16,6 +16,13 @@ package com.linecorp.armeria.it.client.retry; import static com.linecorp.armeria.client.retry.AbstractRetryingClient.ARMERIA_RETRY_COUNT; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.assertValidRequestContext; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.verifyAllVerifierValid; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.verifyLastChildHasSameHttpRequest; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.verifyRequestCause; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.verifyRequestExistsAndCompleted; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.verifyResponseCause; +import static com.linecorp.armeria.internal.testing.RequestContextUtils.verifyStatusCode; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.catchThrowable; @@ -28,10 +35,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -42,6 +47,8 @@ import com.linecorp.armeria.client.ClientFactory; import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.ClientRequestContextCaptor; +import com.linecorp.armeria.client.Clients; import com.linecorp.armeria.client.UnprocessedRequestException; import com.linecorp.armeria.client.retry.Backoff; import com.linecorp.armeria.client.retry.RetryConfig; @@ -50,10 +57,12 @@ import com.linecorp.armeria.client.retry.RetryRuleWithContent; import com.linecorp.armeria.client.retry.RetryingRpcClient; import com.linecorp.armeria.client.thrift.ThriftClients; -import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.CompletableRpcResponse; +import com.linecorp.armeria.common.HttpStatus; import com.linecorp.armeria.common.RpcResponse; -import com.linecorp.armeria.common.logging.RequestLog; +import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.common.util.UnmodifiableFuture; +import com.linecorp.armeria.internal.testing.RequestContextUtils.RequestLogVerifier; import com.linecorp.armeria.server.ServerBuilder; import com.linecorp.armeria.server.thrift.THttpService; import com.linecorp.armeria.testing.junit5.server.ServerExtension; @@ -74,6 +83,8 @@ class RetryingRpcClientTest { private final DevNullService.Iface devNullServiceHandler = mock(DevNullService.Iface.class); private final AtomicInteger serviceRetryCount = new AtomicInteger(); + private ClientRequestContext ctx; + @RegisterExtension final ServerExtension server = new ServerExtension() { @Override @@ -99,10 +110,15 @@ protected void configure(ServerBuilder sb) throws Exception { @Test void execute() throws Exception { final HelloService.Iface client = helloClient(retryOnException, 100); + when(serviceHandler.hello(anyString())).thenReturn("world"); - assertThat(client.hello("hello")).isEqualTo("world"); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThat(client.hello("hello")).isEqualTo("world"); + ctx = captor.get(); + } verify(serviceHandler, only()).hello("hello"); + awaitValidClientRequestContext(ctx, verifyResponse("world")); } @Test @@ -127,8 +143,15 @@ void execute_honorMapping() throws Exception { .thenThrow(new IllegalArgumentException()) .thenReturn("Hey"); serviceRetryCount.set(0); - assertThat(client.hello("Alice")).isEqualTo("Hey"); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThat(client.hello("Alice")).isEqualTo("Hey"); + ctx = captor.get(); + } verify(serviceHandler, times(3)).hello("Alice"); + awaitValidClientRequestContext(ctx, + verifyResponseException(), + verifyResponseException(), + verifyResponse("Hey")); when(serviceHandler.hello(anyString())) .thenThrow(new IllegalArgumentException()) @@ -139,6 +162,10 @@ void execute_honorMapping() throws Exception { serviceRetryCount.set(0); assertThat(client.hello("Bob")).isEqualTo("Hey"); verify(serviceHandler, times(5)).hello("Bob"); + awaitValidClientRequestContext(ctx, + verifyResponseException(), + verifyResponseException(), + verifyResponse("Hey")); } @Test @@ -159,10 +186,15 @@ void evaluatesMappingOnce() throws Exception { .thenThrow(new IllegalArgumentException()) .thenReturn("Hey"); - assertThat(client.hello("Alice")).isEqualTo("Hey"); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThat(client.hello("Alice")).isEqualTo("Hey"); + ctx = captor.get(); + } // 1 logical request; 3 retries assertThat(evaluations.get()).isEqualTo(1); verify(serviceHandler, times(3)).hello("Alice"); + awaitValidClientRequestContext(ctx, verifyResponseException(), verifyResponseException(), + verifyResponse("Hey")); serviceRetryCount.set(0); @@ -171,10 +203,15 @@ void evaluatesMappingOnce() throws Exception { .thenThrow(new IllegalArgumentException()) .thenReturn("Hey"); - assertThat(client.hello("Alice")).isEqualTo("Hey"); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThat(client.hello("Alice")).isEqualTo("Hey"); + ctx = captor.get(); + } // 2 logical requests total; 6 retries total assertThat(evaluations.get()).isEqualTo(2); verify(serviceHandler, times(6)).hello("Alice"); + awaitValidClientRequestContext(ctx, verifyResponseException(), verifyResponseException(), + verifyResponse("Hey")); } @Test @@ -185,8 +222,13 @@ void execute_retry() throws Exception { .thenThrow(new IllegalArgumentException()) .thenReturn("world"); - assertThat(client.hello("hello")).isEqualTo("world"); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThat(client.hello("hello")).isEqualTo("world"); + ctx = captor.get(); + } verify(serviceHandler, times(3)).hello("hello"); + awaitValidClientRequestContext(ctx, verifyResponseException(), + verifyResponseException(), verifyResponse("world")); } @Test @@ -194,31 +236,33 @@ void execute_reachedMaxAttempts() throws Exception { final HelloService.Iface client = helloClient(retryAlways, 2); when(serviceHandler.hello(anyString())).thenThrow(new IllegalArgumentException()); - final Throwable thrown = catchThrowable(() -> client.hello("hello")); + final Throwable thrown; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + thrown = catchThrowable(() -> client.hello("hello")); + ctx = captor.get(); + } assertThat(thrown).isInstanceOf(TApplicationException.class); assertThat(((TApplicationException) thrown).getType()).isEqualTo(TApplicationException.INTERNAL_ERROR); verify(serviceHandler, times(2)).hello("hello"); + awaitValidClientRequestContext(ctx, verifyResponseException(), verifyResponseException()); } @Test void propagateLastResponseWhenNextRetryIsAfterTimeout() throws Exception { - final BlockingQueue logQueue = new LinkedTransferQueue<>(); final RetryRuleWithContent rule = (ctx, response, cause) -> UnmodifiableFuture.completedFuture( RetryDecision.retry(Backoff.fixed(10000000))); - final HelloService.Iface client = helloClient(rule, 100, logQueue); + final HelloService.Iface client = helloClient(rule, 100); when(serviceHandler.hello(anyString())).thenThrow(new IllegalArgumentException()); - final Throwable thrown = catchThrowable(() -> client.hello("hello")); + final Throwable thrown; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + thrown = catchThrowable(() -> client.hello("hello")); + ctx = captor.get(); + } assertThat(thrown).isInstanceOf(TApplicationException.class); assertThat(((TApplicationException) thrown).getType()).isEqualTo(TApplicationException.INTERNAL_ERROR); verify(serviceHandler, only()).hello("hello"); - - // Make sure the last HTTP request is set to the parent's HTTP request. - final RequestLog log = logQueue.poll(10, TimeUnit.SECONDS); - assertThat(log).isNotNull(); - assertThat(log.children()).isNotEmpty(); - final HttpRequest lastHttpReq = log.children().get(log.children().size() - 1).context().request(); - assertThat(lastHttpReq).isSameAs(log.context().request()); + awaitValidClientRequestContext(ctx, verifyResponseException()); } @Test @@ -228,7 +272,18 @@ void exceptionInStrategy() { throw exception; }, Integer.MAX_VALUE); - assertThatThrownBy(() -> client.hello("bar")).isSameAs(exception); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThatThrownBy(() -> client.hello("bar")).isSameAs(exception); + ctx = captor.get(); + } + + awaitValidRequestContextWithParentLogVerifier(ctx, + verifyAllVerifierValid( + verifyStatusCode(HttpStatus.UNKNOWN), + verifyResponseCause(exception) + ), + verifyResponseException( + TApplicationException.MISSING_RESULT)); } private HelloService.Iface helloClient(RetryConfigMapping mapping) { @@ -249,20 +304,6 @@ private HelloService.Iface helloClient(RetryRuleWithContent rule, i .build(HelloService.Iface.class); } - private HelloService.Iface helloClient(RetryRuleWithContent rule, int maxAttempts, - BlockingQueue logQueue) { - return ThriftClients.builder(server.httpUri()) - .path("/thrift") - .rpcDecorator(RetryingRpcClient.builder(rule) - .maxTotalAttempts(maxAttempts) - .newDecorator()) - .rpcDecorator((delegate, ctx, req) -> { - ctx.log().whenComplete().thenAccept(logQueue::add); - return delegate.execute(ctx, req); - }) - .build(HelloService.Iface.class); - } - @Test void execute_void() throws Exception { final DevNullService.Iface client = @@ -277,8 +318,14 @@ void execute_void() throws Exception { .doThrow(new IllegalArgumentException()) .doNothing() .when(devNullServiceHandler).consume(anyString()); - client.consume("hello"); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + client.consume("hello"); + ctx = captor.get(); + } verify(devNullServiceHandler, times(3)).consume("hello"); + awaitValidClientRequestContext(ctx, verifyResponseException(), + verifyResponseException(), verifyResponse(null) + ); } @Test @@ -315,11 +362,32 @@ void shouldGetExceptionWhenFactoryIsClosed() throws Exception { // 3 - In HttpClientDelegate, addressResolverGroup.getResolver(eventLoop) can raise // IllegalStateException("executor not accepting a task"). // - Throwable t = catchThrowable(() -> client.hello("hello")); + Throwable t; + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + t = catchThrowable(() -> client.hello("hello")); + ctx = captor.get(); + } if (t instanceof UnprocessedRequestException) { final Throwable cause = t.getCause(); assertThat(cause).isInstanceOf(IllegalStateException.class); t = cause; + awaitValidClientRequestContext(ctx, verifyAllVerifierValid( + // We cannot be sure that we set + // the request cause so we are not checking + // with verifyRequestException/ + // verifyRequestCause(). + verifyStatusCode(HttpStatus.UNKNOWN), + verifyResponseCause(t) + )); + } else { + awaitValidRequestContextWithParentLogVerifier(ctx, + verifyAllVerifierValid( + verifyStatusCode(HttpStatus.UNKNOWN), + verifyResponseCause(t) + ), + verifyResponseException( + TApplicationException.INTERNAL_ERROR) + ); } assertThat(t).isInstanceOf(IllegalStateException.class) .satisfies(cause -> assertThat(cause.getMessage()).matches( @@ -342,19 +410,78 @@ void doNotRetryWhenResponseIsCancelled() throws Exception { .build(HelloService.Iface.class); when(serviceHandler.hello(anyString())).thenThrow(new IllegalArgumentException()); - assertThatThrownBy(() -> client.hello("hello")).isInstanceOf(CancellationException.class); + try (ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + assertThatThrownBy(() -> client.hello("hello")).isInstanceOf(CancellationException.class); + ctx = captor.get(); + } await().untilAsserted(() -> { verify(serviceHandler, only()).hello("hello"); }); - final RequestLog log = context.get().log().whenComplete().join(); - - // ClientUtil.completeLogIfIncomplete() records exceptions caused by response cancellations. - assertThat(log.requestCause()).isExactlyInstanceOf(CancellationException.class); - assertThat(log.responseCause()).isExactlyInstanceOf(CancellationException.class); - // Sleep 1 second more to check if there was another retry. TimeUnit.SECONDS.sleep(1); verify(serviceHandler, only()).hello("hello"); + assertValidRequestContext( + ctx, + // ClientUtil.completeLogIfIncomplete() records exceptions caused by response cancellations. + verifyRequestException(CancellationException.class), + verifyResponseException(TApplicationException.INTERNAL_ERROR)); + } + + private static void awaitValidClientRequestContext(ClientRequestContext ctx, + RequestLogVerifier... childLogVerifiers) { + await().untilAsserted(() -> assertValidRequestContext(ctx, + verifyAllVerifierValid( + verifyRequestExistsAndCompleted(), + verifyLastChildHasSameHttpRequest() + ), + childLogVerifiers) + ); + } + + private static void awaitValidRequestContextWithParentLogVerifier(ClientRequestContext ctx, + RequestLogVerifier parentLogVerifier, + RequestLogVerifier... childLogVerifiers) { + await().untilAsserted(() -> assertValidRequestContext( + ctx, parentLogVerifier, childLogVerifiers)); + } + + private static RequestLogVerifier verifyRequestException(Class causeClass) { + return verifyAllVerifierValid( + verifyStatusCode(HttpStatus.UNKNOWN), + verifyRequestCause(causeClass), + verifyResponseCause(causeClass) + ); + } + + private static RequestLogVerifier verifyResponseException() { + return verifyResponseException(TApplicationException.INTERNAL_ERROR); + } + + private static RequestLogVerifier verifyResponseException(int type) { + return verifyAllVerifierValid( + verifyStatusCode(HttpStatus.OK), + verifyResponseCause(TApplicationException.class), + childLog -> { + final TApplicationException responseCause = + (TApplicationException) childLog.responseCause(); + + assertThat(responseCause.getType()).isEqualTo(type); + } + ); + } + + private static RequestLogVerifier verifyResponse(@Nullable String expectedResponse) { + return verifyAllVerifierValid( + verifyStatusCode(HttpStatus.OK), + childLog -> { + assertThat(childLog.responseContent()).isExactlyInstanceOf(CompletableRpcResponse.class); + final CompletableRpcResponse rpcResponse = + (CompletableRpcResponse) childLog.responseContent(); + assertThat(rpcResponse.isDone()).isTrue(); + assertThat(rpcResponse.isCompletedExceptionally()).isFalse(); + assertThat(rpcResponse.getNow("should-not-be-returned")).isEqualTo(expectedResponse); + } + ); } }