feat: add deadline config to backoff system (#5489)
This will simplify event emission in #5464. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
4f119d1e40
commit
e441b5b307
|
@ -319,6 +319,7 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"observability_deps",
|
||||
"rand",
|
||||
"snafu",
|
||||
"tokio",
|
||||
"workspace-hack",
|
||||
]
|
||||
|
|
|
@ -7,4 +7,5 @@ edition = "2021"
|
|||
tokio = { version = "1.20", features = ["macros", "time"] }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
rand = "0.8"
|
||||
snafu = "0.7"
|
||||
workspace-hack = { path = "../workspace-hack"}
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
)]
|
||||
use observability_deps::tracing::info;
|
||||
use rand::prelude::*;
|
||||
use snafu::Snafu;
|
||||
use std::ops::ControlFlow;
|
||||
use std::time::Duration;
|
||||
|
||||
|
@ -28,6 +29,9 @@ pub struct BackoffConfig {
|
|||
|
||||
/// Multiplier for each backoff round.
|
||||
pub base: f64,
|
||||
|
||||
/// Timeout until we try to retry.
|
||||
pub deadline: Option<Duration>,
|
||||
}
|
||||
|
||||
impl Default for BackoffConfig {
|
||||
|
@ -36,15 +40,18 @@ impl Default for BackoffConfig {
|
|||
init_backoff: Duration::from_millis(100),
|
||||
max_backoff: Duration::from_secs(500),
|
||||
base: 3.,
|
||||
deadline: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Currently, retrying can't fail, but there should be a global maximum timeout that
|
||||
// causes an error if the total time retrying exceeds that amount.
|
||||
|
||||
/// Error after giving up retrying.
|
||||
pub type BackoffError = std::convert::Infallible;
|
||||
#[derive(Debug, Snafu, PartialEq, Eq)]
|
||||
#[allow(missing_copy_implementations, missing_docs)]
|
||||
pub enum BackoffError {
|
||||
#[snafu(display("Deadline exceeded: {deadline:?}"))]
|
||||
DeadlineExceeded { deadline: Duration },
|
||||
}
|
||||
|
||||
/// Backoff result.
|
||||
pub type BackoffResult<T> = Result<T, BackoffError>;
|
||||
|
@ -58,6 +65,8 @@ pub struct Backoff {
|
|||
next_backoff_secs: f64,
|
||||
max_backoff_secs: f64,
|
||||
base: f64,
|
||||
total: f64,
|
||||
deadline: Option<f64>,
|
||||
rng: Option<Box<dyn RngCore + Sync + Send>>,
|
||||
}
|
||||
|
||||
|
@ -68,6 +77,8 @@ impl std::fmt::Debug for Backoff {
|
|||
.field("next_backoff_secs", &self.next_backoff_secs)
|
||||
.field("max_backoff_secs", &self.max_backoff_secs)
|
||||
.field("base", &self.base)
|
||||
.field("total", &self.total)
|
||||
.field("deadline", &self.deadline)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
@ -91,12 +102,14 @@ impl Backoff {
|
|||
next_backoff_secs: init_backoff,
|
||||
max_backoff_secs: config.max_backoff.as_secs_f64(),
|
||||
base: config.base,
|
||||
total: 0.0,
|
||||
deadline: config.deadline.map(|d| d.as_secs_f64()),
|
||||
rng,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the next backoff duration to wait for
|
||||
fn next(&mut self) -> Duration {
|
||||
fn next(&mut self) -> BackoffResult<Duration> {
|
||||
let range = self.init_backoff..(self.next_backoff_secs * self.base);
|
||||
|
||||
let rand_backoff = match self.rng.as_mut() {
|
||||
|
@ -105,7 +118,18 @@ impl Backoff {
|
|||
};
|
||||
|
||||
let next_backoff = self.max_backoff_secs.min(rand_backoff);
|
||||
Duration::from_secs_f64(std::mem::replace(&mut self.next_backoff_secs, next_backoff))
|
||||
self.total += next_backoff;
|
||||
if let Some(deadline) = self.deadline {
|
||||
if self.total >= deadline {
|
||||
return Err(BackoffError::DeadlineExceeded {
|
||||
deadline: Duration::from_secs_f64(deadline),
|
||||
});
|
||||
}
|
||||
}
|
||||
Ok(Duration::from_secs_f64(std::mem::replace(
|
||||
&mut self.next_backoff_secs,
|
||||
next_backoff,
|
||||
)))
|
||||
}
|
||||
|
||||
/// Perform an async operation that retries with a backoff
|
||||
|
@ -130,7 +154,7 @@ impl Backoff {
|
|||
ControlFlow::Continue(e) => e,
|
||||
};
|
||||
|
||||
let backoff = self.next();
|
||||
let backoff = self.next()?;
|
||||
info!(
|
||||
e=%e,
|
||||
task_name,
|
||||
|
@ -181,6 +205,7 @@ mod tests {
|
|||
let config = BackoffConfig {
|
||||
init_backoff: Duration::from_secs_f64(init_backoff_secs),
|
||||
max_backoff: Duration::from_secs_f64(max_backoff_secs),
|
||||
deadline: None,
|
||||
base,
|
||||
};
|
||||
|
||||
|
@ -191,7 +216,7 @@ mod tests {
|
|||
let mut backoff = Backoff::new_with_rng(&config, Some(rng));
|
||||
|
||||
for _ in 0..20 {
|
||||
assert_eq!(backoff.next().as_secs_f64(), init_backoff_secs);
|
||||
assert_eq!(backoff.next().unwrap().as_secs_f64(), init_backoff_secs);
|
||||
}
|
||||
|
||||
// Create a static rng that takes the maximum of the range
|
||||
|
@ -200,7 +225,7 @@ mod tests {
|
|||
|
||||
for i in 0..20 {
|
||||
let value = (base.powi(i) * init_backoff_secs).min(max_backoff_secs);
|
||||
assert_fuzzy_eq(backoff.next().as_secs_f64(), value);
|
||||
assert_fuzzy_eq(backoff.next().unwrap().as_secs_f64(), value);
|
||||
}
|
||||
|
||||
// Create a static rng that takes the mid point of the range
|
||||
|
@ -209,9 +234,22 @@ mod tests {
|
|||
|
||||
let mut value = init_backoff_secs;
|
||||
for _ in 0..20 {
|
||||
assert_fuzzy_eq(backoff.next().as_secs_f64(), value);
|
||||
assert_fuzzy_eq(backoff.next().unwrap().as_secs_f64(), value);
|
||||
value =
|
||||
(init_backoff_secs + (value * base - init_backoff_secs) / 2.).min(max_backoff_secs);
|
||||
}
|
||||
|
||||
// deadline
|
||||
let rng = Box::new(StepRng::new(u64::MAX, 0));
|
||||
let deadline = Duration::from_secs_f64(init_backoff_secs);
|
||||
let mut backoff = Backoff::new_with_rng(
|
||||
&BackoffConfig {
|
||||
deadline: Some(deadline),
|
||||
..config
|
||||
},
|
||||
Some(rng),
|
||||
);
|
||||
let err = backoff.next().unwrap_err();
|
||||
assert_eq!(err, BackoffError::DeadlineExceeded { deadline });
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue