Skip to content

Commit 573c8a6

Browse files
committed
Experiment work stealing pools
1 parent f390e3f commit 573c8a6

File tree

8 files changed

+337
-30
lines changed

8 files changed

+337
-30
lines changed

reactor-netty-core/src/main/java/reactor/netty/Metrics.java

+4
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,10 @@ public class Metrics {
204204
*/
205205
public static final String PENDING_STREAMS = ".pending.streams";
206206

207+
/**
208+
* The number of HTTP/2 stream acquisitions steal count.
209+
*/
210+
public static final String STEAL_STREAMS = ".steal.streams";
207211

208212
// ByteBufAllocator Metrics
209213
/**

reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java

+45-5
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,20 @@ public InstrumentedPool<T> newPool(
528528
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate).build(poolFactory);
529529
}
530530

531+
public InstrumentedPool<T> newPool(
532+
PoolBuilder<T, PoolConfig<T>> poolBuilder,
533+
int maxConnections,
534+
@Nullable AllocationStrategy<?> allocationStrategy,
535+
Function<T, Publisher<Void>> destroyHandler,
536+
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
537+
Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
538+
if (disposeTimeout != null) {
539+
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, null)
540+
.build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown));
541+
}
542+
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, null).build(poolFactory);
543+
}
544+
531545
public InstrumentedPool<T> newPool(
532546
Publisher<T> allocator,
533547
Function<T, Publisher<Void>> destroyHandler,
@@ -541,6 +555,21 @@ public InstrumentedPool<T> newPool(
541555
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder).build(poolFactory);
542556
}
543557

558+
public InstrumentedPool<T> newPool(
559+
PoolBuilder<T, PoolConfig<T>> poolBuilder,
560+
int maxConnections,
561+
@Nullable AllocationStrategy<?> allocationStrategy,
562+
Function<T, Publisher<Void>> destroyHandler,
563+
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
564+
PoolMetricsRecorder poolMetricsRecorder,
565+
Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
566+
if (disposeTimeout != null) {
567+
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder)
568+
.build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown));
569+
}
570+
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder).build(poolFactory);
571+
}
572+
544573
PoolBuilder<T, PoolConfig<T>> newPoolInternal(
545574
Publisher<T> allocator,
546575
Function<T, Publisher<Void>> destroyHandler,
@@ -553,11 +582,22 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
553582
Function<T, Publisher<Void>> destroyHandler,
554583
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
555584
@Nullable PoolMetricsRecorder poolMetricsRecorder) {
556-
PoolBuilder<T, PoolConfig<T>> poolBuilder =
557-
PoolBuilder.from(allocator)
558-
.destroyHandler(destroyHandler)
559-
.maxPendingAcquire(pendingAcquireMaxCount)
560-
.evictInBackground(evictionInterval);
585+
return newPoolInternal(PoolBuilder.from(allocator), -1, null, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder);
586+
}
587+
588+
PoolBuilder<T, PoolConfig<T>> newPoolInternal(
589+
PoolBuilder<T, PoolConfig<T>> poolBuilder,
590+
int maxConnections,
591+
@Nullable AllocationStrategy<?> allocationStrategy,
592+
Function<T, Publisher<Void>> destroyHandler,
593+
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
594+
@Nullable PoolMetricsRecorder poolMetricsRecorder) {
595+
maxConnections = (maxConnections == -1) ? this.maxConnections : maxConnections;
596+
allocationStrategy = (allocationStrategy == null) ? this.allocationStrategy : allocationStrategy;
597+
poolBuilder = poolBuilder
598+
.destroyHandler(destroyHandler)
599+
.maxPendingAcquire(pendingAcquireMaxCount)
600+
.evictInBackground(evictionInterval);
561601

562602
if (this.evictionPredicate != null) {
563603
poolBuilder = poolBuilder.evictionPredicate(

reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java

+54-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -66,6 +66,23 @@ public interface Builder {
6666
* @return {@code this}
6767
*/
6868
Builder minConnections(int minConnections);
69+
70+
/**
71+
* Enables or disables work stealing mode for managing HTTP2 Connection Pools.
72+
* <p>
73+
* By default, a single Connection Pool is used by multiple Netty event loop threads.
74+
* When work stealing is enabled, each Netty event loop will maintain its own
75+
* HTTP2 Connection Pool, and HTTP2 streams allocation will be distributed over all available
76+
* pools using a work stealing strategy. This approach maximizes throughput and
77+
* resource utilization in a multithreaded environment.
78+
*
79+
* @param progressive true if the HTTP2 Connection pools should be enabled gradually (when the nth pool becomes
80+
* is starting to get some pendingg acquisitions request, then enable one more
81+
* pool until all available pools are enabled).
82+
*
83+
* @return {@code this}
84+
*/
85+
Builder enableWorkStealing(boolean progressive);
6986
}
7087

