refactor: Extract an s3_request method that handles retries
parent
fd3a027ca8
commit
5c78b7d5ae
|
@ -1202,6 +1202,17 @@ dependencies = [
|
|||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-retry"
|
||||
version = "0.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fde5a672a61f96552aa5ed9fd9c81c3fbdae4be9b1e205d6eaf17c83705adc0f"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "futures-sink"
|
||||
version = "0.3.17"
|
||||
|
@ -2484,6 +2495,7 @@ dependencies = [
|
|||
"cloud-storage",
|
||||
"dotenv",
|
||||
"futures",
|
||||
"futures-retry",
|
||||
"futures-test",
|
||||
"indexmap",
|
||||
"itertools",
|
||||
|
|
|
@ -15,6 +15,7 @@ chrono = "0.4"
|
|||
# Google Cloud Storage integration
|
||||
cloud-storage = {version = "0.10.2", optional = true}
|
||||
futures = "0.3"
|
||||
futures-retry = "0.6"
|
||||
# https://github.com/tkaitchuck/aHash/issues/95
|
||||
indexmap = { version = "~1.6.2", optional = true }
|
||||
itertools = "0.10.1"
|
||||
|
|
|
@ -10,14 +10,14 @@ use bytes::Bytes;
|
|||
use chrono::{DateTime, Utc};
|
||||
use futures::{
|
||||
stream::{self, BoxStream},
|
||||
Stream, StreamExt, TryStreamExt,
|
||||
Future, Stream, StreamExt, TryStreamExt,
|
||||
};
|
||||
use futures_retry::{FutureRetry, RetryPolicy};
|
||||
use rusoto_core::ByteStream;
|
||||
use rusoto_credential::{InstanceMetadataProvider, StaticProvider};
|
||||
use rusoto_s3::S3;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::convert::TryFrom;
|
||||
use std::{fmt, io};
|
||||
use std::{convert::TryFrom, fmt, io, time::Duration};
|
||||
|
||||
/// A specialized `Result` for object store-related errors
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -379,39 +379,52 @@ impl AmazonS3 {
|
|||
use ListState::*;
|
||||
|
||||
let raw_prefix = prefix.map(|p| p.to_raw());
|
||||
let bucket = self.bucket_name.clone();
|
||||
|
||||
let request_factory = move || rusoto_s3::ListObjectsV2Request {
|
||||
bucket: self.bucket_name.clone(),
|
||||
bucket,
|
||||
prefix: raw_prefix.clone(),
|
||||
delimiter: delimiter.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let bucket = self.bucket_name.clone();
|
||||
Ok(stream::unfold(ListState::Start, move |state| {
|
||||
let request_factory = request_factory.clone();
|
||||
let (bucket, request_factory) = (bucket.clone(), request_factory.clone());
|
||||
let s3 = self.client.clone();
|
||||
|
||||
async move {
|
||||
let mut list_request = request_factory();
|
||||
|
||||
match state.clone() {
|
||||
HasMore(continuation_token) => {
|
||||
list_request.continuation_token = Some(continuation_token);
|
||||
}
|
||||
let continuation_token = match state.clone() {
|
||||
HasMore(continuation_token) => Some(continuation_token),
|
||||
Done => {
|
||||
return None;
|
||||
}
|
||||
// If this is the first request we've made, we don't need to make any
|
||||
// modifications to the request
|
||||
Start => {}
|
||||
}
|
||||
Start => None,
|
||||
};
|
||||
|
||||
let resp = s3_request(move || {
|
||||
let (s3, bucket, request_factory, continuation_token) = (
|
||||
s3.clone(),
|
||||
bucket.clone(),
|
||||
request_factory.clone(),
|
||||
continuation_token.clone(),
|
||||
);
|
||||
|
||||
async move {
|
||||
Ok(async move {
|
||||
s3.list_objects_v2(rusoto_s3::ListObjectsV2Request {
|
||||
continuation_token,
|
||||
..request_factory()
|
||||
})
|
||||
.await
|
||||
.context(UnableToListData { bucket })
|
||||
})
|
||||
}
|
||||
})
|
||||
.await;
|
||||
|
||||
let resp =
|
||||
self.client
|
||||
.list_objects_v2(list_request)
|
||||
.await
|
||||
.context(UnableToListData {
|
||||
bucket: &self.bucket_name,
|
||||
});
|
||||
let resp = match resp {
|
||||
Ok(resp) => resp,
|
||||
Err(e) => return Some((Err(e), state)),
|
||||
|
@ -435,6 +448,45 @@ impl AmazonS3 {
|
|||
}
|
||||
}
|
||||
|
||||
async fn s3_request<F, G, H>(future_factory: F) -> Result<rusoto_s3::ListObjectsV2Output>
|
||||
where
|
||||
F: Fn() -> G + Unpin + Clone + Send + Sync + 'static,
|
||||
G: Future<Output = Result<H, Error>> + Send,
|
||||
H: Future<Output = Result<rusoto_s3::ListObjectsV2Output>> + Send,
|
||||
{
|
||||
let mut attempts = 0;
|
||||
// TODO: configurable
|
||||
let n_retries = 10;
|
||||
// TODO: let backoff =
|
||||
|
||||
FutureRetry::new(
|
||||
move || {
|
||||
let future_factory = future_factory.clone();
|
||||
|
||||
async move {
|
||||
let request = future_factory().await?;
|
||||
|
||||
request.await
|
||||
}
|
||||
},
|
||||
// retry
|
||||
{
|
||||
move |e| {
|
||||
attempts += 1;
|
||||
if attempts > n_retries {
|
||||
RetryPolicy::ForwardError(e)
|
||||
} else {
|
||||
RetryPolicy::WaitRetry(Duration::from_millis(200))
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
// TODO: log number of attempts?
|
||||
.map(|(response, _attempts)| response)
|
||||
.map_err(|(err, _attempts)| err)
|
||||
}
|
||||
|
||||
impl Error {
|
||||
#[cfg(test)]
|
||||
fn s3_error_due_to_credentials(&self) -> bool {
|
||||
|
|
Loading…
Reference in New Issue