Skip to content

Commit 80be146

Browse files
authored
4.x: Improves support for new gRPC V2 blocking stub calls (#10453)
* Improves support for V2 blocking stub calls. Handles use case of messages in receiving queue that need to be delivered after all data from the server stream is consumed. Some new tests. Use a configurable duration that waits for the listener to request more data when the receiving queue is not empty. If data is received within that period, and more data is in the queue, then the duration resets.
1 parent 3b70e1f commit 80be146

File tree

3 files changed

+86
-14
lines changed

3 files changed

+86
-14
lines changed

webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientCall.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import java.util.concurrent.ExecutorService;
2323
import java.util.concurrent.Future;
2424
import java.util.concurrent.LinkedBlockingQueue;
25+
import java.util.concurrent.Semaphore;
2526
import java.util.concurrent.TimeUnit;
26-
import java.util.concurrent.atomic.AtomicInteger;
2727

2828
import io.helidon.common.buffers.BufferData;
2929
import io.helidon.webclient.http2.StreamTimeoutException;
@@ -36,9 +36,7 @@
3636
import static java.lang.System.Logger.Level.ERROR;
3737

3838
/**
39-
* An implementation of a gRPC call. Expects:
40-
* <p>
41-
* start (request | sendMessage)* (halfClose | cancel)
39+
* An implementation of a gRPC call.
4240
*
4341
* @param <ReqT> request type
4442
* @param <ResT> response type
@@ -47,7 +45,7 @@ class GrpcClientCall<ReqT, ResT> extends GrpcBaseClientCall<ReqT, ResT> {
4745
private static final System.Logger LOGGER = System.getLogger(GrpcClientCall.class.getName());
4846

4947
private final ExecutorService executor;
50-
private final AtomicInteger messageRequest = new AtomicInteger();
48+
private final Semaphore messageRequest = new Semaphore(0);
5149

5250
private final LinkedBlockingQueue<BufferData> sendingQueue = new LinkedBlockingQueue<>();
5351
private final LinkedBlockingQueue<BufferData> receivingQueue = new LinkedBlockingQueue<>();
@@ -67,7 +65,7 @@ class GrpcClientCall<ReqT, ResT> extends GrpcBaseClientCall<ReqT, ResT> {
6765
@Override
6866
public void request(int numMessages) {
6967
socket().log(LOGGER, DEBUG, "request called %d", numMessages);
70-
messageRequest.addAndGet(numMessages);
68+
messageRequest.release(numMessages);
7169
startReadBarrier.countDown();
7270
}
7371

@@ -202,8 +200,24 @@ protected void startStreamingThreads() {
202200
socket().log(LOGGER, DEBUG, "[Reading thread] adding bufferData to receiving queue");
203201
}
204202

205-
socket().log(LOGGER, DEBUG, "[Reading thread] closing listener");
206-
responseListener().onClose(Status.OK, EMPTY_METADATA);
203+
// attempt to drain our receiving queue if permits arrive on time
204+
Status status = Status.OK;
205+
if (!receivingQueue.isEmpty()) {
206+
Duration waitTime = grpcClient().prototype().protocolConfig().nextRequestWaitTime();
207+
do {
208+
if (messageRequest.tryAcquire(waitTime.toNanos(), TimeUnit.NANOSECONDS)) {
209+
ResT res = toResponse(receivingQueue.remove());
210+
responseListener().onMessage(res);
211+
} else {
212+
socket().log(LOGGER, DEBUG, "[Reading thread] unable to drain receiving queue");
213+
status = Status.CANCELLED;
214+
break; // wait time expired
215+
}
216+
} while (!receivingQueue.isEmpty());
217+
}
218+
219+
// report onClose call with status
220+
responseListener().onClose(status, EMPTY_METADATA);
207221
} catch (StreamTimeoutException e) {
208222
responseListener().onClose(Status.DEADLINE_EXCEEDED, EMPTY_METADATA);
209223
} catch (Throwable e) {
@@ -236,8 +250,7 @@ private void close() {
236250

237251
private void drainReceivingQueue() {
238252
socket().log(LOGGER, DEBUG, "[Reading thread] draining receiving queue");
239-
while (messageRequest.get() > 0 && !receivingQueue.isEmpty()) {
240-
messageRequest.getAndDecrement();
253+
while (!receivingQueue.isEmpty() && messageRequest.tryAcquire()) {
241254
ResT res = toResponse(receivingQueue.remove());
242255
responseListener().onMessage(res);
243256
}

webclient/grpc/src/main/java/io/helidon/webclient/grpc/GrpcClientProtocolConfigBlueprint.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024 Oracle and/or its affiliates.
2+
* Copyright (c) 2024, 2025 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -92,4 +92,19 @@ default String type() {
9292
@Option.Configured
9393
@Option.Default("2048")
9494
int initBufferSize();
95+
96+
/**
97+
* When data has been received from the server but not yet requested by the client
98+
* (i.e., listener), the implementation will wait for this duration before signaling
99+
* an error. If data is requested and more data is still in the queue, this time
100+
* wait restarts until the next request is received. If duration expires, a
101+
* status of {@link io.grpc.Status#CANCELLED} is reported in the call to
102+
* {@link io.grpc.ClientCall.Listener#onClose(io.grpc.Status, io.grpc.Metadata)}.
103+
*
104+
* @return duration to wait for the next data request from listener
105+
* @see io.grpc.ClientCall.Listener#request(int)
106+
*/
107+
@Option.Configured
108+
@Option.Default("PT1S")
109+
Duration nextRequestWaitTime();
95110
}