7188
/**
@@ -77,6 +94,18 @@ public static Http2AllocationStrategy.Builder builder() {
7794
return new Http2AllocationStrategy.Build();
7895
}
7996

97+
/**
98+
* Creates a builder for {@link Http2AllocationStrategy} and initialize it
99+
* with an existing strategy. This method can be used to create a mutated version
100+
* of an existing strategy.
101+
*
102+
* @return a new {@link Http2AllocationStrategy.Builder} initialized with an existing http2
103+
* allocation strategy.
104+
*/
105+
public static Http2AllocationStrategy.Builder builder(Http2AllocationStrategy existing) {
106+
return new Http2AllocationStrategy.Build(existing);
107+
}
108+
80109
@Override
81110
public Http2AllocationStrategy copy() {
82111
return new Http2AllocationStrategy(this);
@@ -141,9 +170,14 @@ public void returnPermits(int returned) {
141170
}
142171
}
143172

173+
public boolean enableWorkStealing() {
174+
return enableWorkStealing;
175+
}
176+
144177
final long maxConcurrentStreams;
145178
final int maxConnections;
146179
final int minConnections;
180+
final boolean enableWorkStealing;
147181

148182
volatile int permits;
149183
static final AtomicIntegerFieldUpdater<Http2AllocationStrategy> PERMITS = AtomicIntegerFieldUpdater.newUpdater(Http2AllocationStrategy.class, "permits");
@@ -152,13 +186,15 @@ public void returnPermits(int returned) {
152186
this.maxConcurrentStreams = build.maxConcurrentStreams;
153187
this.maxConnections = build.maxConnections;
154188
this.minConnections = build.minConnections;
189+
this.enableWorkStealing = build.enableWorkStealing;
155190
PERMITS.lazySet(this, this.maxConnections);
156191
}
157192

158193
Http2AllocationStrategy(Http2AllocationStrategy copy) {
159194
this.maxConcurrentStreams = copy.maxConcurrentStreams;
160195
this.maxConnections = copy.maxConnections;
161196
this.minConnections = copy.minConnections;
197+
this.enableWorkStealing = copy.enableWorkStealing;
162198
PERMITS.lazySet(this, this.maxConnections);
163199
}
164200

@@ -170,6 +206,17 @@ static final class Build implements Builder {
170206
long maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
171207
int maxConnections = DEFAULT_MAX_CONNECTIONS;
172208
int minConnections = DEFAULT_MIN_CONNECTIONS;
209+
boolean enableWorkStealing = Boolean.getBoolean("reactor.netty.pool.h2.enableworkstealing");
210+
211+
Build() {
212+
}
213+
214+
Build(Http2AllocationStrategy existing) {
215+
this.maxConcurrentStreams = existing.maxConcurrentStreams;
216+
this.minConnections = existing.minConnections;
217+
this.maxConnections = existing.maxConnections;
218+
this.enableWorkStealing = existing.enableWorkStealing;
219+
}
173220

174221
@Override
175222
public Http2AllocationStrategy build() {
@@ -206,5 +253,11 @@ public Builder minConnections(int minConnections) {
206253
this.minConnections = minConnections;
207254
return this;
208255
}
256+
257+
@Override
258+
public Builder enableWorkStealing(boolean progressive) {
259+
this.enableWorkStealing = true;
260+
return this;
261+
}
209262
}
210263
}

reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java

+60-6
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@
3939
import reactor.netty.ConnectionObserver;
4040
import reactor.netty.channel.ChannelMetricsRecorder;
4141
import reactor.netty.channel.ChannelOperations;
42+
import reactor.netty.internal.shaded.reactor.pool.PoolBuilder;
43+
import reactor.netty.internal.shaded.reactor.pool.PoolConfig;
44+
import reactor.netty.internal.shaded.reactor.pool.decorators.InstrumentedPoolDecorators;
4245
import reactor.netty.resources.ConnectionProvider;
4346
import reactor.netty.resources.PooledConnectionProvider;
4447
import reactor.netty.transport.ClientTransportConfig;
@@ -51,13 +54,20 @@
5154
import reactor.util.annotation.Nullable;
5255
import reactor.util.concurrent.Queues;
5356
import reactor.util.context.Context;
57+
import reactor.util.function.Tuples;
5458

