Skip to content

Commit

Permalink
feat: upgrade to hyper 1.0.0-rc.4, add TokioIo
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Jul 13, 2023
1 parent 4390d51 commit f898015
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 25 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ jobs:
with:
toolchain: ${{ matrix.rust }}

- name: Make sure log v0.4.18 is used
run: |
cargo update
cargo update -p log --precise 0.4.18
- run: cargo check

miri:
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ edition = "2018"
publish = false # no accidents while in dev

[dependencies]
hyper = "=1.0.0-rc.3"
hyper = "=1.0.0-rc.4"
futures-channel = "0.3"
futures-util = { version = "0.3", default-features = false }
http = "0.2"
Expand All @@ -31,7 +31,7 @@ tower = { version = "0.4", features = ["make", "util"] }

[dev-dependencies]
bytes = "1"
http-body-util = "0.1.0-rc.2"
http-body-util = "0.1.0-rc.3"
tokio = { version = "1", features = ["macros", "test-util"] }

[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
Expand Down
15 changes: 9 additions & 6 deletions src/client/connect/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use tracing::{debug, trace, warn};

use super::dns::{self, resolve, GaiResolver, Resolve};
use super::{Connected, Connection};
use crate::rt::TokioIo;

/// A connector for the `http` scheme.
///
Expand Down Expand Up @@ -335,7 +336,7 @@ where
R: Resolve + Clone + Send + Sync + 'static,
R::Future: Send,
{
type Response = TcpStream;
type Response = TokioIo<TcpStream>;
type Error = ConnectError;
type Future = HttpConnecting<R>;

Expand Down Expand Up @@ -402,7 +403,7 @@ impl<R> HttpConnector<R>
where
R: Resolve,
{
async fn call_async(&mut self, dst: Uri) -> Result<TcpStream, ConnectError> {
async fn call_async(&mut self, dst: Uri) -> Result<TokioIo<TcpStream>, ConnectError> {
let config = &self.config;

let (host, port) = get_host_port(config, &dst)?;
Expand Down Expand Up @@ -433,14 +434,16 @@ where
warn!("tcp set_nodelay error: {}", e);
}

Ok(sock)
Ok(TokioIo::new(sock))
}
}

impl Connection for TcpStream {
impl Connection for TokioIo<TcpStream> {
fn connected(&self) -> Connected {
let connected = Connected::new();
if let (Ok(remote_addr), Ok(local_addr)) = (self.peer_addr(), self.local_addr()) {
if let (Ok(remote_addr), Ok(local_addr)) =
(self.inner().peer_addr(), self.inner().local_addr())
{
connected.extra(HttpInfo {
remote_addr,
local_addr,
Expand Down Expand Up @@ -478,7 +481,7 @@ pin_project! {
}
}

type ConnectResult = Result<TcpStream, ConnectError>;
type ConnectResult = Result<TokioIo<TcpStream>, ConnectError>;
type BoxConnecting = Pin<Box<dyn Future<Output = ConnectResult> + Send>>;

impl<R: Resolve> Future for HttpConnecting<R> {
Expand Down
16 changes: 8 additions & 8 deletions src/client/connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! # Connectors
//!
//! A "connector" is a [`Service`][] that takes a [`Uri`][] destination, and
//! its `Response` is some type implementing [`AsyncRead`][], [`AsyncWrite`][],
//! its `Response` is some type implementing [`Read`][], [`Write`][],
//! and [`Connection`][].
//!
//! ## Custom Connectors
Expand Down Expand Up @@ -59,8 +59,8 @@
//! [`HttpConnector`]: HttpConnector
//! [`Service`]: tower::Service
//! [`Uri`]: ::http::Uri
//! [`AsyncRead`]: tokio::io::AsyncRead
//! [`AsyncWrite`]: tokio::io::AsyncWrite
//! [`Read`]: hyper::rt::Read
//! [`Write`]: hyper::rt::Write
//! [`Connection`]: Connection
use std::fmt;

Expand Down Expand Up @@ -248,7 +248,7 @@ pub(super) mod sealed {
use std::marker::Unpin;

use ::http::Uri;
use tokio::io::{AsyncRead, AsyncWrite};
use hyper::rt::{Read, Write};

use super::Connection;

Expand All @@ -272,7 +272,7 @@ pub(super) mod sealed {
}

pub trait ConnectSvc {
type Connection: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static;
type Connection: Read + Write + Connection + Unpin + Send + 'static;
type Error: Into<Box<dyn StdError + Send + Sync>>;
type Future: Future<Output = Result<Self::Connection, Self::Error>> + Unpin + Send + 'static;

Expand All @@ -284,7 +284,7 @@ pub(super) mod sealed {
S: tower_service::Service<Uri, Response = T> + Send + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
T: Read + Write + Connection + Unpin + Send + 'static,
{
type _Svc = S;

Expand All @@ -298,7 +298,7 @@ pub(super) mod sealed {
S: tower_service::Service<Uri, Response = T> + Send + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
T: Read + Write + Connection + Unpin + Send + 'static,
{
type Connection = T;
type Error = S::Error;
Expand All @@ -314,7 +314,7 @@ pub(super) mod sealed {
S: tower_service::Service<Uri, Response = T> + Send,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
S::Future: Unpin + Send,
T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static,
T: Read + Write + Connection + Unpin + Send + 'static,
{
}

Expand Down
19 changes: 10 additions & 9 deletions src/client/legacy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub struct Client<C, B> {
#[cfg(feature = "http1")]
h1_builder: hyper::client::conn::http1::Builder,
#[cfg(feature = "http2")]
h2_builder: hyper::client::conn::http2::Builder,
h2_builder: hyper::client::conn::http2::Builder<Exec>,
pool: pool::Pool<PoolClient<B>, PoolKey>,
}

Expand Down Expand Up @@ -128,7 +128,7 @@ impl Client<(), ()> {
impl<C, B> Client<C, B>
where
C: Connect + Clone + Send + Sync + 'static,
B: Body + Send + 'static,
B: Body + Send + 'static + Unpin,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
Expand Down Expand Up @@ -458,7 +458,8 @@ where
fn connect_to(
&self,
pool_key: PoolKey,
) -> impl Lazy<Output = Result<pool::Pooled<PoolClient<B>, PoolKey>, Error>> + Unpin {
) -> impl Lazy<Output = Result<pool::Pooled<PoolClient<B>, PoolKey>, Error>> + Send + Unpin
{
let executor = self.exec.clone();
let pool = self.pool.clone();
#[cfg(feature = "http1")]
Expand Down Expand Up @@ -574,7 +575,7 @@ where
impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
where
C: Connect + Clone + Send + Sync + 'static,
B: Body + Send + 'static,
B: Body + Send + 'static + Unpin,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
Expand All @@ -594,7 +595,7 @@ where
impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
where
C: Connect + Clone + Send + Sync + 'static,
B: Body + Send + 'static,
B: Body + Send + 'static + Unpin,
B::Data: Send,
B::Error: Into<Box<dyn StdError + Send + Sync>>,
{
Expand Down Expand Up @@ -955,7 +956,7 @@ pub struct Builder {
#[cfg(feature = "http1")]
h1_builder: hyper::client::conn::http1::Builder,
#[cfg(feature = "http2")]
h2_builder: hyper::client::conn::http2::Builder,
h2_builder: hyper::client::conn::http2::Builder<Exec>,
pool_config: pool::Config,
}

Expand All @@ -965,18 +966,18 @@ impl Builder {
where
E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
{
let exec = Exec::new(executor.clone());
let exec = Exec::new(executor);
Self {
client_config: Config {
retry_canceled_requests: true,
set_host: true,
ver: Ver::Auto,
},
exec,
exec: exec.clone(),
#[cfg(feature = "http1")]
h1_builder: hyper::client::conn::http1::Builder::new(),
#[cfg(feature = "http2")]
h2_builder: hyper::client::conn::http2::Builder::new(executor),
h2_builder: hyper::client::conn::http2::Builder::new(exec),
pool_config: pool::Config {
idle_timeout: Some(Duration::from_secs(90)),
max_idle_per_host: std::usize::MAX,
Expand Down
9 changes: 9 additions & 0 deletions src/common/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,12 @@ impl fmt::Debug for Exec {
f.debug_struct("Exec").finish()
}
}

impl<F> hyper::rt::Executor<F> for Exec
where
F: Future<Output = ()> + Send + 'static,
{
fn execute(&self, fut: F) {
Exec::execute(self, fut);
}
}
2 changes: 2 additions & 0 deletions src/rt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@

/// Implementation of [`hyper::rt::Executor`] that utilises [`tokio::spawn`].
pub mod tokio_executor;
mod tokio_io;

pub use tokio_executor::TokioExecutor;
pub use tokio_io::TokioIo;
Loading

0 comments on commit f898015

Please sign in to comment.