Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add debug spans for decoding requests #1760

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 66 additions & 13 deletions tonic/src/codec/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,43 @@ impl<T> Unpin for Streaming<T> {}

#[derive(Debug, Clone)]
enum State {
ReadHeader,
ReadHeader {
span: Option<tracing::Span>,
},
ReadBody {
span: tracing::Span,
compression: Option<CompressionEncoding>,
len: usize,
},
Error(Status),
Error(Box<Status>),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boxing here significantly reduces the size of State and StreamingInner as a result, even with the addition of tracing::Span to the other variants.

}

impl State {
fn read_header() -> Self {
Self::ReadHeader { span: None }
}

fn read_body(compression: Option<CompressionEncoding>, len: usize) -> Self {
let span = tracing::debug_span!(
"read_body",
body.compression = compression.map(|c| c.as_str()).unwrap_or("none"),
body.bytes.compressed = compression.is_some().then_some(len),
body.bytes.uncompressed = compression.is_none().then_some(len),
);
Self::ReadBody {
span,
compression,
len,
}
}

fn span(&self) -> Option<&tracing::Span> {
match self {
Self::ReadHeader { span } => span.as_ref(),
Self::ReadBody { span, .. } => Some(span),
Self::Error(_) => None,
}
}
}

#[derive(Debug, PartialEq, Eq)]
Expand Down Expand Up @@ -125,7 +156,7 @@ impl<T> Streaming<T> {
.map_frame(|frame| frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining())))
.map_err(|err| Status::map_error(err.into()))
.boxed_unsync(),
state: State::ReadHeader,
state: State::read_header(),
direction,
buf: BytesMut::with_capacity(buffer_size),
trailers: None,
Expand All @@ -142,7 +173,19 @@ impl StreamingInner {
&mut self,
buffer_settings: BufferSettings,
) -> Result<Option<DecodeBuf<'_>>, Status> {
if let State::ReadHeader = self.state {
if let State::ReadHeader { span } = &mut self.state {
if !self.buf.has_remaining() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this guard here? Can we just do the self.buf.remaining() < HEADER_SIZE check here instead?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had considered that. The reason for the guard at all is that, even with unary requests, Tonic still polls the stream after reading the first message. That was generating read_header, read_body, read_header for these unary requests, with the last read_header really being vestigial, since the stream would then be dropped after the message. I opted to try to remove this vestigial span, but we could simplify by re-introducing it.

With that in mind, if there are still bytes in the stream, then that would indicate that there's a header already partially in the input, so opening a new span is indicated. If not, then we'd delay opening a new span (but that also means that a new span wouldn't capture the time from the empty poll until the next header started appearing in the buffer).

Changing it so that the span is only opened once 5 bytes are in the buffer would mean that the span would only open once we've already completely read the header, making this really just timing the extraction of the u8 compression encoding and u32 payload size, which probably then isn't all that valuable.

Another option here is to have a debug span one level higher for the entire stream itself, skip the span for the header, and keep a span for the body.

return Ok(None);
}

let span = span.get_or_insert_with(|| {
tracing::debug_span!(
"read_header",
body.compression = "none",
body.bytes = tracing::field::Empty,
)
});
let _guard = span.enter();
if self.buf.remaining() < HEADER_SIZE {
return Ok(None);
}
Expand All @@ -151,7 +194,8 @@ impl StreamingInner {
0 => None,
1 => {
{
if self.encoding.is_some() {
if let Some(ce) = self.encoding {
span.record("body.compression", ce.as_str());
self.encoding
} else {
// https://grpc.github.io/grpc/core/md_doc_compression.html
Expand All @@ -177,6 +221,7 @@ impl StreamingInner {
};

let len = self.buf.get_u32() as usize;
span.record("body.bytes", len);
let limit = self
.max_message_size
.unwrap_or(DEFAULT_MAX_RECV_MESSAGE_SIZE);
Expand All @@ -191,14 +236,19 @@ impl StreamingInner {
}

self.buf.reserve(len);
drop(_guard);

self.state = State::ReadBody {
compression: compression_encoding,
len,
}
self.state = State::read_body(compression_encoding, len)
}

if let State::ReadBody { len, compression } = self.state {
if let State::ReadBody {
len,
span,
compression,
} = &self.state
{
let (len, compression) = (*len, *compression);
let _guard = span.enter();
// if we haven't read enough of the message then return and keep
// reading
if self.buf.remaining() < len || self.buf.len() < len {
Expand Down Expand Up @@ -228,6 +278,7 @@ impl StreamingInner {
return Err(Status::new(Code::Internal, message));
}
let decompressed_len = self.decompress_buf.len();
span.record("body.bytes.uncompressed", decompressed_len);
DecodeBuf::new(&mut self.decompress_buf, decompressed_len)
} else {
DecodeBuf::new(&mut self.buf, len)
Expand All @@ -241,14 +292,16 @@ impl StreamingInner {

// Returns Some(()) if data was found or None if the loop in `poll_next` should break
fn poll_frame(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<()>, Status>> {
let _guard = self.state.span().map(|s| s.enter());
let chunk = match ready!(Pin::new(&mut self.body).poll_frame(cx)) {
Some(Ok(d)) => Some(d),
Some(Err(status)) => {
if self.direction == Direction::Request && status.code() == Code::Cancelled {
return Poll::Ready(Ok(None));
}

let _ = std::mem::replace(&mut self.state, State::Error(status.clone()));
drop(_guard);
let _ = std::mem::replace(&mut self.state, State::Error(Box::new(status.clone())));
debug!("decoder inner stream error: {:?}", status);
return Poll::Ready(Err(status));
}
Expand Down Expand Up @@ -378,7 +431,7 @@ impl<T> Streaming<T> {
match self.inner.decode_chunk(self.decoder.buffer_settings())? {
Some(mut decode_buf) => match self.decoder.decode(&mut decode_buf)? {
Some(msg) => {
self.inner.state = State::ReadHeader;
self.inner.state = State::read_header();
Ok(Some(msg))
}
None => Ok(None),
Expand All @@ -394,7 +447,7 @@ impl<T> Stream for Streaming<T> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if let State::Error(status) = &self.inner.state {
return Poll::Ready(Some(Err(status.clone())));
return Poll::Ready(Some(Err(*status.clone())));
}

if let Some(item) = self.decode_chunk()? {
Expand Down