Skip to content

Commit a06c842

Browse files
committed
Add configuration for HTTP/2 PING scheduler interval and retry threshold
- Added a method to configure the execution interval of the scheduler that sends HTTP/2 PING frames and periodically checks for ACK responses - Introduced a retry threshold setting to limit the number of PING transmission attempts before considering the connection as unresponsive - Default values: - Scheduler interval must be explicitly set - Retry threshold defaults to 0 (no retries, only one PING attempt) Signed-off-by: raccoonback <[email protected]>
1 parent 92be9aa commit a06c842

File tree

7 files changed

+320
-94
lines changed

7 files changed

+320
-94
lines changed

docs/modules/ROOT/pages/http-client.adoc

+4-1
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,10 @@ include::{examples-dir}/http2/H2Application.java[lines=18..42]
318318
----
319319
<1> Configures the client to support only `HTTP/2`
320320
<2> Configures `SSL`
321-
<3> You can configure the interval for checking `Ping` frames
321+
<3> Sets the interval for sending `HTTP/2` `PING` frames and receiving `ACK` responses
322+
<4> Sets the execution interval for the scheduler that sends `HTTP/2` `PING frames and periodically checks for `ACK` responses
323+
<5> Sets the threshold for retrying `HTTP/2` `PING` frame transmissions.
324+
322325

323326
The following listing presents a simple `H2C` example:
324327

reactor-netty-core/src/main/java/reactor/netty/NettyPipeline.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public interface NettyPipeline {
111111
String OnChannelReadIdle = LEFT + "onChannelReadIdle";
112112
String OnChannelWriteIdle = LEFT + "onChannelWriteIdle";
113113
String ProxyHandler = LEFT + "proxyHandler";
114-
String H2LivenessHandler = LEFT + "h2LivenessHandler";
114+
String H2LivenessHandler = LEFT + "h2LivenessHandler";
115115
/**
116116
* Use to register a special handler which ensures that any {@link io.netty.channel.VoidChannelPromise}
117117
* will be converted to "unvoided" promises.

reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/client/http2/H2Application.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@ public static void main(String[] args) {
3030
HttpClient.create()
3131
.protocol(HttpProtocol.H2) //<1>
3232
.secure() //<2>
33-
.http2Settings(builder -> builder.pingInterval(Duration.ofMillis(100))); // <3>
33+
.http2Settings(
34+
builder -> builder.pingAckTimeout(Duration.ofMillis(600)) // <3>
35+
.pingScheduleInterval(Duration.ofMillis(300)) // <4>
36+
.pingAckDropThreshold(2) // <5>
37+
);
3438

3539
Tuple2<String, HttpHeaders> response =
3640
client.get()

reactor-netty-http/src/main/java/reactor/netty/http/Http2SettingsSpec.java

+118-20
Original file line numberDiff line numberDiff line change
@@ -96,19 +96,75 @@ public interface Builder {
9696
//Builder pushEnabled(boolean pushEnabled);
9797

9898
/**
99-
* Sets the interval for checking ping frames.
100-
* If a ping ACK frame is not received within the configured interval, the connection will be closed.
99+
* Sets the interval for sending HTTP/2 PING frames and receiving ACK responses.
101100
*
102-
* <p>Be cautious when setting a very short interval, as it may cause the connection to be closed,
103-
* even if the keep-alive setting is enabled.</p>
101+
* <p>
102+
* This method configures the time interval at which PING frames are sent to the peer.
103+
* The interval should be chosen carefully to balance between detecting connection issues
104+
* and minimizing unnecessary network traffic.
105+
* </p>
104106
*
105-
* <p>If no interval is specified, no ping frame checking will be performed by default.</p>
107+
* <p>
108+
* If the interval is set too short, it may cause excessive network overhead.
109+
* If set too long, connection failures may not be detected in a timely manner.
110+
* </p>
106111
*
107-
* @param pingInterval the duration between sending ping frames. If not specified, ping frame checking is disabled.
108-
* @return {@code this}
109-
* @since 1.2.3
112+
* @param pingAckTimeout the interval in between consecutive PING frames
113+
* and ACK responses. Must be a positive value.
114+
*/
115+
default Builder pingAckTimeout(Duration pingAckTimeout) {
116+
return this;
117+
}
118+
119+
/**
120+
* Sets the execution interval for the scheduler that sends HTTP/2 PING frames
121+
* and periodically checks for ACK responses.
122+
*
123+
* <p>
124+
* This method configures the time interval at which the scheduler runs
125+
* to send PING frames and verify if ACK responses are received within
126+
* the expected timeframe.
127+
* Proper tuning of this interval helps in detecting connection issues
128+
* while avoiding unnecessary network overhead.
129+
* </p>
130+
*
131+
* <p>
132+
* If the interval is too short, it may increase network and CPU usage.
133+
* Conversely, setting it too long may delay the detection of connection failures.
134+
* </p>
135+
*
136+
* @param pingScheduleInterval the interval in at which the scheduler executes.
137+
* Must be a positive value.
110138
*/
111-
default Builder pingInterval(Duration pingInterval) {
139+
default Builder pingScheduleInterval(Duration pingScheduleInterval) {
140+
return this;
141+
}
142+
143+
/**
144+
* Sets the threshold for retrying HTTP/2 PING frame transmissions.
145+
*
146+
* <p>
147+
* This method defines the maximum number of attempts to send a PING frame
148+
* before considering the connection as unresponsive.
149+
* If the threshold is exceeded without receiving an ACK response,
150+
* the connection may be closed or marked as unhealthy.
151+
* </p>
152+
*
153+
* <p>
154+
* A lower threshold can detect connection failures more quickly but may lead
155+
* to premature disconnections. Conversely, a higher threshold allows more retries
156+
* but may delay failure detection.
157+
* </p>
158+
*
159+
* <p>
160+
* If this value is not specified, it defaults to 0, meaning only one attempt to send a PING frame is made without retries.
161+
* </p>
162+
*
163+
* @param pingAckDropThreshold the maximum number of PING transmission attempts.
164+
* Must be a positive integer.
165+
* The default value is 0, meaning no retries will occur and only one PING frame will be sent.
166+
*/
167+
default Builder pingAckDropThreshold(Integer pingAckDropThreshold) {
112168
return this;
113169
}
114170
}
@@ -195,13 +251,33 @@ public Boolean pushEnabled() {
195251
}
196252

197253
/**
198-
* Returns the configured {@code pingInterval} value or null.
254+
* Returns the configured {@code pingAckTimeout} value or null.
255+
*
256+
* @return the configured {@code pingAckTimeout} value or null
257+
*/
258+
@Nullable
259+
public Duration pingAckTimeout() {
260+
return pingAckTimeout;
261+
}
262+
263+
/**
264+
* Returns the configured {@code pingScheduleInterval} value or null.
199265
*
200-
* @return the configured {@code pingInterval} value or null
266+
* @return the configured {@code pingScheduleInterval} value or null
201267
*/
202268
@Nullable
203-
public Duration pingInterval() {
204-
return pingInterval;
269+
public Duration pingScheduleInterval() {
270+
return pingScheduleInterval;
271+
}
272+
273+
/**
274+
* Returns the configured {@code pingAckDropThreshold} value or null.
275+
*
276+
* @return the configured {@code pingAckDropThreshold} value or null
277+
*/
278+
@Nullable
279+
public Integer pingAckDropThreshold() {
280+
return pingAckDropThreshold;
205281
}
206282

207283
@Override
@@ -220,7 +296,9 @@ public boolean equals(Object o) {
220296
maxHeaderListSize.equals(that.maxHeaderListSize) &&
221297
Objects.equals(maxStreams, that.maxStreams) &&
222298
Objects.equals(pushEnabled, that.pushEnabled) &&
223-
Objects.equals(pingInterval, that.pingInterval);
299+
Objects.equals(pingAckTimeout, that.pingAckTimeout) &&
300+
Objects.equals(pingScheduleInterval, that.pingScheduleInterval) &&
301+
Objects.equals(pingAckDropThreshold, that.pingAckDropThreshold);
224302
}
225303

226304
@Override
@@ -233,7 +311,9 @@ public int hashCode() {
233311
result = 31 * result + Long.hashCode(maxHeaderListSize);
234312
result = 31 * result + Long.hashCode(maxStreams);
235313
result = 31 * result + Boolean.hashCode(pushEnabled);
236-
result = 31 * result + Objects.hashCode(pingInterval);
314+
result = 31 * result + Objects.hashCode(pingAckTimeout);
315+
result = 31 * result + Objects.hashCode(pingScheduleInterval);
316+
result = 31 * result + Objects.hashCode(pingAckDropThreshold);
237317
return result;
238318
}
239319

@@ -244,7 +324,9 @@ public int hashCode() {
244324
final Long maxHeaderListSize;
245325
final Long maxStreams;
246326
final Boolean pushEnabled;
247-
final Duration pingInterval;
327+
final Duration pingAckTimeout;
328+
final Duration pingScheduleInterval;
329+
final Integer pingAckDropThreshold;
248330

249331
Http2SettingsSpec(Build build) {
250332
Http2Settings settings = build.http2Settings;
@@ -261,12 +343,16 @@ public int hashCode() {
261343
maxHeaderListSize = settings.maxHeaderListSize();
262344
maxStreams = build.maxStreams;
263345
pushEnabled = settings.pushEnabled();
264-
pingInterval = build.pingInterval;
346+
pingAckTimeout = build.pingAckTimeout;
347+
pingScheduleInterval = build.pingScheduleInterval;
348+
pingAckDropThreshold = build.pingAckDropThreshold;
265349
}
266350

267351
static final class Build implements Builder {
268352
Long maxStreams;
269-
Duration pingInterval;
353+
Duration pingAckTimeout;
354+
Duration pingScheduleInterval;
355+
Integer pingAckDropThreshold;
270356
final Http2Settings http2Settings = Http2Settings.defaultSettings();
271357

272358
@Override
@@ -314,8 +400,20 @@ public Builder maxStreams(long maxStreams) {
314400
}
315401

316402
@Override
317-
public Builder pingInterval(Duration pingInterval) {
318-
this.pingInterval = pingInterval;
403+
public Builder pingAckTimeout(Duration pingAckTimeout) {
404+
this.pingAckTimeout = pingAckTimeout;
405+
return this;
406+
}
407+
408+
@Override
409+
public Builder pingScheduleInterval(Duration pingScheduleInterval) {
410+
this.pingScheduleInterval = pingScheduleInterval;
411+
return this;
412+
}
413+
414+
@Override
415+
public Builder pingAckDropThreshold(Integer pingAckDropThreshold) {
416+
this.pingAckDropThreshold = pingAckDropThreshold;
319417
return this;
320418
}
321419

reactor-netty-http/src/main/java/reactor/netty/http/client/Http2ConnectionLivenessHandler.java

+61-18
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import reactor.util.annotation.Nullable;
2929

3030
import java.time.Duration;
31+
import java.util.Objects;
3132
import java.util.concurrent.ScheduledFuture;
3233
import java.util.concurrent.ThreadLocalRandom;
3334

@@ -55,33 +56,57 @@ final class Http2ConnectionLivenessHandler extends ChannelDuplexHandler {
5556
private static final Logger log = Loggers.getLogger(Http2ConnectionLivenessHandler.class);
5657

5758
private ScheduledFuture<?> pingScheduler;
59+
5860
private final ChannelFutureListener pingWriteListener = new PingWriteListener();
5961
private final Http2ConnectionEncoder encoder;
60-
private final long pingIntervalNanos;
62+
private final long pingAckTimeoutNanos;
63+
private final long pingScheduleIntervalNanos;
64+
private final int pingAckDropThreshold;
65+
66+
private int pingAckDropCount;
6167
private long lastSentPingData;
6268
private long lastReceivedPingTime;
69+
private long lastSendingPingTime;
6370
private long lastIoTime;
6471
private boolean isPingAckPending;
6572

66-
public Http2ConnectionLivenessHandler(Http2ConnectionEncoder encoder, @Nullable Duration pingInterval) {
73+
public Http2ConnectionLivenessHandler(Http2ConnectionEncoder encoder, @Nullable Duration pingAckTimeout,
74+
@Nullable Duration pintScheduleInterval, @Nullable Integer pingAckDropThreshold) {
75+
Objects.requireNonNull(encoder, "encoder");
76+
6777
this.encoder = encoder;
6878

69-
if (pingInterval != null) {
70-
this.pingIntervalNanos = pingInterval.toNanos();
79+
if (pingAckTimeout != null) {
80+
this.pingAckTimeoutNanos = pingAckTimeout.toNanos();
81+
}
82+
else {
83+
this.pingAckTimeoutNanos = 0L;
84+
}
85+
86+
if (pintScheduleInterval != null) {
87+
this.pingScheduleIntervalNanos = pintScheduleInterval.toNanos();
7188
}
7289
else {
73-
this.pingIntervalNanos = 0L;
90+
this.pingScheduleIntervalNanos = 0L;
91+
}
92+
93+
if (pingAckDropThreshold != null) {
94+
this.pingAckDropThreshold = pingAckDropThreshold;
95+
}
96+
else {
97+
this.pingAckDropThreshold = 0;
7498
}
7599
}
76100

77101
@Override
78102
public void channelActive(ChannelHandlerContext ctx) throws Exception {
79103
if (isPingIntervalConfigured()) {
80104
isPingAckPending = false;
105+
pingAckDropCount = 0;
81106
pingScheduler = ctx.executor()
82107
.schedule(
83108
new PingChecker(ctx),
84-
pingIntervalNanos,
109+
pingAckTimeoutNanos,
85110
NANOSECONDS
86111
);
87112
}
@@ -118,14 +143,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
118143
ctx.fireChannelInactive();
119144
}
120145

121-
@Override
122-
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
123-
cancel();
124-
ctx.fireExceptionCaught(cause);
125-
}
126-
127146
private boolean isPingIntervalConfigured() {
128-
return pingIntervalNanos > 0;
147+
return pingAckTimeoutNanos > 0
148+
&& pingScheduleIntervalNanos > 0;
129149
}
130150

131151
private void cancel() {
@@ -155,6 +175,7 @@ public void run() {
155175
}
156176

157177
isPingAckPending = false;
178+
pingAckDropCount = 0;
158179
pingScheduler = invokeNextSchedule();
159180
return;
160181
}
@@ -170,15 +191,28 @@ public void run() {
170191
}
171192

172193
if (isOutOfTimeRange()) {
194+
countPingDrop();
195+
196+
if (isExceedAckDropThreshold()) {
197+
if (log.isInfoEnabled()) {
198+
log.info("Closing {} channel due to delayed ping frame response (timeout: {} ns). lastReceivedPingTime: {}, current: {}", channel, pingAckTimeoutNanos, lastReceivedPingTime, System.nanoTime());
199+
}
200+
201+
close(channel);
202+
return;
203+
}
204+
173205
if (log.isInfoEnabled()) {
174-
log.info("Closing {} channel due to delayed ping frame response (timeout: {} ns).", channel, pingIntervalNanos);
206+
log.info("Drop ping ack frame in {} channel. (ping: {})", channel, lastSentPingData);
175207
}
176208

177-
close(channel);
209+
writePing(ctx);
210+
pingScheduler = invokeNextSchedule();
178211
return;
179212
}
180213

181214
isPingAckPending = false;
215+
pingAckDropCount = 0;
182216
pingScheduler = invokeNextSchedule();
183217
}
184218

@@ -192,18 +226,26 @@ private void writePing(ChannelHandlerContext ctx) {
192226
}
193227

194228
private boolean isIoInProgress() {
195-
return pingIntervalNanos > (System.nanoTime() - lastIoTime);
229+
return pingAckTimeoutNanos >= (System.nanoTime() - lastIoTime);
196230
}
197231

198232
private boolean isOutOfTimeRange() {
199-
return pingIntervalNanos < (System.nanoTime() - lastReceivedPingTime);
233+
return pingAckTimeoutNanos < Math.abs(lastReceivedPingTime - lastSendingPingTime);
234+
}
235+
236+
private void countPingDrop() {
237+
pingAckDropCount++;
238+
}
239+
240+
private boolean isExceedAckDropThreshold() {
241+
return pingAckDropCount > pingAckDropThreshold;
200242
}
201243

202244
private ScheduledFuture<?> invokeNextSchedule() {
203245
return ctx.executor()
204246
.schedule(
205247
new PingChecker(ctx),
206-
pingIntervalNanos,
248+
pingScheduleIntervalNanos,
207249
NANOSECONDS
208250
);
209251
}
@@ -233,6 +275,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
233275
}
234276

235277
isPingAckPending = true;
278+
lastSendingPingTime = System.nanoTime();
236279
}
237280
else if (log.isDebugEnabled()) {
238281
log.debug("Failed to wrote PING frame to {} channel.", future.channel());

0 commit comments

Comments
 (0)