From 5c7a69ec1cc0e3ef3fc8a513a1e83f9c8af49880 Mon Sep 17 00:00:00 2001 From: tottoto Date: Sat, 11 Jan 2025 16:29:30 +0900 Subject: [PATCH] chore(interop): Use http-body-util WithTrailers (#2134) --- interop/Cargo.toml | 1 + interop/src/server.rs | 44 +++++++------------------------------------ 2 files changed, 8 insertions(+), 37 deletions(-) diff --git a/interop/Cargo.toml b/interop/Cargo.toml index 0ae949f76..e460f2f55 100644 --- a/interop/Cargo.toml +++ b/interop/Cargo.toml @@ -19,6 +19,7 @@ pico-args = {version = "0.5", features = ["eq-separator"]} console = "0.15" http = "1" http-body = "1" +http-body-util = "0.1" hyper = "1" prost = "0.13" tokio = {version = "1.0", features = ["rt-multi-thread", "time", "macros"]} diff --git a/interop/src/server.rs b/interop/src/server.rs index c5cdda052..f64297041 100644 --- a/interop/src/server.rs +++ b/interop/src/server.rs @@ -1,11 +1,11 @@ use crate::pb::{self, *}; use async_stream::try_stream; -use http::header::{HeaderName, HeaderValue}; -use http_body::Body as HttpBody; +use http::header::{HeaderMap, HeaderName}; +use http_body_util::BodyExt; use std::future::Future; use std::pin::Pin; use std::result::Result as StdResult; -use std::task::{ready, Context, Poll}; +use std::task::{Context, Poll}; use std::time::Duration; use tokio_stream::StreamExt; use tonic::{body::Body, server::NamedService, Code, Request, Response, Status}; @@ -196,11 +196,12 @@ where fn call(&mut self, req: http::Request) -> Self::Future { let echo_header = req.headers().get("x-grpc-test-echo-initial").cloned(); + let trailer_name = HeaderName::from_static("x-grpc-test-echo-trailing-bin"); let echo_trailer = req .headers() - .get("x-grpc-test-echo-trailing-bin") + .get(&trailer_name) .cloned() - .map(|v| (HeaderName::from_static("x-grpc-test-echo-trailing-bin"), v)); + .map(|v| HeaderMap::from_iter(std::iter::once((trailer_name, v)))); let call = self.inner.call(req); @@ -211,7 +212,7 @@ where res.headers_mut() .insert("x-grpc-test-echo-initial", echo_header); Ok(res - .map(|b| MergeTrailers::new(b, echo_trailer)) + .map(|b| b.with_trailers(async move { echo_trailer.map(Ok) })) .map(Body::new)) } else { Ok(res) @@ -219,34 +220,3 @@ where }) } } - -pub struct MergeTrailers { - inner: B, - trailer: Option<(HeaderName, HeaderValue)>, -} - -impl MergeTrailers { - pub fn new(inner: B, trailer: Option<(HeaderName, HeaderValue)>) -> Self { - Self { inner, trailer } - } -} - -impl HttpBody for MergeTrailers { - type Data = B::Data; - type Error = B::Error; - - fn poll_frame( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>>> { - let this = self.get_mut(); - let mut frame = ready!(Pin::new(&mut this.inner).poll_frame(cx)?); - if let (Some(trailers), Some((key, value))) = ( - frame.as_mut().and_then(|frame| frame.trailers_mut()), - &this.trailer, - ) { - trailers.insert(key.clone(), value.clone()); - } - Poll::Ready(frame.map(Ok)) - } -}