diff --git a/docs/env.example b/docs/env.example index 87be812bdd..1d476f8ffa 100644 --- a/docs/env.example +++ b/docs/env.example @@ -13,7 +13,7 @@ # AWS_ACCESS_KEY_ID=access_key_value # AWS_SECRET_ACCESS_KEY=secret_access_key_value # AWS_DEFAULT_REGION=us-east-2 -# AWS_S3_BUCKET_NAME=bucket_name +# AWS_S3_BUCKET_NAME=bucket-name # # If using Google Cloud Storage as an object store: # GCS_BUCKET_NAME=bucket_name diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 9997529ac8..199ec8e525 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -153,19 +153,22 @@ impl GoogleCloudStorage { } ); - let location = location.to_string(); + let location_copy = location.to_string(); let bucket_name = self.bucket_name.clone(); let _ = tokio::task::spawn_blocking(move || { cloud_storage::Object::create( &bucket_name, &temporary_non_streaming, - &location, + &location_copy, "application/octet-stream", ) }) .await - .context(UnableToPutDataToGcs)?; + .context(UnableToPutDataToGcs { + bucket: &self.bucket_name, + location, + })?; Ok(()) } @@ -174,28 +177,42 @@ impl GoogleCloudStorage { &self, location: &str, ) -> InternalResult>> { - let location = location.to_string(); + let location_copy = location.to_string(); let bucket_name = self.bucket_name.clone(); let bytes = tokio::task::spawn_blocking(move || { - cloud_storage::Object::download(&bucket_name, &location) + cloud_storage::Object::download(&bucket_name, &location_copy) }) .await - .context(UnableToGetDataFromGcs)? - .context(UnableToGetDataFromGcs2)?; + .context(UnableToGetDataFromGcs { + bucket: &self.bucket_name, + location, + })? + .context(UnableToGetDataFromGcs2 { + bucket: &self.bucket_name, + location, + })?; Ok(futures::stream::once(async move { Ok(bytes.into()) })) } /// Delete the object at the specified location. async fn delete(&self, location: &str) -> InternalResult<()> { - let location = location.to_string(); + let location_copy = location.to_string(); let bucket_name = self.bucket_name.clone(); - tokio::task::spawn_blocking(move || cloud_storage::Object::delete(&bucket_name, &location)) - .await - .context(UnableToDeleteDataFromGcs)? - .context(UnableToDeleteDataFromGcs2)?; + tokio::task::spawn_blocking(move || { + cloud_storage::Object::delete(&bucket_name, &location_copy) + }) + .await + .context(UnableToDeleteDataFromGcs { + bucket: &self.bucket_name, + location, + })? + .context(UnableToDeleteDataFromGcs2 { + bucket: &self.bucket_name, + location, + })?; Ok(()) } @@ -213,8 +230,12 @@ impl GoogleCloudStorage { None => cloud_storage::Object::list(&bucket_name), }) .await - .context(UnableToListDataFromGcs)? - .context(UnableToListDataFromGcs2)?; + .context(UnableToListDataFromGcs { + bucket: &self.bucket_name, + })? + .context(UnableToListDataFromGcs2 { + bucket: &self.bucket_name, + })?; Ok(futures::stream::once(async move { Ok(objects.into_iter().map(|o| o.name).collect()) @@ -272,7 +293,13 @@ impl AmazonS3 { ..Default::default() }; - self.client.put_object(put_request).await?; + self.client + .put_object(put_request) + .await + .context(UnableToPutDataToS3 { + bucket: &self.bucket_name, + location: location.to_string(), + })?; Ok(()) } @@ -289,10 +316,20 @@ impl AmazonS3 { Ok(self .client .get_object(get_request) - .await? + .await + .context(UnableToGetDataFromS3 { + bucket: self.bucket_name.to_owned(), + location: location.to_owned(), + })? .body - .context(NoDataFromS3)? - .context(UnableToGetPieceOfDataFromS3) + .context(NoDataFromS3 { + bucket: self.bucket_name.to_owned(), + location: location.to_string(), + })? + .context(UnableToGetPieceOfDataFromS3 { + bucket: self.bucket_name.to_owned(), + location: location.to_string(), + }) .err_into()) } @@ -304,7 +341,13 @@ impl AmazonS3 { ..Default::default() }; - self.client.delete_object(delete_request).await?; + self.client + .delete_object(delete_request) + .await + .context(UnableToDeleteDataFromS3 { + bucket: self.bucket_name.to_owned(), + location: location.to_owned(), + })?; Ok(()) } @@ -342,7 +385,15 @@ impl AmazonS3 { let resp = match self.client.list_objects_v2(list_request).await { Ok(resp) => resp, - Err(e) => return Some((Err(e.into()), state)), + Err(e) => { + return Some(( + Err(InternalError::UnableToListDataFromS3 { + source: e, + bucket: self.bucket_name.clone(), + }), + state, + )) + } }; let contents = resp.contents.unwrap_or_default(); @@ -589,15 +640,22 @@ impl Error { match self.0 { UnableToPutDataToS3 { source: RusotoError::Credentials(_), + bucket: _, + location: _, } => true, UnableToGetDataFromS3 { source: RusotoError::Credentials(_), + bucket: _, + location: _, } => true, UnableToDeleteDataFromS3 { source: RusotoError::Credentials(_), + bucket: _, + location: _, } => true, UnableToListDataFromS3 { source: RusotoError::Credentials(_), + bucket: _, } => true, _ => false, } @@ -613,49 +671,70 @@ enum InternalError { UnableToPutDataToGcs { source: tokio::task::JoinError, + bucket: String, + location: String, }, UnableToListDataFromGcs { source: tokio::task::JoinError, + bucket: String, }, UnableToListDataFromGcs2 { source: cloud_storage::Error, + bucket: String, }, UnableToDeleteDataFromGcs { source: tokio::task::JoinError, + bucket: String, + location: String, }, UnableToDeleteDataFromGcs2 { source: cloud_storage::Error, + bucket: String, + location: String, }, UnableToGetDataFromGcs { source: tokio::task::JoinError, + bucket: String, + location: String, }, UnableToGetDataFromGcs2 { source: cloud_storage::Error, + bucket: String, + location: String, }, - #[snafu(context(false))] UnableToPutDataToS3 { source: rusoto_core::RusotoError, + bucket: String, + location: String, }, - #[snafu(context(false))] UnableToGetDataFromS3 { source: rusoto_core::RusotoError, + bucket: String, + location: String, }, - #[snafu(context(false))] UnableToDeleteDataFromS3 { source: rusoto_core::RusotoError, + bucket: String, + location: String, + }, + NoDataFromS3 { + bucket: String, + location: String, }, - NoDataFromS3, UnableToReadBytesFromS3 { source: std::io::Error, + bucket: String, + location: String, }, UnableToGetPieceOfDataFromS3 { source: std::io::Error, + bucket: String, + location: String, }, - - #[snafu(context(false))] UnableToListDataFromS3 { source: rusoto_core::RusotoError, + bucket: String, }, UnableToPutDataInMemory { @@ -712,6 +791,9 @@ mod tests { type Error = Box; type Result = std::result::Result; + #[cfg(any(test_aws, test_gcs))] + const NON_EXISTENT_NAME: &str = "nonexistentname"; + macro_rules! assert_error { ($res:expr, $error_pat:pat$(,)?) => { assert!( @@ -779,8 +861,27 @@ mod tests { Ok(()) } + #[cfg(any(test_aws, test_gcs))] + async fn get_nonexistent_object( + storage: &ObjectStore, + location: Option<&str>, + ) -> Result { + let location = location.unwrap_or("this_file_should_not_exist"); + + let content_list = flatten_list_stream(storage, Some(location)).await?; + assert!(content_list.is_empty()); + + Ok(storage + .get(location) + .await? + .map_ok(|b| bytes::BytesMut::from(&b[..])) + .try_concat() + .await? + .freeze()) + } + // Tests TODO: - // GET nonexisting location + // GET nonexisting location (in_memory/file) // DELETE nonexisting location // PUT overwriting @@ -807,6 +908,108 @@ mod tests { put_get_delete_list(&integration).await?; Ok(()) } + + #[tokio::test] + async fn gcs_test_get_nonexistent_location() -> Result<()> { + let bucket_name = bucket_name()?; + let location_name = NON_EXISTENT_NAME; + let integration = + ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(&bucket_name)); + + let result = get_nonexistent_object(&integration, Some(location_name)).await?; + + assert_eq!( + result, + Bytes::from(format!("No such object: {}/{}", bucket_name, location_name)) + ); + + Ok(()) + } + + #[tokio::test] + async fn gcs_test_get_nonexistent_bucket() -> Result<()> { + let bucket_name = NON_EXISTENT_NAME; + let location_name = NON_EXISTENT_NAME; + let integration = + ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(bucket_name)); + + let result = get_nonexistent_object(&integration, Some(location_name)).await?; + + assert_eq!(result, Bytes::from("Not Found")); + + Ok(()) + } + + #[tokio::test] + async fn gcs_test_delete_nonexistent_location() -> Result<()> { + let bucket_name = bucket_name()?; + let location_name = NON_EXISTENT_NAME; + let integration = + ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(&bucket_name)); + + let err = integration.delete(location_name).await.unwrap_err(); + + if let Error(InternalError::UnableToDeleteDataFromGcs2 { + source, + bucket, + location, + }) = err + { + assert!(matches!(source, cloud_storage::Error::Google(_))); + assert_eq!(bucket, bucket_name); + assert_eq!(location, location_name); + } else { + panic!("unexpected error type") + } + + Ok(()) + } + + #[tokio::test] + async fn gcs_test_delete_nonexistent_bucket() -> Result<()> { + let bucket_name = NON_EXISTENT_NAME; + let location_name = NON_EXISTENT_NAME; + let integration = + ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(bucket_name)); + + let err = integration.delete(location_name).await.unwrap_err(); + + if let Error(InternalError::UnableToDeleteDataFromGcs2 { + source, + bucket, + location, + }) = err + { + assert!(matches!(source, cloud_storage::Error::Google(_))); + assert_eq!(bucket, bucket_name); + assert_eq!(location, location_name); + } else { + panic!("unexpected error type") + } + + Ok(()) + } + + #[tokio::test] + async fn gcs_test_put_nonexistent_bucket() -> Result<()> { + let bucket_name = NON_EXISTENT_NAME; + let location_name = NON_EXISTENT_NAME; + let integration = + ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(bucket_name)); + let data = Bytes::from("arbitrary data"); + let stream_data = std::io::Result::Ok(data.clone()); + + let result = integration + .put( + location_name, + futures::stream::once(async move { stream_data }), + data.len(), + ) + .await; + assert!(result.is_ok()); + + Ok(()) + } } #[cfg(test_aws)] @@ -815,16 +1018,6 @@ mod tests { use super::*; - #[tokio::test] - async fn s3_test() -> Result<()> { - let (region, bucket_name) = region_and_bucket_name()?; - - let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name)); - check_credentials(put_get_delete_list(&integration).await)?; - - Ok(()) - } - fn region_and_bucket_name() -> Result<(rusoto_core::Region, String)> { dotenv::dotenv().ok(); @@ -848,6 +1041,225 @@ mod tests { r } + + #[tokio::test] + async fn s3_test() -> Result<()> { + let (region, bucket_name) = region_and_bucket_name()?; + + let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name)); + check_credentials(put_get_delete_list(&integration).await)?; + + Ok(()) + } + + #[tokio::test] + async fn s3_test_get_nonexistent_region() -> Result<()> { + // Assumes environment variables do not provide credentials to AWS US West 1 + let (_, bucket_name) = region_and_bucket_name()?; + let region = rusoto_core::Region::UsWest1; + let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name)); + let location_name = NON_EXISTENT_NAME; + + let err = get_nonexistent_object(&integration, Some(location_name)) + .await + .unwrap_err(); + if let Some(Error(InternalError::UnableToListDataFromS3 { source, bucket })) = + err.downcast_ref::() + { + assert!(matches!(source, rusoto_core::RusotoError::Unknown(_))); + assert_eq!(bucket, &bucket_name); + } else { + panic!("unexpected error type") + } + + Ok(()) + } + + #[tokio::test] + async fn s3_test_get_nonexistent_location() -> Result<()> { + let (region, bucket_name) = region_and_bucket_name()?; + let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name)); + let location_name = NON_EXISTENT_NAME; + + let err = get_nonexistent_object(&integration, Some(location_name)) + .await + .unwrap_err(); + if let Some(Error(InternalError::UnableToGetDataFromS3 { + source, + bucket, + location, + })) = err.downcast_ref::() + { + assert!(matches!( + source, + rusoto_core::RusotoError::Service(rusoto_s3::GetObjectError::NoSuchKey(_)) + )); + assert_eq!(bucket, &bucket_name); + assert_eq!(location, location_name); + } else { + panic!("unexpected error type") + } + + Ok(()) + } + + #[tokio::test] + async fn s3_test_get_nonexistent_bucket() -> Result<()> { + let (region, _) = region_and_bucket_name()?; + let bucket_name = NON_EXISTENT_NAME; + let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, bucket_name)); + let location_name = NON_EXISTENT_NAME; + + let err = get_nonexistent_object(&integration, Some(location_name)) + .await + .unwrap_err(); + if let Some(Error(InternalError::UnableToListDataFromS3 { source, bucket })) = + e.downcast_ref::() + { + assert!(matches!( + source, + rusoto_core::RusotoError::Service( + rusoto_s3::ListObjectsV2Error::NoSuchBucket(_), + ) + )); + assert_eq!(bucket, &bucket_name); + } else { + panic!("unexpected error type") + } + + Ok(()) + } + + #[tokio::test] + async fn s3_test_put_nonexistent_region() -> Result<()> { + // Assumes environment variables do not provide credentials to AWS US West 1 + let (_, bucket_name) = region_and_bucket_name()?; + let region = rusoto_core::Region::UsWest1; + let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name)); + let location_name = NON_EXISTENT_NAME; + let data = Bytes::from("arbitrary data"); + let stream_data = std::io::Result::Ok(data.clone()); + + let err = integration + .put( + location_name, + futures::stream::once(async move { stream_data }), + data.len(), + ) + .await + .unwrap_err(); + + if let Error(InternalError::UnableToPutDataToS3 { + source, + bucket, + location, + }) = err + { + assert!(matches!(source, rusoto_core::RusotoError::Unknown(_))); + assert_eq!(bucket, bucket_name); + assert_eq!(location, location_name); + } else { + panic!("unexpected error type") + } + + Ok(()) + } + + #[tokio::test] + async fn s3_test_put_nonexistent_bucket() -> Result<()> { + let (region, _) = region_and_bucket_name()?; + let bucket_name = NON_EXISTENT_NAME; + let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, bucket_name)); + let location_name = NON_EXISTENT_NAME; + let data = Bytes::from("arbitrary data"); + let stream_data = std::io::Result::Ok(data.clone()); + + let err = integration + .put( + location_name, + futures::stream::once(async move { stream_data }), + data.len(), + ) + .await + .unwrap_err(); + + if let Error(InternalError::UnableToPutDataToS3 { + source, + bucket, + location, + }) = err + { + assert!(matches!(source, rusoto_core::RusotoError::Unknown(_))); + assert_eq!(bucket, bucket_name); + assert_eq!(location, location_name); + } else { + panic!("unexpected error type") + } + + Ok(()) + } + + #[tokio::test] + async fn s3_test_delete_nonexistent_location() -> Result<()> { + let (region, bucket_name) = region_and_bucket_name()?; + let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name)); + let location_name = NON_EXISTENT_NAME; + + let result = integration.delete(location_name).await; + + assert!(result.is_ok()); + + Ok(()) + } + + #[tokio::test] + async fn s3_test_delete_nonexistent_region() -> Result<()> { + // Assumes environment variables do not provide credentials to AWS US West 1 + let (_, bucket_name) = region_and_bucket_name()?; + let region = rusoto_core::Region::UsWest1; + let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, &bucket_name)); + let location_name = NON_EXISTENT_NAME; + + let err = integration.delete(location_name).await.unwrap_err(); + if let Error(InternalError::UnableToDeleteDataFromS3 { + source, + bucket, + location, + }) = err + { + assert!(matches!(source, rusoto_core::RusotoError::Unknown(_))); + assert_eq!(bucket, bucket_name); + assert_eq!(location, location_name); + } else { + panic!("unexpected error type") + } + + Ok(()) + } + + #[tokio::test] + async fn s3_test_delete_nonexistent_bucket() -> Result<()> { + let (region, _) = region_and_bucket_name()?; + let bucket_name = NON_EXISTENT_NAME; + let integration = ObjectStore::new_amazon_s3(AmazonS3::new(region, bucket_name)); + let location_name = NON_EXISTENT_NAME; + + let err = integration.delete(location_name).await.unwrap_err(); + if let Error(InternalError::UnableToDeleteDataFromS3 { + source, + bucket, + location, + }) = err + { + assert!(matches!(source, rusoto_core::RusotoError::Unknown(_))); + assert_eq!(bucket, bucket_name); + assert_eq!(location, location_name); + } else { + panic!("unexpected error type") + } + + Ok(()) + } } mod in_memory {