diff --git a/quinn-proto/src/connection/datagrams.rs b/quinn-proto/src/connection/datagrams.rs index c22e8d7155..cf8e4d3ff0 100644 --- a/quinn-proto/src/connection/datagrams.rs +++ b/quinn-proto/src/connection/datagrams.rs @@ -57,6 +57,12 @@ impl Datagrams<'_> { Ok(()) } + /// Marks a sender as blocked. + /// Triggers a `datagram_unblocked` notification once sending becomes possible again. + pub fn set_send_blocked(&mut self) { + self.conn.datagrams.send_blocked = true; + } + /// Compute the maximum size of datagrams that may passed to `send_datagram` /// /// Returns `None` if datagrams are unsupported by the peer or disabled locally. @@ -87,6 +93,11 @@ impl Datagrams<'_> { self.conn.datagrams.recv() } + /// Recv Bytes currently stored in the read buffer + pub fn recv_buffered(&self) -> usize { + self.conn.datagrams.recv_buffered + } + /// Bytes available in the outgoing datagram buffer /// /// When greater than zero, [`send`](Self::send)ing a datagram of at most this size is diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index ca2014b6d0..171c432617 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -345,6 +345,14 @@ impl Connection { } } + /// Resolves as soon as a readable datagram is buffered + pub fn datagram_readable(&self) -> DatagramReadable<'_> { + DatagramReadable { + conn: &self.0, + notify: self.0.shared.datagram_received.notified(), + } + } + /// Receive an application datagram pub fn read_datagram(&self) -> ReadDatagram<'_> { ReadDatagram { @@ -353,6 +361,19 @@ impl Connection { } } + /// Attempts to receive an application datagram + /// + /// If there are no readable datagrams, this will return [TryReceiveDatagramError::WouldBlock] + pub fn try_read_datagram(&self) -> Result, ConnectionError> { + let mut state = self.0.state.lock("try_read_datagram"); + + if let Some(ref e) = state.error { + return Err(e.clone()); + } + + Ok(state.inner.datagrams().recv()) + } + /// Wait for the connection to be closed for any reason /// /// Despite the return type's name, closed connections are often not an error condition at the @@ -422,6 +443,15 @@ impl Connection { conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared); } + /// Creates a future, resolving as soon as a Datagram with the given size in byte can be sent + pub fn datagram_sendable(&self, size: usize) -> DatagramSendable<'_> { + DatagramSendable { + conn: &self.0, + required_space: size, + notify: self.0.shared.datagrams_unblocked.notified(), + } + } + /// Transmit `data` as an unreliable, unordered application datagram /// /// Application datagrams are a low-level primitive. They may be lost or delivered out of order, @@ -450,6 +480,33 @@ impl Connection { } } + /// Transmit `data` as an unreliable, unordered application datagram + /// + /// Application datagrams are a low-level primitive. They may be lost or delivered out of order, + /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum + /// dictated by the peer. + /// + /// If the send buffer doesn't have enough available space, this will return [TrySendDatagramError::WouldBlock] + pub fn try_send_datagram(&self, data: Bytes) -> Result<(), TrySendDatagramError> { + let conn = &mut *self.0.state.lock("try_send_datagram"); + if let Some(ref x) = conn.error { + return Err(SendDatagramError::ConnectionLost(x.clone()).into()); + } + use proto::SendDatagramError::*; + match conn.inner.datagrams().send(data, false) { + Ok(()) => { + conn.wake(); + Ok(()) + } + Err(e) => Err(match e { + Blocked(bytes) => TrySendDatagramError::WouldBlock(bytes), + UnsupportedByPeer => SendDatagramError::UnsupportedByPeer.into(), + Disabled => SendDatagramError::Disabled.into(), + TooLarge => SendDatagramError::TooLarge.into(), + }), + } + } + /// Transmit `data` as an unreliable, unordered application datagram /// /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion @@ -821,6 +878,46 @@ impl Future for ReadDatagram<'_> { } } +pin_project! { + /// Future produced by [`Connection::datagram_readable`] + pub struct DatagramReadable<'a> { + conn: &'a ConnectionRef, + #[pin] + notify: Notified<'a>, + } +} + +impl Future for DatagramReadable<'_> { + type Output = Result<(), ConnectionError>; + fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + let mut state = this.conn.state.lock("ReadDatagram::poll"); + + // Check for buffered datagrams before checking `state.error` so that already-received + // datagrams, which are necessarily finite, can be drained from a closed connection. + + if state.inner.datagrams().recv_buffered() > 0 { + return Poll::Ready(Ok(())); + } + + if let Some(ref e) = state.error { + return Poll::Ready(Err(e.clone())); + } + + loop { + // Poll next datagram received notification + match this.notify.as_mut().poll(ctx) { + // `state` lock ensures we didn't race with readiness + Poll::Pending => return Poll::Pending, + // Replace already used Notify from previous poll + Poll::Ready(()) => this + .notify + .set(this.conn.shared.datagram_received.notified()), + } + } + } +} + pin_project! { /// Future produced by [`Connection::send_datagram_wait`] pub struct SendDatagram<'a> { @@ -854,6 +951,7 @@ impl Future for SendDatagram<'_> { this.data.replace(data); loop { match this.notify.as_mut().poll(ctx) { + // `state` lock ensures we didn't race with readiness Poll::Pending => return Poll::Pending, // Spurious wakeup, get a new future Poll::Ready(()) => this @@ -870,6 +968,59 @@ impl Future for SendDatagram<'_> { } } +pin_project! { + /// Future produced by [`Connection::sendable_datagram`] + pub struct DatagramSendable<'a> { + conn: &'a ConnectionRef, + required_space: usize, + #[pin] + notify: Notified<'a>, + } +} + +impl Future for DatagramSendable<'_> { + type Output = Result<(), SendDatagramError>; + fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + let mut state = this.conn.state.lock("DatagramSendable::poll"); + if let Some(ref e) = state.error { + return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone()))); + } + + // Check if peer support datagrams + let max = state + .inner + .datagrams() + .max_size() + .ok_or(SendDatagramError::UnsupportedByPeer)?; + + if *this.required_space > max { + return Poll::Ready(Err(SendDatagramError::TooLarge)); + } + + // Check if send can be satisfied + if state.inner.datagrams().send_buffer_space() > *this.required_space { + // We currently have enough space to satisfy the requirements + return Poll::Ready(Ok(())); + } + + // Otherwise - set send_blocked and wait for the unblocked notification + state.inner.datagrams().set_send_blocked(); + + loop { + // Poll next datagram unblocked + match this.notify.as_mut().poll(ctx) { + // `state` lock ensures we didn't race with readiness + Poll::Pending => return Poll::Pending, + // Replace already used Notify + Poll::Ready(()) => this + .notify + .set(this.conn.shared.datagrams_unblocked.notified()), + } + } + } +} + #[derive(Debug)] pub(crate) struct ConnectionRef(Arc); @@ -1313,6 +1464,37 @@ pub enum SendDatagramError { ConnectionLost(#[from] ConnectionError), } +/// Errors that can arise when trying to send a datagram without blocking +#[derive(Debug, Error, Clone, Eq, PartialEq)] +pub enum TrySendDatagramError { + /// Send Would Block - contains the unsent Bytes + #[error("send would block")] + WouldBlock(Bytes), + /// Actual Error sending the Datagram + #[error(transparent)] + SendDatagramError(#[from] SendDatagramError), +} + +/// Errors that can arise when trying to receive a datagram without blocking +#[derive(Debug, Error, Clone, PartialEq, Eq)] +pub enum TryReceiveDatagramError { + /// The operation would block + #[error("operation would block")] + WouldBlock, + /// A Connection error has occurred + #[error(transparent)] + ConnectionError(#[from] ConnectionError), +} + +impl From for io::Error { + fn from(err: TryReceiveDatagramError) -> Self { + match err { + TryReceiveDatagramError::ConnectionError(err) => err.into(), + TryReceiveDatagramError::WouldBlock => Self::new(io::ErrorKind::WouldBlock, err), + } + } +} + /// The maximum amount of datagrams which will be produced in a single `drive_transmit` call /// /// This limits the amount of CPU resources consumed by datagram generation, diff --git a/quinn/src/lib.rs b/quinn/src/lib.rs index 82becdb1de..59560660e2 100644 --- a/quinn/src/lib.rs +++ b/quinn/src/lib.rs @@ -76,7 +76,7 @@ pub use udp; pub use crate::connection::{ AcceptBi, AcceptUni, Connecting, Connection, OpenBi, OpenUni, ReadDatagram, SendDatagram, - SendDatagramError, ZeroRttAccepted, + SendDatagramError, TryReceiveDatagramError, TrySendDatagramError, ZeroRttAccepted, }; pub use crate::endpoint::{Accept, Endpoint, EndpointStats}; pub use crate::incoming::{Incoming, IncomingFuture, RetryError};