Skip to content

Commit c6b7d6c

Browse files
authored
Proper handling of ACK frames and long gRPC data frames (#10303)
* Do not ack or process an HTTP/2 SETTINGS frame with the ACK flag on. * Handle gRPC data frames delivered in two or more HTTP/2 frames in the gRPC client. * Share gRPC reading code in base class. * Add new test that verifies gRPC client can read large gRPC frames.
1 parent cb67955 commit c6b7d6c

File tree

6 files changed

+209
-44
lines changed

6 files changed

+209
-44
lines changed

webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcBaseClientCall.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@
3030

3131
import io.helidon.common.LazyValue;
3232
import io.helidon.common.buffers.BufferData;
33+
import io.helidon.common.buffers.CompositeBufferData;
3334
import io.helidon.common.socket.HelidonSocket;
3435
import io.helidon.grpc.core.GrpcHeadersUtil;
3536
import io.helidon.http.Header;
3637
import io.helidon.http.HeaderNames;
3738
import io.helidon.http.HeaderValues;
3839
import io.helidon.http.WritableHeaders;
40+
import io.helidon.http.http2.Http2FrameData;
3941
import io.helidon.http.http2.Http2Headers;
4042
import io.helidon.http.http2.Http2Settings;
4143
import io.helidon.http.http2.Http2StreamState;
@@ -56,6 +58,7 @@
5658
import io.helidon.webclient.http2.Http2ClientConnection;
5759
import io.helidon.webclient.http2.Http2ClientImpl;
5860
import io.helidon.webclient.http2.Http2StreamConfig;
61+
import io.helidon.webclient.http2.StreamTimeoutException;
5962

6063
import io.grpc.CallOptions;
6164
import io.grpc.ClientCall;
@@ -64,6 +67,7 @@
6467

6568
import static io.helidon.metrics.api.Meter.Scope.VENDOR;
6669
import static java.lang.System.Logger.Level.DEBUG;
70+
import static java.lang.System.Logger.Level.ERROR;
6771

6872
/**
6973
* Base class for gRPC client calls.
@@ -194,6 +198,58 @@ static WritableHeaders<?> setupHeaders(Metadata metadata, String authority, Stri
194198

195199
abstract void startStreamingThreads();
196200

201+
/**
202+
* Read a single gRPC frame, possibly assembled from multiple HTTP/2 frames.
203+
*
204+
* @return data for gRPC frame or {@code null}
205+
*/
206+
protected BufferData readGrpcFrame() {
207+
// attempt to read HTTP/2 frame
208+
Http2FrameData frameData;
209+
try {
210+
frameData = clientStream.readOne(pollWaitTime());
211+
} catch (StreamTimeoutException e) {
212+
handleStreamTimeout(e);
213+
return null;
214+
}
215+
if (frameData == null) {
216+
return null;
217+
}
218+
219+
// read more HTTP/2 frames if long gRPC frame
220+
BufferData bufferData = frameData.data();
221+
bufferData.read(); // skip compression
222+
long grpcLength = bufferData.readUnsignedInt32(); // length prefixed
223+
grpcLength -= bufferData.available();
224+
225+
if (grpcLength > 0) {
226+
// collect frames in composite buffer
227+
CompositeBufferData compositeBuffer = BufferData.createComposite(bufferData);
228+
do {
229+
try {
230+
frameData = clientStream.readOne(pollWaitTime());
231+
} catch (StreamTimeoutException e) {
232+
handleStreamTimeout(e);
233+
continue;
234+
}
235+
if (frameData == null) {
236+
continue;
237+
}
238+
239+
bufferData = frameData.data();
240+
compositeBuffer.add(bufferData);
241+
grpcLength -= bufferData.available();
242+
} while (grpcLength > 0);
243+
244+
// switch to composite buffer
245+
bufferData = compositeBuffer;
246+
}
247+
248+
// rewind and return
249+
bufferData.rewind();
250+
return bufferData;
251+
}
252+
197253
/**
198254
* Unary blocking calls that use stubs provide their own executor which needs
199255
* to be used at least once to unblock the calling thread and complete the
@@ -326,6 +382,14 @@ private ClientUri nextClientUri() {
326382
: clientUriSupplier.next();
327383
}
328384

385+
protected void handleStreamTimeout(StreamTimeoutException e) {
386+
if (abortPollTimeExpired()) {
387+
socket().log(LOGGER, ERROR, "[Reading thread] HTTP/2 stream timeout, aborting");
388+
throw e;
389+
}
390+
socket().log(LOGGER, ERROR, "[Reading thread] HTTP/2 stream timeout, retrying");
391+
}
392+
329393
protected void initMetrics() {
330394
String baseUri = grpcChannel.baseUri().toString();
331395
String methodName = methodDescriptor.getFullMethodName();

webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.concurrent.atomic.AtomicInteger;
2727

2828
import io.helidon.common.buffers.BufferData;
29-
import io.helidon.http.http2.Http2FrameData;
3029
import io.helidon.webclient.http2.StreamTimeoutException;
3130

3231
import io.grpc.CallOptions;
@@ -109,7 +108,6 @@ protected void startStreamingThreads() {
109108
try {
110109
startWriteBarrier.await();
111110
socket().log(LOGGER, DEBUG, "[Heartbeat thread] started with period " + period);
112-
113111
while (isRemoteOpen()) {
114112
Thread.sleep(period);
115113
if (sendingQueue.isEmpty()) {
@@ -182,7 +180,6 @@ protected void startStreamingThreads() {
182180

183181
// read data from stream
184182
while (isRemoteOpen()) {
185-
// drain queue
186183
drainReceivingQueue();
187184

188185
// trailers or eos received?
@@ -191,23 +188,18 @@ protected void startStreamingThreads() {
191188
break;
192189
}
193190

194-
// attempt to read and queue
195-
Http2FrameData frameData;
196-
try {
197-
frameData = clientStream().readOne(pollWaitTime());
198-
} catch (StreamTimeoutException e) {
199-
handleStreamTimeout(e);
191+
// read complete gRPC data
192+
BufferData bufferData = readGrpcFrame();
193+
if (bufferData == null) {
200194
continue;
201195
}
202-
if (frameData != null) {
203-
BufferData bufferData = frameData.data();
204-
// update bytes received excluding prefix
205-
if (enableMetrics()) {
206-
bytesRcvd().addAndGet(bufferData.available() - DATA_PREFIX_LENGTH);
207-
}
208-
receivingQueue.add(bufferData);
209-
socket().log(LOGGER, DEBUG, "[Reading thread] adding bufferData to receiving queue");
196+
197+
// update bytes received excluding prefix
198+
if (enableMetrics()) {
199+
bytesRcvd().addAndGet(bufferData.available() - DATA_PREFIX_LENGTH);
210200
}
201+
receivingQueue.add(bufferData);
202+
socket().log(LOGGER, DEBUG, "[Reading thread] adding bufferData to receiving queue");
211203
}
212204

213205
socket().log(LOGGER, DEBUG, "[Reading thread] closing listener");
@@ -250,12 +242,4 @@ private void drainReceivingQueue() {
250242
responseListener().onMessage(res);
251243
}
252244
}
253-
254-
private void handleStreamTimeout(StreamTimeoutException e) {
255-
if (abortPollTimeExpired()) {
256-
socket().log(LOGGER, ERROR, "[Reading thread] HTTP/2 stream timeout, aborting");
257-
throw e;
258-
}
259-
socket().log(LOGGER, ERROR, "[Reading thread] HTTP/2 stream timeout, retrying");
260-
}
261245
}

webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcUnaryClientCall.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,12 @@
1919
import java.time.Duration;
2020

2121
import io.helidon.common.buffers.BufferData;
22-
import io.helidon.http.http2.Http2FrameData;
23-
import io.helidon.webclient.http2.StreamTimeoutException;
2422

2523
import io.grpc.CallOptions;
2624
import io.grpc.MethodDescriptor;
2725
import io.grpc.Status;
2826

2927
import static java.lang.System.Logger.Level.DEBUG;
30-
import static java.lang.System.Logger.Level.ERROR;
3128

3229
/**
3330
* An implementation of a unary gRPC call. Expects:
@@ -102,22 +99,9 @@ public void sendMessage(ReqT message) {
10299
break;
103100
}
104101

105-
// attempt to read and queue
106-
Http2FrameData frameData;
107-
try {
108-
frameData = clientStream().readOne(pollWaitTime());
109-
} catch (StreamTimeoutException e) {
110-
// abort or retry based on config settings
111-
if (abortPollTimeExpired()) {
112-
socket().log(LOGGER, ERROR, "[Reading thread] HTTP/2 stream timeout, aborting");
113-
responseListener().onClose(Status.DEADLINE_EXCEEDED, EMPTY_METADATA);
114-
break;
115-
}
116-
socket().log(LOGGER, ERROR, "[Reading thread] HTTP/2 stream timeout, retrying");
117-
continue;
118-
}
119-
if (frameData != null) {
120-
BufferData bufferData = frameData.data();
102+
// read single gRPC frame
103+
BufferData bufferData = readGrpcFrame();
104+
if (bufferData != null) {
121105
socket().log(LOGGER, DEBUG, "response received");
122106

123107
// update bytes received excluding prefix

webclient/http2/src/main/java/io/helidon/webclient/http2/Http2ClientConnection.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,17 @@ private boolean handle() {
375375
http2GoAway.lastStreamId());
376376
return false;
377377
case SETTINGS:
378+
Http2Flag.SettingsFlags flags = frameHeader.flags(Http2FrameTypes.SETTINGS);
379+
380+
// if ack flag set, empty frame and no processing
381+
if (flags.ack()) {
382+
if (frameHeader.length() > 0) {
383+
throw new Http2Exception(Http2ErrorCode.FRAME_SIZE,
384+
"Settings with ACK should not have payload.");
385+
}
386+
return true;
387+
}
388+
378389
serverSettings = Http2Settings.create(data);
379390
recvListener.frameHeader(ctx, streamId, frameHeader);
380391
recvListener.frame(ctx, streamId, serverSettings);
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright (c) 2025 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
18+
syntax = "proto3";
19+
option java_package = "io.helidon.webclient.grpc.tests";
20+
21+
import "google/protobuf/empty.proto";
22+
23+
service DownloadService {
24+
rpc Download (google.protobuf.Empty) returns (DownloadResponse);
25+
}
26+
27+
message DownloadResponse {
28+
repeated bytes data = 1;
29+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright (c) 2025 Oracle and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.helidon.webclient.grpc.tests;
18+
19+
import java.util.Arrays;
20+
21+
import io.helidon.common.tls.TlsConfig;
22+
import io.helidon.webclient.api.WebClient;
23+
import io.helidon.webclient.grpc.GrpcClient;
24+
import io.helidon.webserver.Router;
25+
import io.helidon.webserver.WebServer;
26+
import io.helidon.webserver.grpc.GrpcRouting;
27+
import io.helidon.webserver.grpc.GrpcService;
28+
import io.helidon.webserver.testing.junit5.ServerTest;
29+
import io.helidon.webserver.testing.junit5.SetUpRoute;
30+
31+
import com.google.protobuf.ByteString;
32+
import com.google.protobuf.Descriptors;
33+
import com.google.protobuf.Empty;
34+
import io.grpc.stub.StreamObserver;
35+
import org.hamcrest.MatcherAssert;
36+
import org.junit.jupiter.api.Test;
37+
38+
import static org.hamcrest.CoreMatchers.is;
39+
import static org.hamcrest.MatcherAssert.assertThat;
40+
41+
@ServerTest
42+
class DownloadServiceTest {
43+
44+
private final WebClient webClient;
45+
46+
DownloadServiceTest(WebServer server) {
47+
this.webClient = WebClient.builder()
48+
.baseUri("http://localhost:" + server.port())
49+
.tls(TlsConfig.builder().enabled(false).build())
50+
.build();
51+
}
52+
53+
@SetUpRoute
54+
static void routing(Router.RouterBuilder<?> router) {
55+
router.addRouting(GrpcRouting.builder().service(new DownloadService()));
56+
}
57+
58+
@Test
59+
void testDownload() {
60+
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
61+
DownloadServiceGrpc.DownloadServiceBlockingStub stub = DownloadServiceGrpc.newBlockingStub(grpcClient.channel());
62+
Downloads.DownloadResponse res = stub.download(Empty.getDefaultInstance());
63+
MatcherAssert.assertThat(res.getDataCount(), is(1));
64+
ByteString byteString = res.getData(0);
65+
assertThat(byteString.size(), is(40 * 1024));
66+
}
67+
68+
static class DownloadService implements GrpcService {
69+
70+
private static final byte[] DATA = new byte[40 * 1024];
71+
72+
static {
73+
Arrays.fill(DATA, (byte) 'A');
74+
}
75+
76+
@Override
77+
public Descriptors.FileDescriptor proto() {
78+
return Downloads.getDescriptor();
79+
}
80+
81+
@Override
82+
public void update(Routing router) {
83+
router.unary("Download", this::download);
84+
}
85+
86+
private void download(Empty request, StreamObserver<Downloads.DownloadResponse> observer) {
87+
Downloads.DownloadResponse.Builder builder = Downloads.DownloadResponse.newBuilder();
88+
builder.addData(ByteString.copyFrom(DATA));
89+
observer.onNext(builder.build());
90+
observer.onCompleted();
91+
}
92+
}
93+
}

0 commit comments

Comments
 (0)