Skip to content

Commit

Permalink
chore: Refactor use of tokio-stream (#1560)
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto authored Nov 13, 2023
1 parent b3fca19 commit f089e7a
Show file tree
Hide file tree
Showing 11 changed files with 29 additions and 31 deletions.
2 changes: 1 addition & 1 deletion examples/src/mock/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tokio::spawn(async move {
Server::builder()
.add_service(GreeterServer::new(greeter))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
});

Expand Down
6 changes: 3 additions & 3 deletions interop/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ pub async fn ping_pong(client: &mut TestClient, assertions: &mut Vec<TestAsserti
}

pub async fn empty_stream(client: &mut TestClient, assertions: &mut Vec<TestAssertion>) {
let stream = tokio_stream::iter(Vec::new());
let stream = tokio_stream::empty();
let result = client.full_duplex_call(Request::new(stream)).await;

assertions.push(test_assert!(
Expand Down Expand Up @@ -270,7 +270,7 @@ pub async fn status_code_and_message(client: &mut TestClient, assertions: &mut V
let result = client.unary_call(Request::new(simple_req)).await;
validate_response(result, assertions);

let stream = tokio_stream::iter(vec![duplex_req]);
let stream = tokio_stream::once(duplex_req);
let result = match client.full_duplex_call(Request::new(stream)).await {
Ok(response) => {
let stream = response.into_inner();
Expand Down Expand Up @@ -356,7 +356,7 @@ pub async fn custom_metadata(client: &mut TestClient, assertions: &mut Vec<TestA
req_unary.metadata_mut().insert(key1, value1.clone());
req_unary.metadata_mut().insert_bin(key2, value2.clone());

let stream = tokio_stream::iter(vec![make_ping_pong_request(0)]);
let stream = tokio_stream::once(make_ping_pong_request(0));
let mut req_stream = Request::new(stream);
req_stream.metadata_mut().insert(key1, value1.clone());
req_stream.metadata_mut().insert_bin(key2, value2.clone());
Expand Down
2 changes: 1 addition & 1 deletion interop/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl pb::test_service_server::TestService for TestService {
return Err(status);
}

let single_message = tokio_stream::iter(vec![Ok(first_msg)]);
let single_message = tokio_stream::once(Ok(first_msg));
let mut stream = single_message.chain(stream);

let stream = try_stream! {
Expand Down
2 changes: 1 addition & 1 deletion tests/compression/src/bidirectional_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down
11 changes: 5 additions & 6 deletions tests/compression/src/client_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -75,7 +75,7 @@ async fn client_disabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand All @@ -102,7 +102,7 @@ async fn client_enabled_server_disabled() {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
});
Expand Down Expand Up @@ -147,7 +147,7 @@ async fn compressing_response_from_client_stream() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand All @@ -156,8 +156,7 @@ async fn compressing_response_from_client_stream() {
let mut client = test_client::TestClient::new(mock_io_channel(client).await)
.accept_compressed(CompressionEncoding::Gzip);

let stream = tokio_stream::iter(vec![]);
let req = Request::new(Box::pin(stream));
let req = Request::new(Box::pin(tokio_stream::empty()));

let res = client.compress_output_client_stream(req).await.unwrap();
assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip");
Expand Down
6 changes: 3 additions & 3 deletions tests/compression/src/compressing_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -61,7 +61,7 @@ async fn client_enabled_server_disabled() {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
});
Expand Down Expand Up @@ -99,7 +99,7 @@ async fn client_mark_compressed_without_header_server_enabled() {
async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down
17 changes: 8 additions & 9 deletions tests/compression/src/compressing_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn client_enabled_server_disabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -160,7 +160,7 @@ async fn client_disabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -198,7 +198,7 @@ async fn server_replying_with_unsupported_encoding() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
});
Expand Down Expand Up @@ -240,7 +240,7 @@ async fn disabling_compression_on_single_response() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -281,7 +281,7 @@ async fn disabling_compression_on_response_but_keeping_compression_on_stream() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -337,7 +337,7 @@ async fn disabling_compression_on_response_from_client_stream() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand All @@ -346,8 +346,7 @@ async fn disabling_compression_on_response_from_client_stream() {
let mut client = test_client::TestClient::new(mock_io_channel(client).await)
.accept_compressed(CompressionEncoding::Gzip);

let stream = tokio_stream::iter(vec![]);
let req = Request::new(Box::pin(stream));
let req = Request::new(Box::pin(tokio_stream::empty()));

let res = client.compress_output_client_stream(req).await.unwrap();
assert_eq!(res.metadata().get("grpc-encoding").unwrap(), "gzip");
Expand Down
6 changes: 3 additions & 3 deletions tests/compression/src/server_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -80,7 +80,7 @@ async fn client_disabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down Expand Up @@ -125,7 +125,7 @@ async fn client_enabled_server_disabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
}
Expand Down
4 changes: 2 additions & 2 deletions tests/integration_tests/tests/max_message_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async fn response_stream_limit() {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
});
Expand Down Expand Up @@ -317,7 +317,7 @@ async fn max_message_run(case: &TestCase) -> Result<(), Status> {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(server)))
.await
.unwrap();
});
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/tests/routes_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn multiple_service_using_routes_builder() {
let output = Output1 {
buf: request.into_inner().buf,
};
let stream = tokio_stream::iter(vec![Ok(output)]);
let stream = tokio_stream::once(Ok(output));

Ok(Response::new(Box::pin(stream)))
}
Expand Down
2 changes: 1 addition & 1 deletion tonic-reflection/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async fn make_test_reflection_request(request: ServerReflectionRequest) -> Messa
.unwrap();
let mut client = ServerReflectionClient::new(conn);

let request = Request::new(tokio_stream::iter(vec![request]));
let request = Request::new(tokio_stream::once(request));
let mut inbound = client
.server_reflection_info(request)
.await
Expand Down

0 comments on commit f089e7a

Please sign in to comment.