diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index 6bfaaeea77..0e24b7a312 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -44,7 +44,7 @@ chrono = { workspace = true, features = ["clock"] } data-encoding = { workspace = true } flate2 = { workspace = true } fnv = { workspace = true } -futures = { workspace = true } +futures = { workspace = true, features = ["async-await"] } hashbrown = { workspace = true } hyper-util = { workspace = true } itertools = { workspace = true } diff --git a/relay-server/src/utils/stask.rs b/relay-server/src/utils/stask.rs index 8a36b1dd7c..66290947b0 100644 --- a/relay-server/src/utils/stask.rs +++ b/relay-server/src/utils/stask.rs @@ -85,3 +85,113 @@ where false } } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use futures::future::{ready, Ready}; + + use super::*; + + #[tokio::test] + async fn test_stask_empty() { + let mut stask = ScheduledTasks::>::new(); + + assert_eq!(stask.len(), 0); + let mut next = stask.next(); + for _ in 0..10 { + assert_eq!(futures::poll!(&mut next), Poll::Pending); + } + assert_eq!(stask.len(), 0); + } + + #[tokio::test] + async fn test_stask_immediate_task() { + let mut stask = ScheduledTasks::new(); + + stask.schedule(None, ready(())); + assert_eq!(stask.len(), 1); + + let mut next = stask.next(); + assert_eq!(futures::poll!(&mut next), Poll::Ready(Some(()))); + assert_eq!(stask.len(), 0); + } + + #[tokio::test(start_paused = true)] + async fn test_stask_scheduled_task() { + let mut stask = ScheduledTasks::new(); + + stask.schedule(Some(Instant::now() + Duration::from_secs(3)), ready(())); + assert_eq!(stask.len(), 1); + + let mut next = stask.next(); + assert_eq!(futures::poll!(&mut next), Poll::Pending); + tokio::time::sleep(Duration::from_millis(2800)).await; + assert_eq!(futures::poll!(&mut next), Poll::Pending); + tokio::time::sleep(Duration::from_millis(201)).await; + assert_eq!(futures::poll!(&mut next), Poll::Ready(Some(()))); + + assert_eq!(stask.len(), 0); + } + + #[tokio::test(start_paused = true)] + async fn test_stask_scheduled_task_next_cancelled() { + let mut stask = ScheduledTasks::new(); + + stask.schedule(Some(Instant::now() + Duration::from_secs(3)), ready(())); + assert_eq!(stask.len(), 1); + + let mut next = stask.next(); + assert_eq!(futures::poll!(&mut next), Poll::Pending); + tokio::time::sleep(Duration::from_millis(2800)).await; + assert_eq!(futures::poll!(&mut next), Poll::Pending); + drop(next); + + assert_eq!(stask.len(), 1); + tokio::time::sleep(Duration::from_millis(201)).await; + assert_eq!(futures::poll!(stask.next()), Poll::Ready(Some(()))); + + assert_eq!(stask.len(), 0); + } + + #[tokio::test(start_paused = true)] + async fn test_stask_mixed_tasks() { + let mut stask = ScheduledTasks::new(); + + let now = Instant::now(); + + stask.schedule(None, ready(0)); + stask.schedule(Some(now + Duration::from_secs(2)), ready(2)); + stask.schedule(Some(now + Duration::from_secs(1)), ready(1)); + stask.schedule(Some(now + Duration::from_secs(3)), ready(3)); + assert_eq!(stask.len(), 4); + + assert_eq!(stask.next().await, Some(0)); + assert_eq!(stask.next().await, Some(1)); + stask.schedule(None, ready(90)); + assert_eq!(stask.next().await, Some(90)); + stask.schedule(Some(now), ready(91)); // Now in the past. + assert_eq!(stask.next().await, Some(91)); + assert_eq!(stask.next().await, Some(2)); + stask.schedule(Some(now + Duration::from_secs(4)), ready(4)); + assert_eq!(stask.len(), 2); + assert_eq!(stask.next().await, Some(3)); + assert_eq!(stask.next().await, Some(4)); + + assert_eq!(stask.len(), 0); + assert!(Instant::now() < now + Duration::from_millis(4001)); + assert_eq!(futures::poll!(stask.next()), Poll::Pending); + + stask.schedule(Some(Instant::now()), ready(92)); + assert_eq!( + tokio::time::timeout(Duration::from_millis(1), stask.next()) + .await + .unwrap(), + Some(92) + ); + + assert_eq!(futures::poll!(stask.next()), Poll::Pending); + assert_eq!(stask.len(), 0); + } +}