Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion web-transport-quinn/src/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct RecvStream {
}

impl RecvStream {
pub(crate) fn new(stream: quinn::RecvStream) -> Self {
pub fn new(stream: quinn::RecvStream) -> Self {
Self { inner: stream }
}

Expand Down
2 changes: 1 addition & 1 deletion web-transport-quinn/src/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct SendStream {
}

impl SendStream {
pub(crate) fn new(stream: quinn::SendStream) -> Self {
pub fn new(stream: quinn::SendStream) -> Self {
Self { stream }
}

Expand Down
35 changes: 32 additions & 3 deletions web-transport-quinn/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,35 @@ impl Session {
}

// Keep reading from the control stream until it's closed.
// NOTE: We must keep the send half alive to prevent sending FIN to the peer.
async fn run_closed(&mut self, connect: Connect) -> (u32, String) {
let (_send, mut recv) = connect.into_inner();
let (_send_keep_alive, mut recv) = connect.into_inner();

// Log when the capsule reader starts
log::debug!("WebTransport capsule reader started, waiting for capsules on CONNECT stream");

loop {
match web_transport_proto::Capsule::read(&mut recv).await {
Ok(web_transport_proto::Capsule::CloseWebTransportSession { code, reason }) => {
log::info!("WebTransport session closed gracefully: code={}, reason={}", code, reason);
return (code, reason);
}
Ok(web_transport_proto::Capsule::Unknown { typ, payload }) => {
log::warn!("unknown capsule: type={typ} size={}", payload.len());
}
Err(_) => {
return (1, "capsule error".to_string());
Err(e) => {
// Distinguish between expected and unexpected stream closure.
// UnexpectedEnd typically means the peer closed the CONNECT stream,
// which is a valid way to end the WebTransport session (graceful close).
// Other errors (parse errors, connection errors) are true errors.
let error_str = format!("{e:?}");
if error_str.contains("UnexpectedEnd") {
log::info!("WebTransport CONNECT stream closed by peer (graceful)");
return (0, "session closed".to_string());
} else {
log::error!("WebTransport CONNECT stream capsule read error: {e:?}");
return (1, format!("capsule error: {e:?}"));
}
}
}
}
Expand Down Expand Up @@ -176,7 +192,12 @@ impl Session {
// Otherwise the application could write data with lower priority than the header, resulting in queuing.
// Also the header is very important for determining the session ID without reliable reset.
send.set_priority(i32::MAX).ok();
let hex_str: String = self.header_bi.iter().map(|b| format!("{:02x}", b)).collect::<Vec<_>>().join(" ");
let stream_id_u64: u64 = send.id().into();
println!("open_bi: Created QUIC stream ID: {} (index={}, {}), writing header ({} bytes): [{}]",
stream_id_u64, send.id().index(), send.id(), self.header_bi.len(), hex_str);
Self::write_full(&mut send, &self.header_bi).await?;
println!("open_bi: QUIC stream ID {} header written successfully", stream_id_u64);

// Reset the stream priority back to the default of 0.
send.set_priority(0).ok();
Expand Down Expand Up @@ -452,6 +473,7 @@ impl SessionAccept {
if let Poll::Ready(Some(res)) = self.accept_bi.poll_next_unpin(cx) {
// Start decoding the header and add the future to the list of pending streams.
let (send, recv) = res?;
println!("poll_accept_bi: Accepted stream ID: {}, queuing for decode", recv.id());
let pending = Self::decode_bi(send, recv, self.session_id);
self.pending_bi.push(Box::pin(pending));

Expand All @@ -471,6 +493,7 @@ impl SessionAccept {

if let Some((send, recv)) = res {
// Wrap the streams in our own types for correct error codes.
println!("poll_accept_bi: Returning validated stream ID: {} to application", recv.id());
let send = SendStream::new(send);
let recv = RecvStream::new(recv);
return Poll::Ready(Ok((send, recv)));
Expand All @@ -486,10 +509,13 @@ impl SessionAccept {
mut recv: quinn::RecvStream,
expected_session: VarInt,
) -> Result<Option<(quinn::SendStream, quinn::RecvStream)>, SessionError> {
println!("decode_bi: START - stream ID: {}", recv.id());
let typ = VarInt::read(&mut recv)
.await
.map_err(|_| WebTransportError::UnknownSession)?;
println!("decode_bi: Stream {} - read type byte: 0x{:02x} (expected WEBTRANSPORT = 0x41)", recv.id(), typ.into_inner());
if Frame(typ) != Frame::WEBTRANSPORT {
println!("decode_bi: Stream {} - type mismatch! Got 0x{:02x}, ignoring stream", recv.id(), typ.into_inner());
log::debug!("ignoring unknown bidirectional stream: {typ:?}");
return Ok(None);
}
Expand All @@ -498,10 +524,13 @@ impl SessionAccept {
let session_id = VarInt::read(&mut recv)
.await
.map_err(|_| WebTransportError::UnknownSession)?;
println!("decode_bi: Stream {} - read session_id: {}, expected: {}", recv.id(), session_id.into_inner(), expected_session.into_inner());
if session_id != expected_session {
println!("decode_bi: Stream {} - session ID mismatch!", recv.id());
return Err(WebTransportError::UnknownSession.into());
}

println!("decode_bi: Stream {} - SUCCESS, returning to application", recv.id());
Ok(Some((send, recv)))
}
}
Expand Down