From 5c78b7d5ae637715b2117d2bf06b795f0aab29ac Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 24 Sep 2021 11:26:05 -0400 Subject: [PATCH] refactor: Extract an s3_request method that handles retries --- Cargo.lock | 12 ++++++ object_store/Cargo.toml | 1 + object_store/src/aws.rs | 92 ++++++++++++++++++++++++++++++++--------- 3 files changed, 85 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dcda9a15dd..92b6735367 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1202,6 +1202,17 @@ dependencies = [ "syn", ] +[[package]] +name = "futures-retry" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fde5a672a61f96552aa5ed9fd9c81c3fbdae4be9b1e205d6eaf17c83705adc0f" +dependencies = [ + "futures", + "pin-project-lite", + "tokio", +] + [[package]] name = "futures-sink" version = "0.3.17" @@ -2484,6 +2495,7 @@ dependencies = [ "cloud-storage", "dotenv", "futures", + "futures-retry", "futures-test", "indexmap", "itertools", diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 6803bcf34d..3901c41461 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -15,6 +15,7 @@ chrono = "0.4" # Google Cloud Storage integration cloud-storage = {version = "0.10.2", optional = true} futures = "0.3" +futures-retry = "0.6" # https://github.com/tkaitchuck/aHash/issues/95 indexmap = { version = "~1.6.2", optional = true } itertools = "0.10.1" diff --git a/object_store/src/aws.rs b/object_store/src/aws.rs index 1b3690e259..42dc395e9a 100644 --- a/object_store/src/aws.rs +++ b/object_store/src/aws.rs @@ -10,14 +10,14 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::{ stream::{self, BoxStream}, - Stream, StreamExt, TryStreamExt, + Future, Stream, StreamExt, TryStreamExt, }; +use futures_retry::{FutureRetry, RetryPolicy}; use rusoto_core::ByteStream; use rusoto_credential::{InstanceMetadataProvider, StaticProvider}; use rusoto_s3::S3; use snafu::{OptionExt, ResultExt, Snafu}; -use std::convert::TryFrom; -use std::{fmt, io}; +use std::{convert::TryFrom, fmt, io, time::Duration}; /// A specialized `Result` for object store-related errors pub type Result = std::result::Result; @@ -379,39 +379,52 @@ impl AmazonS3 { use ListState::*; let raw_prefix = prefix.map(|p| p.to_raw()); + let bucket = self.bucket_name.clone(); let request_factory = move || rusoto_s3::ListObjectsV2Request { - bucket: self.bucket_name.clone(), + bucket, prefix: raw_prefix.clone(), delimiter: delimiter.clone(), ..Default::default() }; + let bucket = self.bucket_name.clone(); Ok(stream::unfold(ListState::Start, move |state| { - let request_factory = request_factory.clone(); + let (bucket, request_factory) = (bucket.clone(), request_factory.clone()); + let s3 = self.client.clone(); async move { - let mut list_request = request_factory(); - - match state.clone() { - HasMore(continuation_token) => { - list_request.continuation_token = Some(continuation_token); - } + let continuation_token = match state.clone() { + HasMore(continuation_token) => Some(continuation_token), Done => { return None; } // If this is the first request we've made, we don't need to make any // modifications to the request - Start => {} - } + Start => None, + }; + + let resp = s3_request(move || { + let (s3, bucket, request_factory, continuation_token) = ( + s3.clone(), + bucket.clone(), + request_factory.clone(), + continuation_token.clone(), + ); + + async move { + Ok(async move { + s3.list_objects_v2(rusoto_s3::ListObjectsV2Request { + continuation_token, + ..request_factory() + }) + .await + .context(UnableToListData { bucket }) + }) + } + }) + .await; - let resp = - self.client - .list_objects_v2(list_request) - .await - .context(UnableToListData { - bucket: &self.bucket_name, - }); let resp = match resp { Ok(resp) => resp, Err(e) => return Some((Err(e), state)), @@ -435,6 +448,45 @@ impl AmazonS3 { } } +async fn s3_request(future_factory: F) -> Result +where + F: Fn() -> G + Unpin + Clone + Send + Sync + 'static, + G: Future> + Send, + H: Future> + Send, +{ + let mut attempts = 0; + // TODO: configurable + let n_retries = 10; + // TODO: let backoff = + + FutureRetry::new( + move || { + let future_factory = future_factory.clone(); + + async move { + let request = future_factory().await?; + + request.await + } + }, + // retry + { + move |e| { + attempts += 1; + if attempts > n_retries { + RetryPolicy::ForwardError(e) + } else { + RetryPolicy::WaitRetry(Duration::from_millis(200)) + } + } + }, + ) + .await + // TODO: log number of attempts? + .map(|(response, _attempts)| response) + .map_err(|(err, _attempts)| err) +} + impl Error { #[cfg(test)] fn s3_error_due_to_credentials(&self) -> bool {