diff --git a/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java b/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java index 2f0f795a0c8..9383f392b3c 100644 --- a/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java +++ b/core/src/main/java/com/linecorp/armeria/client/ClientFactoryBuilder.java @@ -53,6 +53,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.primitives.Ints; +import com.linecorp.armeria.client.metric.ClientRequestLifecycleListener; import com.linecorp.armeria.client.proxy.ProxyConfig; import com.linecorp.armeria.client.proxy.ProxyConfigSelector; import com.linecorp.armeria.common.CommonPools; @@ -135,6 +136,8 @@ public final class ClientFactoryBuilder implements TlsSetters { private ClientTlsSpec clientTlsSpec = ClientTlsSpec.of(); private boolean staticTlsSettingsSet; private boolean autoCloseConnectionPoolListener = true; + private ClientRequestLifecycleListener clientRequestLifecycleListener = + ClientRequestLifecycleListener.noop(); ClientFactoryBuilder() { connectTimeoutMillis(Flags.defaultConnectTimeoutMillis()); @@ -261,6 +264,21 @@ public ClientFactoryBuilder channelOption(ChannelOption option, T value) return this; } + /** + * Sets the {@link ClientRequestLifecycleListener} that listens to the lifecycle events of + * client requests created by the {@link ClientFactory}. + * + *

If not set, {@link ClientRequestLifecycleListener#noop()} is used by default. + * + * @param listener the listener to be notified of client request lifecycle events. Must not be {@code null}. + * @return {@code this} to support method chaining. + */ + public ClientFactoryBuilder clientRequestLifecycleListener(ClientRequestLifecycleListener listener) { + requireNonNull(listener, "listener"); + clientRequestLifecycleListener = listener; + return this; + } + private void channelOptions(Map, Object> newChannelOptions) { @SuppressWarnings("unchecked") final ClientFactoryOptionValue, Object>> castOptions = @@ -1127,7 +1145,7 @@ public ClientFactory build() { final ClientFactoryOptions options = buildOptions(); final ClientTlsSpec baseClientTlsSpec = buildTlsSpec(clientTlsSpec, tlsNoVerifySet, insecureHosts); return new DefaultClientFactory(new HttpClientFactory( - options, autoCloseConnectionPoolListener, baseClientTlsSpec)); + options, autoCloseConnectionPoolListener, baseClientTlsSpec, clientRequestLifecycleListener)); } private static ClientTlsSpec buildTlsSpec( diff --git a/core/src/main/java/com/linecorp/armeria/client/HttpClientDelegate.java b/core/src/main/java/com/linecorp/armeria/client/HttpClientDelegate.java index 3517a52f22c..3207d02b699 100644 --- a/core/src/main/java/com/linecorp/armeria/client/HttpClientDelegate.java +++ b/core/src/main/java/com/linecorp/armeria/client/HttpClientDelegate.java @@ -104,6 +104,12 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex final DecodedHttpResponse res = new DecodedHttpResponse(eventLoop); updateCancellationTask(ctx, req, res); + factory.clientRequestLifecycleListener().onRequestPending(ctx); + ctx.log().addListener( + factory.clientRequestLifecycleListener() + .asRequestLogListener() + ); + try { resolveProxyConfig(protocol, endpoint, ctx, (proxyConfig, thrown) -> { if (thrown != null) { diff --git a/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java b/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java index c93b0fef12e..c55ecf69e2a 100644 --- a/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java +++ b/core/src/main/java/com/linecorp/armeria/client/HttpClientFactory.java @@ -38,6 +38,7 @@ import com.google.common.collect.MapMaker; import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.client.metric.ClientRequestLifecycleListener; import com.linecorp.armeria.client.proxy.ProxyConfigSelector; import com.linecorp.armeria.client.redirect.RedirectConfig; import com.linecorp.armeria.common.Http1HeaderNaming; @@ -123,15 +124,23 @@ final class HttpClientFactory implements ClientFactory { private final boolean autoCloseConnectionPoolListener; private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync); private final BootstrapSslContexts bootstrapSslContexts; + private final ClientRequestLifecycleListener clientRequestLifecycleListener; HttpClientFactory(ClientFactoryOptions options, boolean autoCloseConnectionPoolListener, ClientTlsSpec baseClientTlsSpec) { + this(options, autoCloseConnectionPoolListener, baseClientTlsSpec, + ClientRequestLifecycleListener.noop()); + } + + HttpClientFactory(ClientFactoryOptions options, boolean autoCloseConnectionPoolListener, + ClientTlsSpec baseClientTlsSpec, + ClientRequestLifecycleListener clientRequestLifecycleListener) { workerGroup = options.workerGroup(); @SuppressWarnings("unchecked") final AddressResolverGroup group = (AddressResolverGroup) options.addressResolverGroupFactory() - .apply(workerGroup); + .apply(workerGroup); addressResolverGroup = group; final Bootstrap bootstrap = new Bootstrap(); @@ -199,6 +208,7 @@ final class HttpClientFactory implements ClientFactory { this.autoCloseConnectionPoolListener = autoCloseConnectionPoolListener; this.options = options; + this.clientRequestLifecycleListener = clientRequestLifecycleListener; clientDelegate = new HttpClientDelegate(this, addressResolverGroup); RequestTargetCache.registerClientMetrics(meterRegistry); @@ -306,6 +316,10 @@ Http1HeaderNaming http1HeaderNaming() { return http1HeaderNaming; } + ClientRequestLifecycleListener clientRequestLifecycleListener() { + return clientRequestLifecycleListener; + } + @VisibleForTesting AddressResolverGroup addressResolverGroup() { return addressResolverGroup; diff --git a/core/src/main/java/com/linecorp/armeria/client/logging/DefaultClientRequestLogListener.java b/core/src/main/java/com/linecorp/armeria/client/logging/DefaultClientRequestLogListener.java new file mode 100644 index 00000000000..98cda1a65a2 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/logging/DefaultClientRequestLogListener.java @@ -0,0 +1,75 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.logging; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.metric.ClientRequestLifecycleListener; +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.common.logging.RequestLog; +import com.linecorp.armeria.common.logging.RequestLogListener; +import com.linecorp.armeria.common.logging.RequestLogProperty; + +/** + * A default implementation of {@link RequestLogListener} that delegates {@link RequestLog} events + * to a {@link ClientRequestLifecycleListener}. + * + *

This listener monitors the progress of a {@link RequestLog} and invokes the corresponding + * callback methods of the {@link ClientRequestLifecycleListener} when specific {@link RequestLogProperty} + * become available. + */ +@UnstableApi +public class DefaultClientRequestLogListener implements RequestLogListener { + + private final ClientRequestLifecycleListener lifecycleListener; + + /** + * Creates a new instance with the specified {@link ClientRequestLifecycleListener}. + * + * @param lifecycleListener the listener to which the {@link RequestLog} events will be delegated + */ + public DefaultClientRequestLogListener(ClientRequestLifecycleListener lifecycleListener) { + this.lifecycleListener = lifecycleListener; + } + + @Override + public void onEvent(RequestLogProperty property, RequestLog log) { + if (!(log.context() instanceof ClientRequestContext)) { + return; + } + + final ClientRequestContext ctx = (ClientRequestContext) log.context(); + switch (property) { + case REQUEST_FIRST_BYTES_TRANSFERRED_TIME: + lifecycleListener.onRequestStart(ctx); + break; + case REQUEST_COMPLETE: + lifecycleListener.onRequestSendComplete(ctx); + break; + case RESPONSE_HEADERS: + lifecycleListener.onResponseHeaders(ctx, log.responseHeaders()); + break; + case RESPONSE_COMPLETE: + lifecycleListener.onResponseComplete(ctx); + break; + case ALL_COMPLETE: + lifecycleListener.onRequestComplete(ctx, log.responseCause()); + break; + default: + break; + } + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/metric/ClientRequestLifecycleListener.java b/core/src/main/java/com/linecorp/armeria/client/metric/ClientRequestLifecycleListener.java new file mode 100644 index 00000000000..9757bf25185 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/metric/ClientRequestLifecycleListener.java @@ -0,0 +1,111 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.metric; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.logging.DefaultClientRequestLogListener; +import com.linecorp.armeria.common.ResponseHeaders; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.common.logging.RequestLogListener; + +/** + * Listens to the lifecycle events of client requests. + * + *

This interface provides a high-level view of a request's lifecycle (e.g., pending, started, completed). + * unlike {@link RequestLogListener} which provides low-level property changes. + * + *

Note: The methods in this interface are typically not invoked directly by the + * transport layer. Instead, they are triggered by inspecting the changes in + * {@link com.linecorp.armeria.common.logging.RequestLog}. + * Implementations should use {@link #asRequestLogListener()} to bridge + * {@link com.linecorp.armeria.common.logging.RequestLogProperty} + * changes to these lifecycle methods. + * + *

For example, a standard implementation might map: + *

+ * + * @see com.linecorp.armeria.client.ClientFactoryBuilder#clientRequestLifecycleListener( + * ClientRequestLifecycleListener) + */ +@UnstableApi +public interface ClientRequestLifecycleListener { + + /** + * Invoked when a request is created and scheduled for execution but has not yet started. + * Note: This method is explicitly invoked by HttpClientDelegate when + * HttpClientDelegate starts to call execute(). + */ + void onRequestPending(ClientRequestContext ctx); + + /** + * Called when the client begins execution (connection acquired, headers sent). + */ + void onRequestStart(ClientRequestContext ctx); + + /** + * Called when the request is fully sent. + */ + void onRequestSendComplete(ClientRequestContext ctx); + + /** + * Called when the first response headers are received. + */ + void onResponseHeaders(ClientRequestContext ctx, ResponseHeaders headers); + + /** + * Called when the full response body is received successfully. + */ + void onResponseComplete(ClientRequestContext ctx); + + /** + * Called when a request is completed with either success or failure. + */ + void onRequestComplete(ClientRequestContext ctx, @Nullable Throwable cause); + + /** + * Returns a {@link RequestLogListener} that delegates + * {@link com.linecorp.armeria.common.logging.RequestLog} + * events to this lifecycle listener. + * This method bridges the low-level {@link com.linecorp.armeria.common.logging.RequestLog} + * updates to the high-level lifecycle methods + * defined in this interface. The returned listener is registered to the {@link ClientRequestContext} + * automatically when the request starts. + */ + default RequestLogListener asRequestLogListener() { + return new DefaultClientRequestLogListener(this); + } + + /** + * Returns a {@link ClientRequestMetrics} that collects the number of pending and active requests. + */ + static ClientRequestMetrics counting() { + return new ClientRequestMetrics(); + } + + /** + * Returns a {@link ClientRequestLifecycleListener} that does nothing. + */ + static ClientRequestLifecycleListener noop() { + return NoopClientRequestLifecycleListener.getInstance(); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/metric/ClientRequestMetrics.java b/core/src/main/java/com/linecorp/armeria/client/metric/ClientRequestMetrics.java new file mode 100644 index 00000000000..dac61a97eb7 --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/metric/ClientRequestMetrics.java @@ -0,0 +1,196 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.metric; + +import java.util.EnumMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.LongAdder; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.common.ResponseHeaders; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.common.annotation.Nullable; + +import io.netty.util.AttributeKey; + +/** + * A {@link ClientRequestLifecycleListener} that collects the number of pending and active requests. + * + *

This class tracks: + *

+ */ +public class ClientRequestMetrics implements ClientRequestLifecycleListener { + + private static final AttributeKey METRICS_STATE = + AttributeKey.valueOf(ClientRequestMetrics.class, "METRICS_STATE"); + + private final ConcurrentMap activeRequestsPerEndpoint = + new ConcurrentHashMap<>(); + + private final EnumMap pendingRequestsPerProtocol = + new EnumMap<>(SessionProtocol.class); + + /** + * Creates a new {@link ClientRequestMetrics} instance. + */ + public ClientRequestMetrics() { + for (SessionProtocol protocol : SessionProtocol.values()) { + pendingRequestsPerProtocol.put(protocol, new LongAdder()); + } + } + + @Override + public void onRequestPending(ClientRequestContext ctx) { + final State s = state(ctx); + if (s.pendingCounted.compareAndSet(false, true)) { + final LongAdder longAdder = pendingRequestsPerProtocol.get(ctx.sessionProtocol()); + if (longAdder != null) { + longAdder.increment(); + } + } + } + + @Override + public void onRequestStart(ClientRequestContext ctx) { + final State s = state(ctx); + if (s.pendingCounted.getAndSet(false)) { + final LongAdder longAdder = pendingRequestsPerProtocol.get(ctx.sessionProtocol()); + if (longAdder != null) { + longAdder.decrement(); + } + } + + if (s.activeCounted.compareAndSet(false, true)) { + final Endpoint endpoint = ctx.endpoint(); + if (endpoint != null) { + activeRequestsPerEndpoint.compute(endpoint, (k, adder) -> { + if (adder == null) { + adder = new LongAdder(); + } + adder.increment(); + return adder; + }); + } + } + } + + @Override + public void onRequestSendComplete(ClientRequestContext ctx) { + // no-op + } + + @Override + public void onResponseHeaders(ClientRequestContext ctx, ResponseHeaders headers) { + // no-op + } + + @Override + public void onResponseComplete(ClientRequestContext ctx) { + // no-op + } + + @Override + public void onRequestComplete(ClientRequestContext ctx, @Nullable Throwable cause) { + final State s = state(ctx); + + if (s.pendingCounted.getAndSet(false)) { + final LongAdder longAdder = pendingRequestsPerProtocol.get(ctx.sessionProtocol()); + if (longAdder != null) { + longAdder.decrement(); + } + } + + if (!s.activeCounted.getAndSet(false)) { + return; + } + + final Endpoint endpoint = ctx.endpoint(); + if (endpoint != null) { + activeRequestsPerEndpoint.compute(endpoint, (key, adder) -> { + if (adder == null) { + return null; + } + adder.decrement(); + return adder.sum() == 0 ? null : adder; + }); + } + } + + /** + * Returns the number of active requests for the specified {@link Endpoint}. + * An active request is one that has started sending data but has not yet completed. + */ + public long activeRequestsPerEndpoint(Endpoint endpoint) { + final LongAdder longAdder = activeRequestsPerEndpoint.get(endpoint); + return longAdder != null ? longAdder.sum() : 0L; + } + + /** + * Returns the total number of active requests across all endpoints. + */ + public long activeRequests() { + long sum = 0L; + for (LongAdder adder : activeRequestsPerEndpoint.values()) { + sum += adder.sum(); + } + return sum; + } + + /** + * Returns the number of pending requests for the specified {@link SessionProtocol}. + * A pending request is one that is scheduled but has not yet acquired a connection or started sending. + */ + public long pendingRequestsPerProtocol(SessionProtocol protocol) { + final LongAdder longAdder = pendingRequestsPerProtocol.get(protocol); + return longAdder != null ? longAdder.sum() : 0L; + } + + /** + * Returns the total number of pending requests across all protocols. + */ + public long pendingRequest() { + long sum = 0L; + for (LongAdder adder : pendingRequestsPerProtocol.values()) { + sum += adder.sum(); + } + return sum; + } + + private static State state(ClientRequestContext ctx) { + State s = ctx.ownAttr(METRICS_STATE); + if (s != null) { + return s; + } + + s = new State(); + ctx.setAttr(METRICS_STATE, s); + return s; + } + + private static final class State { + final AtomicBoolean pendingCounted = new AtomicBoolean(false); + final AtomicBoolean activeCounted = new AtomicBoolean(false); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/client/metric/NoopClientRequestLifecycleListener.java b/core/src/main/java/com/linecorp/armeria/client/metric/NoopClientRequestLifecycleListener.java new file mode 100644 index 00000000000..a819fae8fec --- /dev/null +++ b/core/src/main/java/com/linecorp/armeria/client/metric/NoopClientRequestLifecycleListener.java @@ -0,0 +1,80 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.metric; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.common.ResponseHeaders; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.common.annotation.UnstableApi; +import com.linecorp.armeria.common.logging.RequestLogListener; + +/** + * A {@link ClientRequestLifecycleListener} that does nothing. + */ +@UnstableApi +public final class NoopClientRequestLifecycleListener implements ClientRequestLifecycleListener { + + private static final NoopClientRequestLifecycleListener INSTANCE = + new NoopClientRequestLifecycleListener(); + private final RequestLogListener requestLogListener = ((property, log) -> {}); + + /** + * Returns NoopClientRequestLifecycleListener. + * @return Returns NoopClientRequestLifecycleListener. + */ + public static NoopClientRequestLifecycleListener getInstance() { + return INSTANCE; + } + + private NoopClientRequestLifecycleListener() { } + + @Override + public void onRequestPending(ClientRequestContext ctx) { + // no-op + } + + @Override + public void onRequestStart(ClientRequestContext ctx) { + // no-op + } + + @Override + public void onRequestSendComplete(ClientRequestContext ctx) { + // no-op + } + + @Override + public void onResponseHeaders(ClientRequestContext ctx, ResponseHeaders headers) { + // no-op + } + + @Override + public void onResponseComplete(ClientRequestContext ctx) { + // no-op + } + + @Override + public void onRequestComplete(ClientRequestContext ctx, @Nullable Throwable cause) { + // no-op + } + + @Override + public RequestLogListener asRequestLogListener() { + // no-op + return requestLogListener; + } +} diff --git a/core/src/test/java/com/linecorp/armeria/client/metric/ClientRequestMetricsIntegrationTest.java b/core/src/test/java/com/linecorp/armeria/client/metric/ClientRequestMetricsIntegrationTest.java new file mode 100644 index 00000000000..f4973cf4543 --- /dev/null +++ b/core/src/test/java/com/linecorp/armeria/client/metric/ClientRequestMetricsIntegrationTest.java @@ -0,0 +1,227 @@ +/* + * Copyright 2025 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package com.linecorp.armeria.client.metric; + +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.InetSocketAddress; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.ClientFactoryBuilder; +import com.linecorp.armeria.client.ClientRequestContextCaptor; +import com.linecorp.armeria.client.Clients; +import com.linecorp.armeria.client.ConnectionPoolListener; +import com.linecorp.armeria.client.UnprocessedRequestException; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.CancellationException; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.SessionProtocol; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.testing.junit5.server.ServerExtension; + +import io.netty.util.AttributeMap; + +class ClientRequestMetricsIntegrationTest { + + private static final CountDownLatch SCENARIO1_CLIENT1_ARRIVED = new CountDownLatch(1); + private static final CountDownLatch SCENARIO1_CLIENT1_RESPONSE = new CountDownLatch(1); + private static final String SCENARIO1_CLIENT1_PATH = "/scenario1-client1"; + + private static final CountDownLatch SCENARIO1_CLIENT2_ARRIVED = new CountDownLatch(1); + private static final CountDownLatch SCENARIO1_CLIENT2_RESPONSE = new CountDownLatch(1); + private static final String SCENARIO1_CLIENT2_PATH = "/scenario2-client2"; + + private static final CountDownLatch SCENARIO2_NEVER_CALLED = new CountDownLatch(1); + private static final String SCENARIO2_PATH = "/scenario2"; + + private static final String SCENARIO3_PATH = "/scenario3"; + + @RegisterExtension + static ServerExtension server = new ServerExtension() { + @Override + protected void configure(ServerBuilder sb) { + sb.service(SCENARIO1_CLIENT1_PATH, (ctx, req) -> HttpResponse.of( + () -> { + SCENARIO1_CLIENT1_ARRIVED.countDown(); + try { + SCENARIO1_CLIENT1_RESPONSE.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return HttpResponse.of("ok"); + }, + ctx.blockingTaskExecutor() + )); + + sb.service(SCENARIO1_CLIENT2_PATH, (ctx, req) -> HttpResponse.of( + () -> { + SCENARIO1_CLIENT2_ARRIVED.countDown(); + try { + SCENARIO1_CLIENT2_RESPONSE.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return HttpResponse.of("ok"); + }, + ctx.blockingTaskExecutor() + )); + + sb.service(SCENARIO2_PATH, (ctx, req) -> { + SCENARIO2_NEVER_CALLED.countDown(); + return HttpResponse.of("ok"); + }); + } + }; + + @Test + void client_request_metric_basic_test() throws InterruptedException { + final ClientRequestMetrics metrics = ClientRequestLifecycleListener.counting(); + final ClientFactoryBuilder factoryBuilder = ClientFactory + .builder() + .clientRequestLifecycleListener(metrics); + try (ClientFactory clientFactory = factoryBuilder.build()) { + final HttpResponse res1 = WebClient.builder("http://127.0.0.1:" + server.httpPort()) + .factory(clientFactory) + .build() + .get(SCENARIO1_CLIENT1_PATH); + + final HttpResponse res2 = WebClient.builder("http://127.0.0.1:" + server.httpPort()) + .factory(clientFactory) + .build() + .get(SCENARIO1_CLIENT2_PATH); + + assertEquals(2L, metrics.pendingRequest()); + assertEquals(0L, metrics.activeRequests()); + + assertTrue(SCENARIO1_CLIENT1_ARRIVED.await(3, TimeUnit.SECONDS)); + assertTrue(SCENARIO1_CLIENT2_ARRIVED.await(3, TimeUnit.SECONDS)); + assertEquals(0L, metrics.pendingRequest()); + assertEquals(2L, metrics.activeRequests()); + + SCENARIO1_CLIENT1_RESPONSE.countDown(); + res1.aggregate().join(); + + await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(1L, metrics.activeRequests()); + assertEquals(0L, metrics.pendingRequest()); + }); + + SCENARIO1_CLIENT2_RESPONSE.countDown(); + res2.aggregate().join(); + + await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(0L, metrics.activeRequests()); + assertEquals(0L, metrics.pendingRequest()); + }); + } + } + + @Test + void client_request_early_cancel_test() throws InterruptedException { + final ClientRequestMetrics metrics = ClientRequestLifecycleListener.counting(); + final ConnectionPoolListener delayListener = new ConnectionPoolListener() { + @Override + public void connectionOpen(SessionProtocol protocol, InetSocketAddress remoteAddr, + InetSocketAddress localAddr, AttributeMap attrs) { + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + // ignore + } + } + + @Override + public void connectionClosed(SessionProtocol protocol, InetSocketAddress remoteAddr, + InetSocketAddress localAddr, AttributeMap attrs) throws Exception { + // ignore + } + }; + + final ClientFactoryBuilder factoryBuilder = ClientFactory + .builder() + .connectionPoolListener(delayListener) + .clientRequestLifecycleListener(metrics); + try (ClientFactory clientFactory = factoryBuilder.build(); + ClientRequestContextCaptor captor = Clients.newContextCaptor()) { + + final HttpResponse unused = WebClient.builder("http://127.0.0.1:" + server.httpPort()) + .factory(clientFactory) + .build() + .get(SCENARIO2_PATH); + + await().pollInSameThread().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(1L, metrics.pendingRequest()); + assertEquals(0L, metrics.activeRequests()); + }); + + // early cancel. + captor.get().cancel(new CancellationException("test-cancel")); + + await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(0L, metrics.activeRequests()); + assertEquals(0L, metrics.pendingRequest()); + }); + + await().during(500, TimeUnit.MILLISECONDS) + .atMost(1, TimeUnit.SECONDS) + .untilAsserted(() -> + // because of early cancel, CountdownLatch in Server Side never be called. + assertEquals(1L, SCENARIO2_NEVER_CALLED.getCount()) + ); + } + } + + @Test + void cannot_find_proper_server_test() throws InterruptedException { + final ClientRequestMetrics metrics = ClientRequestLifecycleListener.counting(); + final ClientFactoryBuilder factoryBuilder = ClientFactory + .builder() + .clientRequestLifecycleListener(metrics); + try (ClientFactory clientFactory = factoryBuilder.build()) { + final String invalidHostName = "http://255.255.255.255:" + server.httpPort(); + final HttpResponse res1 = WebClient.builder(invalidHostName) + .factory(clientFactory) + .build() + .get(SCENARIO3_PATH); + + assertEquals(1L, metrics.pendingRequest()); + assertEquals(0L, metrics.activeRequests()); + + try { + res1.aggregate().join(); + } catch (Exception e) { + // It fails to get channel because there is no server such as http://255.255.255.255 + assertInstanceOf(CompletionException.class, e); + assertInstanceOf(UnprocessedRequestException.class, e.getCause()); + } + + // Check clean up + await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> { + assertEquals(0L, metrics.activeRequests()); + assertEquals(0L, metrics.pendingRequest()); + }); + } + } +}