Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ members = [
resolver = "2"

[workspace.dependencies]
web-transport-proto = { path = "web-transport-proto", version = "0.2.8" }
web-transport-trait = { path = "web-transport-trait", version = "0.2" }
web-transport-proto = { path = "web-transport-proto", version = "0.3" }
web-transport-trait = { path = "web-transport-trait", version = "0.3" }
2 changes: 1 addition & 1 deletion web-transport-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Luke Curley"]
repository = "https://github.com/kixelated/web-transport"
license = "MIT OR Apache-2.0"

version = "0.2.8"
version = "0.3.0"
edition = "2021"

keywords = ["quic", "http3", "webtransport"]
Expand Down
2 changes: 1 addition & 1 deletion web-transport-quiche/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ tokio = { version = "1", default-features = false, features = [
"time",
] }

tokio-quiche = "0.10"
tokio-quiche = "0.12"
tracing = "0.1"
url = "2"
web-transport-proto = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions web-transport-quiche/src/ez/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl RecvStream {
/// Tell the other end to stop sending data with the given error code.
///
/// This sends a STOP_SENDING frame to the remote.
pub fn close(&mut self, code: u64) {
pub fn stop(&mut self, code: u64) {
self.state.lock().stop = Some(code);

let waker = self.driver.lock().recv(self.id);
Expand All @@ -313,7 +313,7 @@ impl RecvStream {
/// Returns true if the stream is closed by either side.
///
/// This includes:
/// - We sent a STOP_SENDING via [RecvStream::close]
/// - We sent a STOP_SENDING via [RecvStream::stop]
/// - We received a RESET_STREAM from the remote
/// - We received a FIN from the remote
pub fn is_closed(&self) -> bool {
Expand All @@ -335,7 +335,7 @@ impl RecvStream {
/// Wait until the stream is closed by either side.
///
/// This includes:
/// - We sent a STOP_SENDING via [RecvStream::close]
/// - We sent a STOP_SENDING via [RecvStream::stop]
/// - We received a RESET_STREAM from the remote
/// - We received a FIN from the remote
///
Expand Down
17 changes: 9 additions & 8 deletions web-transport-quiche/src/ez/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
pin::Pin,
task::{ready, Context, Poll, Waker},
};
use tokio_quiche::quiche;
use tokio_quiche::quiche::{self};

use bytes::{Buf, Bytes};
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -281,8 +281,9 @@ impl SendStream {

/// Mark the stream as finished, such that no more data can be written.
///
/// **WARN**: If this is not called explicitly, [SendStream::close] will be called on [Drop].
/// **NOTE**: [SendStream::closed] will block until the FIN has been sent.
/// [SendStream::closed] will block until the FIN has been sent.
///
/// **WARN**: If this is not called explicitly, [SendStream::reset] will be called on [Drop].
pub fn finish(&mut self) -> Result<(), StreamError> {
{
let mut state = self.state.lock();
Expand Down Expand Up @@ -313,7 +314,7 @@ impl SendStream {
/// Abruptly reset the stream with the provided error code.
///
/// This sends a RESET_STREAM frame to the remote.
pub fn close(&mut self, code: u64) {
pub fn reset(&mut self, code: u64) {
self.state.lock().reset = Some(code);

let waker = self.driver.lock().send(self.id);
Expand All @@ -325,8 +326,8 @@ impl SendStream {
/// Returns true if the stream is closed by either side.
///
/// This includes:
/// - We sent a RESET_STREAM via [SendStream::close]
/// - We received a STOP_SENDING via [RecvStream::close]
/// - We sent a RESET_STREAM via [SendStream::reset]
/// - We received a STOP_SENDING via [super::RecvStream::stop]
/// - We sent a FIN via [SendStream::finish]
pub fn is_closed(&self) -> bool {
self.state.lock().is_closed()
Expand All @@ -347,8 +348,8 @@ impl SendStream {
/// Wait until the stream is closed by either side.
///
/// This includes:
/// - We sent a RESET_STREAM via [SendStream::close]
/// - We received a STOP_SENDING via [RecvStream::close]
/// - We sent a RESET_STREAM via [SendStream::reset]
/// - We received a STOP_SENDING via [super::RecvStream::stop]
/// - We sent a FIN via [SendStream::finish]
///
/// Note: This takes `&mut` to match quiche and to simplify the implementation.
Expand Down
12 changes: 6 additions & 6 deletions web-transport-quiche/src/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ impl RecvStream {
/// Tell the other end to stop sending data with the given error code.
///
/// This is a u32 with WebTransport since it shares the error space with HTTP/3.
pub fn close(&mut self, code: u32) {
self.inner.close(web_transport_proto::error_to_http3(code));
pub fn stop(&mut self, code: u32) {
self.inner.stop(web_transport_proto::error_to_http3(code));
}

/// Block until the stream has been reset and return the error code.
Expand All @@ -65,8 +65,8 @@ impl RecvStream {
impl Drop for RecvStream {
fn drop(&mut self) {
if !self.inner.is_closed() {
tracing::warn!("stream dropped without `close` or reading all contents");
self.inner.close(DROP_CODE)
tracing::warn!("stream dropped without `stop` or reading all contents");
self.inner.stop(DROP_CODE)
}
}
}
Expand Down Expand Up @@ -94,8 +94,8 @@ impl web_transport_trait::RecvStream for RecvStream {
self.read_chunk(max).await
}

fn close(&mut self, code: u32) {
self.close(code);
fn stop(&mut self, code: u32) {
self.stop(code);
}

async fn closed(&mut self) -> Result<(), Self::Error> {
Expand Down
17 changes: 7 additions & 10 deletions web-transport-quiche/src/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ impl SendStream {
}

/// Mark the stream as finished, such that no more data can be written.
///
/// **WARNING**: This is implicitly called on Drop, but it's a common footgun.
/// If you cancel futures by dropping them you'll get incomplete writes.
pub fn finish(&mut self) -> Result<(), StreamError> {
self.inner.finish().map_err(Into::into)
}
Expand All @@ -65,9 +62,9 @@ impl SendStream {
/// Abruptly reset the stream with the provided error code.
///
/// This is a u32 with WebTransport because it shares the error space with HTTP/3.
pub fn close(&mut self, code: u32) {
pub fn reset(&mut self, code: u32) {
let code = web_transport_proto::error_to_http3(code);
self.inner.close(code)
self.inner.reset(code)
}

/// Wait until the stream has been stopped and return the error code.
Expand All @@ -78,10 +75,10 @@ impl SendStream {

impl Drop for SendStream {
fn drop(&mut self) {
// Reset the stream if we dropped without calling `close` or `finish`
// Reset the stream if we dropped without calling `close` or `reset`
if !self.inner.is_finished().unwrap_or(true) {
tracing::warn!("stream dropped without `close` or `finish`");
self.inner.close(DROP_CODE)
tracing::warn!("stream dropped without `close` or `reset`");
self.inner.reset(DROP_CODE)
}
}
}
Expand Down Expand Up @@ -121,8 +118,8 @@ impl web_transport_trait::SendStream for SendStream {
self.set_priority(order)
}

fn close(&mut self, code: u32) {
self.close(code)
fn reset(&mut self, code: u32) {
self.reset(code)
}

fn finish(&mut self) -> Result<(), Self::Error> {
Expand Down
18 changes: 5 additions & 13 deletions web-transport-quinn/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub enum SessionError {
ConnectionError(quinn::ConnectionError),

#[error("webtransport error: {0}")]
WebTransport(#[from] WebTransportError),
WebTransportError(#[from] WebTransportError),

#[error("send datagram error: {0}")]
SendDatagramError(#[from] quinn::SendDatagramError),
Expand All @@ -54,15 +54,14 @@ impl From<quinn::ConnectionError> for SessionError {
match &e {
quinn::ConnectionError::ApplicationClosed(close) => {
match web_transport_proto::error_from_http3(close.error_code.into_inner()) {
Some(code) => WebTransportError::ApplicationClosed(
Some(code) => WebTransportError::Closed(
code,
String::from_utf8_lossy(&close.reason).into_owned(),
)
.into(),
None => SessionError::ConnectionError(e),
}
}
quinn::ConnectionError::LocallyClosed => WebTransportError::LocallyClosed.into(),
_ => SessionError::ConnectionError(e),
}
}
Expand All @@ -71,18 +70,12 @@ impl From<quinn::ConnectionError> for SessionError {
/// An error that can occur when reading/writing the WebTransport stream header.
#[derive(Clone, Error, Debug)]
pub enum WebTransportError {
#[error("application closed: code={0} reason={1}")]
ApplicationClosed(u32, String),

#[error("locally closed")]
LocallyClosed,
#[error("closed: code={0} reason={1}")]
Closed(u32, String),

#[error("unknown session")]
UnknownSession,

#[error("unknown stream")]
UnknownStream,

#[error("read error: {0}")]
ReadError(#[from] quinn::ReadExactError),

Expand Down Expand Up @@ -264,8 +257,7 @@ pub enum ServerError {

impl web_transport_trait::Error for SessionError {
fn session_error(&self) -> Option<(u32, String)> {
if let SessionError::WebTransport(WebTransportError::ApplicationClosed(code, reason)) = self
{
if let SessionError::WebTransportError(WebTransportError::Closed(code, reason)) = self {
return Some((*code, reason.to_string()));
}

Expand Down
2 changes: 1 addition & 1 deletion web-transport-quinn/src/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl tokio::io::AsyncRead for RecvStream {
impl web_transport_trait::RecvStream for RecvStream {
type Error = ReadError;

fn close(&mut self, code: u32) {
fn stop(&mut self, code: u32) {
Self::stop(self, code).ok();
}

Expand Down
2 changes: 1 addition & 1 deletion web-transport-quinn/src/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl web_transport_trait::SendStream for SendStream {
Self::set_priority(self, order.into()).ok();
}

fn close(&mut self, code: u32) {
fn reset(&mut self, code: u32) {
Self::reset(self, code).ok();
}

Expand Down
4 changes: 2 additions & 2 deletions web-transport-quinn/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ impl SessionAccept {
// Read the VarInt at the start of the stream.
let typ = VarInt::read(&mut recv)
.await
.map_err(|_| WebTransportError::UnknownStream)?;
.map_err(|_| WebTransportError::UnknownSession)?;
let typ = StreamUni(typ);

if typ == StreamUni::WEBTRANSPORT {
Expand Down Expand Up @@ -488,7 +488,7 @@ impl SessionAccept {
) -> Result<Option<(quinn::SendStream, quinn::RecvStream)>, SessionError> {
let typ = VarInt::read(&mut recv)
.await
.map_err(|_| WebTransportError::UnknownStream)?;
.map_err(|_| WebTransportError::UnknownSession)?;
if Frame(typ) != Frame::WEBTRANSPORT {
log::debug!("ignoring unknown bidirectional stream: {typ:?}");
return Ok(None);
Expand Down
2 changes: 1 addition & 1 deletion web-transport-trait/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Luke Curley"]
repository = "https://github.com/kixelated/web-transport"
license = "MIT OR Apache-2.0"

version = "0.2.0"
version = "0.3.0"
edition = "2021"

keywords = ["quic", "http3", "webtransport"]
Expand Down
16 changes: 8 additions & 8 deletions web-transport-trait/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,24 +141,24 @@ pub trait SendStream: MaybeSend {

/// Mark the stream as finished, erroring on any future writes.
///
/// [SendStream::close] can still be called to abandon any queued data.
/// [SendStream::reset] can still be called to abandon any queued data.
/// [SendStream::closed] should return when the FIN is acknowledged by the peer.
///
/// NOTE: Quinn implicitly calls this on Drop, but it's a common footgun.
/// Implementations SHOULD [SendStream::close] on Drop instead.
/// Implementations SHOULD [SendStream::reset] on Drop instead.
fn finish(&mut self) -> Result<(), Self::Error>;

/// Immediately closes the stream and discards any remaining data.
///
/// This translates into a RESET_STREAM QUIC code.
/// The peer may not receive the reset code if the stream is already closed.
fn close(&mut self, code: u32);
fn reset(&mut self, code: u32);

/// Block until the stream is closed by either side.
///
/// This includes:
/// - We sent a RESET_STREAM via [SendStream::close]
/// - We received a STOP_SENDING via [RecvStream::close]
/// - We sent a RESET_STREAM via [SendStream::reset]
/// - We received a STOP_SENDING via [RecvStream::stop]
/// - A FIN is acknowledged by the peer via [SendStream::finish]
///
/// Some implementations do not support FIN acknowledgement, in which case this will block until the FIN is sent.
Expand Down Expand Up @@ -225,13 +225,13 @@ pub trait RecvStream: MaybeSend {
///
/// An implementation MUST do this on Drop otherwise flow control will be leaked.
/// Call this method manually if you want to specify a code yourself.
fn close(&mut self, code: u32);
fn stop(&mut self, code: u32);

/// Block until the stream has been closed by either side.
///
/// This includes:
/// - We received a RESET_STREAM via [SendStream::close]
/// - We sent a STOP_SENDING via [RecvStream::close]
/// - We received a RESET_STREAM via [SendStream::reset]
/// - We sent a STOP_SENDING via [RecvStream::stop]
/// - We received a FIN via [SendStream::finish] and read all data.
fn closed(&mut self) -> impl Future<Output = Result<(), Self::Error>> + MaybeSend;

Expand Down
8 changes: 4 additions & 4 deletions web-transport-ws/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ impl SendStream {
impl Drop for SendStream {
fn drop(&mut self) {
if !self.fin && self.closed.is_none() {
generic::SendStream::close(self, 0);
generic::SendStream::reset(self, 0);
}
}
}
Expand Down Expand Up @@ -570,7 +570,7 @@ impl generic::SendStream for SendStream {
// Priority not implemented in this version
}

fn close(&mut self, code: u32) {
fn reset(&mut self, code: u32) {
if self.fin || self.closed.is_some() {
return;
}
Expand Down Expand Up @@ -653,7 +653,7 @@ impl RecvStream {
impl Drop for RecvStream {
fn drop(&mut self) {
if !self.fin && self.closed.is_none() {
generic::RecvStream::close(self, 0);
generic::RecvStream::stop(self, 0);
}
}
}
Expand Down Expand Up @@ -714,7 +714,7 @@ impl generic::RecvStream for RecvStream {
self.read_buf(&mut buf).await
}

fn close(&mut self, code: u32) {
fn stop(&mut self, code: u32) {
let code = VarInt::from(code);
let frame = StopSending { id: self.id, code };

Expand Down
2 changes: 1 addition & 1 deletion web-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = ["Luke Curley"]
repository = "https://github.com/kixelated/web-transport"
license = "MIT OR Apache-2.0"

version = "0.9.7"
version = "0.10.0"
edition = "2021"

keywords = ["quic", "http3", "webtransport"]
Expand Down