Skip to content

Commit a2d6fa3

Browse files
authored
Fix http2 client dynamic table RC #10820
Signed-off-by: Daniel Kec <[email protected]>
1 parent ab5cbd2 commit a2d6fa3

File tree

3 files changed

+114
-27
lines changed

3 files changed

+114
-27
lines changed

webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnection.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.atomic.AtomicReference;
2929
import java.util.concurrent.locks.Lock;
3030
import java.util.concurrent.locks.ReadWriteLock;
31+
import java.util.concurrent.locks.ReentrantLock;
3132
import java.util.concurrent.locks.ReentrantReadWriteLock;
3233

3334
import io.helidon.common.buffers.BufferData;
@@ -46,6 +47,7 @@
4647
import io.helidon.http.http2.Http2FrameTypes;
4748
import io.helidon.http.http2.Http2GoAway;
4849
import io.helidon.http.http2.Http2Headers;
50+
import io.helidon.http.http2.Http2HuffmanDecoder;
4951
import io.helidon.http.http2.Http2LoggingFrameListener;
5052
import io.helidon.http.http2.Http2Ping;
5153
import io.helidon.http.http2.Http2RstStream;
@@ -76,6 +78,7 @@ public class Http2ClientConnection {
7678
private final ConnectionFlowControl connectionFlowControl;
7779
private final Http2Headers.DynamicTable inboundDynamicTable =
7880
Http2Headers.DynamicTable.create(Http2Setting.HEADER_TABLE_SIZE.defaultValue());
81+
private final ReentrantLock inboundDynamicTableLock = new ReentrantLock();
7982
private final Http2ClientProtocolConfig protocolConfig;
8083
private final ClientConnection connection;
8184
private final SocketContext ctx;
@@ -259,6 +262,32 @@ static Http2Settings settings(Http2ClientProtocolConfig config) {
259262
.build();
260263
}
261264

265+
/**
266+
* Reads the HTTP/2 headers for the specified client stream from this connection.
267+
* Thread-safe: Uses connection inbound dynamic table synchronized per connection.
268+
*
269+
* @param stream the HTTP/2 client stream for which headers are being read
270+
* @param decoder the Huffman decoder to decode the headers
271+
* @param headers the existing headers object to populate or use as a basis
272+
* @param array the array of HTTP/2 frame data to process
273+
* @return the processed HTTP/2 headers
274+
*/
275+
Http2Headers readHeaders(Http2ClientStream stream,
276+
Http2HuffmanDecoder decoder,
277+
Http2Headers headers,
278+
Http2FrameData[] array) {
279+
inboundDynamicTableLock.lock();
280+
try {
281+
return Http2Headers.create(stream,
282+
inboundDynamicTable,
283+
decoder,
284+
headers,
285+
array);
286+
} finally {
287+
inboundDynamicTableLock.unlock();
288+
}
289+
}
290+
262291
private void sendPreface(Http2ClientProtocolConfig config, boolean sendSettings) {
263292
BufferData prefaceData = Http2Util.prefaceData();
264293
sendListener.frame(ctx, 0, prefaceData);

webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientStream.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -87,11 +87,11 @@ public class Http2ClientStream implements Http2Stream, ReleasableResource {
8787
private StreamBuffer buffer;
8888

8989
protected Http2ClientStream(Http2ClientConnection connection,
90-
Http2Settings serverSettings,
91-
SocketContext ctx,
92-
Http2StreamConfig http2StreamConfig,
93-
Http2ClientConfig http2ClientConfig,
94-
LockingStreamIdSequence streamIdSeq) {
90+
Http2Settings serverSettings,
91+
SocketContext ctx,
92+
Http2StreamConfig http2StreamConfig,
93+
Http2ClientConfig http2ClientConfig,
94+
LockingStreamIdSequence streamIdSeq) {
9595
this.connection = connection;
9696
this.serverSettings = serverSettings;
9797
this.ctx = ctx;
@@ -289,7 +289,7 @@ Status waitFor100Continue() {
289289
* Writes HTTP2 headers to the stream.
290290
*
291291
* @param http2Headers the headers
292-
* @param endOfStream end of stream marker
292+
* @param endOfStream end of stream marker
293293
*/
294294
public void writeHeaders(Http2Headers http2Headers, boolean endOfStream) {
295295
this.state = Http2StreamState.checkAndGetState(this.state, Http2FrameType.HEADERS, true, endOfStream, true);
@@ -477,13 +477,13 @@ private void continue100(Http2Headers headers, boolean endOfStream) {
477477
}
478478

479479
private Http2Headers readHeaders(Http2HuffmanDecoder decoder, boolean mergeWithPrevious) {
480-
Http2Headers http2Headers = Http2Headers.create(this,
481-
connection.getInboundDynamicTable(),
482-
decoder,
483-
mergeWithPrevious && currentHeaders != null
484-
? currentHeaders
485-
: Http2Headers.create(WritableHeaders.create()),
486-
continuationData.toArray(new Http2FrameData[0]));
480+
Http2Headers http2Headers =
481+
connection.readHeaders(this,
482+
decoder,
483+
mergeWithPrevious && currentHeaders != null
484+
? currentHeaders
485+
: Http2Headers.create(WritableHeaders.create()),
486+
continuationData.toArray(new Http2FrameData[0]));
487487
recvListener.headers(ctx, streamId, http2Headers);
488488
return http2Headers;
489489
}
@@ -518,7 +518,7 @@ enum ReadState {
518518

519519
private final Set<ReadState> allowedTransitions;
520520

521-
ReadState(ReadState... allowedTransitions){
521+
ReadState(ReadState... allowedTransitions) {
522522
this.allowedTransitions = Set.of(allowedTransitions);
523523
}
524524

webclient/http2/src/test/java/io/helidon/webclient/http2/Http2WebClientTest.java

Lines changed: 71 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2023 Oracle and/or its affiliates.
2+
* Copyright (c) 2023, 2025 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.TimeoutException;
2828
import java.util.function.Consumer;
2929
import java.util.function.Supplier;
30+
import java.util.stream.IntStream;
3031
import java.util.stream.Stream;
3132

3233
import io.helidon.common.LazyValue;
@@ -141,17 +142,36 @@ static void setUpServer(WebServerConfig.Builder serverBuilder) {
141142
res.send("POST " + req.content().as(String.class));
142143
}))
143144
.route(Http2Route.route(GET, "/versionspecific/h2streaming", (req, res) -> {
144-
res.status(Status.OK_200);
145-
String execId = req.query().get("execId");
146-
try (OutputStream os = res.outputStream()) {
147-
for (int i = 0; i < 5; i++) {
148-
os.write(String.format(execId + "BAF%03d", i).getBytes());
149-
Thread.sleep(10);
150-
}
151-
} catch (IOException | InterruptedException e) {
152-
throw new RuntimeException(e);
153-
}
154-
}));
145+
res.status(Status.OK_200);
146+
String execId = req.query().get("execId");
147+
try (OutputStream os = res.outputStream()) {
148+
for (int i = 0; i < 5; i++) {
149+
os.write(String.format(execId + "BAF%03d", i).getBytes());
150+
Thread.sleep(10);
151+
}
152+
} catch (IOException | InterruptedException e) {
153+
throw new RuntimeException(e);
154+
}
155+
}))
156+
.route(Http2Route.route(GET, "/versionspecific/h2streaming-headers", (req, res) -> {
157+
res.status(Status.OK_200);
158+
int phase = req.headers()
159+
.first(HeaderNames.create("phase"))
160+
.map(Integer::parseInt)
161+
.orElse(0);
162+
for (int i = phase; i < 1000 + phase; i++) {
163+
res.headers().add(HeaderNames.create("test" + i), "test" + i);
164+
}
165+
String execId = req.query().get("execId");
166+
try (OutputStream os = res.outputStream()) {
167+
for (int i = 0; i < 5; i++) {
168+
os.write(String.format(execId + "BAF%03d", i).getBytes());
169+
Thread.sleep(10);
170+
}
171+
} catch (IOException | InterruptedException e) {
172+
throw new RuntimeException(e);
173+
}
174+
}));
155175

156176
serverBuilder
157177
.port(-1)
@@ -253,7 +273,6 @@ void clientPost(String clientType, Supplier<Http2Client> client) {
253273
}
254274
}
255275

256-
// @Disabled("Failing intermittently, to be investigated")
257276
@ParameterizedTest(name = "{0}")
258277
@MethodSource("clientTypes")
259278
void multiplexParallelStreamsGet(String clientType, Supplier<Http2Client> client)
@@ -289,4 +308,43 @@ void multiplexParallelStreamsGet(String clientType, Supplier<Http2Client> client
289308
, CompletableFuture.runAsync(() -> callable.accept(4), executorService)
290309
).get(5, TimeUnit.MINUTES);
291310
}
311+
312+
@ParameterizedTest(name = "{0}")
313+
@MethodSource("clientTypes")
314+
void multiplexParallelStreamsGetWithUniqueHeaders(String clientType, Supplier<Http2Client> client)
315+
throws ExecutionException, InterruptedException, TimeoutException {
316+
317+
Http2Client http2Client = client.get();
318+
Consumer<Integer> callable = id -> {
319+
try (Http2ClientResponse response = http2Client
320+
.get("/h2streaming-headers")
321+
.queryParam("execId", id.toString())
322+
.header(HeaderNames.create("phase"), String.valueOf(id * 1000))
323+
.request()
324+
) {
325+
326+
InputStream is = response.inputStream();
327+
for (int i = 0; ; i++) {
328+
byte[] bytes = is.readNBytes("0BAF000".getBytes().length);
329+
if (bytes.length == 0) {
330+
break;
331+
}
332+
String message = new String(bytes);
333+
assertThat(message, is(String.format(id + "BAF%03d", i)));
334+
}
335+
336+
} catch (IOException e) {
337+
throw new RuntimeException(e);
338+
}
339+
};
340+
341+
CompletableFuture.allOf(
342+
IntStream.range(1, 6)
343+
.boxed()
344+
.map(i -> CompletableFuture
345+
.runAsync(() -> callable.accept(i), executorService))
346+
.toList()
347+
.toArray(new CompletableFuture[0])
348+
).get(5, TimeUnit.MINUTES);
349+
}
292350
}

0 commit comments

Comments
 (0)