fix: Test using the object store wrapper interface

pull/24376/head
Carol (Nichols || Goulding) 2021-03-04 15:15:03 -05:00
parent bc255529f2
commit ca2f74063e
6 changed files with 244 additions and 168 deletions

View File

@ -412,7 +412,7 @@ mod tests {
use super::*;
use crate::{
tests::{get_nonexistent_object, list_with_delimiter, put_get_delete_list},
AmazonS3, ObjectStoreApi, ObjectStorePath,
AmazonS3, Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath,
};
use bytes::Bytes;
use std::env;
@ -458,7 +458,7 @@ mod tests {
"TEST_INTEGRATION is set, \
but variable(s) {} need to be set",
unset_var_names
)
);
} else if force.is_err() && !unset_var_names.is_empty() {
eprintln!(
"skipping AWS integration test - set \
@ -500,13 +500,15 @@ mod tests {
#[tokio::test]
async fn s3_test() -> Result<()> {
let config = maybe_skip_integration!();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
config.bucket,
)
.expect("Valid S3 config");
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
config.bucket,
)
.expect("Valid S3 config"),
);
check_credentials(put_get_delete_list(&integration).await)?;
check_credentials(list_with_delimiter(&integration).await).unwrap();
@ -520,13 +522,15 @@ mod tests {
// Assumes environment variables do not provide credentials to AWS US West 1
config.region = "us-west-1".into();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config");
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config"),
);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -534,11 +538,14 @@ mod tests {
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let Some(Error::UnableToListData { source, bucket }) = err.downcast_ref::<Error>() {
if let Some(ObjectStoreError::AwsObjectStoreError {
source: Error::UnableToListData { source, bucket },
}) = err.downcast_ref::<ObjectStoreError>()
{
assert!(matches!(source, rusoto_core::RusotoError::Unknown(_)));
assert_eq!(bucket, &config.bucket);
} else {
panic!("unexpected error type")
panic!("unexpected error type: {:?}", err);
}
Ok(())
@ -547,13 +554,15 @@ mod tests {
#[tokio::test]
async fn s3_test_get_nonexistent_location() -> Result<()> {
let config = maybe_skip_integration!();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config");
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config"),
);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -561,11 +570,14 @@ mod tests {
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let Some(Error::UnableToGetData {
source,
bucket,
location,
}) = err.downcast_ref::<Error>()
if let Some(ObjectStoreError::AwsObjectStoreError {
source:
Error::UnableToGetData {
source,
bucket,
location,
},
}) = err.downcast_ref::<ObjectStoreError>()
{
assert!(matches!(
source,
@ -574,7 +586,7 @@ mod tests {
assert_eq!(bucket, &config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")
panic!("unexpected error type: {:?}", err);
}
Ok(())
@ -585,13 +597,15 @@ mod tests {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config");
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config"),
);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -599,14 +613,17 @@ mod tests {
let err = get_nonexistent_object(&integration, Some(location))
.await
.unwrap_err();
if let Some(Error::UnableToListData { source, bucket }) = err.downcast_ref::<Error>() {
if let Some(ObjectStoreError::AwsObjectStoreError {
source: Error::UnableToListData { source, bucket },
}) = err.downcast_ref::<ObjectStoreError>()
{
assert!(matches!(
source,
rusoto_core::RusotoError::Service(rusoto_s3::ListObjectsV2Error::NoSuchBucket(_))
));
assert_eq!(bucket, &config.bucket);
} else {
panic!("unexpected error type")
panic!("unexpected error type: {:?}", err);
}
Ok(())
@ -618,13 +635,15 @@ mod tests {
// Assumes environment variables do not provide credentials to AWS US West 1
config.region = "us-west-1".into();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config");
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config"),
);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -640,17 +659,20 @@ mod tests {
.await
.unwrap_err();
if let Error::UnableToPutData {
source,
bucket,
location,
if let ObjectStoreError::AwsObjectStoreError {
source:
Error::UnableToPutData {
source,
bucket,
location,
},
} = err
{
assert!(matches!(source, rusoto_core::RusotoError::Unknown(_)));
assert_eq!(bucket, config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")
panic!("unexpected error type: {:?}", err);
}
Ok(())
@ -661,13 +683,15 @@ mod tests {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config");
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config"),
);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -683,17 +707,20 @@ mod tests {
.await
.unwrap_err();
if let Error::UnableToPutData {
source,
bucket,
location,
if let ObjectStoreError::AwsObjectStoreError {
source:
Error::UnableToPutData {
source,
bucket,
location,
},
} = err
{
assert!(matches!(source, rusoto_core::RusotoError::Unknown(_)));
assert_eq!(bucket, config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")
panic!("unexpected error type: {:?}", err);
}
Ok(())
@ -702,13 +729,15 @@ mod tests {
#[tokio::test]
async fn s3_test_delete_nonexistent_location() -> Result<()> {
let config = maybe_skip_integration!();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
config.bucket,
)
.expect("Valid S3 config");
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
config.bucket,
)
.expect("Valid S3 config"),
);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -726,29 +755,34 @@ mod tests {
// Assumes environment variables do not provide credentials to AWS US West 1
config.region = "us-west-1".into();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config");
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config"),
);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let err = integration.delete(&location).await.unwrap_err();
if let Error::UnableToDeleteData {
source,
bucket,
location,
if let ObjectStoreError::AwsObjectStoreError {
source:
Error::UnableToDeleteData {
source,
bucket,
location,
},
} = err
{
assert!(matches!(source, rusoto_core::RusotoError::Unknown(_)));
assert_eq!(bucket, config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")
panic!("unexpected error type: {:?}", err);
}
Ok(())
@ -759,29 +793,34 @@ mod tests {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config");
let integration = ObjectStore::new_amazon_s3(
AmazonS3::new(
config.access_key_id,
config.secret_access_key,
config.region,
&config.bucket,
)
.expect("Valid S3 config"),
);
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let err = integration.delete(&location).await.unwrap_err();
if let Error::UnableToDeleteData {
source,
bucket,
location,
if let ObjectStoreError::AwsObjectStoreError {
source:
Error::UnableToDeleteData {
source,
bucket,
location,
},
} = err
{
assert!(matches!(source, rusoto_core::RusotoError::Unknown(_)));
assert_eq!(bucket, config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")
panic!("unexpected error type: {:?}", err);
}
Ok(())

View File

@ -277,6 +277,7 @@ impl MicrosoftAzure {
mod tests {
use super::*;
use crate::tests::{list_with_delimiter, put_get_delete_list};
use crate::ObjectStore;
use std::env;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
@ -340,8 +341,11 @@ mod tests {
#[tokio::test]
async fn azure_blob_test() -> Result<()> {
let config = maybe_skip_integration!();
let integration =
MicrosoftAzure::new(config.storage_account, config.access_key, config.bucket);
let integration = ObjectStore::new_microsoft_azure(MicrosoftAzure::new(
config.storage_account,
config.access_key,
config.bucket,
));
put_get_delete_list(&integration).await?;
list_with_delimiter(&integration).await?;

View File

@ -268,7 +268,7 @@ mod tests {
use crate::{
tests::{list_with_delimiter, put_get_delete_list},
ObjectStoreApi, ObjectStorePath,
Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath,
};
use futures::stream;
use tempfile::TempDir;
@ -276,7 +276,7 @@ mod tests {
#[tokio::test]
async fn file_test() -> Result<()> {
let root = TempDir::new()?;
let integration = File::new(root.path());
let integration = ObjectStore::new_file(File::new(root.path()));
put_get_delete_list(&integration).await?;
list_with_delimiter(&integration).await?;
@ -287,7 +287,7 @@ mod tests {
#[tokio::test]
async fn length_mismatch_is_an_error() -> Result<()> {
let root = TempDir::new()?;
let integration = File::new(root.path());
let integration = ObjectStore::new_file(File::new(root.path()));
let bytes = stream::once(async { Ok(Bytes::from("hello world")) });
let mut location = integration.new_path();
@ -296,9 +296,11 @@ mod tests {
assert!(matches!(
res.err().unwrap(),
Error::DataDoesNotMatchLength {
expected: 0,
actual: 11,
ObjectStoreError::FileObjectStoreError {
source: Error::DataDoesNotMatchLength {
expected: 0,
actual: 11,
}
}
));
@ -308,7 +310,7 @@ mod tests {
#[tokio::test]
async fn creates_dir_if_not_present() -> Result<()> {
let root = TempDir::new()?;
let integration = File::new(root.path());
let integration = ObjectStore::new_file(File::new(root.path()));
let data = Bytes::from("arbitrary data");
let mut location = integration.new_path();
@ -337,7 +339,7 @@ mod tests {
#[tokio::test]
async fn unknown_length() -> Result<()> {
let root = TempDir::new()?;
let integration = File::new(root.path());
let integration = ObjectStore::new_file(File::new(root.path()));
let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());

View File

@ -258,7 +258,8 @@ mod test {
use super::*;
use crate::{
tests::{get_nonexistent_object, list_with_delimiter, put_get_delete_list},
GoogleCloudStorage, ObjectStoreApi, ObjectStorePath,
Error as ObjectStoreError, GoogleCloudStorage, ObjectStore, ObjectStoreApi,
ObjectStorePath,
};
use bytes::Bytes;
use std::env;
@ -319,7 +320,10 @@ mod test {
#[tokio::test]
async fn gcs_test() -> Result<()> {
let config = maybe_skip_integration!();
let integration = GoogleCloudStorage::new(config.service_account, config.bucket);
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
config.service_account,
config.bucket,
));
put_get_delete_list(&integration).await?;
list_with_delimiter(&integration).await?;
@ -329,7 +333,10 @@ mod test {
#[tokio::test]
async fn gcs_test_get_nonexistent_location() -> Result<()> {
let config = maybe_skip_integration!();
let integration = GoogleCloudStorage::new(config.service_account, &config.bucket);
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
config.service_account,
&config.bucket,
));
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -338,17 +345,20 @@ mod test {
.await
.unwrap_err();
if let Some(Error::UnableToGetData {
source,
bucket,
location,
}) = err.downcast_ref::<Error>()
if let Some(ObjectStoreError::GcsObjectStoreError {
source:
Error::UnableToGetData {
source,
bucket,
location,
},
}) = err.downcast_ref::<ObjectStoreError>()
{
assert!(matches!(source, cloud_storage::Error::Reqwest(_)));
assert_eq!(bucket, &config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")
panic!("unexpected error type: {:?}", err)
}
Ok(())
@ -358,7 +368,10 @@ mod test {
async fn gcs_test_get_nonexistent_bucket() -> Result<()> {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = GoogleCloudStorage::new(config.service_account, &config.bucket);
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
config.service_account,
&config.bucket,
));
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -367,12 +380,14 @@ mod test {
.await
.unwrap_err();
if let Some(Error::UnableToStreamListData { source, bucket }) = err.downcast_ref::<Error>()
if let Some(ObjectStoreError::GcsObjectStoreError {
source: Error::UnableToStreamListData { source, bucket },
}) = err.downcast_ref::<ObjectStoreError>()
{
assert!(matches!(source, cloud_storage::Error::Google(_)));
assert_eq!(bucket, &config.bucket);
} else {
panic!("unexpected error type")
panic!("unexpected error type: {:?}", err);
}
Ok(())
@ -381,24 +396,30 @@ mod test {
#[tokio::test]
async fn gcs_test_delete_nonexistent_location() -> Result<()> {
let config = maybe_skip_integration!();
let integration = GoogleCloudStorage::new(config.service_account, &config.bucket);
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
config.service_account,
&config.bucket,
));
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let err = integration.delete(&location).await.unwrap_err();
if let Error::UnableToDeleteData {
source,
bucket,
location,
if let ObjectStoreError::GcsObjectStoreError {
source:
Error::UnableToDeleteData {
source,
bucket,
location,
},
} = err
{
assert!(matches!(source, cloud_storage::Error::Google(_)));
assert_eq!(bucket, config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")
panic!("unexpected error type: {:?}", err)
}
Ok(())
@ -408,24 +429,30 @@ mod test {
async fn gcs_test_delete_nonexistent_bucket() -> Result<()> {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = GoogleCloudStorage::new(config.service_account, &config.bucket);
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
config.service_account,
&config.bucket,
));
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
let err = integration.delete(&location).await.unwrap_err();
if let Error::UnableToDeleteData {
source,
bucket,
location,
if let ObjectStoreError::GcsObjectStoreError {
source:
Error::UnableToDeleteData {
source,
bucket,
location,
},
} = err
{
assert!(matches!(source, cloud_storage::Error::Google(_)));
assert_eq!(bucket, config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type")
panic!("unexpected error type: {:?}", err)
}
Ok(())
@ -435,7 +462,10 @@ mod test {
async fn gcs_test_put_nonexistent_bucket() -> Result<()> {
let mut config = maybe_skip_integration!();
config.bucket = NON_EXISTENT_NAME.into();
let integration = GoogleCloudStorage::new(config.service_account, &config.bucket);
let integration = ObjectStore::new_google_cloud_storage(GoogleCloudStorage::new(
config.service_account,
&config.bucket,
));
let mut location = integration.new_path();
location.set_file_name(NON_EXISTENT_NAME);
@ -452,17 +482,20 @@ mod test {
.await
.unwrap_err();
if let Error::UnableToPutData {
source,
bucket,
location,
if let ObjectStoreError::GcsObjectStoreError {
source:
Error::UnableToPutData {
source,
bucket,
location,
},
} = err
{
assert!(matches!(source, cloud_storage::Error::Other(_)));
assert_eq!(bucket, config.bucket);
assert_eq!(location, NON_EXISTENT_NAME);
} else {
panic!("unexpected error type");
panic!("unexpected error type: {:?}", err);
}
Ok(())

View File

@ -454,10 +454,10 @@ mod tests {
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Result<T, E = Error> = std::result::Result<T, E>;
async fn flatten_list_stream<T: ObjectStoreApi>(
storage: &T,
prefix: Option<&T::Path>,
) -> Result<Vec<T::Path>> {
async fn flatten_list_stream(
storage: &ObjectStore,
prefix: Option<&path::Path>,
) -> Result<Vec<path::Path>> {
storage
.list(prefix)
.await?
@ -467,10 +467,7 @@ mod tests {
.await
}
pub(crate) async fn put_get_delete_list<T: ObjectStoreApi>(storage: &T) -> Result<()>
where
T::Path: From<DirsAndFileName>,
{
pub(crate) async fn put_get_delete_list(storage: &ObjectStore) -> Result<()> {
delete_fixtures(storage).await;
let content_list = flatten_list_stream(storage, None).await?;
@ -526,10 +523,7 @@ mod tests {
Ok(())
}
pub(crate) async fn list_with_delimiter<T: ObjectStoreApi>(storage: &T) -> Result<()>
where
T::Path: From<DirsAndFileName>,
{
pub(crate) async fn list_with_delimiter(storage: &ObjectStore) -> Result<()> {
delete_fixtures(storage).await;
let content_list = flatten_list_stream(storage, None).await?;
@ -547,7 +541,7 @@ mod tests {
"mydb/data/whatevs",
]
.iter()
.map(|&s| str_to_path(s))
.map(|&s| str_to_path(storage, s))
.collect();
for f in &files {
@ -609,9 +603,9 @@ mod tests {
Ok(())
}
pub(crate) async fn get_nonexistent_object<T: ObjectStoreApi>(
storage: &T,
location: Option<T::Path>,
pub(crate) async fn get_nonexistent_object(
storage: &ObjectStore,
location: Option<<ObjectStore as ObjectStoreApi>::Path>,
) -> Result<Bytes> {
let location = location.unwrap_or_else(|| {
let mut loc = storage.new_path();
@ -635,20 +629,22 @@ mod tests {
/// associated storage might not be cloud storage, to reuse the cloud
/// path parsing logic. Then convert into the correct type of path for
/// the given storage.
fn str_to_path<P>(val: &str) -> P
where
P: From<DirsAndFileName> + ObjectStorePath,
{
fn str_to_path(storage: &ObjectStore, val: &str) -> path::Path {
let cloud_path = CloudPath::raw(val);
let parsed: DirsAndFileName = cloud_path.into();
parsed.into()
let mut new_path = storage.new_path();
for part in parsed.directories {
new_path.push_dir(part.to_string());
}
if let Some(file_name) = parsed.file_name {
new_path.set_file_name(file_name.to_string());
}
new_path
}
async fn delete_fixtures<T: ObjectStoreApi>(storage: &T)
where
T::Path: From<DirsAndFileName>,
{
async fn delete_fixtures(storage: &ObjectStore) {
let files: Vec<_> = [
"test_file",
"mydb/wal/000/000/000.segment",
@ -659,7 +655,7 @@ mod tests {
"mydb/data/whatevs",
]
.iter()
.map(|&s| str_to_path(s))
.map(|&s| str_to_path(storage, s))
.collect();
for f in &files {

View File

@ -181,13 +181,13 @@ mod tests {
use crate::{
tests::{list_with_delimiter, put_get_delete_list},
ObjectStoreApi, ObjectStorePath,
Error as ObjectStoreError, ObjectStore, ObjectStoreApi, ObjectStorePath,
};
use futures::stream;
#[tokio::test]
async fn in_memory_test() -> Result<()> {
let integration = InMemory::new();
let integration = ObjectStore::new_in_memory(InMemory::new());
put_get_delete_list(&integration).await?;
@ -198,7 +198,7 @@ mod tests {
#[tokio::test]
async fn length_mismatch_is_an_error() -> Result<()> {
let integration = InMemory::new();
let integration = ObjectStore::new_in_memory(InMemory::new());
let bytes = stream::once(async { Ok(Bytes::from("hello world")) });
let mut location = integration.new_path();
@ -207,9 +207,11 @@ mod tests {
assert!(matches!(
res.err().unwrap(),
Error::DataDoesNotMatchLength {
expected: 0,
actual: 11,
ObjectStoreError::InMemoryObjectStoreError {
source: Error::DataDoesNotMatchLength {
expected: 0,
actual: 11,
}
}
));
@ -218,7 +220,7 @@ mod tests {
#[tokio::test]
async fn unknown_length() -> Result<()> {
let integration = InMemory::new();
let integration = ObjectStore::new_in_memory(InMemory::new());
let data = Bytes::from("arbitrary data");
let stream_data = std::io::Result::Ok(data.clone());