Skip to content

Commit 93cbff3

Browse files
committed
Experiment work stealing pools
1 parent 4bed04d commit 93cbff3

File tree

8 files changed

+342
-35
lines changed

8 files changed

+342
-35
lines changed

reactor-netty-core/src/main/java/reactor/netty/Metrics.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -204,6 +204,10 @@ public class Metrics {
204204
*/
205205
public static final String PENDING_STREAMS = ".pending.streams";
206206

207+
/**
208+
* The number of HTTP/2 stream acquisitions steal count.
209+
*/
210+
public static final String STEAL_STREAMS = ".steal.streams";
207211

208212
// ByteBufAllocator Metrics
209213
/**

reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java

+46-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018-2023 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2018-2024 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -525,6 +525,20 @@ public InstrumentedPool<T> newPool(
525525
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate).build(poolFactory);
526526
}
527527

528+
public InstrumentedPool<T> newPool(
529+
PoolBuilder<T, PoolConfig<T>> poolBuilder,
530+
int maxConnections,
531+
@Nullable AllocationStrategy<?> allocationStrategy,
532+
Function<T, Publisher<Void>> destroyHandler,
533+
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
534+
Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
535+
if (disposeTimeout != null) {
536+
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, null)
537+
.build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown));
538+
}
539+
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, null).build(poolFactory);
540+
}
541+
528542
public InstrumentedPool<T> newPool(
529543
Publisher<T> allocator,
530544
Function<T, Publisher<Void>> destroyHandler,
@@ -538,6 +552,21 @@ public InstrumentedPool<T> newPool(
538552
return newPoolInternal(allocator, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder).build(poolFactory);
539553
}
540554

555+
public InstrumentedPool<T> newPool(
556+
PoolBuilder<T, PoolConfig<T>> poolBuilder,
557+
int maxConnections,
558+
@Nullable AllocationStrategy<?> allocationStrategy,
559+
Function<T, Publisher<Void>> destroyHandler,
560+
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
561+
PoolMetricsRecorder poolMetricsRecorder,
562+
Function<PoolConfig<T>, InstrumentedPool<T>> poolFactory) {
563+
if (disposeTimeout != null) {
564+
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder)
565+
.build(poolFactory.andThen(InstrumentedPoolDecorators::gracefulShutdown));
566+
}
567+
return newPoolInternal(poolBuilder, maxConnections, allocationStrategy, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder).build(poolFactory);
568+
}
569+
541570
PoolBuilder<T, PoolConfig<T>> newPoolInternal(
542571
Publisher<T> allocator,
543572
Function<T, Publisher<Void>> destroyHandler,
@@ -550,11 +579,22 @@ PoolBuilder<T, PoolConfig<T>> newPoolInternal(
550579
Function<T, Publisher<Void>> destroyHandler,
551580
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
552581
@Nullable PoolMetricsRecorder poolMetricsRecorder) {
553-
PoolBuilder<T, PoolConfig<T>> poolBuilder =
554-
PoolBuilder.from(allocator)
555-
.destroyHandler(destroyHandler)
556-
.maxPendingAcquire(pendingAcquireMaxCount)
557-
.evictInBackground(evictionInterval);
582+
return newPoolInternal(PoolBuilder.from(allocator), -1, null, destroyHandler, defaultEvictionPredicate, poolMetricsRecorder);
583+
}
584+
585+
PoolBuilder<T, PoolConfig<T>> newPoolInternal(
586+
PoolBuilder<T, PoolConfig<T>> poolBuilder,
587+
int maxConnections,
588+
@Nullable AllocationStrategy<?> allocationStrategy,
589+
Function<T, Publisher<Void>> destroyHandler,
590+
BiPredicate<T, PooledRefMetadata> defaultEvictionPredicate,
591+
@Nullable PoolMetricsRecorder poolMetricsRecorder) {
592+
maxConnections = (maxConnections == -1) ? this.maxConnections : maxConnections;
593+
allocationStrategy = (allocationStrategy == null) ? this.allocationStrategy : allocationStrategy;
594+
poolBuilder = poolBuilder
595+
.destroyHandler(destroyHandler)
596+
.maxPendingAcquire(pendingAcquireMaxCount)
597+
.evictInBackground(evictionInterval);
558598

559599
if (this.evictionPredicate != null) {
560600
poolBuilder = poolBuilder.evictionPredicate(

reactor-netty-http/src/main/java/reactor/netty/http/client/Http2AllocationStrategy.java

+54-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -66,6 +66,23 @@ public interface Builder {
6666
* @return {@code this}
6767
*/
6868
Builder minConnections(int minConnections);
69+
70+
/**
71+
* Enables or disables work stealing mode for managing HTTP2 Connection Pools.
72+
* <p>
73+
* By default, a single Connection Pool is used by multiple Netty event loop threads.
74+
* When work stealing is enabled, each Netty event loop will maintain its own
75+
* HTTP2 Connection Pool, and HTTP2 streams allocation will be distributed over all available
76+
* pools using a work stealing strategy. This approach maximizes throughput and
77+
* resource utilization in a multithreaded environment.
78+
*
79+
* @param progressive true if the HTTP2 Connection pools should be enabled gradually (when the nth pool becomes
80+
* is starting to get some pendingg acquisitions request, then enable one more
81+
* pool until all available pools are enabled).
82+
*
83+
* @return {@code this}
84+
*/
85+
Builder enableWorkStealing(boolean progressive);
6986
}
7087

