Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async fn main() {
.max_delay(Duration::from_secs(3)) // Cap maximum delay at 3 seconds
.exponential(Duration::from_secs(1)) // Use exponential backoff
.full_jitter() // Add randomized jitter
.retry(|| async {
.execute(|| async {
fallible_operation("connection failed").await
})
.await;
Expand All @@ -86,11 +86,11 @@ async fn main() {
.max_delay(Duration::from_secs(3)) // Cap maximum delay at 3 seconds
.exponential(Duration::from_secs(1)) // Use exponential backoff
.full_jitter() // Add randomized jitter
.on_retry(|prev, attempts| { // Run before each retry.
.after_attempt(|prev, attempts| { // Run before each retry.
println!("In the {}-th attempt, the returned result is {:?}.", attempts, prev);
println!("Start next attempt");
})
.retry(|| async {
.execute(|| async {
fallible_operation("connection failed").await
})
.await;
Expand Down
22 changes: 6 additions & 16 deletions examples/hello_world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ async fn main() {
.max_delay(Duration::from_secs(3))
.full_jitter()
.exponential(Duration::from_secs(1))
.retry(|| async { this_errors("hello").await })
.execute(|| async { this_errors("hello").await })
.await
});
let world = tokio::spawn(async move {
mulligan::until(|r| r.is_ok())
.stop_after(10)
.jitter(mulligan::jitter::Full)
.fixed(Duration::from_secs(1))
.retry(|| async { this_errors("world").await })
.execute(|| async { this_errors("world").await })
.await
});

Expand All @@ -30,13 +30,8 @@ async fn main() {
.stop_after(10)
.full_jitter()
.fixed(Duration::from_millis(200))
.on_retry(|res, attempt| {
println!(
"[retry] start to call retry(): attempt = {}, prev = {:?}",
attempt, res
)
})
.retry(|| async { this_errors("[retry] running").await })
.after_attempt(|res, attempt| println!("Attempt = {}, result = {:?}", attempt, res))
.execute(|| async { this_errors("Oh uh!!!").await })
.await
});

Expand All @@ -49,13 +44,8 @@ async fn main() {
.stop_after(3)
.full_jitter()
.fixed(Duration::from_millis(200))
.on_retry(|res, attempt| {
println!(
"[on_retry] start to call retry() again. In last attempt = {}, result = {:?}",
attempt, res
)
})
.retry(|| async { this_errors("[retry] call `.retry()` and failed").await })
.after_attempt(|res, attempt| println!("Attempt = {}, result = {:?}", attempt, res))
.execute(|| async { this_errors("Uh oh!!!").await })
.await
})
.await;
Expand Down
120 changes: 99 additions & 21 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub use jitter::{Decorrelated, Equal, Full, Jitter, NoJitter};
/// .max_delay(Duration::from_secs(3))
/// .full_jitter()
/// .exponential(Duration::from_secs(1))
/// .retry(|| async { this_errors("hello").await })
/// .execute(|| async { this_errors("hello").await })
/// .await;
/// # }
/// ```
Expand All @@ -55,7 +55,7 @@ pub fn until_ok<T, E>() -> Mulligan<T, E, impl Fn(&Result<T, E>) -> bool, NoJitt
/// .max_delay(Duration::from_secs(3))
/// .full_jitter()
/// .exponential(Duration::from_secs(1))
/// .retry(|| async { this_errors("hello").await })
/// .execute(|| async { this_errors("hello").await })
/// .await;
/// # }
/// ```
Expand All @@ -69,7 +69,8 @@ where
backoff: Fixed::base(Duration::from_secs(0)),
jitterable: jitter::NoJitter,
max: None,
on_retry: None,
before_attempt: None,
after_attempt: None,
_phantom: PhantomData,
}
}
Expand All @@ -86,7 +87,8 @@ where
backoff: Back,
jitterable: Jit,
max: Option<Duration>,
on_retry: Option<Box<dyn Fn(&Result<T, E>, u32) + Send + Sync + 'static>>,
before_attempt: Option<Box<dyn Fn(u32) + Send + Sync + 'static>>,
after_attempt: Option<Box<dyn Fn(&Result<T, E>, u32) + Send + Sync + 'static>>,
_phantom: PhantomData<(T, E)>,
}

Expand All @@ -112,30 +114,81 @@ where
///
/// # async fn example() {
/// mulligan::until_ok()
/// .retry(|| async { this_errors("hello").await })
/// .execute(|| async { this_errors("hello").await })
/// .await;
/// # }
/// ```
pub async fn retry<F, Fut>(mut self, f: F) -> Result<T, E>
pub async fn execute<F, Fut>(mut self, f: F) -> Result<T, E>
where
F: Fn() -> Fut + 'static,
Fut: Future<Output = Result<T, E>> + Send,
{
let mut attempt: u32 = 0;
loop {
if let Some(before_attempt) = &self.before_attempt {
before_attempt(attempt);
}

let res = f().await;

if self.stop_after.map_or(false, |max| attempt >= max) | (self.until)(&res) {
return res;
}
let delay = self.backoff.delay(attempt);
let jittered = self.jitterable.jitter(delay, self.max);

Self::sleep(jittered).await;
let delay = self.calculate_delay(attempt);

Self::sleep(delay).await;

if let Some(after_attempt) = &self.after_attempt {
after_attempt(&res, attempt);
}

attempt += 1;
}
}
/// Retries a provided function until the stopping condition has been met. The default settings will
/// retry forever with no delay between attempts. Backoff, Maximum Backoff, and Maximum Attempts
/// can be configured with the other methods on the struct.
///
/// # Examples
///
/// ```
/// use std::time::Duration;
///
/// fn this_errors(msg: &str) -> std::io::Result<()> {
/// println!("{msg}");
/// Err(std::io::Error::other("uh oh!"))
/// }
///
/// # async fn example() {
/// mulligan::until_ok()
/// .stop_after(2)
/// .execute_sync(move || { this_errors("hello") });
/// # }
/// ```
pub fn execute_sync<F>(mut self, f: F) -> Result<T, E>
where
F: Fn() -> Result<T, E>,
{
let mut attempt: u32 = 0;
loop {
if let Some(before_attempt) = &self.before_attempt {
before_attempt(attempt);
}

let res = f();

if let Some(on_retry) = &self.on_retry {
on_retry(&res, attempt);
if self.stop_after.map_or(false, |max| attempt >= max) | (self.until)(&res) {
return res;
}

let delay = self.calculate_delay(attempt);

std::thread::sleep(delay);

if let Some(after_attempt) = &self.after_attempt {
after_attempt(&res, attempt);
}
attempt += 1;
}
}
Expand All @@ -145,18 +198,35 @@ where
/// For the incoming function, the first parameter represents
/// the result of the last execution, and the second parameter
/// represents the number of times it has been executed.
pub fn on_retry<F>(mut self, on_retry: F) -> Self
pub fn before_attempt<F>(mut self, before_attempt: F) -> Self
where
F: Fn(u32) + Send + Sync + 'static,
{
self.before_attempt = Some(Box::new(before_attempt));
self
}
/// Sets the function to be called before each retry;
/// it will not be called before the first execution.
///
/// For the incoming function, the first parameter represents
/// the result of the last execution, and the second parameter
/// represents the number of times it has been executed.
pub fn after_attempt<F>(mut self, after_attempt: F) -> Self
where
F: Fn(&Result<T, E>, u32) + Send + Sync + 'static,
{
self.on_retry = Some(Box::new(on_retry));
self.after_attempt = Some(Box::new(after_attempt));
self
}
/// Sets the maximum number of attempts to retry before stopping regardless of whether `until` condition has been met.
pub fn stop_after(mut self, attempts: u32) -> Self {
self.stop_after = Some(attempts);
self
}
fn calculate_delay(&mut self, attempt: u32) -> Duration {
let delay = self.backoff.delay(attempt);
self.jitterable.jitter(delay, self.max)
}
/// Adjust the backoff by the provided jitter strategy
pub fn jitter<J>(self, jitter: J) -> Mulligan<T, E, Cond, J, Back>
where
Expand All @@ -168,7 +238,8 @@ where
backoff: self.backoff,
jitterable: jitter,
max: self.max,
on_retry: self.on_retry,
before_attempt: self.before_attempt,
after_attempt: self.after_attempt,
_phantom: PhantomData,
}
}
Expand All @@ -180,7 +251,8 @@ where
backoff: self.backoff,
jitterable: jitter::Full,
max: self.max,
on_retry: self.on_retry,
before_attempt: self.before_attempt,
after_attempt: self.after_attempt,
_phantom: PhantomData,
}
}
Expand All @@ -192,7 +264,8 @@ where
backoff: self.backoff,
jitterable: jitter::Equal,
max: self.max,
on_retry: self.on_retry,
before_attempt: self.before_attempt,
after_attempt: self.after_attempt,
_phantom: PhantomData,
}
}
Expand All @@ -207,7 +280,8 @@ where
backoff: self.backoff,
jitterable: jitter::Decorrelated::base(base),
max: self.max,
on_retry: self.on_retry,
before_attempt: self.before_attempt,
after_attempt: self.after_attempt,
_phantom: PhantomData,
}
}
Expand All @@ -222,7 +296,8 @@ where
backoff,
jitterable: self.jitterable,
max: self.max,
on_retry: self.on_retry,
before_attempt: self.before_attempt,
after_attempt: self.after_attempt,
_phantom: PhantomData,
}
}
Expand All @@ -234,7 +309,8 @@ where
backoff: Fixed::base(dur),
jitterable: self.jitterable,
max: self.max,
on_retry: self.on_retry,
before_attempt: self.before_attempt,
after_attempt: self.after_attempt,
_phantom: PhantomData,
}
}
Expand All @@ -246,7 +322,8 @@ where
backoff: Linear::base(dur),
jitterable: self.jitterable,
max: self.max,
on_retry: self.on_retry,
before_attempt: self.before_attempt,
after_attempt: self.after_attempt,
_phantom: PhantomData,
}
}
Expand All @@ -258,7 +335,8 @@ where
backoff: Exponential::base(dur),
jitterable: self.jitterable,
max: self.max,
on_retry: self.on_retry,
before_attempt: self.before_attempt,
after_attempt: self.after_attempt,
_phantom: PhantomData,
}
}
Expand Down