Skip to content

Commit

Permalink
fix client shutdown; distinguish push and stream ids (#174, #175) (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorianUekermann authored Mar 15, 2023
1 parent 6a2be4a commit da29aea
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 171 deletions.
47 changes: 27 additions & 20 deletions h3/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ use crate::{
connection::{self, ConnectionInner, ConnectionState, SharedStateRef},
error::{Code, Error, ErrorLevel},
frame::FrameStream,
proto::{frame::Frame, headers::Header, varint::VarInt},
qpack, quic, stream,
proto::{frame::Frame, headers::Header, push::PushId, varint::VarInt},
qpack,
quic::{self, StreamId},
stream,
};

/// Start building a new HTTP/3 client
Expand Down Expand Up @@ -147,7 +149,7 @@ where
(state.peer_max_field_section_size, state.closing)
};

if closing.is_some() {
if closing {
return Err(Error::closing());
}

Expand Down Expand Up @@ -348,16 +350,21 @@ where
B: Buf,
{
inner: ConnectionInner<C, B>,
// Has a GOAWAY frame been sent? If so, this PushId is the last we are willing to accept.
sent_closing: Option<PushId>,
// Has a GOAWAY frame been received? If so, this is StreamId the last the remote will accept.
recv_closing: Option<StreamId>,
}

impl<C, B> Connection<C, B>
where
C: quic::Connection<B>,
B: Buf,
{
/// Itiniate a graceful shutdown, accepting `max_request` potentially in-flight server push
pub async fn shutdown(&mut self, max_requests: usize) -> Result<(), Error> {
self.inner.shutdown(max_requests).await
/// Initiate a graceful shutdown, accepting `max_push` potentially in-flight server pushes
pub async fn shutdown(&mut self, _max_push: usize) -> Result<(), Error> {
// TODO: Calculate remaining pushes once server push is implemented.
self.inner.shutdown(&mut self.sent_closing, PushId(0)).await
}

/// Wait until the connection is closed
Expand Down Expand Up @@ -396,12 +403,14 @@ where
//# initiated bidirectional stream encoded as a variable-length integer.
//# A client MUST treat receipt of a GOAWAY frame containing a stream ID
//# of any other type as a connection error of type H3_ID_ERROR.
if !id.is_request() {
if !StreamId::from(id).is_request() {
return Poll::Ready(Err(Code::H3_ID_ERROR.with_reason(
format!("non-request StreamId in a GoAway frame: {}", id),
ErrorLevel::ConnectionError,
)));
}
self.inner.process_goaway(&mut self.recv_closing, id)?;

info!("Server initiated graceful shutdown, last: StreamId({})", id);
}

Expand All @@ -420,22 +429,18 @@ where
)))
}
Err(e) => {
let connection_error = self
.inner
.shared
.read("poll_close error read")
.error
.as_ref()
.cloned();

match connection_error {
Some(e) if e.is_closed() => return Poll::Ready(Ok(())),
Some(e) => return Poll::Ready(Err(e)),
let connection_error = self.inner.shared.read("poll_close").error.clone();
let connection_error = match connection_error {
Some(e) => e,
None => {
self.inner.shared.write("poll_close error").error = e.clone().into();
return Poll::Ready(Err(e));
self.inner.shared.write("poll_close error").error = Some(e.clone());
e
}
};
if connection_error.is_closed() {
return Poll::Ready(Ok(()));
}
return Poll::Ready(Err(connection_error));
}
}
}
Expand Down Expand Up @@ -523,6 +528,8 @@ impl Builder {
self.send_grease,
)
.await?,
sent_closing: None,
recv_closing: None,
},
SendRequest {
open,
Expand Down
120 changes: 63 additions & 57 deletions h3/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
proto::{
frame::{Frame, PayloadLen, SettingId, Settings},
headers::Header,
stream::{StreamId, StreamType},
stream::StreamType,
varint::VarInt,
},
qpack,
Expand All @@ -29,10 +29,8 @@ pub struct SharedState {
pub peer_max_field_section_size: u64,
// connection-wide error, concerns all RequestStreams and drivers
pub error: Option<Error>,
// Has the connection received a GoAway frame? If so, this StreamId is the last
// we're willing to accept. This lets us finish the requests or pushes that were
// already in flight when the graceful shutdown was initiated.
pub closing: Option<StreamId>,
// Has a GOAWAY frame been sent or received?
pub closing: bool,
}

#[derive(Clone)]
Expand All @@ -54,7 +52,7 @@ impl Default for SharedStateRef {
Self(Arc::new(RwLock::new(SharedState {
peer_max_field_section_size: VarInt::MAX.0,
error: None,
closing: None,
closing: false,
})))
}
}
Expand Down Expand Up @@ -83,9 +81,6 @@ where
decoder_recv: Option<AcceptedRecvStream<C::RecvStream, B>>,
encoder_recv: Option<AcceptedRecvStream<C::RecvStream, B>>,
pending_recv_streams: Vec<AcceptRecvStream<C::RecvStream>>,
// The id of the last stream received by this connection:
// request and push stream for server and clients respectively.
last_accepted_stream: Option<StreamId>,
got_peer_settings: bool,
pub(super) send_grease_frame: bool,
}
Expand Down Expand Up @@ -180,7 +175,6 @@ where
decoder_recv: None,
encoder_recv: None,
pending_recv_streams: Vec::with_capacity(3),
last_accepted_stream: None,
got_peer_settings: false,
send_grease_frame: grease,
};
Expand All @@ -198,22 +192,32 @@ where
Ok(conn_inner)
}

