From b09403276b2cd9c586926335347d0fbd1da004f9 Mon Sep 17 00:00:00 2001 From: Mansur Azatbek Date: Wed, 20 Nov 2024 02:43:47 +0500 Subject: [PATCH 1/5] feat(fix): added auto_encoding and corrected default compression behavior The service was initially configured to send compressed data only in Gzip format. Different clients requested varying compression algorithms (e.g., Gzip and Zstd). Logs showed the following behavior: 1. Client 1 requested Gzip: ``` Server received request: data="gzip", headers=MetadataMap { headers: {"te": "trailers", "content-type": "application/grpc", "grpc-encoding": "gzip", "grpc-accept-encoding": "gzip,identity", "user-agent": "tonic/0.13.0"} } Server sending response: gzip Responding with gzip compression ``` 2. Client 2 requested Zstd: ``` Server received request: data="zstd", headers=MetadataMap { headers: {"te": "trailers", "content-type": "application/grpc", "grpc-encoding": "gzip", "grpc-accept-encoding": "zstd,identity", "user-agent": "tonic/0.13.0"} } ``` It is clear that clients requested different encoders using `grpc-accept-encoding`. **Expected behavior:** - The server should select a compression algorithm from the intersection of the supported algorithms from the server and the client. - If the client requests a compression algorithm not supported by the server, the server should either use `identity` (no compression) or return a `UNIMPLEMENTED` status. **Actual behavior:** If any `send_compressed` configuration was set for the service, the `from_accept_encoding_header` method was triggered regardless of the `grpc-accept-encoding` header provided by the client. The function responsible for compression selection on the server did not respect `send_compression_encodings`. The server defaulted to the first supported compression algorithm from `grpc-accept-encoding` without checking if the algorithm was allowed by `send_compression_encodings`. **Resolution:** - If `send_compressed` is not specified, the server defaults to `identity`. - If `send_compressed` is specified, the server only compresses responses for clients explicitly requesting a supported algorithm. **Feature:** The `auto_encoding` mechanism was added. This allows the server to automatically select a compression algorithm supported by the client without requiring explicit configuration of `send_compressed` on the server. The server dynamically adjusts compression behavior, ensuring compatibility with client preferences for efficient communication. --- Cargo.toml | 5 +- tests/various_compression_formats/Cargo.toml | 13 ++ tests/various_compression_formats/build.rs | 4 + .../proto/proto_box.proto | 15 ++ tests/various_compression_formats/src/lib.rs | 3 + .../tests/auto_encoding.rs | 197 ++++++++++++++++++ tonic-build/src/server.rs | 31 +++ tonic/src/codec/compression.rs | 20 +- tonic/src/server/grpc.rs | 35 ++++ 9 files changed, 317 insertions(+), 6 deletions(-) create mode 100644 tests/various_compression_formats/Cargo.toml create mode 100644 tests/various_compression_formats/build.rs create mode 100644 tests/various_compression_formats/proto/proto_box.proto create mode 100644 tests/various_compression_formats/src/lib.rs create mode 100644 tests/various_compression_formats/tests/auto_encoding.rs diff --git a/Cargo.toml b/Cargo.toml index ee6337877..03b647892 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,10 +5,10 @@ members = [ "tonic-health", "tonic-types", "tonic-reflection", - "tonic-web", # Non-published crates + "tonic-web", # Non-published crates "examples", "codegen", - "interop", # Tests + "interop", # Tests "tests/disable_comments", "tests/included_service", "tests/same_name", @@ -22,6 +22,7 @@ members = [ "tests/stream_conflict", "tests/root-crate-path", "tests/compression", + "tests/various_compression_formats", "tests/web", "tests/service_named_result", "tests/use_arc_self", diff --git a/tests/various_compression_formats/Cargo.toml b/tests/various_compression_formats/Cargo.toml new file mode 100644 index 000000000..4212dcf8f --- /dev/null +++ b/tests/various_compression_formats/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "various_compression_formats" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +prost = "0.13" +tonic = { path = "../../tonic", features = ["gzip","zstd"]} +tokio = { version = "1.36.2", features = ["macros", "rt-multi-thread"] } + +[build-dependencies] +tonic-build = { path = "../../tonic-build" } diff --git a/tests/various_compression_formats/build.rs b/tests/various_compression_formats/build.rs new file mode 100644 index 000000000..e2374410d --- /dev/null +++ b/tests/various_compression_formats/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/proto_box.proto")?; + Ok(()) +} diff --git a/tests/various_compression_formats/proto/proto_box.proto b/tests/various_compression_formats/proto/proto_box.proto new file mode 100644 index 000000000..0d09b88b3 --- /dev/null +++ b/tests/various_compression_formats/proto/proto_box.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package proto_box; + +service ProtoService { + rpc Rpc(Input) returns (Output); +} + +message Input { + string data = 1; +} + +message Output { + string data = 1; +} diff --git a/tests/various_compression_formats/src/lib.rs b/tests/various_compression_formats/src/lib.rs new file mode 100644 index 000000000..eca63f58e --- /dev/null +++ b/tests/various_compression_formats/src/lib.rs @@ -0,0 +1,3 @@ +pub mod proto_box { + tonic::include_proto!("proto_box"); +} diff --git a/tests/various_compression_formats/tests/auto_encoding.rs b/tests/various_compression_formats/tests/auto_encoding.rs new file mode 100644 index 000000000..e0af7e9e7 --- /dev/null +++ b/tests/various_compression_formats/tests/auto_encoding.rs @@ -0,0 +1,197 @@ +use std::error::Error; +use tokio::sync::oneshot; +use tonic::codegen::CompressionEncoding; +use tonic::transport::{Channel, Server}; +use tonic::{Request, Response, Status}; + +use various_compression_formats::proto_box::{ + proto_service_client::ProtoServiceClient, + proto_service_server::{ProtoService, ProtoServiceServer}, + Input, Output, +}; + +#[derive(Default)] +pub struct ServerTest; + +#[tonic::async_trait] +impl ProtoService for ServerTest { + async fn rpc(&self, request: Request) -> Result, Status> { + println!("Server received request: {:?}", request); + + let response = Output { + data: format!("Received: {}", request.into_inner().data), + }; + + Ok(Response::new(response)) + } +} + +struct ClientWrapper { + client: ProtoServiceClient, +} + +impl ClientWrapper { + async fn new( + address: &str, + accept: Option, + send: Option, + ) -> Result> { + let channel = Channel::from_shared(address.to_string())?.connect().await?; + let mut client = ProtoServiceClient::new(channel); + + if let Some(encoding) = accept { + client = client.accept_compressed(encoding); + } + + if let Some(encoding) = send { + client = client.send_compressed(encoding); + } + + Ok(Self { client }) + } + + async fn send_request( + &mut self, + data: String, + ) -> Result, Box> { + let request = Request::new(Input { data }); + + println!("Client sending request: {:?}", request); + + let response = self.client.rpc(request).await?; + + println!("Client received response: {:?}", response); + + // Output response headers + println!("Client response headers: {:?}", response.metadata()); + + Ok(response) + } +} + +#[tokio::test] +async fn test_compression_behavior() -> Result<(), Box> { + let port = "50051"; + let address = format!("http://[::1]:{}", port); + + // Создаем канал для остановки сервера + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + + // Запускаем сервер + let server_handle = tokio::spawn(async move { + let srv = ServerTest::default(); + println!("Starting server on port {}", port); + + Server::builder() + .add_service(ProtoServiceServer::new(srv)) + .serve_with_shutdown( + format!("[::1]:{}", port) + .parse() + .expect("Failed to parse address"), + async { + shutdown_rx.await.ok(); + }, + ) + .await + .expect("Server crashed"); + }); + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // Клиент 1: запрашивает gzip + let client1_address = address.clone(); + let client1 = async { + let mut client = + ClientWrapper::new(&client1_address, Some(CompressionEncoding::Gzip), None).await?; + let response = client.send_request("Client 1".to_string()).await?; + + // Проверяем, что заголовок grpc-encoding отсутствует + assert!( + !response.metadata().contains_key("grpc-encoding"), + "Expected no 'grpc-encoding' header" + ); + Ok::<(), Box>(()) + }; + + // Клиент 2: не запрашивает сжатие + let client2_address = address.clone(); + let client2 = async { + let mut client = ClientWrapper::new(&client2_address, None, None).await?; + let response = client.send_request("Client 2".to_string()).await?; + + // Проверяем, что заголовок grpc-encoding отсутствует + assert!( + !response.metadata().contains_key("grpc-encoding"), + "Expected no 'grpc-encoding' header" + ); + Ok::<(), Box>(()) + }; + + tokio::try_join!(client1, client2)?; + + // Останавливаем сервер + shutdown_tx.send(()).unwrap(); + server_handle.await?; + + // Запускаем второй сервер с send_compressed(CompressionEncoding::Zstd) + let (shutdown_tx2, shutdown_rx2) = oneshot::channel::<()>(); + + let server_handle2 = tokio::spawn(async move { + let srv = ServerTest::default(); + println!("Starting server on port {} with Zstd compression", port); + + Server::builder() + .add_service(ProtoServiceServer::new(srv).send_compressed(CompressionEncoding::Zstd)) + .serve_with_shutdown( + format!("[::1]:{}", port) + .parse() + .expect("Failed to parse address"), + async { + shutdown_rx2.await.ok(); + }, + ) + .await + .expect("Server crashed"); + }); + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // Клиент 3: запрашивает Zstd + let client3_address = address.clone(); + let client3 = async { + let mut client = + ClientWrapper::new(&client3_address, Some(CompressionEncoding::Zstd), None).await?; + let response = client.send_request("Client 3".to_string()).await?; + + // Проверяем, что заголовок grpc-encoding установлен + let grpc_encoding = response + .metadata() + .get("grpc-encoding") + .expect("Missing 'grpc-encoding' header"); + assert_eq!(grpc_encoding, "zstd"); + Ok::<(), Box>(()) + }; + + // Клиент 4: запрашивает Gzip, который не поддерживается сервером + let client4_address = address.clone(); + let client4 = async { + let mut client = + ClientWrapper::new(&client4_address, Some(CompressionEncoding::Gzip), None).await?; + let response = client.send_request("Client 4".to_string()).await?; + + // Поскольку сервер не поддерживает Gzip, заголовок grpc-encoding должен отсутствовать + assert!( + !response.metadata().contains_key("grpc-encoding"), + "Expected no 'grpc-encoding' header" + ); + Ok::<(), Box>(()) + }; + + tokio::try_join!(client3, client4)?; + + // Останавливаем сервер + shutdown_tx2.send(()).unwrap(); + server_handle2.await?; + + Ok(()) +} diff --git a/tonic-build/src/server.rs b/tonic-build/src/server.rs index e2d0aacd9..e1e6ca39f 100644 --- a/tonic-build/src/server.rs +++ b/tonic-build/src/server.rs @@ -70,6 +70,13 @@ pub(crate) fn generate_internal( self.send_compression_encodings.enable(encoding); self } + + /// Automatically determine the encoding to use based on the request headers. + #[must_use] + pub fn auto_encoding(mut self) -> Self { + self.auto_encoding = true; + self + } }; let configure_max_message_size_methods = quote! { @@ -117,6 +124,7 @@ pub(crate) fn generate_internal( send_compression_encodings: EnabledCompressionEncodings, max_decoding_message_size: Option, max_encoding_message_size: Option, + auto_encoding: bool, } impl #server_service { @@ -131,6 +139,7 @@ pub(crate) fn generate_internal( send_compression_encodings: Default::default(), max_decoding_message_size: None, max_encoding_message_size: None, + auto_encoding: false, } } @@ -184,6 +193,7 @@ pub(crate) fn generate_internal( send_compression_encodings: self.send_compression_encodings, max_decoding_message_size: self.max_decoding_message_size, max_encoding_message_size: self.max_encoding_message_size, + auto_encoding: self.auto_encoding, } } } @@ -473,6 +483,7 @@ fn generate_unary( let send_compression_encodings = self.send_compression_encodings; let max_decoding_message_size = self.max_decoding_message_size; let max_encoding_message_size = self.max_encoding_message_size; + let auto_encoding = self.auto_encoding; let inner = self.inner.clone(); let fut = async move { let method = #service_ident(inner); @@ -482,6 +493,10 @@ fn generate_unary( .apply_compression_config(accept_compression_encodings, send_compression_encodings) .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + if auto_encoding { + grpc = grpc.auto_encoding(); + } + let res = grpc.unary(method, req).await; Ok(res) }; @@ -540,6 +555,7 @@ fn generate_server_streaming( let send_compression_encodings = self.send_compression_encodings; let max_decoding_message_size = self.max_decoding_message_size; let max_encoding_message_size = self.max_encoding_message_size; + let auto_encoding = self.auto_encoding; let inner = self.inner.clone(); let fut = async move { let method = #service_ident(inner); @@ -549,6 +565,10 @@ fn generate_server_streaming( .apply_compression_config(accept_compression_encodings, send_compression_encodings) .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + if auto_encoding { + grpc = grpc.auto_encoding(); + } + let res = grpc.server_streaming(method, req).await; Ok(res) }; @@ -598,6 +618,7 @@ fn generate_client_streaming( let send_compression_encodings = self.send_compression_encodings; let max_decoding_message_size = self.max_decoding_message_size; let max_encoding_message_size = self.max_encoding_message_size; + let auto_encoding = self.auto_encoding; let inner = self.inner.clone(); let fut = async move { let method = #service_ident(inner); @@ -607,6 +628,10 @@ fn generate_client_streaming( .apply_compression_config(accept_compression_encodings, send_compression_encodings) .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + if auto_encoding { + grpc = grpc.auto_encoding(); + } + let res = grpc.client_streaming(method, req).await; Ok(res) }; @@ -666,7 +691,9 @@ fn generate_streaming( let send_compression_encodings = self.send_compression_encodings; let max_decoding_message_size = self.max_decoding_message_size; let max_encoding_message_size = self.max_encoding_message_size; + let auto_encoding = self.auto_encoding; let inner = self.inner.clone(); + let fut = async move { let method = #service_ident(inner); let codec = #codec_name::default(); @@ -675,6 +702,10 @@ fn generate_streaming( .apply_compression_config(accept_compression_encodings, send_compression_encodings) .apply_max_message_size_config(max_decoding_message_size, max_encoding_message_size); + if auto_encoding { + grpc = grpc.auto_encoding(); + } + let res = grpc.streaming(method, req).await; Ok(res) }; diff --git a/tonic/src/codec/compression.rs b/tonic/src/codec/compression.rs index d5d4eb9e7..285ce9c30 100644 --- a/tonic/src/codec/compression.rs +++ b/tonic/src/codec/compression.rs @@ -101,21 +101,33 @@ impl CompressionEncoding { pub(crate) fn from_accept_encoding_header( map: &http::HeaderMap, enabled_encodings: EnabledCompressionEncodings, + auto_encoding: bool, ) -> Option { - if enabled_encodings.is_empty() { + if enabled_encodings.is_empty() && !auto_encoding { return None; } let header_value = map.get(ACCEPT_ENCODING_HEADER)?; let header_value_str = header_value.to_str().ok()?; - split_by_comma(header_value_str).find_map(|value| match value { + let encoding = split_by_comma(header_value_str).find_map(|value| match value { #[cfg(feature = "gzip")] "gzip" => Some(CompressionEncoding::Gzip), #[cfg(feature = "zstd")] "zstd" => Some(CompressionEncoding::Zstd), - _ => None, - }) + _ => return None, + }); + + if auto_encoding { + return encoding; + } else { + if let Some(encoding) = encoding { + if enabled_encodings.is_enabled(encoding) { + return Some(encoding); + } + } + } + None } /// Get the value of `grpc-encoding` header. Returns an error if the encoding isn't supported. diff --git a/tonic/src/server/grpc.rs b/tonic/src/server/grpc.rs index aff4ce0df..e8defa137 100644 --- a/tonic/src/server/grpc.rs +++ b/tonic/src/server/grpc.rs @@ -41,6 +41,7 @@ pub struct Grpc { max_decoding_message_size: Option, /// Limits the maximum size of an encoded message. max_encoding_message_size: Option, + auto_encoding: bool, } impl Grpc @@ -55,6 +56,7 @@ where send_compression_encodings: EnabledCompressionEncodings::default(), max_decoding_message_size: None, max_encoding_message_size: None, + auto_encoding: false, } } @@ -90,6 +92,35 @@ where self } + /// Automatically determine the encoding to use based on the request headers. + /// + /// # Example + /// + /// The most common way of using this is through a server generated by tonic-build: + /// + /// ```rust + /// # enum CompressionEncoding { Gzip } + /// # struct Svc; + /// # struct ExampleServer(T); + /// # impl ExampleServer { + /// # fn new(svc: T) -> Self { Self(svc) } + /// # fn auto_encoding(self) -> Self { self } + /// # } + /// # #[tonic::async_trait] + /// # trait Example {} + /// + /// #[tonic::async_trait] + /// impl Example for Svc { + /// // ... + /// } + /// + /// let service = ExampleServer::new(Svc).auto_encoding(); + /// ``` + pub fn auto_encoding(mut self) -> Self { + self.auto_encoding = true; + self + } + /// Enable sending compressed responses. /// /// Requires the client to also support receiving compressed responses. @@ -233,6 +264,7 @@ where let accept_encoding = CompressionEncoding::from_accept_encoding_header( req.headers(), self.send_compression_encodings, + self.auto_encoding, ); let request = match self.map_request_unary(req).await { @@ -277,6 +309,7 @@ where let accept_encoding = CompressionEncoding::from_accept_encoding_header( req.headers(), self.send_compression_encodings, + self.auto_encoding, ); let request = match self.map_request_unary(req).await { @@ -317,6 +350,7 @@ where let accept_encoding = CompressionEncoding::from_accept_encoding_header( req.headers(), self.send_compression_encodings, + self.auto_encoding, ); let request = t!(self.map_request_streaming(req)); @@ -351,6 +385,7 @@ where let accept_encoding = CompressionEncoding::from_accept_encoding_header( req.headers(), self.send_compression_encodings, + self.auto_encoding, ); let request = t!(self.map_request_streaming(req)); From 3872fcf6bc7150f01e9f9f9ef717816197b5ac4c Mon Sep 17 00:00:00 2001 From: Mansur Azatbek Date: Wed, 20 Nov 2024 02:57:06 +0500 Subject: [PATCH 2/5] test: add test --- .../tests/auto_encoding.rs | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/tests/various_compression_formats/tests/auto_encoding.rs b/tests/various_compression_formats/tests/auto_encoding.rs index e0af7e9e7..c0d981f3a 100644 --- a/tests/various_compression_formats/tests/auto_encoding.rs +++ b/tests/various_compression_formats/tests/auto_encoding.rs @@ -195,3 +195,78 @@ async fn test_compression_behavior() -> Result<(), Box> Ok(()) } + +#[tokio::test] +async fn test_auto_encoding_behavior() -> Result<(), Box> { + let port = "50052"; // Используем другой порт, чтобы избежать конфликтов + let address = format!("http://[::1]:{}", port); + + // Создаем канал для остановки сервера + let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + + // Запускаем сервер с auto_encoding + let server_handle = tokio::spawn(async move { + let srv = ServerTest::default(); + println!("Starting server on port {} with auto_encoding", port); + + Server::builder() + .add_service( + ProtoServiceServer::new(srv) + .accept_compressed(CompressionEncoding::Gzip) + .accept_compressed(CompressionEncoding::Zstd) + .auto_encoding(), + ) + .serve_with_shutdown( + format!("[::1]:{}", port) + .parse() + .expect("Failed to parse address"), + async { + shutdown_rx.await.ok(); + }, + ) + .await + .expect("Server crashed"); + }); + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // Клиент 5: запрашивает Gzip + let client5_address = address.clone(); + let client5 = async { + let mut client = + ClientWrapper::new(&client5_address, Some(CompressionEncoding::Gzip), None).await?; + let response = client.send_request("Client 5".to_string()).await?; + + // Проверяем, что заголовок grpc-encoding установлен на gzip + let grpc_encoding = response + .metadata() + .get("grpc-encoding") + .expect("Missing 'grpc-encoding' header"); + assert_eq!(grpc_encoding, "gzip"); + Ok::<(), Box>(()) + }; + + // Клиент 6: запрашивает Zstd + let client6_address = address.clone(); + let client6 = async { + let mut client = + ClientWrapper::new(&client6_address, Some(CompressionEncoding::Zstd), None).await?; + let response = client.send_request("Client 6".to_string()).await?; + + // Проверяем, что заголовок grpc-encoding установлен на zstd + let grpc_encoding = response + .metadata() + .get("grpc-encoding") + .expect("Missing 'grpc-encoding' header"); + assert_eq!(grpc_encoding, "zstd"); + Ok::<(), Box>(()) + }; + + tokio::try_join!(client5, client6)?; + + // Останавливаем сервер + shutdown_tx.send(()).unwrap(); + server_handle.await?; + + Ok(()) +} From df50a7966e9134ac019ada191b3f11481d043fc4 Mon Sep 17 00:00:00 2001 From: Mansur Azatbek Date: Wed, 20 Nov 2024 03:00:40 +0500 Subject: [PATCH 3/5] refactor: remove warnings --- tonic/src/codec/compression.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tonic/src/codec/compression.rs b/tonic/src/codec/compression.rs index 285ce9c30..e0af0cda6 100644 --- a/tonic/src/codec/compression.rs +++ b/tonic/src/codec/compression.rs @@ -115,18 +115,17 @@ impl CompressionEncoding { "gzip" => Some(CompressionEncoding::Gzip), #[cfg(feature = "zstd")] "zstd" => Some(CompressionEncoding::Zstd), - _ => return None, + _ => None, }); if auto_encoding { return encoding; - } else { - if let Some(encoding) = encoding { - if enabled_encodings.is_enabled(encoding) { - return Some(encoding); - } + } else if let Some(encoding) = encoding { + if enabled_encodings.is_enabled(encoding) { + return Some(encoding); } } + None } From e1a86e1d2354aad782ddaa111420f51ad7b35a87 Mon Sep 17 00:00:00 2001 From: Mansur Azatbek Date: Wed, 20 Nov 2024 16:42:01 +0500 Subject: [PATCH 4/5] chore(docs): update comments --- .../tests/auto_encoding.rs | 37 ++++++++----------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/tests/various_compression_formats/tests/auto_encoding.rs b/tests/various_compression_formats/tests/auto_encoding.rs index c0d981f3a..1e8a79745 100644 --- a/tests/various_compression_formats/tests/auto_encoding.rs +++ b/tests/various_compression_formats/tests/auto_encoding.rs @@ -62,7 +62,6 @@ impl ClientWrapper { println!("Client received response: {:?}", response); - // Output response headers println!("Client response headers: {:?}", response.metadata()); Ok(response) @@ -74,10 +73,8 @@ async fn test_compression_behavior() -> Result<(), Box> let port = "50051"; let address = format!("http://[::1]:{}", port); - // Создаем канал для остановки сервера let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); - // Запускаем сервер let server_handle = tokio::spawn(async move { let srv = ServerTest::default(); println!("Starting server on port {}", port); @@ -98,14 +95,14 @@ async fn test_compression_behavior() -> Result<(), Box> tokio::time::sleep(std::time::Duration::from_secs(1)).await; - // Клиент 1: запрашивает gzip + // Client 1: Requests Gzip let client1_address = address.clone(); let client1 = async { let mut client = ClientWrapper::new(&client1_address, Some(CompressionEncoding::Gzip), None).await?; let response = client.send_request("Client 1".to_string()).await?; - // Проверяем, что заголовок grpc-encoding отсутствует + // Checking that the rpc-encoding header is missing assert!( !response.metadata().contains_key("grpc-encoding"), "Expected no 'grpc-encoding' header" @@ -113,13 +110,13 @@ async fn test_compression_behavior() -> Result<(), Box> Ok::<(), Box>(()) }; - // Клиент 2: не запрашивает сжатие + // Client 2: does not request compression let client2_address = address.clone(); let client2 = async { let mut client = ClientWrapper::new(&client2_address, None, None).await?; let response = client.send_request("Client 2".to_string()).await?; - // Проверяем, что заголовок grpc-encoding отсутствует + // Checking that the rpc-encoding header is missing assert!( !response.metadata().contains_key("grpc-encoding"), "Expected no 'grpc-encoding' header" @@ -129,11 +126,10 @@ async fn test_compression_behavior() -> Result<(), Box> tokio::try_join!(client1, client2)?; - // Останавливаем сервер shutdown_tx.send(()).unwrap(); server_handle.await?; - // Запускаем второй сервер с send_compressed(CompressionEncoding::Zstd) + // Starting the second server with send_compressed(CompressionEncoding::Zstd) let (shutdown_tx2, shutdown_rx2) = oneshot::channel::<()>(); let server_handle2 = tokio::spawn(async move { @@ -156,14 +152,14 @@ async fn test_compression_behavior() -> Result<(), Box> tokio::time::sleep(std::time::Duration::from_secs(1)).await; - // Клиент 3: запрашивает Zstd + // Client 3: Requests Zstd let client3_address = address.clone(); let client3 = async { let mut client = ClientWrapper::new(&client3_address, Some(CompressionEncoding::Zstd), None).await?; let response = client.send_request("Client 3".to_string()).await?; - // Проверяем, что заголовок grpc-encoding установлен + // Check that the rpc-encoding header is set let grpc_encoding = response .metadata() .get("grpc-encoding") @@ -172,14 +168,14 @@ async fn test_compression_behavior() -> Result<(), Box> Ok::<(), Box>(()) }; - // Клиент 4: запрашивает Gzip, который не поддерживается сервером + // Client 4: Requests Gzip, which is not supported by the server let client4_address = address.clone(); let client4 = async { let mut client = ClientWrapper::new(&client4_address, Some(CompressionEncoding::Gzip), None).await?; let response = client.send_request("Client 4".to_string()).await?; - // Поскольку сервер не поддерживает Gzip, заголовок grpc-encoding должен отсутствовать + // Since the server does not support Gzip, the grpc-encoding header should be omitted assert!( !response.metadata().contains_key("grpc-encoding"), "Expected no 'grpc-encoding' header" @@ -189,7 +185,6 @@ async fn test_compression_behavior() -> Result<(), Box> tokio::try_join!(client3, client4)?; - // Останавливаем сервер shutdown_tx2.send(()).unwrap(); server_handle2.await?; @@ -198,13 +193,12 @@ async fn test_compression_behavior() -> Result<(), Box> #[tokio::test] async fn test_auto_encoding_behavior() -> Result<(), Box> { - let port = "50052"; // Используем другой порт, чтобы избежать конфликтов + let port = "50052"; let address = format!("http://[::1]:{}", port); - // Создаем канал для остановки сервера let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); - // Запускаем сервер с auto_encoding + // Starting the server with auto_encoding let server_handle = tokio::spawn(async move { let srv = ServerTest::default(); println!("Starting server on port {} with auto_encoding", port); @@ -230,14 +224,14 @@ async fn test_auto_encoding_behavior() -> Result<(), Box Result<(), Box>(()) }; - // Клиент 6: запрашивает Zstd + // Client 6: Requests Zstd let client6_address = address.clone(); let client6 = async { let mut client = ClientWrapper::new(&client6_address, Some(CompressionEncoding::Zstd), None).await?; let response = client.send_request("Client 6".to_string()).await?; - // Проверяем, что заголовок grpc-encoding установлен на zstd + // Check that the rpc-encoding header is set to zstd let grpc_encoding = response .metadata() .get("grpc-encoding") @@ -264,7 +258,6 @@ async fn test_auto_encoding_behavior() -> Result<(), Box Date: Sun, 24 Nov 2024 21:02:35 +0500 Subject: [PATCH 5/5] test: update tests to verify server compression handling --- .../tests/auto_encoding.rs | 301 +++++++----------- 1 file changed, 121 insertions(+), 180 deletions(-) diff --git a/tests/various_compression_formats/tests/auto_encoding.rs b/tests/various_compression_formats/tests/auto_encoding.rs index 1e8a79745..60865a25b 100644 --- a/tests/various_compression_formats/tests/auto_encoding.rs +++ b/tests/various_compression_formats/tests/auto_encoding.rs @@ -1,7 +1,10 @@ use std::error::Error; + +use tokio::net::TcpListener; use tokio::sync::oneshot; + use tonic::codegen::CompressionEncoding; -use tonic::transport::{Channel, Server}; +use tonic::transport::{server::TcpIncoming, Channel, Server}; use tonic::{Request, Response, Status}; use various_compression_formats::proto_box::{ @@ -10,6 +13,8 @@ use various_compression_formats::proto_box::{ Input, Output, }; +const LOCALHOST: &str = "127.0.0.1:0"; + #[derive(Default)] pub struct ServerTest; @@ -18,11 +23,9 @@ impl ProtoService for ServerTest { async fn rpc(&self, request: Request) -> Result, Status> { println!("Server received request: {:?}", request); - let response = Output { + Ok(Response::new(Output { data: format!("Received: {}", request.into_inner().data), - }; - - Ok(Response::new(response)) + })) } } @@ -34,7 +37,6 @@ impl ClientWrapper { async fn new( address: &str, accept: Option, - send: Option, ) -> Result> { let channel = Channel::from_shared(address.to_string())?.connect().await?; let mut client = ProtoServiceClient::new(channel); @@ -43,10 +45,6 @@ impl ClientWrapper { client = client.accept_compressed(encoding); } - if let Some(encoding) = send { - client = client.send_compressed(encoding); - } - Ok(Self { client }) } @@ -60,206 +58,149 @@ impl ClientWrapper { let response = self.client.rpc(request).await?; - println!("Client received response: {:?}", response); - println!("Client response headers: {:?}", response.metadata()); Ok(response) } } -#[tokio::test] -async fn test_compression_behavior() -> Result<(), Box> { - let port = "50051"; - let address = format!("http://[::1]:{}", port); - +async fn start_server( + listener: TcpListener, + send: Option, + auto: bool, +) -> oneshot::Sender<()> { let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); + let srv = ServerTest::default(); + let mut service = ProtoServiceServer::new(srv); - let server_handle = tokio::spawn(async move { - let srv = ServerTest::default(); - println!("Starting server on port {}", port); - - Server::builder() - .add_service(ProtoServiceServer::new(srv)) - .serve_with_shutdown( - format!("[::1]:{}", port) - .parse() - .expect("Failed to parse address"), - async { - shutdown_rx.await.ok(); - }, - ) - .await - .expect("Server crashed"); - }); + if let Some(encoding) = send { + service = service.send_compressed(encoding); + } - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + if auto { + service = service.auto_encoding(); + } - // Client 1: Requests Gzip - let client1_address = address.clone(); - let client1 = async { - let mut client = - ClientWrapper::new(&client1_address, Some(CompressionEncoding::Gzip), None).await?; - let response = client.send_request("Client 1".to_string()).await?; - - // Checking that the rpc-encoding header is missing - assert!( - !response.metadata().contains_key("grpc-encoding"), - "Expected no 'grpc-encoding' header" + let server = Server::builder() + .add_service(service) + .serve_with_incoming_shutdown( + TcpIncoming::from_listener(listener, true, None).unwrap(), + async { + shutdown_rx.await.ok(); + }, ); - Ok::<(), Box>(()) - }; - - // Client 2: does not request compression - let client2_address = address.clone(); - let client2 = async { - let mut client = ClientWrapper::new(&client2_address, None, None).await?; - let response = client.send_request("Client 2".to_string()).await?; - - // Checking that the rpc-encoding header is missing - assert!( - !response.metadata().contains_key("grpc-encoding"), - "Expected no 'grpc-encoding' header" - ); - Ok::<(), Box>(()) - }; - - tokio::try_join!(client1, client2)?; - shutdown_tx.send(()).unwrap(); - server_handle.await?; - - // Starting the second server with send_compressed(CompressionEncoding::Zstd) - let (shutdown_tx2, shutdown_rx2) = oneshot::channel::<()>(); - - let server_handle2 = tokio::spawn(async move { - let srv = ServerTest::default(); - println!("Starting server on port {} with Zstd compression", port); - - Server::builder() - .add_service(ProtoServiceServer::new(srv).send_compressed(CompressionEncoding::Zstd)) - .serve_with_shutdown( - format!("[::1]:{}", port) - .parse() - .expect("Failed to parse address"), - async { - shutdown_rx2.await.ok(); - }, - ) - .await - .expect("Server crashed"); + tokio::spawn(async move { + server.await.expect("Server crashed"); }); + shutdown_tx +} + +async fn run_client_test( + address: &str, + client_accept: Option, + expected_encoding: Option<&str>, + data: &str, +) -> Result<(), Box> { + let mut client = ClientWrapper::new(address, client_accept).await?; + let response = client.send_request(data.to_string()).await?; + + match expected_encoding { + Some(encoding) => { + let grpc_encoding = response + .metadata() + .get("grpc-encoding") + .expect("Missing 'grpc-encoding' header"); + assert_eq!(grpc_encoding, encoding); + } + None => { + assert!( + !response.metadata().contains_key("grpc-encoding"), + "Expected no 'grpc-encoding' header" + ); + } + } + + Ok(()) +} + +#[tokio::test] +async fn test_compression_behavior() -> Result<(), Box> { + let listener = TcpListener::bind(LOCALHOST).await?; + let address = format!("http://{}", listener.local_addr().unwrap()); + + // The server is not specified to send data with any compression + let shutdown_tx = start_server(listener, None, false).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; - // Client 3: Requests Zstd - let client3_address = address.clone(); - let client3 = async { - let mut client = - ClientWrapper::new(&client3_address, Some(CompressionEncoding::Zstd), None).await?; - let response = client.send_request("Client 3".to_string()).await?; - - // Check that the rpc-encoding header is set - let grpc_encoding = response - .metadata() - .get("grpc-encoding") - .expect("Missing 'grpc-encoding' header"); - assert_eq!(grpc_encoding, "zstd"); - Ok::<(), Box>(()) - }; - - // Client 4: Requests Gzip, which is not supported by the server - let client4_address = address.clone(); - let client4 = async { - let mut client = - ClientWrapper::new(&client4_address, Some(CompressionEncoding::Gzip), None).await?; - let response = client.send_request("Client 4".to_string()).await?; - - // Since the server does not support Gzip, the grpc-encoding header should be omitted - assert!( - !response.metadata().contains_key("grpc-encoding"), - "Expected no 'grpc-encoding' header" - ); - Ok::<(), Box>(()) - }; + tokio::try_join!( + // Client 1 can only accept gzip encoding or uncompressed, + // so all data must be returned uncompressed + run_client_test(&address, Some(CompressionEncoding::Gzip), None, "Client 1"), + // Client 2 can only accept non-compressed data, + // so all data must be returned uncompressed + run_client_test(&address, None, None, "Client 2") + )?; - tokio::try_join!(client3, client4)?; + shutdown_tx.send(()).unwrap(); + + let listener = TcpListener::bind(LOCALHOST).await?; + let address = format!("http://{}", listener.local_addr().unwrap()); + + // The server is specified to send data with zstd compression + let shutdown_tx = start_server(listener, Some(CompressionEncoding::Zstd), false).await; + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; - shutdown_tx2.send(()).unwrap(); - server_handle2.await?; + tokio::try_join!( + // Client 3 can only accept zstd encoding or uncompressed, + // so all data must be returned compressed with zstd + run_client_test( + &address, + Some(CompressionEncoding::Zstd), + Some("zstd"), + "Client 3" + ), + // Client 4 can only accept Gzip encoding or uncompressed, + // so all data must be returned uncompressed + run_client_test(&address, Some(CompressionEncoding::Gzip), None, "Client 4") + )?; + + shutdown_tx.send(()).unwrap(); Ok(()) } #[tokio::test] async fn test_auto_encoding_behavior() -> Result<(), Box> { - let port = "50052"; - let address = format!("http://[::1]:{}", port); + let listener = TcpListener::bind(LOCALHOST).await?; + let address = format!("http://{}", listener.local_addr().unwrap()); - let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); - - // Starting the server with auto_encoding - let server_handle = tokio::spawn(async move { - let srv = ServerTest::default(); - println!("Starting server on port {} with auto_encoding", port); - - Server::builder() - .add_service( - ProtoServiceServer::new(srv) - .accept_compressed(CompressionEncoding::Gzip) - .accept_compressed(CompressionEncoding::Zstd) - .auto_encoding(), - ) - .serve_with_shutdown( - format!("[::1]:{}", port) - .parse() - .expect("Failed to parse address"), - async { - shutdown_rx.await.ok(); - }, - ) - .await - .expect("Server crashed"); - }); + // The server returns in the compression format that the client prefers + let shutdown_tx = start_server(listener, Some(CompressionEncoding::Gzip), true).await; tokio::time::sleep(std::time::Duration::from_secs(1)).await; - // Client 5: Requests Gzip - let client5_address = address.clone(); - let client5 = async { - let mut client = - ClientWrapper::new(&client5_address, Some(CompressionEncoding::Gzip), None).await?; - let response = client.send_request("Client 5".to_string()).await?; - - // Check that the grpc-encoding header is set to gzip - let grpc_encoding = response - .metadata() - .get("grpc-encoding") - .expect("Missing 'grpc-encoding' header"); - assert_eq!(grpc_encoding, "gzip"); - Ok::<(), Box>(()) - }; - - // Client 6: Requests Zstd - let client6_address = address.clone(); - let client6 = async { - let mut client = - ClientWrapper::new(&client6_address, Some(CompressionEncoding::Zstd), None).await?; - let response = client.send_request("Client 6".to_string()).await?; - - // Check that the rpc-encoding header is set to zstd - let grpc_encoding = response - .metadata() - .get("grpc-encoding") - .expect("Missing 'grpc-encoding' header"); - assert_eq!(grpc_encoding, "zstd"); - Ok::<(), Box>(()) - }; - - tokio::try_join!(client5, client6)?; + tokio::try_join!( + // Client 5 can accept gzip encoding or uncompressed, so all data must be returned compressed with gzip + run_client_test( + &address, + Some(CompressionEncoding::Gzip), + Some("gzip"), + "Client 5" + ), + // Client 6 can accept zstd encoding or uncompressed, so all data must be returned compressed with zstd + run_client_test( + &address, + Some(CompressionEncoding::Zstd), + Some("zstd"), + "Client 6" + ) + )?; shutdown_tx.send(()).unwrap(); - server_handle.await?; Ok(()) }