5559
import java.io.IOException;
5660
import java.net.SocketAddress;
5761
import java.time.Duration;
62+
import java.util.Iterator;
63+
import java.util.List;
5864
import java.util.Queue;
65+
import java.util.concurrent.Executor;
66+
import java.util.concurrent.atomic.AtomicInteger;
5967
import java.util.function.BiPredicate;
6068
import java.util.function.Function;
69+
import java.util.stream.Collectors;
70+
import java.util.stream.StreamSupport;
6171

6272
import static reactor.netty.ReactorNetty.format;
6373
import static reactor.netty.ReactorNetty.getChannelContext;
@@ -565,12 +575,56 @@ static final class PooledConnectionAllocator {
565575
this.config = (HttpClientConfig) config;
566576
this.remoteAddress = remoteAddress;
567577
this.resolver = resolver;
568-
this.pool = id == null ?
569-
poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
570-
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) :
571-
poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
572-
new MicrometerPoolMetricsRecorder(id, name, remoteAddress),
573-
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()));
578+
579+
Http2AllocationStrategy http2Strategy = poolFactory.allocationStrategy() instanceof Http2AllocationStrategy ?
580+
(Http2AllocationStrategy) poolFactory.allocationStrategy() : null;
581+
582+
if (http2Strategy == null || !http2Strategy.enableWorkStealing) {
583+
this.pool = id == null ?
584+
poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
585+
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) :
586+
poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
587+
new MicrometerPoolMetricsRecorder(id, name, remoteAddress),
588+
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()));
589+
}
590+
else {
591+
// Create one connection allocator (it will be shared by all Http2Pool instances)
592+
Publisher<Connection> allocator = connectChannel();
593+
594+
List<Executor> execs = StreamSupport.stream(config.loopResources().onClient(true).spliterator(), false)
595+
.limit(http2Strategy.maxConnections)
596+
.collect(Collectors.toList());
597+
Iterator<Executor> execsIter = execs.iterator();
598+
599+
MicrometerPoolMetricsRecorder micrometerRecorder = id == null ? null : new MicrometerPoolMetricsRecorder(id, name, remoteAddress);
600+
AtomicInteger subPoolIndex = new AtomicInteger();
601+
602+
this.pool = InstrumentedPoolDecorators.concurrentPools(execs.size(), allocator,
603+
(PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder) -> {
604+
int index = subPoolIndex.getAndIncrement();
605+
int minDiv = http2Strategy.minConnections / execs.size();
606+
int minMod = http2Strategy.minConnections % execs.size();
607+
int maxDiv = http2Strategy.maxConnections / execs.size();
608+
int maxMod = http2Strategy.maxConnections % execs.size();
609+
610+
int minConn = index < minMod ? minDiv + 1 : minDiv;
611+
int maxConn = index < maxMod ? maxDiv + 1 : maxDiv;
612+
613+
Http2AllocationStrategy adaptedH2Strategy = Http2AllocationStrategy.builder(http2Strategy)
614+
.minConnections(minConn)
615+
.maxConnections(maxConn)
616+
.build();
617+
618+
InstrumentedPool<Connection> pool =
619+
id == null ?
620+
poolFactory.newPool(poolBuilder, maxConn, adaptedH2Strategy, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
621+
poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy)) :
622+
poolFactory.newPool(poolBuilder, maxConn, adaptedH2Strategy, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
623+
micrometerRecorder,
624+
poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy));
625+
return Tuples.of(pool, execsIter.next());
626+
});
627+
}
574628
}
575629

576630
Publisher<Connection> connectChannel() {

reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java

+21-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -67,6 +67,26 @@ public Meter.Type getType() {
6767
}
6868
},
6969

70+
/**
71+
* The number of HTTP/2 stream acquisition steal count.
72+
*/
73+
STEAL_STREAMS {
74+
@Override
75+
public String getName() {
76+
return "reactor.netty.connection.provider.steal.streams";
77+
}
78+
79+
@Override
80+
public KeyName[] getKeyNames() {
81+
return Http2ConnectionProviderMetersTags.values();
82+
}
83+
84+
@Override
85+
public Meter.Type getType() {
86+
return Meter.Type.COUNTER;
87+
}
88+
},
89+
7090
/**
7191
* The number of the idle connections in the connection pool.
7292
*/

0 commit comments

Comments
 (0)