webclient/tests/grpc/src/test/java/io/helidon/webclient/grpc/tests/GrpcStubTest.java

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024 Oracle and/or its affiliates.
2+
* Copyright (c) 2024, 2025 Oracle and/or its affiliates.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,10 +31,12 @@
3131
import io.helidon.webclient.grpc.GrpcClient;
3232
import io.helidon.webserver.WebServer;
3333
import io.helidon.webserver.testing.junit5.ServerTest;
34-
import org.junit.jupiter.api.Assertions;
35-
import org.junit.jupiter.api.Test;
3634

35+
import io.grpc.StatusException;
36+
import io.grpc.stub.BlockingClientCall;
3737
import io.grpc.stub.StreamObserver;
38+
import org.junit.jupiter.api.Assertions;
39+
import org.junit.jupiter.api.Test;
3840

3941
import static org.hamcrest.CoreMatchers.is;
4042
import static org.hamcrest.MatcherAssert.assertThat;
@@ -119,6 +121,16 @@ void testServerStreamingSplitAsync() throws ExecutionException, InterruptedExcep
119121
assertThat(res.hasNext(), is(false));
120122
}
121123

124+
@Test
125+
void testServerStreamingSplitV2() throws InterruptedException, StatusException, TimeoutException {
126+
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
127+
StringServiceGrpc.StringServiceBlockingV2Stub service = StringServiceGrpc.newBlockingV2Stub(grpcClient.channel());
128+
BlockingClientCall<?, Strings.StringMessage> call = service.split(newStringMessage("hello world"));
129+
assertThat(call.read(100, TimeUnit.MILLISECONDS).getText(), is("hello"));
130+
assertThat(call.read(100, TimeUnit.MILLISECONDS).getText(), is("world"));
131+
assertThat(call.hasNext(), is(false));
132+
}
133+
122134
@Test
123135
void testClientStreamingJoinAsync() throws ExecutionException, InterruptedException, TimeoutException {
124136
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
@@ -132,6 +144,18 @@ void testClientStreamingJoinAsync() throws ExecutionException, InterruptedExcept
132144
assertThat(res.getText(), is("hello world"));
133145
}
134146

147+
@Test
148+
void testClientStreamingJoinV2() throws InterruptedException, TimeoutException, StatusException {
149+
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
150+
StringServiceGrpc.StringServiceBlockingV2Stub service = StringServiceGrpc.newBlockingV2Stub(grpcClient.channel());
151+
BlockingClientCall<Strings.StringMessage, Strings.StringMessage> call = service.join();
152+
call.write(newStringMessage("hello"));
153+
call.write(newStringMessage("world"));
154+
call.halfClose();
155+
Strings.StringMessage res = call.read(100, TimeUnit.MILLISECONDS);
156+
assertThat(res.getText(), is("hello world"));
157+
}
158+
135159
@Test
136160
void testBidirectionalEchoAsync() throws ExecutionException, InterruptedException, TimeoutException {
137161
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
@@ -147,6 +171,26 @@ void testBidirectionalEchoAsync() throws ExecutionException, InterruptedExceptio
147171
assertThat(res.hasNext(), is(false));
148172
}
149173

174+
@Test
175+
void testBidirectionalEchoV2() throws InterruptedException, TimeoutException, StatusException, ExecutionException {
176+
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);
177+
StringServiceGrpc.StringServiceBlockingV2Stub service = StringServiceGrpc.newBlockingV2Stub(grpcClient.channel());
178+
BlockingClientCall<Strings.StringMessage, Strings.StringMessage> call = service.echo();
179+
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
180+
try {
181+
assertThat(call.read(100, TimeUnit.MILLISECONDS).getText(), is("hello"));
182+
assertThat(call.read(100, TimeUnit.MILLISECONDS).getText(), is("world"));
183+
assertThat(call.hasNext(), is(false));
184+
} catch (InterruptedException | TimeoutException | StatusException e) {
185+
throw new RuntimeException(e);
186+
}
187+
});
188+
assertThat(call.write(newStringMessage("hello")), is(true));
189+
assertThat(call.write(newStringMessage("world")), is(true));
190+
call.halfClose();
191+
future.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
192+
}
193+
150194
@Test
151195
void testBidirectionalEchoAsyncEmpty() throws ExecutionException, InterruptedException, TimeoutException {
152196
GrpcClient grpcClient = webClient.client(GrpcClient.PROTOCOL);

0 commit comments

Comments
 (0)