Merge branch 'main' into dom/deprecate-mem-qe

pull/24376/head
Dom 2020-12-10 18:47:55 +00:00 committed by GitHub
commit c2156c6271
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 450 additions and 38 deletions

View File

@ -13,7 +13,7 @@
# AWS_ACCESS_KEY_ID=access_key_value # AWS_ACCESS_KEY_ID=access_key_value
# AWS_SECRET_ACCESS_KEY=secret_access_key_value # AWS_SECRET_ACCESS_KEY=secret_access_key_value
# AWS_DEFAULT_REGION=us-east-2 # AWS_DEFAULT_REGION=us-east-2
# AWS_S3_BUCKET_NAME=bucket_name # AWS_S3_BUCKET_NAME=bucket-name
# #
# If using Google Cloud Storage as an object store: # If using Google Cloud Storage as an object store:
# GCS_BUCKET_NAME=bucket_name # GCS_BUCKET_NAME=bucket_name

View File

@ -153,19 +153,22 @@ impl GoogleCloudStorage {
} }
); );
let location = location.to_string(); let location_copy = location.to_string();
let bucket_name = self.bucket_name.clone(); let bucket_name = self.bucket_name.clone();
let _ = tokio::task::spawn_blocking(move || { let _ = tokio::task::spawn_blocking(move || {
cloud_storage::Object::create( cloud_storage::Object::create(
&bucket_name, &bucket_name,
&temporary_non_streaming, &temporary_non_streaming,
&location, &location_copy,
"application/octet-stream", "application/octet-stream",
) )
}) })
.await .await
.context(UnableToPutDataToGcs)?; .context(UnableToPutDataToGcs {
bucket: &self.bucket_name,
location,
})?;
Ok(()) Ok(())
} }
@ -174,28 +177,42 @@ impl GoogleCloudStorage {
&self, &self,
location: &str, location: &str,
) -> InternalResult<impl Stream<Item = InternalResult<Bytes>>> { ) -> InternalResult<impl Stream<Item = InternalResult<Bytes>>> {
let location = location.to_string(); let location_copy = location.to_string();
let bucket_name = self.bucket_name.clone(); let bucket_name = self.bucket_name.clone();
let bytes = tokio::task::spawn_blocking(move || { let bytes = tokio::task::spawn_blocking(move || {
cloud_storage::Object::download(&bucket_name, &location) cloud_storage::Object::download(&bucket_name, &location_copy)
}) })
.await .await
.context(UnableToGetDataFromGcs)? .context(UnableToGetDataFromGcs {
.context(UnableToGetDataFromGcs2)?; bucket: &self.bucket_name,
location,
})?
.context(UnableToGetDataFromGcs2 {
bucket: &self.bucket_name,
location,
})?;
Ok(futures::stream::once(async move { Ok(bytes.into()) })) Ok(futures::stream::once(async move { Ok(bytes.into()) }))
} }
/// Delete the object at the specified location. /// Delete the object at the specified location.
async fn delete(&self, location: &str) -> InternalResult<()> { async fn delete(&self, location: &str) -> InternalResult<()> {
let location = location.to_string(); let location_copy = location.to_string();
let bucket_name = self.bucket_name.clone(); let bucket_name = self.bucket_name.clone();
tokio::task::spawn_blocking(move || cloud_storage::Object::delete(&bucket_name, &location)) tokio::task::spawn_blocking(move || {
.await cloud_storage::Object::delete(&bucket_name, &location_copy)
.context(UnableToDeleteDataFromGcs)? })
.context(UnableToDeleteDataFromGcs2)?; .await
.context(UnableToDeleteDataFromGcs {
bucket: &self.bucket_name,
location,
})?
.context(UnableToDeleteDataFromGcs2 {
bucket: &self.bucket_name,
location,
})?;
Ok(()) Ok(())
} }
@ -213,8 +230,12 @@ impl GoogleCloudStorage {
None => cloud_storage::Object::list(&bucket_name), None => cloud_storage::Object::list(&bucket_name),
}) })
.await .await
.context(UnableToListDataFromGcs)? .context(UnableToListDataFromGcs {
.context(UnableToListDataFromGcs2)?; bucket: &self.bucket_name,
})?
.context(UnableToListDataFromGcs2 {
bucket: &self.bucket_name,
})?;
Ok(futures::stream::once(async move { Ok(futures::stream::once(async move {
Ok(objects.into_iter().map(|o| o.name).collect()) Ok(objects.into_iter().map(|o| o.name).collect())
@ -272,7 +293,13 @@ impl AmazonS3 {
..Default::default() ..Default::default()
}; };
self.client.put_object(put_request).await?; self.client
.put_object(put_request)
.await
.context(UnableToPutDataToS3 {
bucket: &self.bucket_name,
location: location.to_string(),
})?;
Ok(()) Ok(())
} }
@ -289,10 +316,20 @@ impl AmazonS3 {
Ok(self Ok(self
.client .client
.get_object(get_request) .get_object(get_request)
.await? .await
.context(UnableToGetDataFromS3 {
bucket: self.bucket_name.to_owned(),
location: location.to_owned(),
})?
.body .body
.context(NoDataFromS3)? .context(NoDataFromS3 {
.context(UnableToGetPieceOfDataFromS3) bucket: self.bucket_name.to_owned(),
location: location.to_string(),
})?
.context(UnableToGetPieceOfDataFromS3 {
bucket: self.bucket_name.to_owned(),
location: location.to_string(),
})
.err_into()) .err_into())
} }
@ -304,7 +341,13 @@ impl AmazonS3 {
..Default::default() ..Default::default()
}; };
self.client.delete_object(delete_request).await?; self.client
.delete_object(delete_request)
.await
.context(UnableToDeleteDataFromS3 {
bucket: self.bucket_name.to_owned(),
location: location.to_owned(),
})?;
Ok(()) Ok(())
} }
@ -342,7 +385,15 @@ impl AmazonS3 {
let resp = match self.client.list_objects_v2(list_request).await { let resp = match self.client.list_objects_v2(list_request).await {
Ok(resp) => resp, Ok(resp) => resp,
Err(e) => return Some((Err(e.into()), state)), Err(e) => {
return Some((
Err(InternalError::UnableToListDataFromS3 {
source: e,
bucket: self.bucket_name.clone(),
}),
state,
))
}
}; };
let contents = resp.contents.unwrap_or_default(); let contents = resp.contents.unwrap_or_default();
@ -589,15 +640,22 @@ impl Error {
match self.0 { match self.0 {
UnableToPutDataToS3 { UnableToPutDataToS3 {
source: RusotoError::Credentials(_), source: RusotoError::Credentials(_),
bucket: _,
location: _,
} => true, } => true,
UnableToGetDataFromS3 { UnableToGetDataFromS3 {
source: RusotoError::Credentials(_), source: RusotoError::Credentials(_),
bucket: _,
location: _,
} => true, } => true,
UnableToDeleteDataFromS3 { UnableToDeleteDataFromS3 {
source: RusotoError::Credentials(_), source: RusotoError::Credentials(_),
bucket: _,
location: _,
} => true, } => true,
UnableToListDataFromS3 { UnableToListDataFromS3 {
source: RusotoError::Credentials(_), source: RusotoError::Credentials(_),
bucket: _,
} => true, } => true,
_ => false, _ => false,
} }
@ -613,49 +671,70 @@ enum InternalError {
UnableToPutDataToGcs { UnableToPutDataToGcs {
source: tokio::task::JoinError, source: tokio::task::JoinError,
bucket: String,
location: String,
}, },
UnableToListDataFromGcs { UnableToListDataFromGcs {
source: tokio::task::JoinError, source: tokio::task::JoinError,
bucket: String,
}, },
UnableToListDataFromGcs2 { UnableToListDataFromGcs2 {
source: cloud_storage::Error, source: cloud_storage::Error,
bucket: String,
}, },
UnableToDeleteDataFromGcs { UnableToDeleteDataFromGcs {
source: tokio::task::JoinError, source: tokio::task::JoinError,
bucket: String,
location: String,
}, },
UnableToDeleteDataFromGcs2 { UnableToDeleteDataFromGcs2 {
source: cloud_storage::Error, source: cloud_storage::Error,
bucket: String,
location: String,
}, },
UnableToGetDataFromGcs { UnableToGetDataFromGcs {
source: tokio::task::JoinError, source: tokio::task::JoinError,
bucket: String,
location: String,
}, },
UnableToGetDataFromGcs2 { UnableToGetDataFromGcs2 {
source: cloud_storage::Error, source: cloud_storage::Error,
bucket: String,
location: String,
}, },
#[snafu(context(false))]
UnableToPutDataToS3 { UnableToPutDataToS3 {
source: rusoto_core::RusotoError<rusoto_s3::PutObjectError>, source: rusoto_core::RusotoError<rusoto_s3::PutObjectError>,
bucket: String,
location: String,
}, },
#[snafu(context(false))]
UnableToGetDataFromS3 { UnableToGetDataFromS3 {
source: rusoto_core::RusotoError<rusoto_s3::GetObjectError>, source: rusoto_core::RusotoError<rusoto_s3::GetObjectError>,
bucket: String,
location: String,
}, },
#[snafu(context(false))]
UnableToDeleteDataFromS3 { UnableToDeleteDataFromS3 {
source: rusoto_core::RusotoError<rusoto_s3::DeleteObjectError>, source: rusoto_core::RusotoError<rusoto_s3::DeleteObjectError>,
bucket: String,
location: String,
},
NoDataFromS3 {
bucket: String,
location: String,
}, },
NoDataFromS3,
UnableToReadBytesFromS3 { UnableToReadBytesFromS3 {
source: std::io::Error, source: std::io::Error,
bucket: String,
location: String,
}, },
UnableToGetPieceOfDataFromS3 { UnableToGetPieceOfDataFromS3 {
source: std::io::Error, source: std::io::Error,
bucket: String,
location: String,
}, },
#[snafu(context(false))]
UnableToListDataFromS3 { UnableToListDataFromS3 {
source: rusoto_core::RusotoError<rusoto_s3::ListObjectsV2Error>, source: rusoto_core::RusotoError<rusoto_s3::ListObjectsV2Error>,
bucket: String,
}, },
UnableToPutDataInMemory { UnableToPutDataInMemory {
@ -712,6 +791,9 @@ mod tests {
type Error = Box<dyn std::error::Error + Send + Sync + 'static>; type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>; type Result<T, E = Error> = std::result::Result<T, E>;
#[cfg(any(test_aws, test_gcs))]
const NON_EXISTENT_NAME: &str = "nonexistentname";
macro_rules! assert_error { macro_rules! assert_error {
($res:expr, $error_pat:pat$(,)?) => { ($res:expr, $error_pat:pat$(,)?) => {
assert!( assert!(
@ -779,8 +861,27 @@ mod tests {
Ok(()) Ok(())
} }
#[cfg(any(test_aws, test_gcs))]
async fn get_nonexistent_object(
storage: &ObjectStore,
location: Option<&str>,
) -> Result<Bytes> {
let location = location.unwrap_or("this_file_should_not_exist");
let content_list = flatten_list_stream(storage, Some(location)).await?;
assert!(content_list.is_empty());
Ok(storage
.get(location)
.await?
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
.await?
.freeze())
}
// Tests TODO: // Tests TODO:
// GET nonexisting location // GET nonexisting location (in_memory/file)
// DELETE nonexisting location // DELETE nonexisting location
// PUT overwriting // PUT overwriting
@ -807,6 +908,108 @@ mod tests {
put_get_delete_list(&integration).await?; put_get_delete_list(&integration).await?;
Ok(()) Ok(())
} }
#[tokio::test]
async fn gcs_test_get_nonexistent_location() -> Result<()> {
let bucket_name = bucket_name()?;
let location_name = NON_EXISTENT_NAME;
let integration =
ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(&bucket_name));
let result = get_nonexistent_object(&integration, Some(location_name)).await?;
assert_eq!(
result,
Bytes::from(format!("No such object: {}/{}", bucket_name, location_name))
);
Ok(())
}
#[tokio::test]
async fn gcs_test_get_nonexistent_bucket() -> Result<()> {
let bucket_name = NON_EXISTENT_NAME;
let location_name = NON_EXISTENT_NAME;
let integration =
ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(bucket_name));
let result = get_nonexistent_object(&integration, Some(location_name)).await?;
assert_eq!(result, Bytes::from("Not Found"));
Ok(())
}
#[tokio::test]
async fn gcs_test_delete_nonexistent_location() -> Result<()> {
let bucket_name = bucket_name()?;
let location_name = NON_EXISTENT_NAME;
let integration =
ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(&bucket_name));
let err = integration.delete(location_name).await.unwrap_err();
if let Error(InternalError::UnableToDeleteDataFromGcs2 {
source,
bucket,
location,
}) = err
{
assert!(matches!(source, cloud_storage::Error::Google(_)));
assert_eq!(bucket, bucket_name);
assert_eq!(location, location_name);
} else {
panic!("unexpected error type")
}
Ok(())
}
#[tokio::test]
async fn gcs_test_delete_nonexistent_bucket() -> Result<()> {
let bucket_name = NON_EXISTENT_NAME;
let location_name = NON_EXISTENT_NAME;
let integration =
ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(bucket_name));
let err = integration.delete(location_name).await.unwrap_err();
if let Error(InternalError::UnableToDeleteDataFromGcs2 {
source,
bucket,
location,
}) = err
{
assert!(matches!(source, cloud_storage::Error::Google(_)));
assert_eq!(bucket, bucket_name);
assert_eq!(location, location_name);
} else {
panic!("unexpected error type")
}
Ok(())
}
#[tokio::test]
async fn gcs_test_put_nonexistent_bucket() -> Result<()> {
let bucket_name = NON_EXISTENT_NAME;
let location_name = NON_EXISTENT_NAME;
let integration =
ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(bucket_name));
let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
let result = integration
.put(
location_name,
futures::stream::once(async move { stream_data }),
data.len(),
)
.await;
assert!(result.is_ok());
Ok(())
}
} }
#[cfg(test_aws)] #[cfg(test_aws)]
@ -815,16 +1018,6 @@ mod tests {
use super::*; use super::*;
#[tokio::test]
async fn s3_test() -> Result<()> {
let (region, bucket_name) = region_and_bucket_name()?;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name));
check_credentials(put_get_delete_list(&integration).await)?;
Ok(())
}
fn region_and_bucket_name() -> Result<(rusoto_core::Region, String)> { fn region_and_bucket_name() -> Result<(rusoto_core::Region, String)> {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
@ -848,6 +1041,225 @@ mod tests {
r r
} }
#[tokio::test]
async fn s3_test() -> Result<()> {
let (region, bucket_name) = region_and_bucket_name()?;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name));
check_credentials(put_get_delete_list(&integration).await)?;
Ok(())
}
#[tokio::test]
async fn s3_test_get_nonexistent_region() -> Result<()> {
// Assumes environment variables do not provide credentials to AWS US West 1
let (_, bucket_name) = region_and_bucket_name()?;
let region = rusoto_core::Region::UsWest1;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name));
let location_name = NON_EXISTENT_NAME;
let err = get_nonexistent_object(&integration, Some(location_name))
.await
.unwrap_err();
if let Some(Error(InternalError::UnableToListDataFromS3 { source, bucket })) =
err.downcast_ref::<crate::Error>()
{
assert!(matches!(source, rusoto_core::RusotoError::Unknown(_)));
assert_eq!(bucket, &bucket_name);
} else {
panic!("unexpected error type")
}
Ok(())
}
#[tokio::test]
async fn s3_test_get_nonexistent_location() -> Result<()> {
let (region, bucket_name) = region_and_bucket_name()?;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name));
let location_name = NON_EXISTENT_NAME;
let err = get_nonexistent_object(&integration, Some(location_name))
.await
.unwrap_err();
if let Some(Error(InternalError::UnableToGetDataFromS3 {
source,
bucket,
location,
})) = err.downcast_ref::<crate::Error>()
{
assert!(matches!(
source,
rusoto_core::RusotoError::Service(rusoto_s3::GetObjectError::NoSuchKey(_))
));
assert_eq!(bucket, &bucket_name);
assert_eq!(location, location_name);
} else {
panic!("unexpected error type")
}
Ok(())
}
#[tokio::test]
async fn s3_test_get_nonexistent_bucket() -> Result<()> {
let (region, _) = region_and_bucket_name()?;
let bucket_name = NON_EXISTENT_NAME;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, bucket_name));
let location_name = NON_EXISTENT_NAME;
let err = get_nonexistent_object(&integration, Some(location_name))
.await
.unwrap_err();
if let Some(Error(InternalError::UnableToListDataFromS3 { source, bucket })) =
e.downcast_ref::<crate::Error>()
{
assert!(matches!(
source,
rusoto_core::RusotoError::Service(
rusoto_s3::ListObjectsV2Error::NoSuchBucket(_),
)
));
assert_eq!(bucket, &bucket_name);
} else {
panic!("unexpected error type")
}
Ok(())
}
#[tokio::test]
async fn s3_test_put_nonexistent_region() -> Result<()> {
// Assumes environment variables do not provide credentials to AWS US West 1
let (_, bucket_name) = region_and_bucket_name()?;
let region = rusoto_core::Region::UsWest1;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name));
let location_name = NON_EXISTENT_NAME;
let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
let err = integration
.put(
location_name,
futures::stream::once(async move { stream_data }),
data.len(),
)
.await
.unwrap_err();
if let Error(InternalError::UnableToPutDataToS3 {
source,
bucket,
location,
}) = err
{
assert!(matches!(source, rusoto_core::RusotoError::Unknown(_)));
assert_eq!(bucket, bucket_name);
assert_eq!(location, location_name);
} else {
panic!("unexpected error type")
}
Ok(())
}
#[tokio::test]
async fn s3_test_put_nonexistent_bucket() -> Result<()> {
let (region, _) = region_and_bucket_name()?;
let bucket_name = NON_EXISTENT_NAME;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, bucket_name));
let location_name = NON_EXISTENT_NAME;
let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());
let err = integration
.put(
location_name,
futures::stream::once(async move { stream_data }),
data.len(),
)
.await
.unwrap_err();
if let Error(InternalError::UnableToPutDataToS3 {
source,
bucket,
location,
}) = err
{
assert!(matches!(source, rusoto_core::RusotoError::Unknown(_)));
assert_eq!(bucket, bucket_name);
assert_eq!(location, location_name);
} else {
panic!("unexpected error type")
}
Ok(())
}
#[tokio::test]
async fn s3_test_delete_nonexistent_location() -> Result<()> {
let (region, bucket_name) = region_and_bucket_name()?;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name));
let location_name = NON_EXISTENT_NAME;
let result = integration.delete(location_name).await;
assert!(result.is_ok());
Ok(())
}
#[tokio::test]
async fn s3_test_delete_nonexistent_region() -> Result<()> {
// Assumes environment variables do not provide credentials to AWS US West 1
let (_, bucket_name) = region_and_bucket_name()?;
let region = rusoto_core::Region::UsWest1;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name));
let location_name = NON_EXISTENT_NAME;
let err = integration.delete(location_name).await.unwrap_err();
if let Error(InternalError::UnableToDeleteDataFromS3 {
source,
bucket,
location,
}) = err
{
assert!(matches!(source, rusoto_core::RusotoError::Unknown(_)));
assert_eq!(bucket, bucket_name);
assert_eq!(location, location_name);
} else {
panic!("unexpected error type")
}
Ok(())
}
#[tokio::test]
async fn s3_test_delete_nonexistent_bucket() -> Result<()> {
let (region, _) = region_and_bucket_name()?;
let bucket_name = NON_EXISTENT_NAME;
let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, bucket_name));
let location_name = NON_EXISTENT_NAME;
let err = integration.delete(location_name).await.unwrap_err();
if let Error(InternalError::UnableToDeleteDataFromS3 {
source,
bucket,
location,
}) = err
{
assert!(matches!(source, rusoto_core::RusotoError::Unknown(_)));
assert_eq!(bucket, bucket_name);
assert_eq!(location, location_name);
} else {
panic!("unexpected error type")
}
Ok(())
}
} }
mod in_memory { mod in_memory {