Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import io.helidon.common.buffers.BufferData;
Expand All @@ -46,6 +47,7 @@
import io.helidon.http.http2.Http2FrameTypes;
import io.helidon.http.http2.Http2GoAway;
import io.helidon.http.http2.Http2Headers;
import io.helidon.http.http2.Http2HuffmanDecoder;
import io.helidon.http.http2.Http2LoggingFrameListener;
import io.helidon.http.http2.Http2Ping;
import io.helidon.http.http2.Http2RstStream;
Expand Down Expand Up @@ -76,6 +78,7 @@ public class Http2ClientConnection {
private final ConnectionFlowControl connectionFlowControl;
private final Http2Headers.DynamicTable inboundDynamicTable =
Http2Headers.DynamicTable.create(Http2Setting.HEADER_TABLE_SIZE.defaultValue());
private final ReentrantLock inboundDynamicTableLock = new ReentrantLock();
private final Http2ClientProtocolConfig protocolConfig;
private final ClientConnection connection;
private final SocketContext ctx;
Expand Down Expand Up @@ -259,6 +262,32 @@ static Http2Settings settings(Http2ClientProtocolConfig config) {
.build();
}

/**
* Reads the HTTP/2 headers for the specified client stream from this connection.
* Thread-safe: Uses connection inbound dynamic table synchronized per connection.
*
* @param stream the HTTP/2 client stream for which headers are being read
* @param decoder the Huffman decoder to decode the headers
* @param headers the existing headers object to populate or use as a basis
* @param array the array of HTTP/2 frame data to process
* @return the processed HTTP/2 headers
*/
Http2Headers readHeaders(Http2ClientStream stream,
Http2HuffmanDecoder decoder,
Http2Headers headers,
Http2FrameData[] array) {
inboundDynamicTableLock.lock();
try {
return Http2Headers.create(stream,
inboundDynamicTable,
decoder,
headers,
array);
} finally {
inboundDynamicTableLock.unlock();
}
}

private void sendPreface(Http2ClientProtocolConfig config, boolean sendSettings) {
BufferData prefaceData = Http2Util.prefaceData();
sendListener.frame(ctx, 0, prefaceData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,10 @@ private void continue100(Http2Headers headers, boolean endOfStream) {
}

private Http2Headers readHeaders(Http2HuffmanDecoder decoder, boolean mergeWithPrevious) {
Http2Headers http2Headers = Http2Headers.create(this,
connection.getInboundDynamicTable(),
decoder,
mergeWithPrevious && currentHeaders != null
? currentHeaders
: Http2Headers.create(WritableHeaders.create()),
continuationData.toArray(new Http2FrameData[0]));
Http2Headers http2Headers = connection.readHeaders(this, decoder, mergeWithPrevious && currentHeaders != null
? currentHeaders
: Http2Headers.create(WritableHeaders.create()),
continuationData.toArray(new Http2FrameData[0]));
recvListener.headers(ctx, streamId, http2Headers);
return http2Headers;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
* Copyright (c) 2023, 2025 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import io.helidon.common.LazyValue;
Expand Down Expand Up @@ -141,17 +142,36 @@ static void setUpServer(WebServerConfig.Builder serverBuilder) {
res.send("POST " + req.content().as(String.class));
}))
.route(Http2Route.route(GET, "/versionspecific/h2streaming", (req, res) -> {
res.status(Status.OK_200);
String execId = req.query().get("execId");
try (OutputStream os = res.outputStream()) {
for (int i = 0; i < 5; i++) {
os.write(String.format(execId + "BAF%03d", i).getBytes());
Thread.sleep(10);
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}));
res.status(Status.OK_200);
String execId = req.query().get("execId");
try (OutputStream os = res.outputStream()) {
for (int i = 0; i < 5; i++) {
os.write(String.format(execId + "BAF%03d", i).getBytes());
Thread.sleep(10);
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}))
.route(Http2Route.route(GET, "/versionspecific/h2streaming-headers", (req, res) -> {
res.status(Status.OK_200);
int phase = req.headers()
.first(HeaderNames.create("phase"))
.map(Integer::parseInt)
.orElse(0);
for (int i = phase; i < 1000 + phase; i++) {
res.headers().add(HeaderNames.create("test" + i), "test" + i);
}
String execId = req.query().get("execId");
try (OutputStream os = res.outputStream()) {
for (int i = 0; i < 5; i++) {
os.write(String.format(execId + "BAF%03d", i).getBytes());
Thread.sleep(10);
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}));

serverBuilder
.port(-1)
Expand Down Expand Up @@ -253,7 +273,6 @@ void clientPost(String clientType, Supplier<Http2Client> client) {
}
}

// @Disabled("Failing intermittently, to be investigated")
@ParameterizedTest(name = "{0}")
@MethodSource("clientTypes")
void multiplexParallelStreamsGet(String clientType, Supplier<Http2Client> client)
Expand Down Expand Up @@ -289,4 +308,43 @@ void multiplexParallelStreamsGet(String clientType, Supplier<Http2Client> client
, CompletableFuture.runAsync(() -> callable.accept(4), executorService)
).get(5, TimeUnit.MINUTES);
}

@ParameterizedTest(name = "{0}")
@MethodSource("clientTypes")
void multiplexParallelStreamsGetWithUniqueHeaders(String clientType, Supplier<Http2Client> client)
throws ExecutionException, InterruptedException, TimeoutException {

Http2Client http2Client = client.get();
Consumer<Integer> callable = id -> {
try (Http2ClientResponse response = http2Client
.get("/h2streaming-headers")
.queryParam("execId", id.toString())
.header(HeaderNames.create("phase"), String.valueOf(id * 1000))
.request()
) {

InputStream is = response.inputStream();
for (int i = 0; ; i++) {
byte[] bytes = is.readNBytes("0BAF000".getBytes().length);
if (bytes.length == 0) {
break;
}
String message = new String(bytes);
assertThat(message, is(String.format(id + "BAF%03d", i)));
}

} catch (IOException e) {
throw new RuntimeException(e);
}
};

CompletableFuture.allOf(
IntStream.range(1, 6)
.boxed()
.map(i -> CompletableFuture
.runAsync(() -> callable.accept(i), executorService))
.toList()
.toArray(new CompletableFuture[0])
).get(5, TimeUnit.MINUTES);
}
}