Skip to content

Commit

Permalink
feat: add debug spans for decoding requests
Browse files Browse the repository at this point in the history
  • Loading branch information
neoeinstein committed Jun 26, 2024
1 parent c783652 commit ab00ecb
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 15 deletions.
7 changes: 3 additions & 4 deletions tonic/src/codec/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,8 @@ impl CompressionEncoding {
}

#[allow(missing_docs)]
#[cfg(any(feature = "gzip", feature = "zstd"))]
pub(crate) fn as_str(&self) -> &'static str {
match self {
pub(crate) const fn as_str(&self) -> &'static str {
match *self {
#[cfg(feature = "gzip")]
CompressionEncoding::Gzip => "gzip",
#[cfg(feature = "zstd")]
Expand All @@ -175,7 +174,7 @@ impl CompressionEncoding {
http::HeaderValue::from_static(self.as_str())
}

pub(crate) fn encodings() -> &'static [Self] {
pub(crate) const fn encodings() -> &'static [Self] {
&[
#[cfg(feature = "gzip")]
CompressionEncoding::Gzip,
Expand Down
69 changes: 58 additions & 11 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,52 @@ struct StreamingInner {

impl<T> Unpin for Streaming<T> {}

#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone)]
enum State {
ReadHeader,
ReadHeader {
span: tracing::Span,
},
ReadBody {
span: tracing::Span,
compression: Option<CompressionEncoding>,
len: usize,
},
Error,
}

impl State {
fn read_header() -> Self {
let span = tracing::debug_span!(
"read_header",
compression = tracing::field::Empty,
body.bytes = tracing::field::Empty,
);
Self::ReadHeader { span }
}

fn read_body(compression: Option<CompressionEncoding>, len: usize) -> Self {
let span = tracing::debug_span!(
"read_body",
compression = compression.map(|c| c.as_str()),
compressed.bytes = len,
uncompressed.bytes = compression.is_none().then_some(len),
);
Self::ReadBody {
span,
compression,
len,
}
}

fn span(&self) -> Option<&tracing::Span> {
match self {
Self::ReadHeader { span } => Some(span),
Self::ReadBody { span, .. } => Some(span),
Self::Error => None,
}
}
}

#[derive(Debug, PartialEq, Eq)]
enum Direction {
Request,
Expand Down Expand Up @@ -124,7 +160,7 @@ impl<T> Streaming<T> {
.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()))
.map_err(|err| Status::map_error(err.into()))
.boxed_unsync(),
state: State::ReadHeader,
state: State::read_header(),
direction,
buf: BytesMut::with_capacity(buffer_size),
trailers: None,
Expand All @@ -141,7 +177,8 @@ impl StreamingInner {
&mut self,
buffer_settings: BufferSettings,
) -> Result<Option<DecodeBuf<'_>>, Status> {
if let State::ReadHeader = self.state {
if let State::ReadHeader { span } = &self.state {
let _guard = span.enter();
if self.buf.remaining() < HEADER_SIZE {
return Ok(None);
}
Expand All @@ -150,7 +187,8 @@ impl StreamingInner {
0 => None,
1 => {
{
if self.encoding.is_some() {
if let Some(ce) = self.encoding {
span.record("compression", ce.as_str());
self.encoding
} else {
// https://grpc.github.io/grpc/core/md_doc_compression.html
Expand All @@ -176,6 +214,7 @@ impl StreamingInner {
};

let len = self.buf.get_u32() as usize;
span.record("body.bytes", len);
let limit = self
.max_message_size
.unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE);
Expand All @@ -190,14 +229,19 @@ impl StreamingInner {
}

self.buf.reserve(len);
drop(_guard);

self.state = State::ReadBody {
compression: compression_encoding,
len,
}
self.state = State::read_body(compression_encoding, len)
}

if let State::ReadBody { len, compression } = self.state {
if let State::ReadBody {
len,
span,
compression,
} = &self.state
{
let (len, compression) = (*len, *compression);
let _guard = span.enter();
// if we haven't read enough of the message then return and keep
// reading
if self.buf.remaining() < len || self.buf.len() < len {
Expand Down Expand Up @@ -227,6 +271,7 @@ impl StreamingInner {
return Err(Status::new(Code::Internal, message));
}
let decompressed_len = self.decompress_buf.len();
span.record("uncompressed.bytes", decompressed_len);
DecodeBuf::new(&mut self.decompress_buf, decompressed_len)
} else {
DecodeBuf::new(&mut self.buf, len)
Expand All @@ -240,13 +285,15 @@ impl StreamingInner {

// Returns Some(()) if data was found or None if the loop in `poll_next` should break
fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, Status>> {
let _guard = self.state.span().map(|s| s.enter());
let chunk = match ready!(Pin::new(&mut self.body).poll_data(cx)) {
Some(Ok(d)) => Some(d),
Some(Err(status)) => {
if self.direction == Direction::Request && status.code() == Code::Cancelled {
return Poll::Ready(Ok(None));
}

drop(_guard);
let _ = std::mem::replace(&mut self.state, State::Error);
debug!("decoder inner stream error: {:?}", status);
return Poll::Ready(Err(status));
Expand Down Expand Up @@ -376,7 +423,7 @@ impl<T> Streaming<T> {
match self.inner.decode_chunk(self.decoder.buffer_settings())? {
Some(mut decode_buf) => match self.decoder.decode(&mut decode_buf)? {
Some(msg) => {
self.inner.state = State::ReadHeader;
self.inner.state = State::read_header();
Ok(Some(msg))
}
None => Ok(None),
Expand Down

0 comments on commit ab00ecb

Please sign in to comment.