Skip to content

Commit fb06ba3

Browse files
committed
gRPC io stub generation based on common service/client interfaces.
Motivation: Provides a gRPC io stub generation implementing the Vert.x client/service contracts that is compatible with gRPC io.
1 parent cd4811b commit fb06ba3

File tree

28 files changed

+2392
-630
lines changed

28 files changed

+2392
-630
lines changed

vertx-grpc-client/src/main/java/io/vertx/grpc/client/impl/GrpcClientImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ protected GrpcClientImpl(Vertx vertx, GrpcClientOptions grpcOptions, HttpClient
5858
this.closeClient = close;
5959
}
6060

61+
public Vertx vertx() {
62+
return vertx;
63+
}
64+
6165
public Future<GrpcClientRequest<Buffer, Buffer>> request(RequestOptions options) {
6266
return client.request(options)
6367
.map(httpRequest -> {

vertx-grpc-docs/src/main/java/examples/GrpcServerExamples.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ public void messageLevelAPI(GrpcServer server) {
256256
}
257257

258258
public void unaryStub1(GrpcServer server) {
259-
GreeterService stub = new GreeterService() {
259+
GreeterService service = new GreeterService() {
260260
@Override
261261
public Future<HelloReply> sayHello(HelloRequest request) {
262262
return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
@@ -265,24 +265,24 @@ public Future<HelloReply> sayHello(HelloRequest request) {
265265
}
266266

267267
public void unaryStub2(GrpcServer server) {
268-
GreeterService stub = new GreeterService() {
268+
GreeterService service = new GreeterService() {
269269
@Override
270270
public void sayHello(HelloRequest request, Completable<HelloReply> response) {
271271
response.succeed(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
272272
}
273273
};
274274
}
275275

276-
public void unaryStub3(GreeterGrpcService stub, GrpcServer server) {
277-
server.addService(stub);
276+
public void unaryStub3(GreeterGrpcService service, GrpcServer server) {
277+
server.addService(service);
278278
}
279279

280-
public void unaryStub4(GreeterGrpcService stub, GrpcServer server) {
281-
server.addService(GreeterGrpcService.of(stub));
280+
public void unaryStub4(GreeterService service, GrpcServer server) {
281+
server.addService(GreeterGrpcService.of(service));
282282
}
283283

284284
public void streamingRequestStub(GrpcServer server) {
285-
StreamingGrpcService stub = new StreamingGrpcService() {
285+
StreamingGrpcService service = new StreamingGrpcService() {
286286
@Override
287287
public void sink(ReadStream<Item> stream, Completable<Empty> response) {
288288
stream.handler(item -> {
@@ -292,15 +292,15 @@ public void sink(ReadStream<Item> stream, Completable<Empty> response) {
292292
stream.endHandler(v -> response.succeed(Empty.getDefaultInstance()));
293293
}
294294
};
295-
server.addService(stub);
295+
server.addService(service);
296296
}
297297

298298
private Future<ReadStream<Item>> streamOfItems() {
299299
throw new UnsupportedOperationException();
300300
}
301301

302302
public void streamingResponseStub1() {
303-
StreamingService stub = new StreamingService() {
303+
StreamingService service = new StreamingService() {
304304
@Override
305305
public Future<ReadStream<Item>> source(Empty request) {
306306
return streamOfItems();
@@ -309,7 +309,7 @@ public Future<ReadStream<Item>> source(Empty request) {
309309
}
310310

311311
public void streamingResponseStub2() {
312-
StreamingService stub = new StreamingService() {
312+
StreamingService service = new StreamingService() {
313313
@Override
314314
public void source(Empty request, WriteStream<Item> response) {
315315
response.write(Item.newBuilder().setValue("value-1").build());
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package examples.grpc;
2+
3+
import static examples.grpc.GreeterGrpc.getServiceDescriptor;
4+
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
5+
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
6+
import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
7+
import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall;
8+
9+
import io.grpc.ClientCall;
10+
11+
import io.grpc.stub.StreamObserver;
12+
13+
import io.vertx.grpcio.client.GrpcIoClientChannel;
14+
import io.vertx.grpcio.client.impl.GrpcIoClientImpl;
15+
16+
public final class GreeterGrpcIo {
17+
private GreeterGrpcIo() {}
18+
19+
public static GreeterStub newStub(io.vertx.grpcio.client.GrpcIoClient client, io.vertx.core.net.SocketAddress socketAddress) {
20+
return newStub(new io.vertx.grpcio.client.GrpcIoClientChannel(client, socketAddress));
21+
}
22+
23+
public static GreeterStub newStub(io.grpc.Channel channel) {
24+
return new GreeterStub(channel);
25+
}
26+
27+
28+
public static final class GreeterStub extends io.grpc.stub.AbstractStub<GreeterStub> implements GreeterClient {
29+
private final io.vertx.core.internal.ContextInternal context;
30+
private GreeterGrpc.GreeterStub delegateStub;
31+
32+
private GreeterStub(io.grpc.Channel channel) {
33+
super(channel);
34+
this.delegateStub = GreeterGrpc.newStub(channel);
35+
this.context = (io.vertx.core.internal.ContextInternal) ((GrpcIoClientImpl)((GrpcIoClientChannel)getChannel()).client()).vertx().getOrCreateContext();
36+
}
37+
38+
private GreeterStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
39+
super(channel, callOptions);
40+
delegateStub = GreeterGrpc.newStub(channel).build(channel, callOptions);
41+
this.context = (io.vertx.core.internal.ContextInternal) ((GrpcIoClientImpl)((GrpcIoClientChannel)getChannel()).client()).vertx().getOrCreateContext();
42+
}
43+
44+
@Override
45+
protected GreeterStub build(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
46+
return new GreeterStub(channel, callOptions);
47+
}
48+
49+
50+
public io.vertx.core.Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request) {
51+
return io.vertx.grpcio.common.impl.stub.ClientCalls.oneToOne(context, request, delegateStub::sayHello);
52+
}
53+
54+
}
55+
56+
public static io.vertx.grpc.server.Service of(GreeterService service) {
57+
String compression = null;
58+
return io.vertx.grpcio.server.GrpcIoServiceBridge.bridge(io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
59+
.addMethod(
60+
examples.grpc.GreeterGrpc.getSayHelloMethod(),
61+
asyncUnaryCall(
62+
new MethodHandlers<
63+
examples.grpc.HelloRequest,
64+
examples.grpc.HelloReply>(
65+
service, METHODID_SAY_HELLO, compression)))
66+
.build());
67+
}
68+
69+
private static final int METHODID_SAY_HELLO = 0;
70+
71+
private static final class MethodHandlers<Req, Resp> implements
72+
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
73+
io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
74+
io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
75+
io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
76+
77+
private final GreeterService serviceImpl;
78+
private final int methodId;
79+
private final String compression;
80+
81+
MethodHandlers(GreeterService serviceImpl, int methodId, String compression) {
82+
this.serviceImpl = serviceImpl;
83+
this.methodId = methodId;
84+
this.compression = compression;
85+
}
86+
87+
@java.lang.Override
88+
@java.lang.SuppressWarnings("unchecked")
89+
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
90+
switch (methodId) {
91+
case METHODID_SAY_HELLO:
92+
io.vertx.grpcio.common.impl.stub.ServerCalls.<examples.grpc.HelloRequest, examples.grpc.HelloReply>oneToOne(
93+
(io.vertx.core.internal.ContextInternal) io.vertx.core.Vertx.currentContext(),
94+
(examples.grpc.HelloRequest) request,
95+
(io.grpc.stub.StreamObserver<examples.grpc.HelloReply>) responseObserver,
96+
compression,
97+
serviceImpl::sayHello);
98+
break;
99+
default:
100+
throw new java.lang.AssertionError();
101+
}
102+
}
103+
104+
@java.lang.Override
105+
@java.lang.SuppressWarnings("unchecked")
106+
public io.grpc.stub.StreamObserver<Req> invoke(io.grpc.stub.StreamObserver<Resp> responseObserver) {
107+
StreamObserver<Req> reqStreamObserver;
108+
switch (methodId) {
109+
default:
110+
throw new java.lang.AssertionError();
111+
}
112+
}
113+
}
114+
}

vertx-grpc-docs/src/main/java/examples/grpc/GreeterGrpcService.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ public void bind(GrpcServer server) {
5757
builder(this).bind(all()).build().bind(server);
5858
}
5959

60+
/**
61+
* @return a service binding all methods of the given {@code service}
62+
*/
63+
public static Service of(GreeterService service) {
64+
return builder(service).bind(all()).build();
65+
}
66+
6067
/**
6168
* SayHello protobuf RPC server service method.
6269
*/
@@ -115,13 +122,6 @@ public static Service of(GreeterService service) {
115122
}
116123
}
117124

118-
/**
119-
* @return a service binding all methods of the given {@code service}
120-
*/
121-
public static Service of(GreeterService service) {
122-
return builder(service).bind(all()).build();
123-
}
124-
125125
/**
126126
* @return a free form builder that gives the opportunity to bind only certain methods of a service
127127
*/
@@ -219,13 +219,13 @@ private void handle_sayHello(io.vertx.grpc.server.GrpcServerRequest<examples.grp
219219
if (err == null) {
220220
request.response().end(res);
221221
} else {
222-
request.response().status(GrpcStatus.INTERNAL).end();
222+
request.response().status(GrpcStatus.UNKNOWN).end();
223223
}
224224
});
225225
} catch (UnsupportedOperationException err) {
226226
request.response().status(GrpcStatus.UNIMPLEMENTED).end();
227227
} catch (RuntimeException err) {
228-
request.response().status(GrpcStatus.INTERNAL).end();
228+
request.response().status(GrpcStatus.UNKNOWN).end();
229229
}
230230
});
231231
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package examples.grpc;
2+
3+
import static examples.grpc.GreeterGrpc.getServiceDescriptor;
4+
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
5+
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
6+
import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
7+
import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall;
8+
9+
import io.grpc.ClientCall;
10+
11+
import io.grpc.stub.StreamObserver;
12+
13+
import io.vertx.grpcio.client.GrpcIoClientChannel;
14+
import io.vertx.grpcio.client.impl.GrpcIoClientImpl;
15+
16+
public final class GreeterGrpcStub {
17+
private GreeterGrpcStub() {}
18+
19+
public static GreeterVertxStub newVertxStub(io.vertx.grpcio.client.GrpcIoClient client, io.vertx.core.net.SocketAddress socketAddress) {
20+
return newVertxStub(new io.vertx.grpcio.client.GrpcIoClientChannel(client, socketAddress));
21+
}
22+
23+
public static GreeterVertxStub newVertxStub(io.grpc.Channel channel) {
24+
return new GreeterVertxStub(channel);
25+
}
26+
27+
28+
public static final class GreeterVertxStub extends io.grpc.stub.AbstractStub<GreeterVertxStub> implements GreeterClient {
29+
private final io.vertx.core.internal.ContextInternal context;
30+
private GreeterGrpc.GreeterStub delegateStub;
31+
32+
private GreeterVertxStub(io.grpc.Channel channel) {
33+
super(channel);
34+
delegateStub = GreeterGrpc.newStub(channel);
35+
this.context = (io.vertx.core.internal.ContextInternal) ((GrpcIoClientImpl)((GrpcIoClientChannel)getChannel()).client()).vertx().getOrCreateContext();
36+
}
37+
38+
private GreeterVertxStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
39+
super(channel, callOptions);
40+
delegateStub = GreeterGrpc.newStub(channel).build(channel, callOptions);
41+
this.context = (io.vertx.core.internal.ContextInternal) ((GrpcIoClientImpl)((GrpcIoClientChannel)getChannel()).client()).vertx().getOrCreateContext();
42+
}
43+
44+
@Override
45+
protected GreeterVertxStub build(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
46+
return new GreeterVertxStub(channel, callOptions);
47+
}
48+
49+
50+
public io.vertx.core.Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request) {
51+
return io.vertx.grpcio.common.impl.stub.ClientCalls.oneToOne(context, request, delegateStub::sayHello);
52+
}
53+
54+
}
55+
56+
public static io.vertx.grpc.server.Service of(GreeterService service) {
57+
String compression = null;
58+
return io.vertx.grpcio.server.GrpcIoServiceBridge.bridge(io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
59+
.addMethod(
60+
examples.grpc.GreeterGrpc.getSayHelloMethod(),
61+
asyncUnaryCall(
62+
new MethodHandlers<
63+
examples.grpc.HelloRequest,
64+
examples.grpc.HelloReply>(
65+
service, METHODID_SAY_HELLO, compression)))
66+
.build());
67+
}
68+
69+
private static final int METHODID_SAY_HELLO = 0;
70+
71+
private static final class MethodHandlers<Req, Resp> implements
72+
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
73+
io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
74+
io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
75+
io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
76+
77+
private final GreeterService serviceImpl;
78+
private final int methodId;
79+
private final String compression;
80+
81+
MethodHandlers(GreeterService serviceImpl, int methodId, String compression) {
82+
this.serviceImpl = serviceImpl;
83+
this.methodId = methodId;
84+
this.compression = compression;
85+
}
86+
87+
@java.lang.Override
88+
@java.lang.SuppressWarnings("unchecked")
89+
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
90+
switch (methodId) {
91+
case METHODID_SAY_HELLO:
92+
io.vertx.grpcio.common.impl.stub.ServerCalls.<examples.grpc.HelloRequest, examples.grpc.HelloReply>oneToOne(
93+
(io.vertx.core.internal.ContextInternal) io.vertx.core.Vertx.currentContext(),
94+
(examples.grpc.HelloRequest) request,
95+
(io.grpc.stub.StreamObserver<examples.grpc.HelloReply>) responseObserver,
96+
compression,
97+
serviceImpl::sayHello);
98+
break;
99+
default:
100+
throw new java.lang.AssertionError();
101+
}
102+
}
103+
104+
@java.lang.Override
105+
@java.lang.SuppressWarnings("unchecked")
106+
public io.grpc.stub.StreamObserver<Req> invoke(io.grpc.stub.StreamObserver<Resp> responseObserver) {
107+
StreamObserver<Req> reqStreamObserver;
108+
switch (methodId) {
109+
default:
110+
throw new java.lang.AssertionError();
111+
}
112+
}
113+
}
114+
}

0 commit comments

Comments
 (0)