@@ -240,7 +240,7 @@ public void data(Http2FrameHeader header, BufferData data) {
240240
241241 // read and possibly decompress data
242242 bytesReceived += entityBytes .available ();
243- InputStream is = entityBytes . asInputStream ( );
243+ InputStream is = new BufferDataInputStream ( entityBytes );
244244 REQ request = route .method ().parseRequest (isCompressed ? decompressor .decompress (is ) : is );
245245 listenerQueue .add (request );
246246 flushQueue ();
@@ -535,4 +535,53 @@ private void initMetrics() {
535535 return new MethodMetrics (callStarted , callDuration , sentMessageSize , recvMessageSize );
536536 });
537537 }
538+
539+ /**
540+ * An input stream that can return its length. gRPC parsers can use this extra
541+ * knowledge for optimizations.
542+ */
543+ static class BufferDataInputStream extends InputStream implements KnownLength {
544+ private final int length ;
545+ private final InputStream delegate ;
546+
547+ BufferDataInputStream (BufferData bufferData ) {
548+ this .length = bufferData .available ();
549+ this .delegate = bufferData .asInputStream ();
550+ }
551+
552+ @ Override
553+ public int read () throws IOException {
554+ return delegate .read ();
555+ }
556+
557+ @ Override
558+ public int read (byte [] b ) throws IOException {
559+ return delegate .read (b );
560+ }
561+
562+ @ Override
563+ public int read (byte [] b , int off , int len ) throws IOException {
564+ return delegate .read (b , off , len );
565+ }
566+
567+ @ Override
568+ public byte [] readAllBytes () throws IOException {
569+ return delegate .readAllBytes ();
570+ }
571+
572+ @ Override
573+ public byte [] readNBytes (int len ) throws IOException {
574+ return delegate .readNBytes (len );
575+ }
576+
577+ @ Override
578+ public int readNBytes (byte [] b , int off , int len ) throws IOException {
579+ return delegate .readNBytes (b , off , len );
580+ }
581+
582+ @ Override
583+ public int available () {
584+ return length ;
585+ }
586+ }
538587}
0 commit comments