Skip to content

Commit 1e3fa87

Browse files
authored
fix: transport context polling order (#456)
This PR provides a fix for the strange order identified in #455 and updates the `transport_events` test to validate the new behaviour. As explained in #455, polling order is now sanitized and resumes wherever it left off in a previous call to `poll_next`. resolves #455
1 parent c69b5d5 commit 1e3fa87

File tree

1 file changed

+54
-15
lines changed

1 file changed

+54
-15
lines changed

src/transport/manager/mod.rs

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -174,9 +174,9 @@ impl Stream for TransportContext {
174174
}
175175

176176
let len = self.transports.len();
177-
self.index = (self.index + 1) % len;
178-
for index in 0..len {
179-
let current = (self.index + index) % len;
177+
for _ in 0..len {
178+
let current = self.index;
179+
self.index = (current + 1) % len;
180180
let (key, stream) = self.transports.get_index_mut(current).expect("transport to exist");
181181
match stream.poll_next_unpin(cx) {
182182
Poll::Pending => {}
@@ -1506,6 +1506,7 @@ mod tests {
15061506

15071507
#[tokio::test]
15081508
#[cfg(feature = "websocket")]
1509+
#[cfg(feature = "quic")]
15091510
async fn transport_events() {
15101511
let mut transports = TransportContext::new();
15111512

@@ -1517,23 +1518,27 @@ mod tests {
15171518
let transport = MockTransport::new(rx);
15181519
transports.register_transport(SupportedTransport::WebSocket, Box::new(transport));
15191520

1521+
let (tx_quic, rx) = tokio::sync::mpsc::channel(8);
1522+
let transport = MockTransport::new(rx);
1523+
transports.register_transport(SupportedTransport::Quic, Box::new(transport));
1524+
15201525
assert_eq!(transports.index, 0);
1521-
assert_eq!(transports.transports.len(), 2);
1526+
assert_eq!(transports.transports.len(), 3);
15221527
// No items.
15231528
futures::future::poll_fn(|cx| match transports.poll_next_unpin(cx) {
15241529
std::task::Poll::Ready(_) => panic!("didn't expect event from `TransportService`"),
15251530
std::task::Poll::Pending => std::task::Poll::Ready(()),
15261531
})
15271532
.await;
1528-
assert_eq!(transports.index, 1);
1533+
assert_eq!(transports.index, 0);
15291534

15301535
// Websocket events.
15311536
tx_ws
15321537
.send(TransportEvent::PendingInboundConnection {
15331538
connection_id: ConnectionId::from(1),
15341539
})
15351540
.await
1536-
.expect("chanel to be open");
1541+
.expect("channel to be open");
15371542

15381543
let event = futures::future::poll_fn(|cx| transports.poll_next_unpin(cx))
15391544
.await
@@ -1543,15 +1548,15 @@ mod tests {
15431548
event.1,
15441549
TransportEvent::PendingInboundConnection { .. }
15451550
));
1546-
assert_eq!(transports.index, 0);
1551+
assert_eq!(transports.index, 2);
15471552

15481553
// TCP events.
15491554
tx_tcp
15501555
.send(TransportEvent::PendingInboundConnection {
15511556
connection_id: ConnectionId::from(2),
15521557
})
15531558
.await
1554-
.expect("chanel to be open");
1559+
.expect("channel to be open");
15551560

15561561
let event = futures::future::poll_fn(|cx| transports.poll_next_unpin(cx))
15571562
.await
@@ -1563,19 +1568,43 @@ mod tests {
15631568
));
15641569
assert_eq!(transports.index, 1);
15651570

1566-
// Both transports produce events.
1567-
tx_ws
1571+
// QUIC events
1572+
tx_quic
15681573
.send(TransportEvent::PendingInboundConnection {
15691574
connection_id: ConnectionId::from(3),
15701575
})
15711576
.await
1572-
.expect("chanel to be open");
1573-
tx_tcp
1577+
.expect("channel to be open");
1578+
1579+
let event = futures::future::poll_fn(|cx| transports.poll_next_unpin(cx))
1580+
.await
1581+
.expect("expected event");
1582+
assert_eq!(event.0, SupportedTransport::Quic);
1583+
assert!(std::matches!(
1584+
event.1,
1585+
TransportEvent::PendingInboundConnection { .. }
1586+
));
1587+
assert_eq!(transports.index, 0);
1588+
1589+
// All three transports produce events.
1590+
tx_ws
15741591
.send(TransportEvent::PendingInboundConnection {
15751592
connection_id: ConnectionId::from(4),
15761593
})
15771594
.await
1578-
.expect("chanel to be open");
1595+
.expect("channel to be open");
1596+
tx_tcp
1597+
.send(TransportEvent::PendingInboundConnection {
1598+
connection_id: ConnectionId::from(5),
1599+
})
1600+
.await
1601+
.expect("channel to be open");
1602+
tx_quic
1603+
.send(TransportEvent::PendingInboundConnection {
1604+
connection_id: ConnectionId::from(6),
1605+
})
1606+
.await
1607+
.expect("channel to be open");
15791608

15801609
let event = futures::future::poll_fn(|cx| transports.poll_next_unpin(cx))
15811610
.await
@@ -1585,7 +1614,7 @@ mod tests {
15851614
event.1,
15861615
TransportEvent::PendingInboundConnection { .. }
15871616
));
1588-
assert_eq!(transports.index, 0);
1617+
assert_eq!(transports.index, 1);
15891618

15901619
let event = futures::future::poll_fn(|cx| transports.poll_next_unpin(cx))
15911620
.await
@@ -1595,7 +1624,17 @@ mod tests {
15951624
event.1,
15961625
TransportEvent::PendingInboundConnection { .. }
15971626
));
1598-
assert_eq!(transports.index, 1);
1627+
assert_eq!(transports.index, 2);
1628+
1629+
let event = futures::future::poll_fn(|cx| transports.poll_next_unpin(cx))
1630+
.await
1631+
.expect("expected event");
1632+
assert_eq!(event.0, SupportedTransport::Quic);
1633+
assert!(std::matches!(
1634+
event.1,
1635+
TransportEvent::PendingInboundConnection { .. }
1636+
));
1637+
assert_eq!(transports.index, 0);
15991638
}
16001639

16011640
#[test]

0 commit comments

Comments
 (0)