/// Initiate graceful shutdown, accepting `max_streams` potentially in-flight streams
pub async fn shutdown(&mut self, max_streams: usize) -> Result<(), Error> {
let max_id = self
.last_accepted_stream
.map(|id| id + max_streams)
.unwrap_or_else(StreamId::first_request);
/// Send GOAWAY with specified max_id, iff max_id is smaller than the previous one.
pub async fn shutdown<T>(
&mut self,
sent_closing: &mut Option<T>,
max_id: T,
) -> Result<(), Error>
where
T: From<VarInt> + PartialOrd<T> + Copy,
VarInt: From<T>,
{
if let Some(sent_id) = sent_closing {
if *sent_id <= max_id {
return Ok(());
}
}

self.shared.write("graceful shutdown").closing = Some(max_id);
*sent_closing = Some(max_id);
self.shared.write("shutdown").closing = true;

//= https://www.rfc-editor.org/rfc/rfc9114#section-3.3
//# When either endpoint chooses to close the HTTP/3
//# connection, the terminating endpoint SHOULD first send a GOAWAY frame
//# (Section 5.2) so that both endpoints can reliably determine whether
//# previously sent frames have been processed and gracefully complete or
//# terminate any necessary remaining tasks.
stream::write(&mut self.control_send, Frame::Goaway(max_id)).await
stream::write(&mut self.control_send, Frame::Goaway(max_id.into())).await
}

pub fn poll_accept_request(
Expand Down Expand Up @@ -370,43 +374,7 @@ where
.unwrap_or(VarInt::MAX.0);
Ok(Frame::Settings(settings))
}
Frame::Goaway(id) => {
let closing = self.shared.read("connection goaway read").closing;
match closing {
Some(closing_id) if closing_id.initiator() == id.initiator() => {
//= https://www.rfc-editor.org/rfc/rfc9114#section-5.2
//# An endpoint MAY send multiple GOAWAY frames indicating different
//# identifiers, but the identifier in each frame MUST NOT be greater
//# than the identifier in any previous frame, since clients might
//# already have retried unprocessed requests on another HTTP connection.

//= https://www.rfc-editor.org/rfc/rfc9114#section-5.2
//# Like the server,
//# the client MAY send subsequent GOAWAY frames so long as the specified
//# push ID is no greater than any previously sent value.
if id <= closing_id {
self.shared.write("connection goaway overwrite").closing =
Some(id);
Ok(Frame::Goaway(id))
} else {
//= https://www.rfc-editor.org/rfc/rfc9114#section-5.2
//# Receiving a GOAWAY containing a larger identifier than previously
//# received MUST be treated as a connection error of type H3_ID_ERROR.
Err(self.close(
Code::H3_ID_ERROR,
format!("received a GoAway({}) greater than the former one ({})", id, closing_id)
))
}
}
// When closing initiator is different, the current side has already started to close
// and should not be initiating any new requests / pushes anyway. So we can ignore it.
Some(_) => Ok(Frame::Goaway(id)),
None => {
self.shared.write("connection goaway write").closing = Some(id);
Ok(Frame::Goaway(id))
}
}
}
f @ Frame::Goaway(_) => Ok(f),
f @ Frame::CancelPush(_) | f @ Frame::MaxPushId(_) => {
if self.got_peer_settings {
//= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.3
Expand Down Expand Up @@ -460,8 +428,46 @@ where
Poll::Ready(res)
}

pub fn start_stream(&mut self, id: StreamId) {
self.last_accepted_stream = Some(id);
pub(crate) fn process_goaway<T>(
&mut self,
recv_closing: &mut Option<T>,
id: VarInt,
) -> Result<(), Error>
where
T: From<VarInt> + Copy,
VarInt: From<T>,
{
{
//= https://www.rfc-editor.org/rfc/rfc9114#section-5.2
//# An endpoint MAY send multiple GOAWAY frames indicating different
//# identifiers, but the identifier in each frame MUST NOT be greater
//# than the identifier in any previous frame, since clients might
//# already have retried unprocessed requests on another HTTP connection.

//= https://www.rfc-editor.org/rfc/rfc9114#section-5.2
//# Like the server,
//# the client MAY send subsequent GOAWAY frames so long as the specified
//# push ID is no greater than any previously sent value.
if let Some(prev_id) = recv_closing.map(VarInt::from) {
if prev_id < id {
//= https://www.rfc-editor.org/rfc/rfc9114#section-5.2
//# Receiving a GOAWAY containing a larger identifier than previously
//# received MUST be treated as a connection error of type H3_ID_ERROR.
return Err(self.close(
Code::H3_ID_ERROR,
format!(
"received a GoAway({}) greater than the former one ({})",
id, prev_id
),
));
}
}
*recv_closing = Some(id.into());
if !self.shared.read("connection goaway read").closing {
self.shared.write("connection goaway overwrite").closing = true;
}
Ok(())
}
}

/// Closes a Connection with code and reason.
Expand Down
3 changes: 2 additions & 1 deletion h3/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,8 @@ impl From<frame::FrameStreamError> for Error {
.with_reason("received incomplete frame", ErrorLevel::ConnectionError),

frame::FrameStreamError::Proto(e) => match e {
proto::frame::FrameError::InvalidStreamId(_) => Code::H3_ID_ERROR,
proto::frame::FrameError::InvalidStreamId(_)
| proto::frame::FrameError::InvalidPushId(_) => Code::H3_ID_ERROR,
proto::frame::FrameError::Settings(_) => Code::H3_SETTINGS_ERROR,
proto::frame::FrameError::UnsupportedFrame(_)
| proto::frame::FrameError::UnknownFrame(_) => Code::H3_FRAME_UNEXPECTED,
Expand Down
Loading

0 comments on commit da29aea

Please sign in to comment.