From 4297c6d859da4cc38ba2d6a77bb7a08f54725c27 Mon Sep 17 00:00:00 2001 From: tottoto Date: Sat, 28 Oct 2023 12:19:04 +0900 Subject: [PATCH] chore: Refactor use of tokio-stream --- examples/src/mock/mock.rs | 2 +- interop/src/client.rs | 6 +++--- interop/src/server.rs | 2 +- tests/compression/src/bidirectional_stream.rs | 2 +- tests/compression/src/client_stream.rs | 11 +++++------ tests/compression/src/compressing_request.rs | 6 +++--- tests/compression/src/compressing_response.rs | 17 ++++++++--------- tests/compression/src/server_stream.rs | 6 +++--- .../integration_tests/tests/max_message_size.rs | 4 ++-- tests/integration_tests/tests/routes_builder.rs | 2 +- tonic-reflection/tests/server.rs | 2 +- 11 files changed, 29 insertions(+), 31 deletions(-) diff --git a/examples/src/mock/mock.rs b/examples/src/mock/mock.rs index 87f5fc6ba..0d3754921 100644 --- a/examples/src/mock/mock.rs +++ b/examples/src/mock/mock.rs @@ -23,7 +23,7 @@ async fn main() -> Result<(), Box> { tokio::spawn(async move { Server::builder() .add_service(GreeterServer::new(greeter)) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await }); diff --git a/interop/src/client.rs b/interop/src/client.rs index aa5a9a93f..389264684 100644 --- a/interop/src/client.rs +++ b/interop/src/client.rs @@ -207,7 +207,7 @@ pub async fn ping_pong(client: &mut TestClient, assertions: &mut Vec) { - let stream = tokio_stream::iter(Vec::new()); + let stream = tokio_stream::empty(); let result = client.full_duplex_call(Request::new(stream)).await; assertions.push(test_assert!( @@ -270,7 +270,7 @@ pub async fn status_code_and_message(client: &mut TestClient, assertions: &mut V let result = client.unary_call(Request::new(simple_req)).await; validate_response(result, assertions); - let stream = tokio_stream::iter(vec![duplex_req]); + let stream = tokio_stream::once(duplex_req); let result = match client.full_duplex_call(Request::new(stream)).await { Ok(response) => { let stream = response.into_inner(); @@ -356,7 +356,7 @@ pub async fn custom_metadata(client: &mut TestClient, assertions: &mut Vec(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); } diff --git a/tests/compression/src/client_stream.rs b/tests/compression/src/client_stream.rs index 99d72751b..9c210c574 100644 --- a/tests/compression/src/client_stream.rs +++ b/tests/compression/src/client_stream.rs @@ -29,7 +29,7 @@ async fn client_enabled_server_enabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); } @@ -75,7 +75,7 @@ async fn client_disabled_server_enabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); } @@ -102,7 +102,7 @@ async fn client_enabled_server_disabled() { tokio::spawn(async move { Server::builder() .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); }); @@ -147,7 +147,7 @@ async fn compressing_response_from_client_stream() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); } @@ -156,8 +156,7 @@ async fn compressing_response_from_client_stream() { let mut client = test_client::TestClient::new(mock_io_channel(client).await) .accept_compressed(CompressionEncoding::Gzip); - let stream = tokio_stream::iter(vec![]); - let req = Request::new(Box::pin(stream)); + let req = Request::new(Box::pin(tokio_stream::empty())); let res = client.compress_output_client_stream(req).await.unwrap(); assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip"); diff --git a/tests/compression/src/compressing_request.rs b/tests/compression/src/compressing_request.rs index 7cdfd7cec..b1c9009b8 100644 --- a/tests/compression/src/compressing_request.rs +++ b/tests/compression/src/compressing_request.rs @@ -31,7 +31,7 @@ async fn client_enabled_server_enabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); } @@ -61,7 +61,7 @@ async fn client_enabled_server_disabled() { tokio::spawn(async move { Server::builder() .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); }); @@ -99,7 +99,7 @@ async fn client_mark_compressed_without_header_server_enabled() { async move { Server::builder() .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); } diff --git a/tests/compression/src/compressing_response.rs b/tests/compression/src/compressing_response.rs index cc5d4f4cd..a9b73e6d9 100644 --- a/tests/compression/src/compressing_response.rs +++ b/tests/compression/src/compressing_response.rs @@ -53,7 +53,7 @@ async fn client_enabled_server_enabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); } @@ -94,7 +94,7 @@ async fn client_enabled_server_disabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); } @@ -160,7 +160,7 @@ async fn client_disabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); } @@ -198,7 +198,7 @@ async fn server_replying_with_unsupported_encoding() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); }); @@ -240,7 +240,7 @@ async fn disabling_compression_on_single_response() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); } @@ -281,7 +281,7 @@ async fn disabling_compression_on_response_but_keeping_compression_on_stream() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); } @@ -337,7 +337,7 @@ async fn disabling_compression_on_response_from_client_stream() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); } @@ -346,8 +346,7 @@ async fn disabling_compression_on_response_from_client_stream() { let mut client = test_client::TestClient::new(mock_io_channel(client).await) .accept_compressed(CompressionEncoding::Gzip); - let stream = tokio_stream::iter(vec![]); - let req = Request::new(Box::pin(stream)); + let req = Request::new(Box::pin(tokio_stream::empty())); let res = client.compress_output_client_stream(req).await.unwrap(); assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip"); diff --git a/tests/compression/src/server_stream.rs b/tests/compression/src/server_stream.rs index 453c055c8..3a7fe7104 100644 --- a/tests/compression/src/server_stream.rs +++ b/tests/compression/src/server_stream.rs @@ -26,7 +26,7 @@ async fn client_enabled_server_enabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); } @@ -80,7 +80,7 @@ async fn client_disabled_server_enabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); } @@ -125,7 +125,7 @@ async fn client_enabled_server_disabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); } diff --git a/tests/integration_tests/tests/max_message_size.rs b/tests/integration_tests/tests/max_message_size.rs index bda1e9d47..9ae524dbc 100644 --- a/tests/integration_tests/tests/max_message_size.rs +++ b/tests/integration_tests/tests/max_message_size.rs @@ -148,7 +148,7 @@ async fn response_stream_limit() { tokio::spawn(async move { Server::builder() .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); }); @@ -317,7 +317,7 @@ async fn max_message_run(case: &TestCase) -> Result<(), Status> { tokio::spawn(async move { Server::builder() .add_service(svc) - .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server))) .await .unwrap(); }); diff --git a/tests/integration_tests/tests/routes_builder.rs b/tests/integration_tests/tests/routes_builder.rs index f820be0d1..7bcbfad37 100644 --- a/tests/integration_tests/tests/routes_builder.rs +++ b/tests/integration_tests/tests/routes_builder.rs @@ -43,7 +43,7 @@ async fn multiple_service_using_routes_builder() { let output = Output1 { buf: request.into_inner().buf, }; - let stream = tokio_stream::iter(vec![Ok(output)]); + let stream = tokio_stream::once(Ok(output)); Ok(Response::new(Box::pin(stream))) } diff --git a/tonic-reflection/tests/server.rs b/tonic-reflection/tests/server.rs index 00c98fc8d..ab210d5db 100644 --- a/tonic-reflection/tests/server.rs +++ b/tonic-reflection/tests/server.rs @@ -123,7 +123,7 @@ async fn make_test_reflection_request(request: ServerReflectionRequest) -> Messa .unwrap(); let mut client = ServerReflectionClient::new(conn); - let request = Request::new(tokio_stream::iter(vec![request])); + let request = Request::new(tokio_stream::once(request)); let mut inbound = client .server_reflection_info(request) .await