Skip to content

Commit 5270642

Browse files
committed
New input stream that implements KnownLength for parsing.
1 parent 98f8d2f commit 5270642

File tree

1 file changed

+51
-3
lines changed

1 file changed

+51
-3
lines changed

webserver/grpc/src/main/java/io/helidon/webserver/grpc/GrpcProtocolHandler.java

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,13 @@ public void data(Http2FrameHeader header, BufferData data) {
240240

241241
// read and possibly decompress data
242242
bytesReceived += entityBytes.available();
243-
InputStream is = entityBytes.asInputStream();
243+
InputStream is = new BufferDataInputStream(entityBytes);
244244
REQ request = route.method().parseRequest(isCompressed ? decompressor.decompress(is) : is);
245245
listenerQueue.add(request);
246246
flushQueue();
247247

248-
// reset entity state
248+
// reset entityBytes
249249
entityBytes = null;
250-
entityBytesLeft = 0L;
251250
}
252251
}
253252

@@ -535,4 +534,53 @@ private void initMetrics() {
535534
return new MethodMetrics(callStarted, callDuration, sentMessageSize, recvMessageSize);
536535
});
537536
}
537+
538+
/**
539+
* An input stream that can return its length. gRPC parsers can use this extra
540+
* knowledge for optimizations.
541+
*/
542+
static class BufferDataInputStream extends InputStream implements KnownLength {
543+
private final int length;
544+
private final InputStream delegate;
545+
546+
BufferDataInputStream(BufferData bufferData) {
547+
this.length = bufferData.available();
548+
this.delegate = bufferData.asInputStream();
549+
}
550+
551+
@Override
552+
public int read() throws IOException {
553+
return delegate.read();
554+
}
555+
556+
@Override
557+
public int read(byte[] b) throws IOException {
558+
return delegate.read(b);
559+
}
560+
561+
@Override
562+
public int read(byte[] b, int off, int len) throws IOException {
563+
return delegate.read(b, off, len);
564+
}
565+
566+
@Override
567+
public byte[] readAllBytes() throws IOException {
568+
return delegate.readAllBytes();
569+
}
570+
571+
@Override
572+
public byte[] readNBytes(int len) throws IOException {
573+
return delegate.readNBytes(len);
574+
}
575+
576+
@Override
577+
public int readNBytes(byte[] b, int off, int len) throws IOException {
578+
return delegate.readNBytes(b, off, len);
579+
}
580+
581+
@Override
582+
public int available() {
583+
return length;
584+
}
585+
}
538586
}

0 commit comments

Comments
 (0)