diff --git a/web-transport-quinn/src/recv.rs b/web-transport-quinn/src/recv.rs index 6734c20..b789f02 100644 --- a/web-transport-quinn/src/recv.rs +++ b/web-transport-quinn/src/recv.rs @@ -15,7 +15,7 @@ pub struct RecvStream { } impl RecvStream { - pub(crate) fn new(stream: quinn::RecvStream) -> Self { + pub fn new(stream: quinn::RecvStream) -> Self { Self { inner: stream } } diff --git a/web-transport-quinn/src/send.rs b/web-transport-quinn/src/send.rs index 836490f..5777b89 100644 --- a/web-transport-quinn/src/send.rs +++ b/web-transport-quinn/src/send.rs @@ -18,7 +18,7 @@ pub struct SendStream { } impl SendStream { - pub(crate) fn new(stream: quinn::SendStream) -> Self { + pub fn new(stream: quinn::SendStream) -> Self { Self { stream } } diff --git a/web-transport-quinn/src/session.rs b/web-transport-quinn/src/session.rs index d91d868..3e3290d 100644 --- a/web-transport-quinn/src/session.rs +++ b/web-transport-quinn/src/session.rs @@ -93,19 +93,35 @@ impl Session { } // Keep reading from the control stream until it's closed. + // NOTE: We must keep the send half alive to prevent sending FIN to the peer. async fn run_closed(&mut self, connect: Connect) -> (u32, String) { - let (_send, mut recv) = connect.into_inner(); + let (_send_keep_alive, mut recv) = connect.into_inner(); + + // Log when the capsule reader starts + log::debug!("WebTransport capsule reader started, waiting for capsules on CONNECT stream"); loop { match web_transport_proto::Capsule::read(&mut recv).await { Ok(web_transport_proto::Capsule::CloseWebTransportSession { code, reason }) => { + log::info!("WebTransport session closed gracefully: code={}, reason={}", code, reason); return (code, reason); } Ok(web_transport_proto::Capsule::Unknown { typ, payload }) => { log::warn!("unknown capsule: type={typ} size={}", payload.len()); } - Err(_) => { - return (1, "capsule error".to_string()); + Err(e) => { + // Distinguish between expected and unexpected stream closure. + // UnexpectedEnd typically means the peer closed the CONNECT stream, + // which is a valid way to end the WebTransport session (graceful close). + // Other errors (parse errors, connection errors) are true errors. + let error_str = format!("{e:?}"); + if error_str.contains("UnexpectedEnd") { + log::info!("WebTransport CONNECT stream closed by peer (graceful)"); + return (0, "session closed".to_string()); + } else { + log::error!("WebTransport CONNECT stream capsule read error: {e:?}"); + return (1, format!("capsule error: {e:?}")); + } } } } @@ -176,7 +192,12 @@ impl Session { // Otherwise the application could write data with lower priority than the header, resulting in queuing. // Also the header is very important for determining the session ID without reliable reset. send.set_priority(i32::MAX).ok(); + let hex_str: String = self.header_bi.iter().map(|b| format!("{:02x}", b)).collect::>().join(" "); + let stream_id_u64: u64 = send.id().into(); + println!("open_bi: Created QUIC stream ID: {} (index={}, {}), writing header ({} bytes): [{}]", + stream_id_u64, send.id().index(), send.id(), self.header_bi.len(), hex_str); Self::write_full(&mut send, &self.header_bi).await?; + println!("open_bi: QUIC stream ID {} header written successfully", stream_id_u64); // Reset the stream priority back to the default of 0. send.set_priority(0).ok(); @@ -452,6 +473,7 @@ impl SessionAccept { if let Poll::Ready(Some(res)) = self.accept_bi.poll_next_unpin(cx) { // Start decoding the header and add the future to the list of pending streams. let (send, recv) = res?; + println!("poll_accept_bi: Accepted stream ID: {}, queuing for decode", recv.id()); let pending = Self::decode_bi(send, recv, self.session_id); self.pending_bi.push(Box::pin(pending)); @@ -471,6 +493,7 @@ impl SessionAccept { if let Some((send, recv)) = res { // Wrap the streams in our own types for correct error codes. + println!("poll_accept_bi: Returning validated stream ID: {} to application", recv.id()); let send = SendStream::new(send); let recv = RecvStream::new(recv); return Poll::Ready(Ok((send, recv))); @@ -486,10 +509,13 @@ impl SessionAccept { mut recv: quinn::RecvStream, expected_session: VarInt, ) -> Result, SessionError> { + println!("decode_bi: START - stream ID: {}", recv.id()); let typ = VarInt::read(&mut recv) .await .map_err(|_| WebTransportError::UnknownSession)?; + println!("decode_bi: Stream {} - read type byte: 0x{:02x} (expected WEBTRANSPORT = 0x41)", recv.id(), typ.into_inner()); if Frame(typ) != Frame::WEBTRANSPORT { + println!("decode_bi: Stream {} - type mismatch! Got 0x{:02x}, ignoring stream", recv.id(), typ.into_inner()); log::debug!("ignoring unknown bidirectional stream: {typ:?}"); return Ok(None); } @@ -498,10 +524,13 @@ impl SessionAccept { let session_id = VarInt::read(&mut recv) .await .map_err(|_| WebTransportError::UnknownSession)?; + println!("decode_bi: Stream {} - read session_id: {}, expected: {}", recv.id(), session_id.into_inner(), expected_session.into_inner()); if session_id != expected_session { + println!("decode_bi: Stream {} - session ID mismatch!", recv.id()); return Err(WebTransportError::UnknownSession.into()); } + println!("decode_bi: Stream {} - SUCCESS, returning to application", recv.id()); Ok(Some((send, recv))) } }