Skip to content

Commit 3bcfc28

Browse files
committed
Notify endpoint of drained connections synchronously
1 parent f99daa1 commit 3bcfc28

File tree

6 files changed

+36
-127
lines changed

6 files changed

+36
-127
lines changed

quinn-proto/src/connection/mod.rs

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,7 @@ use crate::{
2424
frame::{Close, Datagram, FrameStruct},
2525
packet::{Header, LongType, Packet, PartialDecode, SpaceId},
2626
range_set::ArrayRangeSet,
27-
shared::{
28-
ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent,
29-
EndpointEventInner,
30-
},
27+
shared::{ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint},
3128
token::ResetToken,
3229
transport_parameters::TransportParameters,
3330
ConnectionHandle, Dir, Endpoint, EndpointConfig, Frame, Side, StreamId, Transmit,
@@ -84,10 +81,10 @@ use timer::{Timer, TimerTable};
8481

8582
/// Protocol state and logic for a single QUIC connection
8683
///
87-
/// Objects of this type receive [`ConnectionEvent`]s and emit [`EndpointEvent`]s and application
88-
/// [`Event`]s to make progress. To handle timeouts, a `Connection` returns timer updates and
89-
/// expects timeouts through various methods. A number of simple getter methods are exposed
90-
/// to allow callers to inspect some of the connection state.
84+
/// Objects of this type receive [`ConnectionEvent`]s and emit application [`Event`]s to make
85+
/// progress. To handle timeouts, a `Connection` returns timer updates and expects timeouts through
86+
/// various methods. A number of simple getter methods are exposed to allow callers to inspect some
87+
/// of the connection state.
9188
///
9289
/// `Connection` has roughly 4 types of methods:
9390
///
@@ -108,8 +105,7 @@ use timer::{Timer, TimerTable};
108105
///
109106
/// 1. [`poll_transmit`](Self::poll_transmit)
110107
/// 2. [`poll_timeout`](Self::poll_timeout)
111-
/// 3. [`poll_endpoint_events`](Self::poll_endpoint_events)
112-
/// 4. [`poll`](Self::poll)
108+
/// 3. [`poll`](Self::poll)
113109
///
114110
/// Currently the only actual dependency is from (2) to (1), however additional
115111
/// dependencies may be added in future, so the above order is recommended.
@@ -156,7 +152,6 @@ pub struct Connection {
156152
/// Total number of outgoing packets that have been deemed lost
157153
lost_packets: u64,
158154
events: VecDeque<Event>,
159-
endpoint_events: VecDeque<EndpointEventInner>,
160155
/// Whether the spin bit is in use for this connection
161156
spin_enabled: bool,
162157
/// Outgoing spin bit state
@@ -294,7 +289,6 @@ impl Connection {
294289
retry_src_cid: None,
295290
lost_packets: 0,
296291
events: VecDeque::new(),
297-
endpoint_events: VecDeque::new(),
298292
spin_enabled: config.allow_spin && rng.gen_ratio(7, 8),
299293
spin: false,
300294
spaces: [initial_space, PacketSpace::new(now), PacketSpace::new(now)],
@@ -376,12 +370,6 @@ impl Connection {
376370
None
377371
}
378372

379-
/// Return endpoint-facing events
380-
#[must_use]
381-
pub fn poll_endpoint_events(&mut self) -> Option<EndpointEvent> {
382-
self.endpoint_events.pop_front().map(EndpointEvent)
383-
}
384-
385373
/// Provide control over streams
386374
#[must_use]
387375
pub fn streams(&mut self) -> Streams<'_> {
@@ -888,8 +876,8 @@ impl Connection {
888876
/// Process `ConnectionEvent`s generated by the associated `Endpoint`
889877
///
890878
/// Will execute protocol logic upon receipt of a connection event, in turn preparing signals
891-
/// (including application `Event`s, `EndpointEvent`s and outgoing datagrams) that should be
892-
/// extracted through the relevant methods.
879+
/// (including application `Event`s and outgoing datagrams) that should be extracted through the
880+
/// relevant methods.
893881
pub fn handle_event(&mut self, event: ConnectionEvent, endpoint: &Endpoint) {
894882
use self::ConnectionEventInner::*;
895883
match event.0 {
@@ -956,9 +944,8 @@ impl Connection {
956944

957945
/// Process timer expirations
958946
///
959-
/// Executes protocol logic, potentially preparing signals (including application `Event`s,
960-
/// `EndpointEvent`s and outgoing datagrams) that should be extracted through the relevant
961-
/// methods.
947+
/// Executes protocol logic, potentially preparing signals (including application `Event`s, and
948+
/// outgoing datagrams) that should be extracted through the relevant methods.
962949
///
963950
/// It is most efficient to call this immediately after the system clock reaches the latest
964951
/// `Instant` that was output by `poll_timeout`; however spurious extra calls will simply
@@ -973,10 +960,11 @@ impl Connection {
973960
match timer {
974961
Timer::Close => {
975962
self.state = State::Drained;
976-
self.endpoint_events.push_back(EndpointEventInner::Drained);
963+
endpoint.handle_drained(self.handle);
977964
}
978965
Timer::Idle => {
979966
self.kill(ConnectionError::TimedOut);
967+
endpoint.handle_drained(self.handle);
980968
}
981969
Timer::KeepAlive => {
982970
trace!("sending keep-alive");
@@ -2123,7 +2111,7 @@ impl Connection {
21232111
}
21242112
}
21252113
if !was_drained && self.state.is_drained() {
2126-
self.endpoint_events.push_back(EndpointEventInner::Drained);
2114+
endpoint.handle_drained(self.handle);
21272115
// Close timer may have been started previously, e.g. if we sent a close and got a
21282116
// stateless reset in response
21292117
self.timers.stop(Timer::Close);
@@ -3276,7 +3264,6 @@ impl Connection {
32763264
self.close_common();
32773265
self.error = Some(reason);
32783266
self.state = State::Drained;
3279-
self.endpoint_events.push_back(EndpointEventInner::Drained);
32803267
}
32813268
}
32823269

quinn-proto/src/endpoint.rs

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,7 @@ use crate::{
2323
crypto::{self, Keys, UnsupportedVersion},
2424
frame,
2525
packet::{Header, Packet, PacketDecodeError, PacketNumber, PartialDecode},
26-
shared::{
27-
ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, EndpointEvent,
28-
EndpointEventInner, IssuedCid,
29-
},
26+
shared::{ConnectionEvent, ConnectionEventInner, ConnectionId, EcnCodepoint, IssuedCid},
3027
transport_parameters::TransportParameters,
3128
ResetToken, RetryToken, Side, Transmit, TransportConfig, TransportError, INITIAL_MTU,
3229
MAX_CID_SIZE, MIN_INITIAL_SIZE, RESET_TOKEN_SIZE,
@@ -75,23 +72,9 @@ impl Endpoint {
7572
*self.server_config.write().unwrap() = server_config;
7673
}
7774

78-
/// Process `EndpointEvent`s emitted from related `Connection`s
79-
///
80-
/// In turn, processing this event may return a `ConnectionEvent` for the same
81-
/// `Connection`. Must never be called concurrently with the same `ch`.
82-
pub fn handle_event(
83-
&self,
84-
ch: ConnectionHandle,
85-
event: EndpointEvent,
86-
) -> Option<ConnectionEvent> {
87-
use EndpointEventInner::*;
88-
match event.0 {
89-
Drained => {
90-
let conn = self.connections.lock().unwrap().remove(ch.0);
91-
self.index.write().unwrap().remove(&conn);
92-
}
93-
}
94-
None
75+
pub(crate) fn handle_drained(&self, ch: ConnectionHandle) {
76+
let conn = self.connections.lock().unwrap().remove(ch.0);
77+
self.index.write().unwrap().remove(&conn);
9578
}
9679

9780
pub(crate) fn set_reset_token(
@@ -573,7 +556,7 @@ impl Endpoint {
573556
}
574557
Err(e) => {
575558
debug!("handshake failed: {}", e);
576-
self.handle_event(ch, EndpointEvent(EndpointEventInner::Drained));
559+
self.handle_drained(ch);
577560
match e {
578561
ConnectionError::TransportError(e) => Some(DatagramEvent::Response(
579562
self.initial_close(version, addresses, crypto, &src_cid, e),

quinn-proto/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ mod endpoint;
6565
pub use crate::endpoint::{ConnectError, ConnectionHandle, DatagramEvent, Endpoint};
6666

6767
mod shared;
68-
pub use crate::shared::{ConnectionEvent, ConnectionId, EcnCodepoint, EndpointEvent};
68+
pub use crate::shared::{ConnectionEvent, ConnectionId, EcnCodepoint};
6969

7070
mod transport_error;
7171
pub use crate::transport_error::{Code as TransportErrorCode, Error as TransportError};

quinn-proto/src/shared.rs

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,33 +20,6 @@ pub(crate) enum ConnectionEventInner {
2020
},
2121
}
2222

23-
/// Events sent from a Connection to an Endpoint
24-
#[derive(Debug)]
25-
pub struct EndpointEvent(pub(crate) EndpointEventInner);
26-
27-
impl EndpointEvent {
28-
/// Construct an event that indicating that a `Connection` will no longer emit events
29-
///
30-
/// Useful for notifying an `Endpoint` that a `Connection` has been destroyed outside of the
31-
/// usual state machine flow, e.g. when being dropped by the user.
32-
pub fn drained() -> Self {
33-
Self(EndpointEventInner::Drained)
34-
}
35-
36-
/// Determine whether this is the last event a `Connection` will emit
37-
///
38-
/// Useful for determining when connection-related event loop state can be freed.
39-
pub fn is_drained(&self) -> bool {
40-
self.0 == EndpointEventInner::Drained
41-
}
42-
}
43-
44-
#[derive(Clone, Debug, Eq, PartialEq)]
45-
pub(crate) enum EndpointEventInner {
46-
/// The connection has been drained
47-
Drained,
48-
}
49-
5023
/// Protocol-level identifier for a connection.
5124
///
5225
/// Mainly useful for identifying this connection's packets on the wire with tools like Wireshark.

quinn-proto/src/tests/util.rs

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -329,41 +329,22 @@ impl TestEndpoint {
329329
}
330330
}
331331

332-
loop {
333-
let mut endpoint_events: Vec<(ConnectionHandle, EndpointEvent)> = vec![];
334-
for (ch, conn) in self.connections.iter_mut() {
335-
if self.timeout.map_or(false, |x| x <= now) {
336-
self.timeout = None;
337-
conn.handle_timeout(now, &self.endpoint);
338-
}
339-
340-
for (_, mut events) in self.conn_events.drain() {
341-
for event in events.drain(..) {
342-
conn.handle_event(event, &self.endpoint);
343-
}
344-
}
345-
346-
while let Some(event) = conn.poll_endpoint_events() {
347-
endpoint_events.push((*ch, event));
348-
}
349-
350-
while let Some(x) = conn.poll_transmit(now, MAX_DATAGRAMS) {
351-
self.outbound.extend(split_transmit(x));
352-
}
353-
self.timeout = conn.poll_timeout();
332+
for conn in self.connections.values_mut() {
333+
if self.timeout.map_or(false, |x| x <= now) {
334+
self.timeout = None;
335+
conn.handle_timeout(now, &self.endpoint);
354336
}
355337

356-
if endpoint_events.is_empty() {
357-
break;
338+
for (_, mut events) in self.conn_events.drain() {
339+
for event in events.drain(..) {
340+
conn.handle_event(event, &self.endpoint);
341+
}
358342
}
359343

360-
for (ch, event) in endpoint_events {
361-
if let Some(event) = self.handle_event(ch, event) {
362-
if let Some(conn) = self.connections.get_mut(&ch) {
363-
conn.handle_event(event, &self.endpoint);
364-
}
365-
}
344+
while let Some(x) = conn.poll_transmit(now, MAX_DATAGRAMS) {
345+
self.outbound.extend(split_transmit(x));
366346
}
347+
self.timeout = conn.poll_timeout();
367348
}
368349
}
369350

quinn/src/connection.rs

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -236,12 +236,14 @@ impl Future for ConnectionDriver {
236236
// If a timer expires, there might be more to transmit. When we transmit something, we
237237
// might need to reset a timer. Hence, we must loop until neither happens.
238238
keep_going |= conn.drive_timer(cx, &self.0.shared);
239-
// Connection might request and receive new CIDs, which prompts a transmit. Future work:
240-
// this should probably happen synchronously inside the connection.
241-
keep_going |= conn.forward_endpoint_events(&self.0.shared);
242239
conn.forward_app_events(&self.0.shared);
243240

244-
if !conn.inner.is_drained() {
241+
if conn.inner.is_drained() {
242+
// Notify endpoint driver to clean up its own resources
243+
let _ = conn
244+
.endpoint_events
245+
.send((conn.handle, EndpointEvent::Drained));
246+
} else {
245247
if keep_going {
246248
// If the connection hasn't processed all tasks, schedule it again
247249
cx.waker().wake_by_ref();
@@ -893,23 +895,6 @@ impl State {
893895
false
894896
}
895897

896-
fn forward_endpoint_events(&mut self, shared: &Shared) -> bool {
897-
let mut keep_going = false;
898-
while let Some(event) = self.inner.poll_endpoint_events() {
899-
if event.is_drained() {
900-
// Notify endpoint driver to clean up its own resources
901-
let _ = self
902-
.endpoint_events
903-
.send((self.handle, EndpointEvent::Drained));
904-
}
905-
if let Some(event) = shared.endpoint.handle_event(self.handle, event) {
906-
self.inner.handle_event(event, &shared.endpoint);
907-
keep_going = true;
908-
}
909-
}
910-
keep_going
911-
}
912-
913898
/// If this returns `Err`, the endpoint is dead, so the driver should exit immediately.
914899
fn process_conn_events(
915900
&mut self,

0 commit comments

Comments
 (0)