@@ -347,14 +347,12 @@ private void flushQueue() {
347347 static Http2StreamState nextStreamState (Http2StreamState currentStreamState ,
348348 Http2StreamState desiredStreamState ) {
349349 return switch (desiredStreamState ) {
350- case HALF_CLOSED_LOCAL ->
351- currentStreamState == Http2StreamState .HALF_CLOSED_REMOTE
352- ? Http2StreamState .CLOSED
353- : Http2StreamState .HALF_CLOSED_LOCAL ;
354- case HALF_CLOSED_REMOTE ->
355- currentStreamState == Http2StreamState .HALF_CLOSED_LOCAL
356- ? Http2StreamState .CLOSED
357- : Http2StreamState .HALF_CLOSED_REMOTE ;
350+ case HALF_CLOSED_LOCAL -> currentStreamState == Http2StreamState .HALF_CLOSED_REMOTE
351+ ? Http2StreamState .CLOSED
352+ : Http2StreamState .HALF_CLOSED_LOCAL ;
353+ case HALF_CLOSED_REMOTE -> currentStreamState == Http2StreamState .HALF_CLOSED_LOCAL
354+ ? Http2StreamState .CLOSED
355+ : Http2StreamState .HALF_CLOSED_REMOTE ;
358356 default -> desiredStreamState ;
359357 };
360358 }
@@ -364,8 +362,7 @@ private ServerCall<REQ, RES> createServerCall() {
364362
365363 private long bytesSent ;
366364 private boolean headersSent ;
367- private final BufferData headerBufferData = BufferData .create (GRPC_HEADER_SIZE );
368- private final BufferData writeBufferData = BufferData .growing (INITIAL_BUFFER_SIZE );
365+ private BufferData writeBufferData = BufferData .growing (INITIAL_BUFFER_SIZE );
369366
370367 @ Override
371368 public void request (int numMessages ) {
@@ -402,30 +399,24 @@ public void sendMessage(RES message) {
402399 try (InputStream inputStream = route .method ().streamResponse (message )) {
403400 // prepare buffer for writing
404401 BufferData bufferData ;
405- if (identityCompressor ) {
406- // avoid buffer copy if length is known
407- if (inputStream instanceof KnownLength knownLength ) {
408- int bytesLength = knownLength .available ();
409- bufferData = writeBufferData .reset ();
410- bufferData .write (0 ); // off for identity compressor
411- bufferData .writeUnsignedInt32 (bytesLength );
412- bufferData .readFrom (inputStream );
413- } else {
414- BufferData entity = writeBufferData .reset ();
415- entity .readFrom (inputStream );
416- BufferData prefix = headerBufferData .reset ();
417- prefix .write (0 ); // off for identity compressor
418- prefix .writeUnsignedInt32 (entity .capacity ());
419- bufferData = BufferData .create (prefix , entity );
420- }
402+ if (identityCompressor && inputStream instanceof KnownLength knownLength ) {
403+ int bytesLength = knownLength .available ();
404+ bufferData = allocateWriteBuffer (GRPC_HEADER_SIZE + bytesLength );
405+ bufferData .write (0 ); // 0 for identity compressor
406+ bufferData .writeUnsignedInt32 (bytesLength );
407+ bufferData .readFrom (inputStream );
421408 } else {
422409 ByteArrayOutputStream baos = new ByteArrayOutputStream ();
423- try (OutputStream os = compressor .compress (baos )) {
424- inputStream .transferTo (os );
410+ if (identityCompressor ) {
411+ inputStream .transferTo (baos );
412+ } else {
413+ try (OutputStream os = compressor .compress (baos )) {
414+ inputStream .transferTo (os );
415+ }
425416 }
426417 byte [] bytes = baos .toByteArray ();
427- bufferData = writeBufferData . reset ( );
428- bufferData .write (1 );
418+ bufferData = allocateWriteBuffer ( GRPC_HEADER_SIZE + bytes . length );
419+ bufferData .write (identityCompressor ? 0 : 1 );
429420 bufferData .writeUnsignedInt32 (bytes .length );
430421 bufferData .write (bytes );
431422 }
@@ -496,6 +487,15 @@ public boolean isCancelled() {
496487 public MethodDescriptor <REQ , RES > getMethodDescriptor () {
497488 return route .method ();
498489 }
490+
491+ private BufferData allocateWriteBuffer (int length ) {
492+ writeBufferData .reset ();
493+ int capacity = writeBufferData .capacity ();
494+ if (length > capacity ) {
495+ writeBufferData = BufferData .create (length );
496+ }
497+ return writeBufferData ;
498+ }
499499 };
500500 }
501501
0 commit comments