Skip to content

Commit

Permalink
chore(interop): Use http-body-util WithTrailers
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Jan 11, 2025
1 parent 66aaa5b commit 55460ab
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 37 deletions.
1 change: 1 addition & 0 deletions interop/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pico-args = {version = "0.5", features = ["eq-separator"]}
console = "0.15"
http = "1"
http-body = "1"
http-body-util = "0.1"
hyper = "1"
prost = "0.13"
tokio = {version = "1.0", features = ["rt-multi-thread", "time", "macros"]}
Expand Down
44 changes: 7 additions & 37 deletions interop/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::pb::{self, *};
use async_stream::try_stream;
use http::header::{HeaderName, HeaderValue};
use http_body::Body as HttpBody;
use http::header::{HeaderMap, HeaderName};
use http_body_util::BodyExt;
use std::future::Future;
use std::pin::Pin;
use std::result::Result as StdResult;
use std::task::{ready, Context, Poll};
use std::task::{Context, Poll};
use std::time::Duration;
use tokio_stream::StreamExt;
use tonic::{body::Body, server::NamedService, Code, Request, Response, Status};
Expand Down Expand Up @@ -196,11 +196,12 @@ where
fn call(&mut self, req: http::Request<Body>) -> Self::Future {
let echo_header = req.headers().get("x-grpc-test-echo-initial").cloned();

let trailer_name = HeaderName::from_static("x-grpc-test-echo-trailing-bin");
let echo_trailer = req
.headers()
.get("x-grpc-test-echo-trailing-bin")
.get(&trailer_name)
.cloned()
.map(|v| (HeaderName::from_static("x-grpc-test-echo-trailing-bin"), v));
.map(|v| HeaderMap::from_iter(std::iter::once((trailer_name, v))));

let call = self.inner.call(req);

Expand All @@ -211,42 +212,11 @@ where
res.headers_mut()
.insert("x-grpc-test-echo-initial", echo_header);
Ok(res
.map(|b| MergeTrailers::new(b, echo_trailer))
.map(|b| b.with_trailers(async move { echo_trailer.map(Ok) }))
.map(Body::new))
} else {
Ok(res)
}
})
}
}

pub struct MergeTrailers<B> {
inner: B,
trailer: Option<(HeaderName, HeaderValue)>,
}

impl<B> MergeTrailers<B> {
pub fn new(inner: B, trailer: Option<(HeaderName, HeaderValue)>) -> Self {
Self { inner, trailer }
}
}

impl<B: HttpBody + Unpin> HttpBody for MergeTrailers<B> {
type Data = B::Data;
type Error = B::Error;

fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<StdResult<http_body::Frame<Self::Data>, Self::Error>>> {
let this = self.get_mut();
let mut frame = ready!(Pin::new(&mut this.inner).poll_frame(cx)?);
if let (Some(trailers), Some((key, value))) = (
frame.as_mut().and_then(|frame| frame.trailers_mut()),
&this.trailer,
) {
trailers.insert(key.clone(), value.clone());
}
Poll::Ready(frame.map(Ok))
}
}

0 comments on commit 55460ab

Please sign in to comment.