Skip to content

Commit

Permalink
add wait for remote settings being set before sending requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Yneth committed Oct 30, 2022
1 parent e4cf88c commit 81aac76
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,11 @@ where
pub fn is_extended_connect_protocol_enabled(&self) -> bool {
self.inner.is_extended_connect_protocol_enabled()
}

/// Returns negotiated max send streams
pub fn max_send_streams(&self) -> usize {
self.inner.max_send_streams()
}
}

impl<B> fmt::Debug for SendRequest<B>
Expand Down
26 changes: 26 additions & 0 deletions src/proto/streams/counts.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::*;

use std::task::{Context, Waker};
use std::usize;

#[derive(Debug)]
Expand All @@ -25,6 +26,11 @@ pub(super) struct Counts {

/// Current number of pending locally reset streams
num_reset_streams: usize,

/// If remote settings were applied
remote_settings_applied: bool,

remote_settings_applied_task: Option<Waker>,
}

impl Counts {
Expand All @@ -38,6 +44,8 @@ impl Counts {
num_recv_streams: 0,
max_reset_streams: config.local_reset_max,
num_reset_streams: 0,
remote_settings_applied: false,
remote_settings_applied_task: None,
}
}

Expand Down Expand Up @@ -108,6 +116,8 @@ impl Counts {
if let Some(val) = settings.max_concurrent_streams() {
self.max_send_streams = val as usize;
}
self.remote_settings_applied = true;
self.notify_remote_settings_applied()
}

/// Run a block of code that could potentially transition a stream's state.
Expand Down Expand Up @@ -173,6 +183,16 @@ impl Counts {
self.max_send_streams
}

/// Returns if remote settings were applied
pub(crate) fn remote_settings_applied(&self) -> bool {
self.remote_settings_applied
}

/// Sets waker task for remote settings being set
pub(crate) fn wait_remote_settings_applied(&mut self, cx: &Context) {
self.remote_settings_applied_task = Some(cx.waker().clone());
}

/// Returns the maximum number of streams that can be initiated by the
/// remote peer.
pub(crate) fn max_recv_streams(&self) -> usize {
Expand All @@ -197,6 +217,12 @@ impl Counts {
assert!(self.num_reset_streams > 0);
self.num_reset_streams -= 1;
}

fn notify_remote_settings_applied(&mut self) {
if let Some(task) = self.remote_settings_applied_task.take() {
task.wake();
}
}
}

impl Drop for Counts {
Expand Down
4 changes: 4 additions & 0 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,10 @@ where

me.actions.ensure_no_conn_error()?;
me.actions.send.ensure_next_stream_id()?;
if !me.counts.remote_settings_applied() {
me.counts.wait_remote_settings_applied(cx);
return Poll::Pending;
}

if let Some(pending) = pending {
let mut stream = me.store.resolve(pending.key);
Expand Down

0 comments on commit 81aac76

Please sign in to comment.