feat: Gets google cloud list_with_delimiter tests passing
parent
d5d4e7a3b0
commit
ef54131afb
|
@ -514,8 +514,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "cloud-storage"
|
||||
version = "0.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33dc6ee89f0440f1fc8356fc01d5451831bd9f390d9cce6a42b5805b63b36e27"
|
||||
source = "git+https://github.com/integer32llc/cloud-storage-rs.git?branch=list-with-delimiter#ca7be2ff7d0a0dce5d130b7ed1f81c0e1f392c95"
|
||||
dependencies = [
|
||||
"base64 0.13.0",
|
||||
"bytes",
|
||||
|
|
|
@ -13,7 +13,9 @@ azure_storage = { git = "https://github.com/Azure/azure-sdk-for-rust.git", rev =
|
|||
bytes = "1.0"
|
||||
chrono = "0.4"
|
||||
# Google Cloud Storage integration
|
||||
cloud-storage = { version = "0.7.0" }
|
||||
# Fork needed until https://github.com/ThouCheese/cloud-storage-rs/pull/57 or similar is merged
|
||||
# and released to support listing with a delimiter parameter
|
||||
cloud-storage = { git = "https://github.com/integer32llc/cloud-storage-rs.git", branch = "list-with-delimiter" }
|
||||
futures = "0.3.5"
|
||||
itertools = "0.9.0"
|
||||
percent-encoding = "2.1"
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
//! This module contains the IOx implementation for using Google Cloud Storage
|
||||
//! as the object store.
|
||||
use crate::{path::cloud::CloudPath, ListResult, ObjectStoreApi};
|
||||
use crate::{
|
||||
path::{cloud::CloudPath, DELIMITER},
|
||||
ListResult, ObjectMeta, ObjectStoreApi,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::{
|
||||
stream::{self, BoxStream},
|
||||
Stream, StreamExt, TryStreamExt,
|
||||
};
|
||||
use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
|
||||
use snafu::{ensure, futures::TryStreamExt as _, ResultExt, Snafu};
|
||||
use std::io;
|
||||
use std::{convert::TryFrom, io};
|
||||
|
||||
/// A specialized `Result` for Google Cloud Storage object store-related errors
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -156,31 +156,21 @@ impl ObjectStoreApi for GoogleCloudStorage {
|
|||
&'a self,
|
||||
prefix: Option<&'a Self::Path>,
|
||||
) -> Result<BoxStream<'a, Result<Vec<Self::Path>>>> {
|
||||
let objects = match prefix {
|
||||
Some(prefix) => {
|
||||
let cloud_prefix = prefix.to_raw();
|
||||
let list = cloud_storage::Object::list_prefix(&self.bucket_name, &cloud_prefix)
|
||||
.await
|
||||
.context(UnableToListData {
|
||||
bucket: &self.bucket_name,
|
||||
})?;
|
||||
|
||||
// TODO: Remove collect when the path no longer needs
|
||||
// to be converted into an owned object that would be
|
||||
// dropped too early.
|
||||
stream::iter(list.collect::<Vec<_>>().await).left_stream()
|
||||
}
|
||||
None => cloud_storage::Object::list(&self.bucket_name)
|
||||
.await
|
||||
.context(UnableToListData {
|
||||
bucket: &self.bucket_name,
|
||||
})?
|
||||
.right_stream(),
|
||||
let converted_prefix = prefix.map(|p| p.to_raw());
|
||||
let list_request = cloud_storage::ListRequest {
|
||||
prefix: converted_prefix,
|
||||
..Default::default()
|
||||
};
|
||||
let object_lists = cloud_storage::Object::list(&self.bucket_name, list_request)
|
||||
.await
|
||||
.context(UnableToListData {
|
||||
bucket: &self.bucket_name,
|
||||
})?;
|
||||
|
||||
let objects = objects
|
||||
let objects = object_lists
|
||||
.map_ok(|list| {
|
||||
list.into_iter()
|
||||
list.items
|
||||
.into_iter()
|
||||
.map(|o| CloudPath::raw(o.name))
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
|
@ -191,8 +181,57 @@ impl ObjectStoreApi for GoogleCloudStorage {
|
|||
Ok(objects.boxed())
|
||||
}
|
||||
|
||||
async fn list_with_delimiter(&self, _prefix: &Self::Path) -> Result<ListResult<Self::Path>> {
|
||||
unimplemented!();
|
||||
async fn list_with_delimiter(&self, prefix: &Self::Path) -> Result<ListResult<Self::Path>> {
|
||||
let converted_prefix = prefix.to_raw();
|
||||
let list_request = cloud_storage::ListRequest {
|
||||
prefix: Some(converted_prefix),
|
||||
delimiter: Some(DELIMITER.to_string()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut object_lists = Box::pin(
|
||||
cloud_storage::Object::list(&self.bucket_name, list_request)
|
||||
.await
|
||||
.context(UnableToListData {
|
||||
bucket: &self.bucket_name,
|
||||
})?,
|
||||
);
|
||||
|
||||
let result = match object_lists.next().await {
|
||||
None => ListResult {
|
||||
objects: vec![],
|
||||
common_prefixes: vec![],
|
||||
next_token: None,
|
||||
},
|
||||
Some(list_response) => {
|
||||
let list_response = list_response.context(UnableToStreamListData {
|
||||
bucket: &self.bucket_name,
|
||||
})?;
|
||||
|
||||
ListResult {
|
||||
objects: list_response
|
||||
.items
|
||||
.iter()
|
||||
.map(|object| {
|
||||
let location = CloudPath::raw(&object.name);
|
||||
let last_modified = object.updated;
|
||||
let size = usize::try_from(object.size)
|
||||
.expect("unsupported size on this platform");
|
||||
|
||||
ObjectMeta {
|
||||
location,
|
||||
last_modified,
|
||||
size,
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
common_prefixes: list_response.prefixes.iter().map(CloudPath::raw).collect(),
|
||||
next_token: list_response.next_page_token,
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -209,7 +248,7 @@ impl GoogleCloudStorage {
|
|||
mod test {
|
||||
use super::*;
|
||||
use crate::{
|
||||
tests::{get_nonexistent_object, put_get_delete_list},
|
||||
tests::{get_nonexistent_object, list_with_delimiter, put_get_delete_list},
|
||||
GoogleCloudStorage, ObjectStoreApi, ObjectStorePath,
|
||||
};
|
||||
use bytes::Bytes;
|
||||
|
@ -254,6 +293,7 @@ mod test {
|
|||
|
||||
let integration = GoogleCloudStorage::new(&bucket_name);
|
||||
put_get_delete_list(&integration).await?;
|
||||
list_with_delimiter(&integration).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -359,14 +399,27 @@ mod test {
|
|||
let data = Bytes::from("arbitrary data");
|
||||
let stream_data = std::io::Result::Ok(data.clone());
|
||||
|
||||
let result = integration
|
||||
let err = integration
|
||||
.put(
|
||||
&location,
|
||||
futures::stream::once(async move { stream_data }),
|
||||
data.len(),
|
||||
)
|
||||
.await;
|
||||
assert!(result.is_ok());
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
if let Error::UnableToPutData {
|
||||
source,
|
||||
bucket,
|
||||
location,
|
||||
} = err
|
||||
{
|
||||
assert!(matches!(source, cloud_storage::Error::Other(_)));
|
||||
assert_eq!(bucket, bucket_name);
|
||||
assert_eq!(location, NON_EXISTENT_NAME);
|
||||
} else {
|
||||
panic!("unexpected error type");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue