Skip to content

Commit bb46c58

Browse files
committed
Grpc io generation based on common service/client interfaces.
1 parent e67d3fe commit bb46c58

File tree

20 files changed

+1611
-556
lines changed

20 files changed

+1611
-556
lines changed

vertx-grpc-docs/src/main/java/examples/grpc/GreeterGrpcService.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -243,19 +243,22 @@ private <Req, Resp> Handler<io.vertx.grpc.server.GrpcServerRequest<Req, Resp>> r
243243

244244

245245
private void handle_sayHello(io.vertx.grpc.server.GrpcServerRequest<examples.grpc.HelloRequest, examples.grpc.HelloReply> request) {
246-
Promise<examples.grpc.HelloReply> promise = Promise.promise();
247246
request.handler(msg -> {
248247
try {
249-
instance.sayHello(msg, promise);
248+
instance.sayHello(msg, (res, err) -> {
249+
if (err == null) {
250+
request.response().end(res);
251+
} else {
252+
request.response().status(GrpcStatus.INTERNAL).end();
253+
}
254+
});
255+
} catch (UnsupportedOperationException err) {
256+
request.response().status(GrpcStatus.UNIMPLEMENTED).end();
250257
} catch (RuntimeException err) {
251-
promise.tryFail(err);
258+
request.response().status(GrpcStatus.INTERNAL).end();
252259
}
253260
});
254-
promise.future()
255-
.onFailure(err -> request.response().status(GrpcStatus.INTERNAL).end())
256-
.onSuccess(resp -> request.response().end(resp));
257261
}
258-
259262
}
260263
}
261264
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package examples.grpc;
2+
3+
import static examples.grpc.GreeterGrpc.getServiceDescriptor;
4+
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
5+
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
6+
import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
7+
import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall;
8+
9+
import io.grpc.stub.StreamObserver;
10+
11+
public final class GreeterGrpcStub {
12+
private GreeterGrpcStub() {}
13+
14+
public static GreeterVertxStub newVertxStub(io.vertx.grpcio.client.GrpcIoClient client, io.vertx.core.net.SocketAddress socketAddress) {
15+
return newVertxStub(new io.vertx.grpcio.client.GrpcIoClientChannel(client, socketAddress));
16+
}
17+
18+
public static GreeterVertxStub newVertxStub(io.grpc.Channel channel) {
19+
return new GreeterVertxStub(channel);
20+
}
21+
22+
23+
public static final class GreeterVertxStub extends io.grpc.stub.AbstractStub<GreeterVertxStub> implements GreeterClient {
24+
private final io.vertx.core.internal.ContextInternal ctx;
25+
private GreeterGrpc.GreeterStub delegateStub;
26+
27+
private GreeterVertxStub(io.grpc.Channel channel) {
28+
super(channel);
29+
delegateStub = GreeterGrpc.newStub(channel);
30+
this.ctx = (io.vertx.core.internal.ContextInternal) io.vertx.core.Vertx.currentContext();
31+
}
32+
33+
private GreeterVertxStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
34+
super(channel, callOptions);
35+
delegateStub = GreeterGrpc.newStub(channel).build(channel, callOptions);
36+
this.ctx = (io.vertx.core.internal.ContextInternal) io.vertx.core.Vertx.currentContext();
37+
}
38+
39+
@Override
40+
protected GreeterVertxStub build(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
41+
return new GreeterVertxStub(channel, callOptions);
42+
}
43+
44+
45+
public io.vertx.core.Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request) {
46+
return io.vertx.grpcio.common.impl.stub.ClientCalls.oneToOne(ctx, request, delegateStub::sayHello);
47+
}
48+
49+
}
50+
51+
public static io.vertx.grpc.server.Service of(GreeterService service) {
52+
String compression = null;
53+
return io.vertx.grpcio.server.GrpcIoServiceBridge.bridge(io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
54+
.addMethod(
55+
examples.grpc.GreeterGrpc.getSayHelloMethod(),
56+
asyncUnaryCall(
57+
new MethodHandlers<
58+
examples.grpc.HelloRequest,
59+
examples.grpc.HelloReply>(
60+
service, METHODID_SAY_HELLO, compression)))
61+
.build());
62+
}
63+
64+
private static final int METHODID_SAY_HELLO = 0;
65+
66+
private static final class MethodHandlers<Req, Resp> implements
67+
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
68+
io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
69+
io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
70+
io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
71+
72+
private final GreeterService serviceImpl;
73+
private final int methodId;
74+
private final String compression;
75+
76+
MethodHandlers(GreeterService serviceImpl, int methodId, String compression) {
77+
this.serviceImpl = serviceImpl;
78+
this.methodId = methodId;
79+
this.compression = compression;
80+
}
81+
82+
@java.lang.Override
83+
@java.lang.SuppressWarnings("unchecked")
84+
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
85+
switch (methodId) {
86+
case METHODID_SAY_HELLO:
87+
io.vertx.grpcio.common.impl.stub.ServerCalls.<examples.grpc.HelloRequest, examples.grpc.HelloReply>oneToOne(
88+
(examples.grpc.HelloRequest) request,
89+
(io.grpc.stub.StreamObserver<examples.grpc.HelloReply>) responseObserver,
90+
compression,
91+
serviceImpl::sayHello);
92+
break;
93+
default:
94+
throw new java.lang.AssertionError();
95+
}
96+
}
97+
98+
@java.lang.Override
99+
@java.lang.SuppressWarnings("unchecked")
100+
public io.grpc.stub.StreamObserver<Req> invoke(io.grpc.stub.StreamObserver<Resp> responseObserver) {
101+
StreamObserver<Req> reqStreamObserver;
102+
switch (methodId) {
103+
default:
104+
throw new java.lang.AssertionError();
105+
}
106+
}
107+
}
108+
}

