refactor: Use loop instead of futures_retry
parent
9cdccae49d
commit
9cf343db08
|
@ -1202,17 +1202,6 @@ dependencies = [
|
||||||
"syn",
|
"syn",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "futures-retry"
|
|
||||||
version = "0.6.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "fde5a672a61f96552aa5ed9fd9c81c3fbdae4be9b1e205d6eaf17c83705adc0f"
|
|
||||||
dependencies = [
|
|
||||||
"futures",
|
|
||||||
"pin-project-lite",
|
|
||||||
"tokio",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "futures-sink"
|
name = "futures-sink"
|
||||||
version = "0.3.17"
|
version = "0.3.17"
|
||||||
|
@ -2495,7 +2484,6 @@ dependencies = [
|
||||||
"cloud-storage",
|
"cloud-storage",
|
||||||
"dotenv",
|
"dotenv",
|
||||||
"futures",
|
"futures",
|
||||||
"futures-retry",
|
|
||||||
"futures-test",
|
"futures-test",
|
||||||
"indexmap",
|
"indexmap",
|
||||||
"itertools",
|
"itertools",
|
||||||
|
|
|
@ -15,7 +15,6 @@ chrono = "0.4"
|
||||||
# Google Cloud Storage integration
|
# Google Cloud Storage integration
|
||||||
cloud-storage = {version = "0.10.2", optional = true}
|
cloud-storage = {version = "0.10.2", optional = true}
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
futures-retry = "0.6"
|
|
||||||
# https://github.com/tkaitchuck/aHash/issues/95
|
# https://github.com/tkaitchuck/aHash/issues/95
|
||||||
indexmap = { version = "~1.6.2", optional = true }
|
indexmap = { version = "~1.6.2", optional = true }
|
||||||
itertools = "0.10.1"
|
itertools = "0.10.1"
|
||||||
|
|
|
@ -12,7 +12,6 @@ use futures::{
|
||||||
stream::{self, BoxStream},
|
stream::{self, BoxStream},
|
||||||
Future, Stream, StreamExt, TryStreamExt,
|
Future, Stream, StreamExt, TryStreamExt,
|
||||||
};
|
};
|
||||||
use futures_retry::{FutureRetry, RetryPolicy};
|
|
||||||
use rusoto_core::ByteStream;
|
use rusoto_core::ByteStream;
|
||||||
use rusoto_credential::{InstanceMetadataProvider, StaticProvider};
|
use rusoto_credential::{InstanceMetadataProvider, StaticProvider};
|
||||||
use rusoto_s3::S3;
|
use rusoto_s3::S3;
|
||||||
|
@ -480,20 +479,17 @@ where
|
||||||
// TODO: make the number of retries configurable
|
// TODO: make the number of retries configurable
|
||||||
let n_retries = 10;
|
let n_retries = 10;
|
||||||
|
|
||||||
FutureRetry::new(
|
loop {
|
||||||
move || {
|
let future_factory = future_factory.clone();
|
||||||
let future_factory = future_factory.clone();
|
let request = future_factory().await?;
|
||||||
|
|
||||||
async move {
|
let result = request.await;
|
||||||
let request = future_factory().await?;
|
|
||||||
|
|
||||||
request.await
|
match result {
|
||||||
}
|
Ok(r) => return Ok(r),
|
||||||
},
|
Err(e) => {
|
||||||
// retry
|
|
||||||
{
|
|
||||||
move |e| {
|
|
||||||
attempts += 1;
|
attempts += 1;
|
||||||
|
|
||||||
let should_retry = matches!(
|
let should_retry = matches!(
|
||||||
e,
|
e,
|
||||||
rusoto_core::RusotoError::Unknown(ref response)
|
rusoto_core::RusotoError::Unknown(ref response)
|
||||||
|
@ -501,17 +497,14 @@ where
|
||||||
);
|
);
|
||||||
|
|
||||||
if attempts > n_retries || !should_retry {
|
if attempts > n_retries || !should_retry {
|
||||||
RetryPolicy::ForwardError(e)
|
return Err(e);
|
||||||
} else {
|
} else {
|
||||||
RetryPolicy::WaitRetry(Duration::from_millis(2u64.pow(attempts) * 50))
|
let wait_time = Duration::from_millis(2u64.pow(attempts) * 50);
|
||||||
|
tokio::time::sleep(wait_time).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
)
|
}
|
||||||
.await
|
|
||||||
// TODO: log number of attempts?
|
|
||||||
.map(|(response, _attempts)| response)
|
|
||||||
.map_err(|(err, _attempts)| err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Error {
|
impl Error {
|
||||||
|
|
Loading…
Reference in New Issue