Skip to content

Commit

Permalink
fix: handle implicit resets at the right time (#833)
Browse files Browse the repository at this point in the history
A stream whose ref count reaches zero while open should
not immediately decrease the number of active streams,
otherwise MAX_CONCURRENT_STREAMS isn't respected anymore.
  • Loading branch information
nox authored Jan 24, 2025
1 parent e348cf3 commit 02eb53b
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/proto/streams/counts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl Counts {
}
}

if stream.is_counted {
if !stream.state.is_scheduled_reset() && stream.is_counted {
tracing::trace!("dec_num_streams; stream={:?}", stream.id);
// Decrement the number of active streams.
self.dec_num_streams(&mut stream);
Expand Down
5 changes: 4 additions & 1 deletion src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,8 +685,11 @@ impl Prioritize {
}

pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
while let Some(stream) = self.pending_send.pop(store) {
while let Some(mut stream) = self.pending_send.pop(store) {
let is_pending_reset = stream.is_pending_reset_expiration();
if let Some(reason) = stream.state.get_scheduled_reset() {
stream.set_reset(reason, Initiator::Library);
}
counts.transition_after(stream, is_pending_reset);
}
}
Expand Down
137 changes: 137 additions & 0 deletions tests/h2-tests/tests/stream_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1218,3 +1218,140 @@ async fn reset_new_stream_before_send() {

join(srv, client).await;
}

#[tokio::test]
async fn explicit_reset_with_max_concurrent_stream() {
h2_support::trace_init!();

let (io, mut srv) = mock::new();

let mock = async move {
let settings = srv
.assert_client_handshake_with_settings(frames::settings().max_concurrent_streams(1))
.await;
assert_default_settings!(settings);

srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
.await;
srv.send_frame(frames::headers(1).response(200)).await;

srv.recv_frame(frames::reset(1).cancel()).await;

srv.recv_frame(
frames::headers(3)
.request("POST", "https://www.example.com/")
.eos(),
)
.await;
srv.send_frame(frames::headers(3).response(200)).await;
};

let h2 = async move {
let (mut client, mut h2) = client::handshake(io).await.unwrap();

{
let request = Request::builder()
.method(Method::POST)
.uri("https://www.example.com/")
.body(())
.unwrap();

let (resp, mut stream) = client.send_request(request, false).unwrap();

{
let resp = h2.drive(resp).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}

stream.send_reset(Reason::CANCEL);
};

{
let request = Request::builder()
.method(Method::POST)
.uri("https://www.example.com/")
.body(())
.unwrap();

let (resp, _) = client.send_request(request, true).unwrap();

{
let resp = h2.drive(resp).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
};

h2.await.unwrap();
};

join(mock, h2).await;
}

#[tokio::test]
async fn implicit_cancel_with_max_concurrent_stream() {
h2_support::trace_init!();

let (io, mut srv) = mock::new();

let mock = async move {
let settings = srv
.assert_client_handshake_with_settings(frames::settings().max_concurrent_streams(1))
.await;
assert_default_settings!(settings);

srv.recv_frame(frames::headers(1).request("POST", "https://www.example.com/"))
.await;
srv.send_frame(frames::headers(1).response(200)).await;

srv.recv_frame(frames::reset(1).cancel()).await;

srv.recv_frame(
frames::headers(3)
.request("POST", "https://www.example.com/")
.eos(),
)
.await;
srv.send_frame(frames::headers(3).response(200)).await;
};

let h2 = async move {
let (mut client, mut h2) = client::handshake(io).await.unwrap();

{
let request = Request::builder()
.method(Method::POST)
.uri("https://www.example.com/")
.body(())
.unwrap();

let (resp, stream) = client.send_request(request, false).unwrap();

{
let resp = h2.drive(resp).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}

// This implicitly resets the stream with CANCEL.
drop(stream);
};

{
let request = Request::builder()
.method(Method::POST)
.uri("https://www.example.com/")
.body(())
.unwrap();

let (resp, _) = client.send_request(request, true).unwrap();

{
let resp = h2.drive(resp).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
};

h2.await.unwrap();
};

join(mock, h2).await;
}

0 comments on commit 02eb53b

Please sign in to comment.