vertx-grpc-docs/src/main/java/examples/grpc/StreamingGrpcClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public Future<ReadStream<examples.grpc.Item>> source(examples.grpc.Empty request
142142
req.end(request);
143143
return req.response().flatMap(resp -> {
144144
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
145-
return Future.failedFuture("Invalid gRPC status " + resp.status());
145+
return Future.failedFuture(new io.vertx.grpc.client.InvalidStatusException(GrpcStatus.OK, resp.status()));
146146
} else {
147147
return Future.succeededFuture(resp);
148148
}
@@ -186,7 +186,7 @@ public Future<ReadStream<examples.grpc.Item>> pipe(Completable<WriteStream<examp
186186
.compose(req -> {
187187
return req.response().flatMap(resp -> {
188188
if (resp.status() != null && resp.status() != GrpcStatus.OK) {
189-
return Future.failedFuture("Invalid gRPC status " + resp.status());
189+
return Future.failedFuture(new io.vertx.grpc.client.InvalidStatusException(GrpcStatus.OK, resp.status()));
190190
} else {
191191
return Future.succeededFuture(resp);
192192
}

vertx-grpc-docs/src/main/java/examples/grpc/StreamingGrpcService.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -277,32 +277,39 @@ private void handle_source(io.vertx.grpc.server.GrpcServerRequest<examples.grpc.
277277
request.handler(msg -> {
278278
try {
279279
instance.source(msg, request.response());
280+
} catch (UnsupportedOperationException e) {
281+
request.response().status(GrpcStatus.UNIMPLEMENTED).end();
280282
} catch (RuntimeException err) {
281283
request.response().status(GrpcStatus.INTERNAL).end();
282284
}
283285
});
284286
}
285287

286288
private void handle_sink(io.vertx.grpc.server.GrpcServerRequest<examples.grpc.Item, examples.grpc.Empty> request) {
287-
Promise<examples.grpc.Empty> promise = Promise.promise();
288-
promise.future()
289-
.onFailure(err -> request.response().status(GrpcStatus.INTERNAL).end())
290-
.onSuccess(resp -> request.response().end(resp));
291289
try {
292-
instance.sink(request, promise);
290+
instance.sink(request, (res, err) -> {
291+
if (err == null) {
292+
request.response().end(res);
293+
} else {
294+
request.response().status(GrpcStatus.INTERNAL).end();
295+
}
296+
});
297+
} catch (UnsupportedOperationException err) {
298+
request.response().status(GrpcStatus.UNIMPLEMENTED).end();
293299
} catch (RuntimeException err) {
294-
promise.tryFail(err);
300+
request.response().status(GrpcStatus.INTERNAL).end();
295301
}
296302
}
297303

298304
private void handle_pipe(io.vertx.grpc.server.GrpcServerRequest<examples.grpc.Item, examples.grpc.Item> request) {
299305
try {
300306
instance.pipe(request, request.response());
301-
} catch (RuntimeException err) {
307+
} catch (UnsupportedOperationException err) {
308+
request.response().status(GrpcStatus.UNIMPLEMENTED).end();
309+
} catch (RuntimeException err) {
302310
request.response().status(GrpcStatus.INTERNAL).end();
303311
}
304312
}
305-
306313
}
307314
}
308315
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package examples.grpc;
2+
3+
import static examples.grpc.StreamingGrpc.getServiceDescriptor;
4+
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
5+
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
6+
import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
7+
import static io.grpc.stub.ServerCalls.asyncBidiStreamingCall;
8+
9+
import io.grpc.stub.StreamObserver;
10+
11+
public final class StreamingGrpcStub {
12+
private StreamingGrpcStub() {}
13+
14+
public static StreamingVertxStub newVertxStub(io.vertx.grpcio.client.GrpcIoClient client, io.vertx.core.net.SocketAddress socketAddress) {
15+
return newVertxStub(new io.vertx.grpcio.client.GrpcIoClientChannel(client, socketAddress));
16+
}
17+
18+
public static StreamingVertxStub newVertxStub(io.grpc.Channel channel) {
19+
return new StreamingVertxStub(channel);
20+
}
21+
22+
23+
public static final class StreamingVertxStub extends io.grpc.stub.AbstractStub<StreamingVertxStub> implements StreamingClient {
24+
private final io.vertx.core.internal.ContextInternal ctx;
25+
private StreamingGrpc.StreamingStub delegateStub;
26+
27+
private StreamingVertxStub(io.grpc.Channel channel) {
28+
super(channel);
29+
delegateStub = StreamingGrpc.newStub(channel);
30+
this.ctx = (io.vertx.core.internal.ContextInternal) io.vertx.core.Vertx.currentContext();
31+
}
32+
33+
private StreamingVertxStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
34+
super(channel, callOptions);
35+
delegateStub = StreamingGrpc.newStub(channel).build(channel, callOptions);
36+
this.ctx = (io.vertx.core.internal.ContextInternal) io.vertx.core.Vertx.currentContext();
37+
}
38+
39+
@Override
40+
protected StreamingVertxStub build(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
41+
return new StreamingVertxStub(channel, callOptions);
42+
}
43+
44+
45+
public io.vertx.core.Future<io.vertx.core.streams.ReadStream<examples.grpc.Item>> source(examples.grpc.Empty request) {
46+
return io.vertx.grpcio.common.impl.stub.ClientCalls.oneToMany(ctx, request, delegateStub::source);
47+
}
48+
49+
50+
public io.vertx.core.Future<examples.grpc.Empty> sink(io.vertx.core.Completable<io.vertx.core.streams.WriteStream<examples.grpc.Item>> hdlr) {
51+
return io.vertx.grpcio.common.impl.stub.ClientCalls.manyToOne(ctx, hdlr, delegateStub::sink);
52+
}
53+
54+
55+
public io.vertx.core.Future<io.vertx.core.streams.ReadStream<examples.grpc.Item>> pipe(io.vertx.core.Completable<io.vertx.core.streams.WriteStream<examples.grpc.Item>> hdlr) {
56+
return io.vertx.grpcio.common.impl.stub.ClientCalls.manyToMany(ctx, hdlr, delegateStub::pipe);
57+
}
58+
}
59+
60+
public static io.vertx.grpc.server.Service of(StreamingService service) {
61+
String compression = null;
62+
return io.vertx.grpcio.server.GrpcIoServiceBridge.bridge(io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
63+
.addMethod(
64+
examples.grpc.StreamingGrpc.getSourceMethod(),
65+
asyncServerStreamingCall(
66+
new MethodHandlers<
67+
examples.grpc.Empty,
68+
examples.grpc.Item>(
69+
service, METHODID_SOURCE, compression)))
70+
.addMethod(
71+
examples.grpc.StreamingGrpc.getSinkMethod(),
72+
asyncClientStreamingCall(
73+
new MethodHandlers<
74+
examples.grpc.Item,
75+
examples.grpc.Empty>(
76+
service, METHODID_SINK, compression)))
77+
.addMethod(
78+
examples.grpc.StreamingGrpc.getPipeMethod(),
79+
asyncBidiStreamingCall(
80+
new MethodHandlers<
81+
examples.grpc.Item,
82+
examples.grpc.Item>(
83+
service, METHODID_PIPE, compression)))
84+
.build());
85+
}
86+
87+
private static final int METHODID_SOURCE = 0;
88+
private static final int METHODID_SINK = 1;
89+
private static final int METHODID_PIPE = 2;
90+
91+
private static final class MethodHandlers<Req, Resp> implements
92+
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
93+
io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
94+
io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
95+
io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
96+
97+
private final StreamingService serviceImpl;
98+
private final int methodId;
99+
private final String compression;
100+
101+
MethodHandlers(StreamingService serviceImpl, int methodId, String compression) {
102+
this.serviceImpl = serviceImpl;
103+
this.methodId = methodId;
104+
this.compression = compression;
105+
}
106+
107+
@java.lang.Override
108+
@java.lang.SuppressWarnings("unchecked")
109+
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
110+
switch (methodId) {
111+
case METHODID_SOURCE:
112+
io.vertx.grpcio.common.impl.stub.ServerCalls.<examples.grpc.Empty, examples.grpc.Item>oneToMany(
113+
(examples.grpc.Empty) request,
114+
(io.grpc.stub.StreamObserver<examples.grpc.Item>) responseObserver,
115+
compression,
116+
serviceImpl::source);
117+
break;
118+
default:
119+
throw new java.lang.AssertionError();
120+
}
121+
}
122+
123+
@java.lang.Override
124+
@java.lang.SuppressWarnings("unchecked")
125+
public io.grpc.stub.StreamObserver<Req> invoke(io.grpc.stub.StreamObserver<Resp> responseObserver) {
126+
StreamObserver<Req> reqStreamObserver;
127+
switch (methodId) {
128+
case METHODID_SINK:
129+
reqStreamObserver = (io.grpc.stub.StreamObserver<Req>) io.vertx.grpcio.common.impl.stub.ServerCalls.<examples.grpc.Item, examples.grpc.Empty>manyToOne(
130+
(io.grpc.stub.StreamObserver<examples.grpc.Empty>) responseObserver,
131+
compression,
132+
serviceImpl::sink);
133+
case METHODID_PIPE:
134+
reqStreamObserver = (io.grpc.stub.StreamObserver<Req>) io.vertx.grpcio.common.impl.stub.ServerCalls.<examples.grpc.Item, examples.grpc.Item>manyToMany(
135+
(io.grpc.stub.StreamObserver<examples.grpc.Item>) responseObserver,
136+
compression,
137+
serviceImpl::pipe);
138+
default:
139+
throw new java.lang.AssertionError();
140+
}
141+
}
142+
}
143+
}

vertx-grpc-it/pom.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,16 @@
4040
<artifactId>vertx-grpc-server</artifactId>
4141
<version>${project.version}</version>
4242
</dependency>
43+
<dependency>
44+
<groupId>io.vertx</groupId>
45+
<artifactId>vertx-grpcio-client</artifactId>
46+
<version>${project.version}</version>
47+
</dependency>
48+
<dependency>
49+
<groupId>io.vertx</groupId>
50+
<artifactId>vertx-grpcio-server</artifactId>
51+
<version>${project.version}</version>
52+
</dependency>
4353
<dependency>
4454
<groupId>io.vertx</groupId>
4555
<artifactId>vertx-grpc-reflection</artifactId>
@@ -102,6 +112,9 @@
102112
<groupId>org.xolstice.maven.plugins</groupId>
103113
<artifactId>protobuf-maven-plugin</artifactId>
104114
<configuration>
115+
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
116+
<pluginId>grpc-java</pluginId>
117+
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
105118
<protocPlugins>
106119
<protocPlugin>
107120
<id>vertx-grpc-protoc-plugin</id>
@@ -117,6 +130,7 @@
117130
<id>test-compile</id>
118131
<goals>
119132
<goal>test-compile</goal>
133+
<goal>test-compile-custom</goal>
120134
</goals>
121135
</execution>
122136
</executions>

0 commit comments

Comments
 (0)