Skip to content

Commit

Permalink
feat: add debug spans for decoding requests
Browse files Browse the repository at this point in the history
  • Loading branch information
neoeinstein committed Jun 26, 2024
1 parent d312dcc commit b894439
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 17 deletions.
7 changes: 3 additions & 4 deletions tonic/src/codec/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ impl CompressionEncoding {
}

#[allow(missing_docs)]
#[cfg(any(feature = "gzip", feature = "zstd"))]
pub(crate) fn as_str(&self) -> &'static str {
pub(crate) const fn as_str(&self) -> &'static str {
match self {
#[cfg(feature = "gzip")]
CompressionEncoding::Gzip => "gzip",
Expand All @@ -169,11 +168,11 @@ impl CompressionEncoding {
}

#[cfg(any(feature = "gzip", feature = "zstd"))]
pub(crate) fn into_header_value(self) -> http::HeaderValue {
pub(crate) const fn into_header_value(self) -> http::HeaderValue {
http::HeaderValue::from_static(self.as_str())
}

pub(crate) fn encodings() -> &'static [Self] {
pub(crate) const fn encodings() -> &'static [Self] {
&[
#[cfg(feature = "gzip")]
CompressionEncoding::Gzip,
Expand Down
75 changes: 62 additions & 13 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,43 @@ impl<T> Unpin for Streaming<T> {}

#[derive(Debug, Clone)]
enum State {
ReadHeader,
ReadHeader {
span: Option<tracing::Span>,
},
ReadBody {
span: tracing::Span,
compression: Option<CompressionEncoding>,
len: usize,
},
Error(Status),
Error(Box<Status>),
}

impl State {
fn read_header() -> Self {
Self::ReadHeader { span: None }
}

fn read_body(compression: Option<CompressionEncoding>, len: usize) -> Self {
let span = tracing::debug_span!(
"read_body",
compression = compression.map(|c| c.as_str()),
compressed.bytes = compression.is_some().then_some(len),
uncompressed.bytes = compression.is_none().then_some(len),
);
Self::ReadBody {
span,
compression,
len,
}
}

fn span(&self) -> Option<&tracing::Span> {
match self {
Self::ReadHeader { span } => span.as_ref(),
Self::ReadBody { span, .. } => Some(span),
Self::Error(_) => None,
}
}
}

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -125,7 +156,7 @@ impl<T> Streaming<T> {
.map_frame(|frame| frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining())))
.map_err(|err| Status::map_error(err.into()))
.boxed_unsync(),
state: State::ReadHeader,
state: State::read_header(),
direction,
buf: BytesMut::with_capacity(buffer_size),
trailers: None,
Expand All @@ -142,7 +173,15 @@ impl StreamingInner {
&mut self,
buffer_settings: BufferSettings,
) -> Result<Option<DecodeBuf<'_>>, Status> {
if let State::ReadHeader = self.state {
if let State::ReadHeader { span } = &mut self.state {
let span = span.get_or_insert_with(|| {
tracing::debug_span!(
"read_header",
compression = tracing::field::Empty,
body.bytes = tracing::field::Empty,
)
});
let _guard = span.enter();
if self.buf.remaining() < HEADER_SIZE {
return Ok(None);
}
Expand All @@ -151,7 +190,8 @@ impl StreamingInner {
0 => None,
1 => {
{
if self.encoding.is_some() {
if let Some(ce) = self.encoding {
span.record("compression", ce.as_str());
self.encoding
} else {
// https://grpc.github.io/grpc/core/md_doc_compression.html
Expand All @@ -177,6 +217,7 @@ impl StreamingInner {
};

let len = self.buf.get_u32() as usize;
span.record("body.bytes", len);
let limit = self
.max_message_size
.unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE);
Expand All @@ -191,14 +232,19 @@ impl StreamingInner {
}

self.buf.reserve(len);
drop(_guard);

self.state = State::ReadBody {
compression: compression_encoding,
len,
}
self.state = State::read_body(compression_encoding, len)
}

if let State::ReadBody { len, compression } = self.state {
if let State::ReadBody {
len,
span,
compression,
} = &self.state
{
let (len, compression) = (*len, *compression);
let _guard = span.enter();
// if we haven't read enough of the message then return and keep
// reading
if self.buf.remaining() < len || self.buf.len() < len {
Expand Down Expand Up @@ -228,6 +274,7 @@ impl StreamingInner {
return Err(Status::new(Code::Internal, message));
}
let decompressed_len = self.decompress_buf.len();
span.record("uncompressed.bytes", decompressed_len);
DecodeBuf::new(&mut self.decompress_buf, decompressed_len)
} else {
DecodeBuf::new(&mut self.buf, len)
Expand All @@ -241,14 +288,16 @@ impl StreamingInner {

// Returns Some(()) if data was found or None if the loop in `poll_next` should break
fn poll_frame(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, Status>> {
let _guard = self.state.span().map(|s| s.enter());
let chunk = match ready!(Pin::new(&mut self.body).poll_frame(cx)) {
Some(Ok(d)) => Some(d),
Some(Err(status)) => {
if self.direction == Direction::Request && status.code() == Code::Cancelled {
return Poll::Ready(Ok(None));
}

let _ = std::mem::replace(&mut self.state, State::Error(status.clone()));
drop(_guard);
let _ = std::mem::replace(&mut self.state, State::Error(Box::new(status.clone())));
debug!("decoder inner stream error: {:?}", status);
return Poll::Ready(Err(status));
}
Expand Down Expand Up @@ -378,7 +427,7 @@ impl<T> Streaming<T> {
match self.inner.decode_chunk(self.decoder.buffer_settings())? {
Some(mut decode_buf) => match self.decoder.decode(&mut decode_buf)? {
Some(msg) => {
self.inner.state = State::ReadHeader;
self.inner.state = State::read_header();
Ok(Some(msg))
}
None => Ok(None),
Expand All @@ -394,7 +443,7 @@ impl<T> Stream for Streaming<T> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let State::Error(status) = &self.inner.state {
return Poll::Ready(Some(Err(status.clone())));
return Poll::Ready(Some(Err(*status.clone())));
}

if let Some(item) = self.decode_chunk()? {
Expand Down

0 comments on commit b894439

Please sign in to comment.