@@ -98,6 +98,9 @@ private record MethodMetrics(Counter callStarted,
9898
9999 private static final LazyValue <Map <String , MethodMetrics >> METHOD_METRICS = LazyValue .create (ConcurrentHashMap ::new );
100100
101+ private static final int GRPC_HEADER_SIZE = 5 ;
102+ private static final int INITIAL_BUFFER_SIZE = 16 * 1024 ;
103+
101104 private final Http2Headers headers ;
102105 private final Http2StreamWriter streamWriter ;
103106 private final int streamId ;
@@ -109,6 +112,9 @@ private record MethodMetrics(Counter callStarted,
109112
110113 private volatile ServerCall .Listener <REQ > listener ;
111114 private BufferData entityBytes ;
115+ private BufferData readBufferData = BufferData .create (INITIAL_BUFFER_SIZE );
116+ private BufferData unreadBufferData ;
117+ private long entityBytesLeft ;
112118 private Compressor compressor ;
113119 private Decompressor decompressor ;
114120 private boolean identityCompressor ;
@@ -185,35 +191,61 @@ public void rstStream(Http2RstStream rstStream) {
185191 public void windowUpdate (Http2WindowUpdate update ) {
186192 }
187193
194+ /**
195+ * Data received from HTTP/2 layer. Data may contain a partial gRPC request
196+ * or more than one request, making logic a bit more difficult.
197+ *
198+ * @param header frame header
199+ * @param data frame data
200+ */
188201 @ Override
189202 public void data (Http2FrameHeader header , BufferData data ) {
190203 try {
191204 boolean isCompressed = false ;
192205
193- while (data .available () > 0 ) {
194- // start of new chunk?
206+ // check for any unread data received before
207+ BufferData newData ;
208+ if (unreadBufferData != null ) {
209+ newData = BufferData .create (unreadBufferData , data );
210+ unreadBufferData = null ;
211+ } else {
212+ newData = data ;
213+ }
214+
215+ // process 0 or more requests from data
216+ while (newData .available () > 0 ) {
217+ // start of new request?
195218 if (entityBytes == null ) {
196- isCompressed = (data .read () == 1 );
197- long length = data .readUnsignedInt32 ();
198- entityBytes = BufferData .create ((int ) length );
219+ if (newData .available () >= GRPC_HEADER_SIZE ) {
220+ isCompressed = (newData .read () == 1 );
221+ entityBytesLeft = newData .readUnsignedInt32 ();
222+ entityBytes = allocateReadBuffer ((int ) entityBytesLeft );
223+ } else {
224+ unreadBufferData = newData ;
225+ return ; // need more for gRPC header
226+ }
199227 }
200228
201- // append data to current chunk
202- entityBytes .write (data );
229+ // append data to current entity
230+ int writableNow = (int ) Math .min (entityBytesLeft , newData .available ());
231+ entityBytes .write (newData , writableNow );
232+ entityBytesLeft -= writableNow ;
203233
204- // is chunk complete?
205- if (entityBytes . capacity () == 0 ) {
234+ // is the entity complete?
235+ if (entityBytesLeft == 0 ) {
206236 // fail if compressed and no decompressor
207237 if (isCompressed && decompressor == null ) {
208238 throw new IllegalStateException ("Unable to codec for compressed data" );
209239 }
210240
211241 // read and possibly decompress data
212242 bytesReceived += entityBytes .available ();
213- InputStream is = entityBytes . asInputStream ( );
243+ InputStream is = new BufferDataInputStream ( entityBytes );
214244 REQ request = route .method ().parseRequest (isCompressed ? decompressor .decompress (is ) : is );
215245 listenerQueue .add (request );
216246 flushQueue ();
247+
248+ // reset entityBytes
217249 entityBytes = null ;
218250 }
219251 }
@@ -234,6 +266,18 @@ public void data(Http2FrameHeader header, BufferData data) {
234266 }
235267 }
236268
269+ BufferData allocateReadBuffer (int length ) {
270+ readBufferData .reset ();
271+ int capacity = readBufferData .capacity ();
272+ if (length > capacity ) {
273+ if (length > grpcConfig .maxReadBufferSize ()) {
274+ throw new IllegalStateException ("gRPC message size exceeds max read buffer size" );
275+ }
276+ readBufferData = BufferData .create (length );
277+ }
278+ return readBufferData ;
279+ }
280+
237281 void initCompression (ServerCall <REQ , RES > serverCall , Headers httpHeaders ) {
238282 if (grpcConfig .enableCompression ()) {
239283 // check for encoding and respond using same algorithm
@@ -303,23 +347,22 @@ private void flushQueue() {
303347 static Http2StreamState nextStreamState (Http2StreamState currentStreamState ,
304348 Http2StreamState desiredStreamState ) {
305349 return switch (desiredStreamState ) {
306- case HALF_CLOSED_LOCAL ->
307- currentStreamState == Http2StreamState .HALF_CLOSED_REMOTE
308- ? Http2StreamState .CLOSED
309- : Http2StreamState .HALF_CLOSED_LOCAL ;
310- case HALF_CLOSED_REMOTE ->
311- currentStreamState == Http2StreamState .HALF_CLOSED_LOCAL
312- ? Http2StreamState .CLOSED
313- : Http2StreamState .HALF_CLOSED_REMOTE ;
350+ case HALF_CLOSED_LOCAL -> currentStreamState == Http2StreamState .HALF_CLOSED_REMOTE
351+ ? Http2StreamState .CLOSED
352+ : Http2StreamState .HALF_CLOSED_LOCAL ;
353+ case HALF_CLOSED_REMOTE -> currentStreamState == Http2StreamState .HALF_CLOSED_LOCAL
354+ ? Http2StreamState .CLOSED
355+ : Http2StreamState .HALF_CLOSED_REMOTE ;
314356 default -> desiredStreamState ;
315357 };
316358 }
317359
318360 private ServerCall <REQ , RES > createServerCall () {
319- return new ServerCall <>() {
361+ return new ServerCall <REQ , RES >() {
320362
321363 private long bytesSent ;
322364 private boolean headersSent ;
365+ private BufferData writeBufferData = BufferData .growing (INITIAL_BUFFER_SIZE );
323366
324367 @ Override
325368 public void request (int numMessages ) {
@@ -356,29 +399,24 @@ public void sendMessage(RES message) {
356399 try (InputStream inputStream = route .method ().streamResponse (message )) {
357400 // prepare buffer for writing
358401 BufferData bufferData ;
359- if (identityCompressor ) {
360- // avoid buffer copy if length is known
361- if (inputStream instanceof KnownLength knownLength ) {
362- int bytesLength = knownLength .available ();
363- bufferData = BufferData .create (5 + bytesLength );
364- bufferData .write (0 ); // off for identity compressor
365- bufferData .writeUnsignedInt32 (bytesLength );
366- bufferData .readFrom (inputStream );
367- } else {
368- byte [] bytes = inputStream .readAllBytes ();
369- bufferData = BufferData .create (5 + bytes .length );
370- bufferData .write (0 ); // off for identity compressor
371- bufferData .writeUnsignedInt32 (bytes .length );
372- bufferData .write (bytes );
373- }
402+ if (identityCompressor && inputStream instanceof KnownLength knownLength ) {
403+ int bytesLength = knownLength .available ();
404+ bufferData = allocateWriteBuffer (GRPC_HEADER_SIZE + bytesLength );
405+ bufferData .write (0 ); // 0 for identity compressor
406+ bufferData .writeUnsignedInt32 (bytesLength );
407+ bufferData .readFrom (inputStream );
374408 } else {
375409 ByteArrayOutputStream baos = new ByteArrayOutputStream ();
376- try (OutputStream os = compressor .compress (baos )) {
377- inputStream .transferTo (os );
410+ if (identityCompressor ) {
411+ inputStream .transferTo (baos );
412+ } else {
413+ try (OutputStream os = compressor .compress (baos )) {
414+ inputStream .transferTo (os );
415+ }
378416 }
379417 byte [] bytes = baos .toByteArray ();
380- bufferData = BufferData . create ( 5 + bytes .length );
381- bufferData .write (1 );
418+ bufferData = allocateWriteBuffer ( GRPC_HEADER_SIZE + bytes .length );
419+ bufferData .write (identityCompressor ? 0 : 1 );
382420 bufferData .writeUnsignedInt32 (bytes .length );
383421 bufferData .write (bytes );
384422 }
@@ -449,6 +487,15 @@ public boolean isCancelled() {
449487 public MethodDescriptor <REQ , RES > getMethodDescriptor () {
450488 return route .method ();
451489 }
490+
491+ private BufferData allocateWriteBuffer (int length ) {
492+ writeBufferData .reset ();
493+ int capacity = writeBufferData .capacity ();
494+ if (length > capacity ) {
495+ writeBufferData = BufferData .create (length );
496+ }
497+ return writeBufferData ;
498+ }
452499 };
453500 }
454501
@@ -490,4 +537,37 @@ private void initMetrics() {
490537 return new MethodMetrics (callStarted , callDuration , sentMessageSize , recvMessageSize );
491538 });
492539 }
540+
541+ /**
542+ * An input stream that can return its length. gRPC parsers can use this extra
543+ * knowledge for optimizations. It can also copy a byte array directly on a
544+ * single read.
545+ */
546+ static class BufferDataInputStream extends InputStream implements KnownLength {
547+ private final BufferData bufferData ;
548+
549+ BufferDataInputStream (BufferData bufferData ) {
550+ this .bufferData = bufferData ;
551+ }
552+
553+ @ Override
554+ public int read () {
555+ return bufferData .read ();
556+ }
557+
558+ @ Override
559+ public int read (byte [] b ) {
560+ return bufferData .read (b );
561+ }
562+
563+ @ Override
564+ public int read (byte [] b , int off , int len ) {
565+ return bufferData .read (b , off , len );
566+ }
567+
568+ @ Override
569+ public int available () {
570+ return bufferData .available ();
571+ }
572+ }
493573}
0 commit comments