7188
/**
@@ -77,6 +94,18 @@ public static Http2AllocationStrategy.Builder builder() {
7794
return new Http2AllocationStrategy.Build();
7895
}
7996

97+
/**
98+
* Creates a builder for {@link Http2AllocationStrategy} and initialize it
99+
* with an existing strategy. This method can be used to create a mutated version
100+
* of an existing strategy.
101+
*
102+
* @return a new {@link Http2AllocationStrategy.Builder} initialized with an existing http2
103+
* allocation strategy.
104+
*/
105+
public static Http2AllocationStrategy.Builder builder(Http2AllocationStrategy existing) {
106+
return new Http2AllocationStrategy.Build(existing);
107+
}
108+
80109
@Override
81110
public Http2AllocationStrategy copy() {
82111
return new Http2AllocationStrategy(this);
@@ -141,9 +170,14 @@ public void returnPermits(int returned) {
141170
}
142171
}
143172

173+
public boolean enableWorkStealing() {
174+
return enableWorkStealing;
175+
}
176+
144177
final long maxConcurrentStreams;
145178
final int maxConnections;
146179
final int minConnections;
180+
final boolean enableWorkStealing;
147181

148182
volatile int permits;
149183
static final AtomicIntegerFieldUpdater<Http2AllocationStrategy> PERMITS = AtomicIntegerFieldUpdater.newUpdater(Http2AllocationStrategy.class, "permits");
@@ -152,13 +186,15 @@ public void returnPermits(int returned) {
152186
this.maxConcurrentStreams = build.maxConcurrentStreams;
153187
this.maxConnections = build.maxConnections;
154188
this.minConnections = build.minConnections;
189+
this.enableWorkStealing = build.enableWorkStealing;
155190
PERMITS.lazySet(this, this.maxConnections);
156191
}
157192

158193
Http2AllocationStrategy(Http2AllocationStrategy copy) {
159194
this.maxConcurrentStreams = copy.maxConcurrentStreams;
160195
this.maxConnections = copy.maxConnections;
161196
this.minConnections = copy.minConnections;
197+
this.enableWorkStealing = copy.enableWorkStealing;
162198
PERMITS.lazySet(this, this.maxConnections);
163199
}
164200

@@ -170,6 +206,17 @@ static final class Build implements Builder {
170206
long maxConcurrentStreams = DEFAULT_MAX_CONCURRENT_STREAMS;
171207
int maxConnections = DEFAULT_MAX_CONNECTIONS;
172208
int minConnections = DEFAULT_MIN_CONNECTIONS;
209+
boolean enableWorkStealing = Boolean.getBoolean("reactor.netty.pool.h2.enableworkstealing");
210+
211+
Build() {
212+
}
213+
214+
Build(Http2AllocationStrategy existing) {
215+
this.maxConcurrentStreams = existing.maxConcurrentStreams;
216+
this.minConnections = existing.minConnections;
217+
this.maxConnections = existing.maxConnections;
218+
this.enableWorkStealing = existing.enableWorkStealing;
219+
}
173220

174221
@Override
175222
public Http2AllocationStrategy build() {
@@ -206,5 +253,11 @@ public Builder minConnections(int minConnections) {
206253
this.minConnections = minConnections;
207254
return this;
208255
}
256+
257+
@Override
258+
public Builder enableWorkStealing(boolean progressive) {
259+
this.enableWorkStealing = true;
260+
return this;
261+
}
209262
}
210263
}

reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProvider.java

+61-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,6 +39,9 @@
3939
import reactor.netty.ConnectionObserver;
4040
import reactor.netty.channel.ChannelMetricsRecorder;
4141
import reactor.netty.channel.ChannelOperations;
42+
import reactor.netty.internal.shaded.reactor.pool.PoolBuilder;
43+
import reactor.netty.internal.shaded.reactor.pool.PoolConfig;
44+
import reactor.netty.internal.shaded.reactor.pool.decorators.InstrumentedPoolDecorators;
4245
import reactor.netty.resources.ConnectionProvider;
4346
import reactor.netty.resources.PooledConnectionProvider;
4447
import reactor.netty.transport.TransportConfig;
@@ -50,13 +53,20 @@
5053
import reactor.util.annotation.Nullable;
5154
import reactor.util.concurrent.Queues;
5255
import reactor.util.context.Context;
56+
import reactor.util.function.Tuples;
5357

