diff --git a/ci/h3spec.sh b/ci/h3spec.sh index d250dc37..73cc6ba0 100755 --- a/ci/h3spec.sh +++ b/ci/h3spec.sh @@ -3,7 +3,7 @@ LOGFILE=h3server.log if ! [ -e "h3spec-linux-x86_64" ] ; then # if we don't already have a h3spec executable, wget it from github - wget https://github.com/kazu-yamamoto/h3spec/releases/download/v0.1.10/h3spec-linux-x86_64 + wget https://github.com/kazu-yamamoto/h3spec/releases/download/v0.1.11/h3spec-linux-x86_64 chmod +x h3spec-linux-x86_64 fi diff --git a/examples/webtransport_server.rs b/examples/webtransport_server.rs index 52fbfe7f..d9a480fe 100644 --- a/examples/webtransport_server.rs +++ b/examples/webtransport_server.rs @@ -61,6 +61,7 @@ async fn main() -> Result<(), Box> { .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL) .with_writer(std::io::stderr) + .with_max_level(tracing::Level::INFO) .init(); #[cfg(feature = "tree")] @@ -69,6 +70,7 @@ async fn main() -> Result<(), Box> { tracing_subscriber::registry() .with(tracing_subscriber::EnvFilter::from_default_env()) .with(tracing_tree::HierarchicalLayer::new(4).with_bracketed_fields(true)) + .with_max_level(tracing::Level::INFO) .init(); // process cli arguments @@ -156,7 +158,7 @@ async fn main() -> Result<(), Box> { Ok(()) } -async fn handle_connection(mut conn: Connection) -> Result<()> { +async fn handle_connection(mut conn: Connection, Bytes>) -> Result<()> { // 3. TODO: Conditionally, if the client indicated that this is a webtransport session, we should accept it here, else use regular h3. // if this is a webtransport session, then h3 needs to stop handing the datagrams, bidirectional streams, and unidirectional streams and give them // to the webtransport session. @@ -314,16 +316,19 @@ where tracing::info!("Finished sending datagram"); } } - uni_stream = session.accept_uni() => { - let (id, stream) = uni_stream?.unwrap(); - - let send = session.open_uni(id).await?; - tokio::spawn( async move { log_result!(echo_stream(send, stream).await); }); - } - stream = session.accept_bi() => { - if let Some(server::AcceptedBi::BidiStream(_, stream)) = stream? { - let (send, recv) = quic::BidiStream::split(stream); - tokio::spawn( async move { log_result!(echo_stream(send, recv).await); }); + stream = session.accept_streams() => { + match stream? { + Some(server::AcceptStream::BidiStream(_, stream)) =>{ + info!("Received bidirectional stream"); + let (send, recv) = quic::BidiStream::split(stream); + tokio::spawn( async move { log_result!(echo_stream(send, recv).await); }); + }, + Some(server::AcceptStream::UnidirectionalStream(id, stream)) => { + info!("Received unidirectional stream with id: {:?}", id); + let send = session.open_uni(id).await?; + tokio::spawn( async move { log_result!(echo_stream(send, stream).await); }); + }, + _ => (), } } else => { diff --git a/h3-quinn/Cargo.toml b/h3-quinn/Cargo.toml index bea10395..9a9250e0 100644 --- a/h3-quinn/Cargo.toml +++ b/h3-quinn/Cargo.toml @@ -13,11 +13,11 @@ categories = ["network-programming", "web-programming"] license = "MIT" [dependencies] -h3 = { version = "0.0.6", path = "../h3" } -bytes = "1" quinn = { version = "0.11", default-features = false, features = [ "futures-io", ] } +h3 = { version = "0.0.6", path = "../h3" } +bytes = "1" tokio-util = { version = "0.7.9" } futures = { version = "0.3.28" } tokio = { version = "1", features = ["io-util"], default-features = false } diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs index dd374a3e..5965328f 100644 --- a/h3-quinn/src/lib.rs +++ b/h3-quinn/src/lib.rs @@ -16,15 +16,15 @@ use bytes::{Buf, Bytes, BytesMut}; use futures::{ ready, - stream::{self}, + stream::{self, select, Select}, Stream, StreamExt, }; -pub use quinn::{self, AcceptBi, AcceptUni, Endpoint, OpenBi, OpenUni, VarInt, WriteError}; +pub use quinn::{self, Endpoint, OpenBi, OpenUni, VarInt, WriteError}; use quinn::{ApplicationClose, ClosedStream, ReadDatagram}; use h3::{ ext::Datagram, - quic::{self, Error, StreamId, WriteBuf}, + quic::{self, Error, IncomingStreamType, StreamId, WriteBuf}, }; use tokio_util::sync::ReusableBoxFuture; @@ -37,31 +37,67 @@ type BoxStreamSync<'a, T> = Pin + Sync + Send + 'a>>; /// A QUIC connection backed by Quinn /// /// Implements a [`quic::Connection`] backed by a [`quinn::Connection`]. -pub struct Connection { +pub struct Connection +where + B: Buf, +{ conn: quinn::Connection, - incoming_bi: BoxStreamSync<'static, as Future>::Output>, opening_bi: Option as Future>::Output>>, - incoming_uni: BoxStreamSync<'static, as Future>::Output>, opening_uni: Option as Future>::Output>>, datagrams: BoxStreamSync<'static, as Future>::Output>, -} - -impl Connection { + incoming: Select< + BoxStreamSync< + 'static, + Result, RecvStream, B>, quinn::ConnectionError>, + >, + BoxStreamSync< + 'static, + Result, RecvStream, B>, quinn::ConnectionError>, + >, + >, +} + +impl Connection +where + B: Buf, +{ /// Create a [`Connection`] from a [`quinn::Connection`] pub fn new(conn: quinn::Connection) -> Self { + let incoming_uni = Box::pin(stream::unfold(conn.clone(), |conn| async { + Some(( + conn.accept_uni().await.map(|recv_stream| { + IncomingStreamType::, RecvStream, B>::Unidirectional( + RecvStream::new(recv_stream), + ) + }), + conn, + )) + })); + let incoming_bi = Box::pin(stream::unfold(conn.clone(), |conn| async { + Some(( + conn.accept_bi().await.map(|bidi_stream| { + IncomingStreamType::, RecvStream, B>::Bidirectional( + BidiStream { + send: SendStream::new(bidi_stream.0), + recv: RecvStream::new(bidi_stream.1), + }, + std::marker::PhantomData, + ) + }), + conn, + )) + })); + Self { conn: conn.clone(), - incoming_bi: Box::pin(stream::unfold(conn.clone(), |conn| async { - Some((conn.accept_bi().await, conn)) - })), + opening_bi: None, - incoming_uni: Box::pin(stream::unfold(conn.clone(), |conn| async { - Some((conn.accept_uni().await, conn)) - })), + opening_uni: None, datagrams: Box::pin(stream::unfold(conn, |conn| async { Some((conn.read_datagram().await, conn)) })), + incoming: select(incoming_bi, incoming_uni), } } } @@ -153,7 +189,7 @@ impl From for SendDatagramError { } } -impl quic::Connection for Connection +impl quic::Connection for Connection where B: Buf, { @@ -161,33 +197,6 @@ where type OpenStreams = OpenStreams; type AcceptError = ConnectionError; - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn poll_accept_bidi( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll, Self::AcceptError>> { - let (send, recv) = match ready!(self.incoming_bi.poll_next_unpin(cx)) { - Some(x) => x?, - None => return Poll::Ready(Ok(None)), - }; - Poll::Ready(Ok(Some(Self::BidiStream { - send: Self::SendStream::new(send), - recv: Self::RecvStream::new(recv), - }))) - } - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn poll_accept_recv( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll, Self::AcceptError>> { - let recv = match ready!(self.incoming_uni.poll_next_unpin(cx)) { - Some(x) => x?, - None => return Poll::Ready(Ok(None)), - }; - Poll::Ready(Ok(Some(Self::RecvStream::new(recv)))) - } - fn opener(&self) -> Self::OpenStreams { OpenStreams { conn: self.conn.clone(), @@ -195,9 +204,22 @@ where opening_uni: None, } } + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] + fn poll_incoming( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll, Self::AcceptError>> + { + // put the two streams together + + match ready!(self.incoming.poll_next_unpin(cx)).unwrap() { + Ok(x) => Poll::Ready(Ok(x)), + Err(e) => Poll::Ready(Err(ConnectionError(e).into())), + } + } } -impl quic::OpenStreams for Connection +impl quic::OpenStreams for Connection where B: Buf, { @@ -248,7 +270,7 @@ where } } -impl quic::SendDatagramExt for Connection +impl quic::SendDatagramExt for Connection where B: Buf, { @@ -265,7 +287,10 @@ where } } -impl quic::RecvDatagramExt for Connection { +impl quic::RecvDatagramExt for Connection +where + B: Buf, +{ type Buf = Bytes; type Error = ConnectionError; diff --git a/h3-webtransport/src/server.rs b/h3-webtransport/src/server.rs index 05970b61..840d44d5 100644 --- a/h3-webtransport/src/server.rs +++ b/h3-webtransport/src/server.rs @@ -151,23 +151,48 @@ where Ok(()) } - /// Accept an incoming unidirectional stream from the client, it reads the stream until EOF. - pub fn accept_uni(&self) -> AcceptUni { - AcceptUni { - conn: &self.server_conn, + /// Accepts an incoming bidirectional stream or request + pub async fn accept_streams(&self) -> Result>, Error> { + { + let mut conn = self.server_conn.lock().unwrap(); + // if there are pending unidirectional streams, return them before the first await + if let Some((id, stream)) = conn.inner.accepted_streams_mut().wt_uni_streams.pop() { + return Ok(Some(AcceptStream::UnidirectionalStream::( + id, + RecvStream::new(stream), + ))); + } } - } - /// Accepts an incoming bidirectional stream or request - pub async fn accept_bi(&self) -> Result>, Error> { // Get the next stream // Accept the incoming stream let stream = poll_fn(|cx| { let mut conn = self.server_conn.lock().unwrap(); - conn.poll_accept_request(cx) + let accepted_stream = conn.poll_accept_request(cx); + if matches!(accepted_stream, Poll::Pending) { + // if there are accepted uni streams, return them + if let Some((id, stream)) = conn.inner.accepted_streams_mut().wt_uni_streams.pop() { + return Poll::Ready(PollAcceptRequestResult::UnidirectionalStream::( + id, stream, + )); + } + return Poll::Pending; + } + let bidi_stream = ready!(accepted_stream); + Poll::Ready(PollAcceptRequestResult::BidirectionalStream(bidi_stream)) }) .await; + let stream = match stream { + PollAcceptRequestResult::UnidirectionalStream(id, stream) => { + return Ok(Some(AcceptStream::UnidirectionalStream( + id, + RecvStream::new(stream), + ))); + } + PollAcceptRequestResult::BidirectionalStream(stream) => stream, + }; + let mut stream = match stream { Ok(Some(s)) => FrameStream::new(BufRecvStream::new(s)), Ok(None) => { @@ -204,7 +229,7 @@ where // Take the stream out of the framed reader and split it in half like Paul Allen let stream = stream.into_inner(); - Ok(Some(AcceptedBi::BidiStream( + Ok(Some(AcceptStream::BidiStream( session_id, BidiStream::new(stream), ))) @@ -217,7 +242,7 @@ where }; if let Some(req) = req { let (req, resp) = req.resolve().await?; - Ok(Some(AcceptedBi::Request(req, resp))) + Ok(Some(AcceptStream::Request(req, resp))) } else { Ok(None) } @@ -249,6 +274,15 @@ where } } +enum PollAcceptRequestResult +where + C: quic::Connection, + B: Buf, +{ + UnidirectionalStream(SessionId, BufRecvStream), + BidirectionalStream(Result, Error>), +} + /// Streams are opened, but the initial webtransport header has not been sent type PendingStreams = ( BidiStream<>::BidiStream, B>, @@ -348,12 +382,15 @@ where } } -/// An accepted incoming bidirectional stream. +/// An accepted incoming bidirectional or unidirectional stream. /// /// Since -pub enum AcceptedBi, B: Buf> { +pub enum AcceptStream, B: Buf> { /// An incoming bidirectional stream BidiStream(SessionId, BidiStream), + /// An incoming unidirectional stream + UnidirectionalStream(SessionId, RecvStream), + /// An incoming HTTP/3 request, passed through a webtransport session. /// /// This makes it possible to respond to multiple CONNECT requests @@ -392,36 +429,6 @@ where } } -/// Future for [`WebTransportSession::accept_uni`] -pub struct AcceptUni<'a, C, B> -where - C: quic::Connection, - B: Buf, -{ - conn: &'a Mutex>, -} - -impl<'a, C, B> Future for AcceptUni<'a, C, B> -where - C: quic::Connection, - B: Buf, -{ - type Output = Result)>, Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut conn = self.conn.lock().unwrap(); - conn.inner.poll_accept_recv(cx)?; - - // Get the currently available streams - let streams = conn.inner.accepted_streams_mut(); - if let Some((id, stream)) = streams.wt_uni_streams.pop() { - return Poll::Ready(Ok(Some((id, RecvStream::new(stream))))); - } - - Poll::Pending - } -} - fn validate_wt_connect(request: &Request<()>) -> bool { let protocol = request.extensions().get::(); matches!((request.method(), protocol), (&Method::CONNECT, Some(p)) if p == &Protocol::WEB_TRANSPORT) diff --git a/h3/Cargo.toml b/h3/Cargo.toml index 6531585b..24194cb9 100644 --- a/h3/Cargo.toml +++ b/h3/Cargo.toml @@ -40,6 +40,7 @@ quinn = { version = "0.11", default-features = false, features = [ "runtime-tokio", "rustls", "ring", + "futures-io", ] } quinn-proto = { version = "0.11", default-features = false } rcgen = "0.13" diff --git a/h3/src/client/connection.rs b/h3/src/client/connection.rs index 49e9ed2f..71c9cd63 100644 --- a/h3/src/client/connection.rs +++ b/h3/src/client/connection.rs @@ -364,6 +364,31 @@ where /// Maintain the connection state until it is closed #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { + let incoming_result = self.inner.poll_handle_incoming(cx); + + match incoming_result { + Poll::Ready(Err(e)) => { + let connection_error = self.inner.shared.set_error(e, "poll_close error"); + if connection_error.is_closed() { + return Poll::Ready(Ok(())); + } + return Poll::Ready(Err(connection_error)); + } + //= https://www.rfc-editor.org/rfc/rfc9114#section-6.1 + //# Clients MUST treat + //# receipt of a server-initiated bidirectional stream as a connection + //# error of type H3_STREAM_CREATION_ERROR unless such an extension has + //# been negotiated. + Poll::Ready(Ok(_)) => { + return Poll::Ready(Err(self.inner.close( + Code::H3_STREAM_CREATION_ERROR, + "client received a bidirectional stream", + ))) + } + // if pending, continue to poll the control stream + Poll::Pending => (), + } + while let Poll::Ready(result) = self.inner.poll_control(cx) { match result { //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2 @@ -425,14 +450,7 @@ where ))) } Err(e) => { - let connection_error = self.inner.shared.read("poll_close").error.clone(); - let connection_error = match connection_error { - Some(e) => e, - None => { - self.inner.shared.write("poll_close error").error = Some(e.clone()); - e - } - }; + let connection_error = self.inner.shared.set_error(e, "poll_close error"); if connection_error.is_closed() { return Poll::Ready(Ok(())); } @@ -440,19 +458,6 @@ where } } } - - //= https://www.rfc-editor.org/rfc/rfc9114#section-6.1 - //# Clients MUST treat - //# receipt of a server-initiated bidirectional stream as a connection - //# error of type H3_STREAM_CREATION_ERROR unless such an extension has - //# been negotiated. - if self.inner.poll_accept_request(cx).is_ready() { - return Poll::Ready(Err(self.inner.close( - Code::H3_STREAM_CREATION_ERROR, - "client received a bidirectional stream", - ))); - } - Poll::Pending } } diff --git a/h3/src/connection.rs b/h3/src/connection.rs index a2fcf6ce..b98462ef 100644 --- a/h3/src/connection.rs +++ b/h3/src/connection.rs @@ -24,7 +24,7 @@ use crate::{ varint::VarInt, }, qpack, - quic::{self, SendStream}, + quic::{self, Connection, SendStream}, stream::{self, AcceptRecvStream, AcceptedRecvStream, BufRecvStream, UniStreamHeader}, webtransport::SessionId, }; @@ -52,6 +52,16 @@ impl SharedStateRef { pub fn write(&self, panic_msg: &'static str) -> RwLockWriteGuard { self.0.write().expect(panic_msg) } + + /// if a error is set, return it, + /// otherwise the passed error is stored and returned + pub fn set_error(&self, err: Error, panic_msg: &'static str) -> Error { + if let Some(err) = self.read(panic_msg).error.clone() { + return err; + } + self.write(panic_msg).error = Some(err.clone()); + err + } } impl Default for SharedStateRef { @@ -109,7 +119,7 @@ where /// TODO: breaking encapsulation just to see if we can get this to work, will fix before merging pub conn: C, control_send: C::SendStream, - control_recv: Option>, + pub(crate) control_recv: Option>, decoder_send: Option, decoder_recv: Option>, encoder_send: Option, @@ -323,47 +333,40 @@ where #[allow(missing_docs)] #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - pub fn poll_accept_request( + pub fn poll_handle_incoming( &mut self, cx: &mut Context<'_>, - ) -> Poll, Error>> { - { - let state = self.shared.read("poll_accept_request"); - if let Some(ref e) = state.error { - return Poll::Ready(Err(e.clone())); - } - } + ) -> Poll> { + if let Some(ref e) = self.shared.read("poll_handle_incoming").error { + return Poll::Ready(Err(e.clone())); + }; + let incoming_bidirectional_stream = loop { + match ready!(self.conn.poll_incoming(cx))? { + quic::IncomingStreamType::Bidirectional(bi_di_stream, _) => break bi_di_stream, + quic::IncomingStreamType::Unidirectional(recv_steam) => { + self.poll_handle_receive_stream(cx, recv_steam)?; + } + }; + }; - // Accept the request by accepting the next bidirectional stream - // .into().into() converts the impl QuicError into crate::error::Error. - // The `?` operator doesn't work here for some reason. - self.conn.poll_accept_bidi(cx).map_err(|e| e.into().into()) + Poll::Ready(Ok(incoming_bidirectional_stream)) } /// Polls incoming streams /// /// Accepted streams which are not control, decoder, or encoder streams are buffer in `accepted_recv_streams` #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - pub fn poll_accept_recv(&mut self, cx: &mut Context<'_>) -> Result<(), Error> { + pub fn poll_handle_receive_stream( + &mut self, + cx: &mut Context<'_>, + recv_stream: >::RecvStream, + ) -> Result<(), Error> { if let Some(ref e) = self.shared.read("poll_accept_request").error { return Err(e.clone()); } - // Get all currently pending streams - loop { - match self.conn.poll_accept_recv(cx)? { - Poll::Ready(Some(stream)) => self - .pending_recv_streams - .push(AcceptRecvStream::new(stream)), - Poll::Ready(None) => { - return Err(Code::H3_GENERAL_PROTOCOL_ERROR.with_reason( - "Connection closed unexpected", - crate::error::ErrorLevel::ConnectionError, - )) - } - Poll::Pending => break, - } - } + self.pending_recv_streams + .push(AcceptRecvStream::new(recv_stream)); let mut resolved = vec![]; @@ -440,7 +443,6 @@ where let recv = { // TODO - self.poll_accept_recv(cx)?; if let Some(v) = &mut self.control_recv { v } else { diff --git a/h3/src/quic.rs b/h3/src/quic.rs index 7d8086fe..d4d6790b 100644 --- a/h3/src/quic.rs +++ b/h3/src/quic.rs @@ -3,6 +3,7 @@ //! This module includes traits and types meant to allow being generic over any //! QUIC implementation. +use std::marker::PhantomData; use std::task::{self, Poll}; use bytes::Buf; @@ -30,6 +31,19 @@ impl<'a, E: Error + 'a> From for Box { } } +/// Trait representing a incoming QUIC stream. +pub enum IncomingStreamType +where + B: Buf, + BiDi: SendStream + RecvStream, + UniDi: RecvStream, +{ + /// A bidirectional stream + Bidirectional(BiDi, PhantomData), + /// A unidirectional stream + Unidirectional(UniDi), +} + /// Trait representing a QUIC connection. pub trait Connection: OpenStreams { /// The type produced by `poll_accept_recv()` @@ -39,21 +53,14 @@ pub trait Connection: OpenStreams { /// Error type yielded by these trait methods type AcceptError: Into>; - /// Accept an incoming unidirectional stream - /// - /// Returning `None` implies the connection is closing or closed. - fn poll_accept_recv( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll, Self::AcceptError>>; - - /// Accept an incoming bidirectional stream + /// Polls the Connection for Incoming Streams /// - /// Returning `None` implies the connection is closing or closed. - fn poll_accept_bidi( + /// Returning ['IncomingStreamType'] for the next incoming stream. + /// Returning `Err` implies the connection is closing or closed. + fn poll_incoming( &mut self, cx: &mut task::Context<'_>, - ) -> Poll, Self::AcceptError>>; + ) -> Poll, Self::AcceptError>>; /// Get an object to open outgoing streams. fn opener(&self) -> Self::OpenStreams; diff --git a/h3/src/server/connection.rs b/h3/src/server/connection.rs index c4e5687d..7f05e28d 100644 --- a/h3/src/server/connection.rs +++ b/h3/src/server/connection.rs @@ -309,20 +309,25 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll, Error>> { - let _ = self.poll_control(cx)?; let _ = self.poll_requests_completion(cx); loop { - match self.inner.poll_accept_request(cx) { + let incoming = self.inner.poll_handle_incoming(cx); + + if self.inner.control_recv.is_some() { + let _ = self.poll_control(cx)?; + } + + match incoming { Poll::Ready(Err(x)) => break Poll::Ready(Err(x)), - Poll::Ready(Ok(None)) => { - if self.poll_requests_completion(cx).is_ready() { - break Poll::Ready(Ok(None)); - } else { - // Wait for all the requests to be finished, request_end_recv will wake - // us on each request completion. - break Poll::Pending; - } - } + // Poll::Ready(Ok(None)) => { + // if self.poll_requests_completion(cx).is_ready() { + // break Poll::Ready(Ok(None)); + // } else { + // // Wait for all the requests to be finished, request_end_recv will wake + // // us on each request completion. + // break Poll::Pending; + // } + // } Poll::Pending => { if self.recv_closing.is_some() && self.poll_requests_completion(cx).is_ready() { // The connection is now idle. @@ -331,7 +336,7 @@ where return Poll::Pending; } } - Poll::Ready(Ok(Some(mut s))) => { + Poll::Ready(Ok(mut s)) => { // When the connection is in a graceful shutdown procedure, reject all // incoming requests not belonging to the grace interval. It's possible that // some acceptable request streams arrive after rejected requests. diff --git a/h3/src/tests/connection.rs b/h3/src/tests/connection.rs index fbf86fc4..4b5e20ab 100644 --- a/h3/src/tests/connection.rs +++ b/h3/src/tests/connection.rs @@ -748,11 +748,7 @@ async fn graceful_shutdown_client() { let (mut driver, mut _send_request) = client::new(pair.client().await).await.unwrap(); driver.shutdown(0).await.unwrap(); assert_matches!( - future::poll_fn(|cx| { - println!("client drive"); - driver.poll_close(cx) - }) - .await, + future::poll_fn(|cx| { driver.poll_close(cx) }).await, Ok(()) ); }; diff --git a/h3/src/tests/mod.rs b/h3/src/tests/mod.rs index d34d234c..bf02ec02 100644 --- a/h3/src/tests/mod.rs +++ b/h3/src/tests/mod.rs @@ -18,9 +18,10 @@ use std::{ time::Duration, }; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use quinn::crypto::rustls::{QuicClientConfig, QuicServerConfig}; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; +use tracing::level_filters::LevelFilter; use crate::quic; use h3_quinn::{quinn::TransportConfig, Connection}; @@ -30,6 +31,7 @@ pub fn init_tracing() { .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL) .with_test_writer() + .with_max_level(LevelFilter::DEBUG) .try_init(); } @@ -124,7 +126,7 @@ impl Pair { .unwrap() } - pub async fn client(&self) -> h3_quinn::Connection { + pub async fn client(&self) -> h3_quinn::Connection { Connection::new(self.client_inner().await) } }