|
16 | 16 |
|
17 | 17 | package scalapb.reactor |
18 | 18 |
|
19 | | -import com.google.protobuf.Descriptors.{FileDescriptor, MethodDescriptor, ServiceDescriptor} |
20 | 19 | import com.google.protobuf.ExtensionRegistry |
21 | | -import com.google.protobuf.compiler.PluginProtos.CodeGeneratorResponse |
22 | 20 | import protocbridge.Artifact |
23 | 21 | import protocgen.{CodeGenApp, CodeGenRequest, CodeGenResponse} |
24 | | -import scala.jdk.CollectionConverters._ |
25 | 22 | import scalapb.compiler._ |
26 | 23 | import scalapb.options.Scalapb |
27 | 24 |
|
@@ -68,242 +65,3 @@ object ReactorCodeGenerator extends CodeGenApp { |
68 | 65 | CodeGenResponse.fail(error) |
69 | 66 | } |
70 | 67 | } |
71 | | - |
72 | | -class ReactorFilePrinter( |
73 | | - implicits: DescriptorImplicits, |
74 | | - file: FileDescriptor |
75 | | -) { |
76 | | - |
77 | | - import implicits._ |
78 | | - |
79 | | - private val AbstractStub = "_root_.io.grpc.stub.AbstractStub" |
80 | | - private val Channel = "_root_.io.grpc.Channel" |
81 | | - private val CallOptions = "_root_.io.grpc.CallOptions" |
82 | | - private val serverServiceDef = "_root_.io.grpc.ServerServiceDefinition" |
83 | | - private val ReactorServerCalls = "_root_.com.salesforce.reactorgrpc.stub.ServerCalls" |
84 | | - private val ReactorClientCalls = "_root_.com.salesforce.reactorgrpc.stub.ClientCalls" |
85 | | - private val ServerCalls = "_root_.io.grpc.stub.ServerCalls" |
86 | | - private val ClientCalls = "_root_.io.grpc.stub.ClientCalls" |
87 | | - private val StreamObserver = "_root_.io.grpc.stub.StreamObserver" |
88 | | - private val Mono = "_root_.reactor.core.publisher.Mono" |
89 | | - private val SMono = "_root_.reactor.core.scala.publisher.SMono" |
90 | | - private val SFlux = "_root_.reactor.core.scala.publisher.SFlux" |
91 | | - |
92 | | - private val FileName = file.scalaPackage / |
93 | | - s"Reactor${NameUtils.snakeCaseToCamelCase(baseName(file.getName), true)}" |
94 | | - |
95 | | - def scalaFileName: String = |
96 | | - FileName.fullName.replace('.', '/') + ".scala" |
97 | | - |
98 | | - def content: String = { |
99 | | - val fp = new FunctionalPrinter() |
100 | | - fp.add( |
101 | | - s"package ${file.scalaPackage.fullName}", |
102 | | - "", |
103 | | - "import scala.language.implicitConversions", |
104 | | - "" |
105 | | - ).print(file.getServices().asScala)((fp, s) => new ServicePrinter(s).print(fp)) |
106 | | - .result() |
107 | | - } |
108 | | - |
109 | | - def result(): CodeGeneratorResponse.File = { |
110 | | - val b = CodeGeneratorResponse.File.newBuilder() |
111 | | - b.setName(scalaFileName) |
112 | | - b.setContent(content) |
113 | | - b.build() |
114 | | - } |
115 | | - |
116 | | - class ServicePrinter(service: ServiceDescriptor) { |
117 | | - |
118 | | - import implicits._ |
119 | | - |
120 | | - private val OuterObject = file.scalaPackage / s"Reactor${service.name}Grpc" |
121 | | - |
122 | | - private val traitName = OuterObject / s"Reactor${service.name}" |
123 | | - private val asyncStubName = s"${traitName.name}Stub" |
124 | | - |
125 | | - def print(fp: FunctionalPrinter): FunctionalPrinter = |
126 | | - fp.add(s"object ${OuterObject.name} {") |
127 | | - .indent |
128 | | - .add(s"val SERVICE: _root_.io.grpc.ServiceDescriptor = ${service.grpcDescriptor.fullName}") |
129 | | - .add("") |
130 | | - .add( |
131 | | - s"trait ${traitName.name} {" |
132 | | - ) |
133 | | - .indented( |
134 | | - _.print(service.getMethods().asScala.toVector)(printMethodSignature()) |
135 | | - ) |
136 | | - .add("}") |
137 | | - .add("") |
138 | | - .add(s"object ${traitName.name} {") |
139 | | - .indented( |
140 | | - _.add(s"def bindService(serviceImpl: ${traitName.fullName}): $serverServiceDef = ").indent |
141 | | - .add(s"$serverServiceDef.builder(${service.grpcDescriptor.fullName})") |
142 | | - .print(service.getMethods().asScala.toVector)( |
143 | | - printBindService(_, _) |
144 | | - ) |
145 | | - .add(".build()") |
146 | | - .outdent |
147 | | - ) |
148 | | - .add("}") |
149 | | - .add("") |
150 | | - .add( |
151 | | - s"class $asyncStubName(channel: $Channel, options: $CallOptions = $CallOptions.DEFAULT) ", |
152 | | - s"extends $AbstractStub[$asyncStubName](channel, options) with ${traitName.name} {" |
153 | | - ) |
154 | | - .indented( |
155 | | - _.print(service.getMethods().asScala.toVector)(printAsyncClientStub(_, _)) |
156 | | - .add( |
157 | | - s"override def build(channel: $Channel, options: $CallOptions): $asyncStubName = new $asyncStubName(channel, options)" |
158 | | - ) |
159 | | - ) |
160 | | - .add("}") |
161 | | - .add("") |
162 | | - .add(s"def stub(channel: $Channel): $asyncStubName = new $asyncStubName(channel)") |
163 | | - .add("") |
164 | | - .outdent |
165 | | - .add("}") |
166 | | - |
167 | | - private def methodSignature(method: MethodDescriptor): String = { |
168 | | - val reqType = method.inputType.scalaType |
169 | | - val resType = method.outputType.scalaType |
170 | | - |
171 | | - s"def ${method.name}" + (method.streamType match { |
172 | | - case StreamType.Unary => |
173 | | - s"(request: $reqType): ${smono(resType)}" |
174 | | - case StreamType.ClientStreaming => |
175 | | - s"(request: ${sflux(reqType)}): ${smono(resType)}" |
176 | | - case StreamType.ServerStreaming => |
177 | | - s"(request: $reqType): ${sflux(resType)}" |
178 | | - case StreamType.Bidirectional => |
179 | | - s"(request: ${sflux(reqType)}): ${sflux(resType)}" |
180 | | - }) |
181 | | - } |
182 | | - |
183 | | - private def printMethodSignature()(fp: FunctionalPrinter, method: MethodDescriptor): FunctionalPrinter = |
184 | | - fp.add(methodSignature(method)) |
185 | | - |
186 | | - private def printBindService(fp: FunctionalPrinter, method: MethodDescriptor): FunctionalPrinter = { |
187 | | - val reqType = method.inputType.scalaType |
188 | | - val resType = method.outputType.scalaType |
189 | | - val serviceCall = s"serviceImpl.${method.name}" |
190 | | - |
191 | | - val fp0 = fp |
192 | | - .add(".addMethod(") |
193 | | - .indent |
194 | | - .add( |
195 | | - s"${method.grpcDescriptor.fullName}," |
196 | | - ) |
197 | | - |
198 | | - val fp1 = method.streamType match { |
199 | | - case StreamType.Unary => |
200 | | - fp0 |
201 | | - .add(s"$ServerCalls.asyncUnaryCall(new $ServerCalls.UnaryMethod[$reqType, $resType] {") |
202 | | - .indent |
203 | | - .add(s"override def invoke(request: $reqType, observer: ${StreamObserver}[$resType]): Unit =") |
204 | | - .indent |
205 | | - // Mono.block() is not a blocking operation. The given Mono is always an instance of JustMono. |
206 | | - // https://github.com/salesforce/reactive-grpc/blob/ce3b3b20e8192c5e6b38b2ed596531242d9708c0/reactor/reactor-grpc-stub/src/main/java/com/salesforce/reactorgrpc/stub/ServerCalls.java#L38 |
207 | | - .add(s"$ReactorServerCalls.oneToOne(request, observer, (mono: ${mono(reqType)}) =>") |
208 | | - .indent |
209 | | - .add(s"$serviceCall(mono.block()).asJava())") |
210 | | - case StreamType.ClientStreaming => |
211 | | - fp0 |
212 | | - .add(s"$ServerCalls.asyncClientStreamingCall(new $ServerCalls.ClientStreamingMethod[$reqType, $resType] {") |
213 | | - .indent |
214 | | - .add(s"override def invoke(observer: ${StreamObserver}[$resType]): $StreamObserver[$reqType] =") |
215 | | - .add(s"$ReactorServerCalls.manyToOne(observer, (flux: ${flux(reqType)}) =>") |
216 | | - .indent |
217 | | - .add(s"$serviceCall($SFlux.fromPublisher(flux)).asJava(), null)") |
218 | | - case StreamType.ServerStreaming => |
219 | | - fp0 |
220 | | - .add( |
221 | | - s"$ServerCalls.asyncServerStreamingCall(new ${ServerCalls}.ServerStreamingMethod[$reqType, $resType] {" |
222 | | - ) |
223 | | - .indent |
224 | | - .add(s"override def invoke(request: $reqType, observer: $StreamObserver[${resType}]): Unit =") |
225 | | - .add(s"$ReactorServerCalls.oneToMany(request, observer, (mono: ${mono(reqType)}) =>") |
226 | | - .indent |
227 | | - .add(s"$serviceCall(mono.block()).asJava())") |
228 | | - case StreamType.Bidirectional => |
229 | | - fp0 |
230 | | - .add(s"$ServerCalls.asyncBidiStreamingCall(new $ServerCalls.BidiStreamingMethod[$reqType, $resType] {") |
231 | | - .indent |
232 | | - .add(s"override def invoke(observer: $StreamObserver[$resType]): ${StreamObserver}[$reqType] =") |
233 | | - .add(s"$ReactorServerCalls.manyToMany(observer, (flux: ${flux(reqType)}) =>") |
234 | | - .indent |
235 | | - .add(s"$serviceCall($SFlux.fromPublisher(flux)).asJava(), null)") |
236 | | - } |
237 | | - |
238 | | - fp1.outdent.outdent |
239 | | - .add("})") |
240 | | - .outdent |
241 | | - .add(")") |
242 | | - } |
243 | | - |
244 | | - private def printAsyncClientStub(fp: FunctionalPrinter, method: MethodDescriptor): FunctionalPrinter = { |
245 | | - val reqType = method.inputType.scalaType |
246 | | - val resType = method.outputType.scalaType |
247 | | - val serviceCall = s"serviceImpl.${method.name}" |
248 | | - |
249 | | - val fp1 = method.streamType match { |
250 | | - case StreamType.Unary => |
251 | | - fp.add(s"override def ${method.name}(request: $reqType): ${smono(resType)} =") |
252 | | - .indent |
253 | | - .add( |
254 | | - s"$SMono.fromPublisher($ReactorClientCalls.oneToOne($Mono.just(request), (req: $reqType, observer: $StreamObserver[$resType]) => {" |
255 | | - ) |
256 | | - .indent |
257 | | - .add( |
258 | | - s"$ClientCalls.asyncUnaryCall(getChannel().newCall(${method.grpcDescriptor.fullName}, getCallOptions()), req, observer)" |
259 | | - ) |
260 | | - case StreamType.ClientStreaming => |
261 | | - fp.add(s"override def ${method.name}(request: ${sflux(reqType)}): ${smono(resType)} =") |
262 | | - .indent |
263 | | - .add( |
264 | | - s"$SMono.fromPublisher($ReactorClientCalls.manyToOne(request.asJava(), (res: $StreamObserver[$resType]) => {" |
265 | | - ) |
266 | | - .indent |
267 | | - .add( |
268 | | - s"$ClientCalls.asyncClientStreamingCall(getChannel().newCall(${method.grpcDescriptor.fullName}, getCallOptions()), res)" |
269 | | - ) |
270 | | - case StreamType.ServerStreaming => |
271 | | - fp.add(s"override def ${method.name}(request: $reqType): ${sflux(resType)} =") |
272 | | - .indent |
273 | | - .add( |
274 | | - s"$SFlux.fromPublisher($ReactorClientCalls.oneToMany($Mono.just(request), (req: $reqType, res: $StreamObserver[$resType]) => {" |
275 | | - ) |
276 | | - .indent |
277 | | - .add( |
278 | | - s"$ClientCalls.asyncServerStreamingCall(getChannel().newCall(${method.grpcDescriptor.fullName}, getCallOptions()), req, res)" |
279 | | - ) |
280 | | - case StreamType.Bidirectional => |
281 | | - fp.add(s"override def ${method.name}(request: ${sflux(reqType)}): ${sflux(resType)} =") |
282 | | - .indent |
283 | | - .add( |
284 | | - s"$SFlux.fromPublisher($ReactorClientCalls.manyToMany(request.asJava(), (res: $StreamObserver[$resType]) => {" |
285 | | - ) |
286 | | - .indent |
287 | | - .add( |
288 | | - s"$ClientCalls.asyncBidiStreamingCall(getChannel().newCall(${method.grpcDescriptor.fullName}, getCallOptions()), res)" |
289 | | - ) |
290 | | - } |
291 | | - |
292 | | - fp1.outdent |
293 | | - .add("}, getCallOptions()))") |
294 | | - .outdent |
295 | | - .add("") |
296 | | - } |
297 | | - |
298 | | - private def sflux(tpe: String) = s"_root_.reactor.core.scala.publisher.SFlux[$tpe]" |
299 | | - |
300 | | - private def smono(tpe: String) = s"_root_.reactor.core.scala.publisher.SMono[$tpe]" |
301 | | - |
302 | | - private def mono(tpe: String) = s"_root_.reactor.core.publisher.Mono[$tpe]" |
303 | | - |
304 | | - private def justMono(value: String) = s"_root_.reactor.core.publisher.Mono.just($value)" |
305 | | - |
306 | | - private def flux(tpe: String) = s"_root_.reactor.core.publisher.Flux[$tpe]" |
307 | | - } |
308 | | - |
309 | | -} |
0 commit comments