5458
import java.io.IOException;
5559
import java.net.SocketAddress;
5660
import java.time.Duration;
61+
import java.util.Iterator;
62+
import java.util.List;
5763
import java.util.Queue;
64+
import java.util.concurrent.Executor;
65+
import java.util.concurrent.atomic.AtomicInteger;
5866
import java.util.function.BiPredicate;
5967
import java.util.function.Function;
68+
import java.util.stream.Collectors;
69+
import java.util.stream.StreamSupport;
6070

6171
import static reactor.netty.ReactorNetty.format;
6272
import static reactor.netty.ReactorNetty.getChannelContext;
@@ -536,12 +546,56 @@ static final class PooledConnectionAllocator {
536546
this.config = (HttpClientConfig) config;
537547
this.remoteAddress = remoteAddress;
538548
this.resolver = resolver;
539-
this.pool = id == null ?
540-
poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
541-
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) :
542-
poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
543-
new MicrometerPoolMetricsRecorder(id, name, remoteAddress),
544-
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()));
549+
550+
Http2AllocationStrategy http2Strategy = poolFactory.allocationStrategy() instanceof Http2AllocationStrategy ?
551+
(Http2AllocationStrategy) poolFactory.allocationStrategy() : null;
552+
553+
if (http2Strategy == null || !http2Strategy.enableWorkStealing) {
554+
this.pool = id == null ?
555+
poolFactory.newPool(connectChannel(), null, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
556+
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy())) :
557+
poolFactory.newPool(connectChannel(), DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
558+
new MicrometerPoolMetricsRecorder(id, name, remoteAddress),
559+
poolConfig -> new Http2Pool(poolConfig, poolFactory.allocationStrategy()));
560+
}
561+
else {
562+
// Create one connection allocator (it will be shared by all Http2Pool instances)
563+
Publisher<Connection> allocator = connectChannel();
564+
565+
List<Executor> execs = StreamSupport.stream(config.loopResources().onClient(true).spliterator(), false)
566+
.limit(http2Strategy.maxConnections)
567+
.collect(Collectors.toList());
568+
Iterator<Executor> execsIter = execs.iterator();
569+
570+
MicrometerPoolMetricsRecorder micrometerRecorder = id == null ? null : new MicrometerPoolMetricsRecorder(id, name, remoteAddress);
571+
AtomicInteger subPoolIndex = new AtomicInteger();
572+
573+
this.pool = InstrumentedPoolDecorators.concurrentPools(execs.size(), allocator,
574+
(PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder) -> {
575+
int index = subPoolIndex.getAndIncrement();
576+
int minDiv = http2Strategy.minConnections / execs.size();
577+
int minMod = http2Strategy.minConnections % execs.size();
578+
int maxDiv = http2Strategy.maxConnections / execs.size();
579+
int maxMod = http2Strategy.maxConnections % execs.size();
580+
581+
int minConn = index < minMod ? minDiv + 1 : minDiv;
582+
int maxConn = index < maxMod ? maxDiv + 1 : maxDiv;
583+
584+
Http2AllocationStrategy adaptedH2Strategy = Http2AllocationStrategy.builder(http2Strategy)
585+
.minConnections(minConn)
586+
.maxConnections(maxConn)
587+
.build();
588+
589+
InstrumentedPool<Connection> pool =
590+
id == null ?
591+
poolFactory.newPool(poolBuilder, maxConn, adaptedH2Strategy, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
592+
poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy)) :
593+
poolFactory.newPool(poolBuilder, maxConn, adaptedH2Strategy, DEFAULT_DESTROY_HANDLER, DEFAULT_EVICTION_PREDICATE,
594+
micrometerRecorder,
595+
poolConfig -> new Http2Pool(poolConfig, adaptedH2Strategy));
596+
return Tuples.of(pool, execsIter.next());
597+
});
598+
}
545599
}
546600

547601
Publisher<Connection> connectChannel() {

reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionProviderMeters.java

+21-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022-2023 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -67,6 +67,26 @@ public Meter.Type getType() {
6767
}
6868
},
6969

70+
/**
71+
* The number of HTTP/2 stream acquisition steal count.
72+
*/
73+
STEAL_STREAMS {
74+
@Override
75+
public String getName() {
76+
return "reactor.netty.connection.provider.steal.streams";
77+
}
78+
79+
@Override
80+
public KeyName[] getKeyNames() {
81+
return Http2ConnectionProviderMetersTags.values();
82+
}
83+
84+
@Override
85+
public Meter.Type getType() {
86+
return Meter.Type.COUNTER;
87+
}
88+
},
89+
7090
/**
7191
* The number of the idle connections in the connection pool.
7292
*/

0 commit comments

Comments
 (0)