diff --git a/object_store/src/aws.rs b/object_store/src/aws.rs index 191cb6bfd6..040acdf49d 100644 --- a/object_store/src/aws.rs +++ b/object_store/src/aws.rs @@ -412,7 +412,7 @@ mod tests { use super::*; use crate::{ tests::{get_nonexistent_object, list_with_delimiter, put_get_delete_list}, - AmazonS3, ObjectStoreApi, ObjectStorePath, + AmazonS3, Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath, }; use bytes::Bytes; use std::env; @@ -458,7 +458,7 @@ mod tests { "TEST_INTEGRATION is set, \ but variable(s) {} need to be set", unset_var_names - ) + ); } else if force.is_err() && !unset_var_names.is_empty() { eprintln!( "skipping AWS integration test - set \ @@ -500,13 +500,15 @@ mod tests { #[tokio::test] async fn s3_test() -> Result<()> { let config = maybe_skip_integration!(); - let integration = AmazonS3::new( - config.access_key_id, - config.secret_access_key, - config.region, - config.bucket, - ) - .expect("Valid S3 config"); + let integration = ObjectStore::new_amazon_s3( + AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + config.bucket, + ) + .expect("Valid S3 config"), + ); check_credentials(put_get_delete_list(&integration).await)?; check_credentials(list_with_delimiter(&integration).await).unwrap(); @@ -520,13 +522,15 @@ mod tests { // Assumes environment variables do not provide credentials to AWS US West 1 config.region = "us-west-1".into(); - let integration = AmazonS3::new( - config.access_key_id, - config.secret_access_key, - config.region, - &config.bucket, - ) - .expect("Valid S3 config"); + let integration = ObjectStore::new_amazon_s3( + AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + &config.bucket, + ) + .expect("Valid S3 config"), + ); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -534,11 +538,14 @@ mod tests { let err = get_nonexistent_object(&integration, Some(location)) .await .unwrap_err(); - if let Some(Error::UnableToListData { source, bucket }) = err.downcast_ref::() { + if let Some(ObjectStoreError::AwsObjectStoreError { + source: Error::UnableToListData { source, bucket }, + }) = err.downcast_ref::() + { assert!(matches!(source, rusoto_core::RusotoError::Unknown(_))); assert_eq!(bucket, &config.bucket); } else { - panic!("unexpected error type") + panic!("unexpected error type: {:?}", err); } Ok(()) @@ -547,13 +554,15 @@ mod tests { #[tokio::test] async fn s3_test_get_nonexistent_location() -> Result<()> { let config = maybe_skip_integration!(); - let integration = AmazonS3::new( - config.access_key_id, - config.secret_access_key, - config.region, - &config.bucket, - ) - .expect("Valid S3 config"); + let integration = ObjectStore::new_amazon_s3( + AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + &config.bucket, + ) + .expect("Valid S3 config"), + ); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -561,11 +570,14 @@ mod tests { let err = get_nonexistent_object(&integration, Some(location)) .await .unwrap_err(); - if let Some(Error::UnableToGetData { - source, - bucket, - location, - }) = err.downcast_ref::() + if let Some(ObjectStoreError::AwsObjectStoreError { + source: + Error::UnableToGetData { + source, + bucket, + location, + }, + }) = err.downcast_ref::() { assert!(matches!( source, @@ -574,7 +586,7 @@ mod tests { assert_eq!(bucket, &config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { - panic!("unexpected error type") + panic!("unexpected error type: {:?}", err); } Ok(()) @@ -585,13 +597,15 @@ mod tests { let mut config = maybe_skip_integration!(); config.bucket = NON_EXISTENT_NAME.into(); - let integration = AmazonS3::new( - config.access_key_id, - config.secret_access_key, - config.region, - &config.bucket, - ) - .expect("Valid S3 config"); + let integration = ObjectStore::new_amazon_s3( + AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + &config.bucket, + ) + .expect("Valid S3 config"), + ); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -599,14 +613,17 @@ mod tests { let err = get_nonexistent_object(&integration, Some(location)) .await .unwrap_err(); - if let Some(Error::UnableToListData { source, bucket }) = err.downcast_ref::() { + if let Some(ObjectStoreError::AwsObjectStoreError { + source: Error::UnableToListData { source, bucket }, + }) = err.downcast_ref::() + { assert!(matches!( source, rusoto_core::RusotoError::Service(rusoto_s3::ListObjectsV2Error::NoSuchBucket(_)) )); assert_eq!(bucket, &config.bucket); } else { - panic!("unexpected error type") + panic!("unexpected error type: {:?}", err); } Ok(()) @@ -618,13 +635,15 @@ mod tests { // Assumes environment variables do not provide credentials to AWS US West 1 config.region = "us-west-1".into(); - let integration = AmazonS3::new( - config.access_key_id, - config.secret_access_key, - config.region, - &config.bucket, - ) - .expect("Valid S3 config"); + let integration = ObjectStore::new_amazon_s3( + AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + &config.bucket, + ) + .expect("Valid S3 config"), + ); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -640,17 +659,20 @@ mod tests { .await .unwrap_err(); - if let Error::UnableToPutData { - source, - bucket, - location, + if let ObjectStoreError::AwsObjectStoreError { + source: + Error::UnableToPutData { + source, + bucket, + location, + }, } = err { assert!(matches!(source, rusoto_core::RusotoError::Unknown(_))); assert_eq!(bucket, config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { - panic!("unexpected error type") + panic!("unexpected error type: {:?}", err); } Ok(()) @@ -661,13 +683,15 @@ mod tests { let mut config = maybe_skip_integration!(); config.bucket = NON_EXISTENT_NAME.into(); - let integration = AmazonS3::new( - config.access_key_id, - config.secret_access_key, - config.region, - &config.bucket, - ) - .expect("Valid S3 config"); + let integration = ObjectStore::new_amazon_s3( + AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + &config.bucket, + ) + .expect("Valid S3 config"), + ); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -683,17 +707,20 @@ mod tests { .await .unwrap_err(); - if let Error::UnableToPutData { - source, - bucket, - location, + if let ObjectStoreError::AwsObjectStoreError { + source: + Error::UnableToPutData { + source, + bucket, + location, + }, } = err { assert!(matches!(source, rusoto_core::RusotoError::Unknown(_))); assert_eq!(bucket, config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { - panic!("unexpected error type") + panic!("unexpected error type: {:?}", err); } Ok(()) @@ -702,13 +729,15 @@ mod tests { #[tokio::test] async fn s3_test_delete_nonexistent_location() -> Result<()> { let config = maybe_skip_integration!(); - let integration = AmazonS3::new( - config.access_key_id, - config.secret_access_key, - config.region, - config.bucket, - ) - .expect("Valid S3 config"); + let integration = ObjectStore::new_amazon_s3( + AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + config.bucket, + ) + .expect("Valid S3 config"), + ); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -726,29 +755,34 @@ mod tests { // Assumes environment variables do not provide credentials to AWS US West 1 config.region = "us-west-1".into(); - let integration = AmazonS3::new( - config.access_key_id, - config.secret_access_key, - config.region, - &config.bucket, - ) - .expect("Valid S3 config"); + let integration = ObjectStore::new_amazon_s3( + AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + &config.bucket, + ) + .expect("Valid S3 config"), + ); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); let err = integration.delete(&location).await.unwrap_err(); - if let Error::UnableToDeleteData { - source, - bucket, - location, + if let ObjectStoreError::AwsObjectStoreError { + source: + Error::UnableToDeleteData { + source, + bucket, + location, + }, } = err { assert!(matches!(source, rusoto_core::RusotoError::Unknown(_))); assert_eq!(bucket, config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { - panic!("unexpected error type") + panic!("unexpected error type: {:?}", err); } Ok(()) @@ -759,29 +793,34 @@ mod tests { let mut config = maybe_skip_integration!(); config.bucket = NON_EXISTENT_NAME.into(); - let integration = AmazonS3::new( - config.access_key_id, - config.secret_access_key, - config.region, - &config.bucket, - ) - .expect("Valid S3 config"); + let integration = ObjectStore::new_amazon_s3( + AmazonS3::new( + config.access_key_id, + config.secret_access_key, + config.region, + &config.bucket, + ) + .expect("Valid S3 config"), + ); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); let err = integration.delete(&location).await.unwrap_err(); - if let Error::UnableToDeleteData { - source, - bucket, - location, + if let ObjectStoreError::AwsObjectStoreError { + source: + Error::UnableToDeleteData { + source, + bucket, + location, + }, } = err { assert!(matches!(source, rusoto_core::RusotoError::Unknown(_))); assert_eq!(bucket, config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { - panic!("unexpected error type") + panic!("unexpected error type: {:?}", err); } Ok(()) diff --git a/object_store/src/azure.rs b/object_store/src/azure.rs index 6f821381e8..32fe8e9966 100644 --- a/object_store/src/azure.rs +++ b/object_store/src/azure.rs @@ -277,6 +277,7 @@ impl MicrosoftAzure { mod tests { use super::*; use crate::tests::{list_with_delimiter, put_get_delete_list}; + use crate::ObjectStore; use std::env; type Error = Box; @@ -340,8 +341,11 @@ mod tests { #[tokio::test] async fn azure_blob_test() -> Result<()> { let config = maybe_skip_integration!(); - let integration = - MicrosoftAzure::new(config.storage_account, config.access_key, config.bucket); + let integration = ObjectStore::new_microsoft_azure(MicrosoftAzure::new( + config.storage_account, + config.access_key, + config.bucket, + )); put_get_delete_list(&integration).await?; list_with_delimiter(&integration).await?; diff --git a/object_store/src/disk.rs b/object_store/src/disk.rs index be80a37daa..5384cb567a 100644 --- a/object_store/src/disk.rs +++ b/object_store/src/disk.rs @@ -268,7 +268,7 @@ mod tests { use crate::{ tests::{list_with_delimiter, put_get_delete_list}, - ObjectStoreApi, ObjectStorePath, + Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath, }; use futures::stream; use tempfile::TempDir; @@ -276,7 +276,7 @@ mod tests { #[tokio::test] async fn file_test() -> Result<()> { let root = TempDir::new()?; - let integration = File::new(root.path()); + let integration = ObjectStore::new_file(File::new(root.path())); put_get_delete_list(&integration).await?; list_with_delimiter(&integration).await?; @@ -287,7 +287,7 @@ mod tests { #[tokio::test] async fn length_mismatch_is_an_error() -> Result<()> { let root = TempDir::new()?; - let integration = File::new(root.path()); + let integration = ObjectStore::new_file(File::new(root.path())); let bytes = stream::once(async { Ok(Bytes::from("hello world")) }); let mut location = integration.new_path(); @@ -296,9 +296,11 @@ mod tests { assert!(matches!( res.err().unwrap(), - Error::DataDoesNotMatchLength { - expected: 0, - actual: 11, + ObjectStoreError::FileObjectStoreError { + source: Error::DataDoesNotMatchLength { + expected: 0, + actual: 11, + } } )); @@ -308,7 +310,7 @@ mod tests { #[tokio::test] async fn creates_dir_if_not_present() -> Result<()> { let root = TempDir::new()?; - let integration = File::new(root.path()); + let integration = ObjectStore::new_file(File::new(root.path())); let data = Bytes::from("arbitrary data"); let mut location = integration.new_path(); @@ -337,7 +339,7 @@ mod tests { #[tokio::test] async fn unknown_length() -> Result<()> { let root = TempDir::new()?; - let integration = File::new(root.path()); + let integration = ObjectStore::new_file(File::new(root.path())); let data = Bytes::from("arbitrary data"); let stream_data = std::io::Result::Ok(data.clone()); diff --git a/object_store/src/gcp.rs b/object_store/src/gcp.rs index 91dcd3abde..eaaf833b27 100644 --- a/object_store/src/gcp.rs +++ b/object_store/src/gcp.rs @@ -258,7 +258,8 @@ mod test { use super::*; use crate::{ tests::{get_nonexistent_object, list_with_delimiter, put_get_delete_list}, - GoogleCloudStorage, ObjectStoreApi, ObjectStorePath, + Error as ObjectStoreError, GoogleCloudStorage, ObjectStore, ObjectStoreApi, + ObjectStorePath, }; use bytes::Bytes; use std::env; @@ -319,7 +320,10 @@ mod test { #[tokio::test] async fn gcs_test() -> Result<()> { let config = maybe_skip_integration!(); - let integration = GoogleCloudStorage::new(config.service_account, config.bucket); + let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new( + config.service_account, + config.bucket, + )); put_get_delete_list(&integration).await?; list_with_delimiter(&integration).await?; @@ -329,7 +333,10 @@ mod test { #[tokio::test] async fn gcs_test_get_nonexistent_location() -> Result<()> { let config = maybe_skip_integration!(); - let integration = GoogleCloudStorage::new(config.service_account, &config.bucket); + let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new( + config.service_account, + &config.bucket, + )); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -338,17 +345,20 @@ mod test { .await .unwrap_err(); - if let Some(Error::UnableToGetData { - source, - bucket, - location, - }) = err.downcast_ref::() + if let Some(ObjectStoreError::GcsObjectStoreError { + source: + Error::UnableToGetData { + source, + bucket, + location, + }, + }) = err.downcast_ref::() { assert!(matches!(source, cloud_storage::Error::Reqwest(_))); assert_eq!(bucket, &config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { - panic!("unexpected error type") + panic!("unexpected error type: {:?}", err) } Ok(()) @@ -358,7 +368,10 @@ mod test { async fn gcs_test_get_nonexistent_bucket() -> Result<()> { let mut config = maybe_skip_integration!(); config.bucket = NON_EXISTENT_NAME.into(); - let integration = GoogleCloudStorage::new(config.service_account, &config.bucket); + let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new( + config.service_account, + &config.bucket, + )); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -367,12 +380,14 @@ mod test { .await .unwrap_err(); - if let Some(Error::UnableToStreamListData { source, bucket }) = err.downcast_ref::() + if let Some(ObjectStoreError::GcsObjectStoreError { + source: Error::UnableToStreamListData { source, bucket }, + }) = err.downcast_ref::() { assert!(matches!(source, cloud_storage::Error::Google(_))); assert_eq!(bucket, &config.bucket); } else { - panic!("unexpected error type") + panic!("unexpected error type: {:?}", err); } Ok(()) @@ -381,24 +396,30 @@ mod test { #[tokio::test] async fn gcs_test_delete_nonexistent_location() -> Result<()> { let config = maybe_skip_integration!(); - let integration = GoogleCloudStorage::new(config.service_account, &config.bucket); + let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new( + config.service_account, + &config.bucket, + )); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); let err = integration.delete(&location).await.unwrap_err(); - if let Error::UnableToDeleteData { - source, - bucket, - location, + if let ObjectStoreError::GcsObjectStoreError { + source: + Error::UnableToDeleteData { + source, + bucket, + location, + }, } = err { assert!(matches!(source, cloud_storage::Error::Google(_))); assert_eq!(bucket, config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { - panic!("unexpected error type") + panic!("unexpected error type: {:?}", err) } Ok(()) @@ -408,24 +429,30 @@ mod test { async fn gcs_test_delete_nonexistent_bucket() -> Result<()> { let mut config = maybe_skip_integration!(); config.bucket = NON_EXISTENT_NAME.into(); - let integration = GoogleCloudStorage::new(config.service_account, &config.bucket); + let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new( + config.service_account, + &config.bucket, + )); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); let err = integration.delete(&location).await.unwrap_err(); - if let Error::UnableToDeleteData { - source, - bucket, - location, + if let ObjectStoreError::GcsObjectStoreError { + source: + Error::UnableToDeleteData { + source, + bucket, + location, + }, } = err { assert!(matches!(source, cloud_storage::Error::Google(_))); assert_eq!(bucket, config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { - panic!("unexpected error type") + panic!("unexpected error type: {:?}", err) } Ok(()) @@ -435,7 +462,10 @@ mod test { async fn gcs_test_put_nonexistent_bucket() -> Result<()> { let mut config = maybe_skip_integration!(); config.bucket = NON_EXISTENT_NAME.into(); - let integration = GoogleCloudStorage::new(config.service_account, &config.bucket); + let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new( + config.service_account, + &config.bucket, + )); let mut location = integration.new_path(); location.set_file_name(NON_EXISTENT_NAME); @@ -452,17 +482,20 @@ mod test { .await .unwrap_err(); - if let Error::UnableToPutData { - source, - bucket, - location, + if let ObjectStoreError::GcsObjectStoreError { + source: + Error::UnableToPutData { + source, + bucket, + location, + }, } = err { assert!(matches!(source, cloud_storage::Error::Other(_))); assert_eq!(bucket, config.bucket); assert_eq!(location, NON_EXISTENT_NAME); } else { - panic!("unexpected error type"); + panic!("unexpected error type: {:?}", err); } Ok(()) diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index c230813c32..635bb55fae 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -454,10 +454,10 @@ mod tests { type Error = Box; type Result = std::result::Result; - async fn flatten_list_stream( - storage: &T, - prefix: Option<&T::Path>, - ) -> Result> { + async fn flatten_list_stream( + storage: &ObjectStore, + prefix: Option<&path::Path>, + ) -> Result> { storage .list(prefix) .await? @@ -467,10 +467,7 @@ mod tests { .await } - pub(crate) async fn put_get_delete_list(storage: &T) -> Result<()> - where - T::Path: From, - { + pub(crate) async fn put_get_delete_list(storage: &ObjectStore) -> Result<()> { delete_fixtures(storage).await; let content_list = flatten_list_stream(storage, None).await?; @@ -526,10 +523,7 @@ mod tests { Ok(()) } - pub(crate) async fn list_with_delimiter(storage: &T) -> Result<()> - where - T::Path: From, - { + pub(crate) async fn list_with_delimiter(storage: &ObjectStore) -> Result<()> { delete_fixtures(storage).await; let content_list = flatten_list_stream(storage, None).await?; @@ -547,7 +541,7 @@ mod tests { "mydb/data/whatevs", ] .iter() - .map(|&s| str_to_path(s)) + .map(|&s| str_to_path(storage, s)) .collect(); for f in &files { @@ -609,9 +603,9 @@ mod tests { Ok(()) } - pub(crate) async fn get_nonexistent_object( - storage: &T, - location: Option, + pub(crate) async fn get_nonexistent_object( + storage: &ObjectStore, + location: Option<::Path>, ) -> Result { let location = location.unwrap_or_else(|| { let mut loc = storage.new_path(); @@ -635,20 +629,22 @@ mod tests { /// associated storage might not be cloud storage, to reuse the cloud /// path parsing logic. Then convert into the correct type of path for /// the given storage. - fn str_to_path

(val: &str) -> P - where - P: From + ObjectStorePath, - { + fn str_to_path(storage: &ObjectStore, val: &str) -> path::Path { let cloud_path = CloudPath::raw(val); let parsed: DirsAndFileName = cloud_path.into(); - parsed.into() + let mut new_path = storage.new_path(); + for part in parsed.directories { + new_path.push_dir(part.to_string()); + } + + if let Some(file_name) = parsed.file_name { + new_path.set_file_name(file_name.to_string()); + } + new_path } - async fn delete_fixtures(storage: &T) - where - T::Path: From, - { + async fn delete_fixtures(storage: &ObjectStore) { let files: Vec<_> = [ "test_file", "mydb/wal/000/000/000.segment", @@ -659,7 +655,7 @@ mod tests { "mydb/data/whatevs", ] .iter() - .map(|&s| str_to_path(s)) + .map(|&s| str_to_path(storage, s)) .collect(); for f in &files { diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index c72deadebf..f46daac623 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -181,13 +181,13 @@ mod tests { use crate::{ tests::{list_with_delimiter, put_get_delete_list}, - ObjectStoreApi, ObjectStorePath, + Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath, }; use futures::stream; #[tokio::test] async fn in_memory_test() -> Result<()> { - let integration = InMemory::new(); + let integration = ObjectStore::new_in_memory(InMemory::new()); put_get_delete_list(&integration).await?; @@ -198,7 +198,7 @@ mod tests { #[tokio::test] async fn length_mismatch_is_an_error() -> Result<()> { - let integration = InMemory::new(); + let integration = ObjectStore::new_in_memory(InMemory::new()); let bytes = stream::once(async { Ok(Bytes::from("hello world")) }); let mut location = integration.new_path(); @@ -207,9 +207,11 @@ mod tests { assert!(matches!( res.err().unwrap(), - Error::DataDoesNotMatchLength { - expected: 0, - actual: 11, + ObjectStoreError::InMemoryObjectStoreError { + source: Error::DataDoesNotMatchLength { + expected: 0, + actual: 11, + } } )); @@ -218,7 +220,7 @@ mod tests { #[tokio::test] async fn unknown_length() -> Result<()> { - let integration = InMemory::new(); + let integration = ObjectStore::new_in_memory(InMemory::new()); let data = Bytes::from("arbitrary data"); let stream_data = std::io::Result::Ok(data.clone());