refactor: Extract a CloudPath type for use in cloud storage

This is the start of using the type system to enforce that only
CloudPaths will be used with S3, GCS, and Azure.

Still some mess in here, cleanup coming.
pull/24376/head
Carol (Nichols || Goulding) 2021-01-19 20:08:13 -05:00
parent 7d3b4db234
commit d39131ab49
8 changed files with 477 additions and 197 deletions

View File

@ -1,7 +1,7 @@
//! This module contains the IOx implementation for using S3 as the object
//! store.
use crate::{
path::{cloud::CloudConverter, DELIMITER},
path::{cloud::CloudPath, DELIMITER},
Error, ListResult, NoDataFromS3, ObjectMeta, Result, UnableToDeleteDataFromS3,
UnableToGetDataFromS3, UnableToGetPieceOfDataFromS3, UnableToPutDataToS3,
};
@ -55,12 +55,12 @@ impl AmazonS3 {
}
/// Return a new location path appropriate for this object storage
pub fn new_path(&self) -> crate::path::Path {
crate::path::Path::default()
pub fn new_path(&self) -> CloudPath {
CloudPath::default()
}
/// Save the provided bytes to the specified location.
pub async fn put<S>(&self, location: &crate::path::Path, bytes: S, length: usize) -> Result<()>
pub async fn put<S>(&self, location: &CloudPath, bytes: S, length: usize) -> Result<()>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
@ -68,7 +68,7 @@ impl AmazonS3 {
let put_request = rusoto_s3::PutObjectRequest {
bucket: self.bucket_name.clone(),
key: CloudConverter::convert(&location.inner),
key: location.to_raw(),
body: Some(bytes),
..Default::default()
};
@ -78,17 +78,14 @@ impl AmazonS3 {
.await
.context(UnableToPutDataToS3 {
bucket: &self.bucket_name,
location: CloudConverter::convert(&location.inner),
location: location.to_raw(),
})?;
Ok(())
}
/// Return the bytes that are stored at the specified location.
pub async fn get(
&self,
location: &crate::path::Path,
) -> Result<impl Stream<Item = Result<Bytes>>> {
let key = CloudConverter::convert(&location.inner);
pub async fn get(&self, location: &CloudPath) -> Result<impl Stream<Item = Result<Bytes>>> {
let key = location.to_raw();
let get_request = rusoto_s3::GetObjectRequest {
bucket: self.bucket_name.clone(),
key: key.clone(),
@ -115,8 +112,8 @@ impl AmazonS3 {
}
/// Delete the object at the specified location.
pub async fn delete(&self, location: &crate::path::Path) -> Result<()> {
let key = CloudConverter::convert(&location.inner);
pub async fn delete(&self, location: &CloudPath) -> Result<()> {
let key = location.to_raw();
let delete_request = rusoto_s3::DeleteObjectRequest {
bucket: self.bucket_name.clone(),
key: key.clone(),
@ -136,8 +133,8 @@ impl AmazonS3 {
/// List all the objects with the given prefix.
pub async fn list<'a>(
&'a self,
prefix: Option<&'a crate::path::Path>,
) -> Result<impl Stream<Item = Result<Vec<crate::path::Path>>> + 'a> {
prefix: Option<&'a CloudPath>,
) -> Result<impl Stream<Item = Result<Vec<CloudPath>>> + 'a> {
#[derive(Clone)]
enum ListState {
Start,
@ -149,7 +146,7 @@ impl AmazonS3 {
Ok(stream::unfold(ListState::Start, move |state| async move {
let mut list_request = rusoto_s3::ListObjectsV2Request {
bucket: self.bucket_name.clone(),
prefix: prefix.map(|p| CloudConverter::convert(&p.inner)),
prefix: prefix.map(|p| p.to_raw()),
..Default::default()
};
@ -181,7 +178,7 @@ impl AmazonS3 {
let contents = resp.contents.unwrap_or_default();
let names = contents
.into_iter()
.flat_map(|object| object.key.map(crate::path::Path::from_cloud_unchecked))
.flat_map(|object| object.key.map(CloudPath::raw))
.collect();
// The AWS response contains a field named `is_truncated` as well as
@ -202,10 +199,10 @@ impl AmazonS3 {
/// common prefixes (directories) in addition to object metadata.
pub async fn list_with_delimiter<'a>(
&'a self,
prefix: &'a crate::path::Path,
prefix: &'a CloudPath,
next_token: &Option<String>,
) -> Result<ListResult<crate::path::Path>> {
let converted_prefix = CloudConverter::convert(&prefix.inner);
) -> Result<ListResult<CloudPath>> {
let converted_prefix = prefix.to_raw();
let mut list_request = rusoto_s3::ListObjectsV2Request {
bucket: self.bucket_name.clone(),
@ -233,9 +230,8 @@ impl AmazonS3 {
let objects: Vec<_> = contents
.into_iter()
.map(|object| {
let location = crate::path::Path::from_cloud_unchecked(
object.key.expect("object doesn't exist without a key"),
);
let location =
CloudPath::raw(object.key.expect("object doesn't exist without a key"));
let last_modified = match object.last_modified {
Some(lm) => {
DateTime::parse_from_rfc3339(&lm)
@ -264,11 +260,7 @@ impl AmazonS3 {
.common_prefixes
.unwrap_or_default()
.into_iter()
.map(|p| {
crate::path::Path::from_cloud_unchecked(
p.prefix.expect("can't have a prefix without a value"),
)
})
.map(|p| CloudPath::raw(p.prefix.expect("can't have a prefix without a value")))
.collect();
let result = ListResult {

View File

@ -1,7 +1,7 @@
//! This module contains the IOx implementation for using Azure Blob storage as
//! the object store.
use crate::{
path::cloud::CloudConverter, DataDoesNotMatchLength, Result, UnableToDeleteDataFromAzure,
path::cloud::CloudPath, DataDoesNotMatchLength, Result, UnableToDeleteDataFromAzure,
UnableToGetDataFromAzure, UnableToListDataFromAzure, UnableToPutDataToAzure,
};
use azure_core::HttpClient;
@ -65,16 +65,16 @@ impl MicrosoftAzure {
}
/// Return a new location path appropriate for this object storage
pub fn new_path(&self) -> crate::path::Path {
crate::path::Path::default()
pub fn new_path(&self) -> CloudPath {
CloudPath::default()
}
/// Save the provided bytes to the specified location.
pub async fn put<S>(&self, location: &crate::path::Path, bytes: S, length: usize) -> Result<()>
pub async fn put<S>(&self, location: &CloudPath, bytes: S, length: usize) -> Result<()>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
let location = CloudConverter::convert(&location.inner);
let location = location.to_raw();
let temporary_non_streaming = bytes
.map_ok(|b| bytes::BytesMut::from(&b[..]))
.try_concat()
@ -102,12 +102,9 @@ impl MicrosoftAzure {
}
/// Return the bytes that are stored at the specified location.
pub async fn get(
&self,
location: &crate::path::Path,
) -> Result<impl Stream<Item = Result<Bytes>>> {
pub async fn get(&self, location: &CloudPath) -> Result<impl Stream<Item = Result<Bytes>>> {
let container_client = self.container_client.clone();
let location = CloudConverter::convert(&location.inner);
let location = location.to_raw();
Ok(async move {
container_client
.as_blob_client(&location)
@ -123,8 +120,8 @@ impl MicrosoftAzure {
}
/// Delete the object at the specified location.
pub async fn delete(&self, location: &crate::path::Path) -> Result<()> {
let location = CloudConverter::convert(&location.inner);
pub async fn delete(&self, location: &CloudPath) -> Result<()> {
let location = location.to_raw();
self.container_client
.as_blob_client(&location)
.delete()
@ -141,8 +138,8 @@ impl MicrosoftAzure {
/// List all the objects with the given prefix.
pub async fn list<'a>(
&'a self,
prefix: Option<&'a crate::path::Path>,
) -> Result<impl Stream<Item = Result<Vec<crate::path::Path>>> + 'a> {
prefix: Option<&'a CloudPath>,
) -> Result<impl Stream<Item = Result<Vec<CloudPath>>> + 'a> {
#[derive(Clone)]
enum ListState {
Start,
@ -153,7 +150,7 @@ impl MicrosoftAzure {
Ok(stream::unfold(ListState::Start, move |state| async move {
let mut request = self.container_client.list_blobs();
let prefix = prefix.map(|p| CloudConverter::convert(&p.inner));
let prefix = prefix.map(|p| p.to_raw());
if let Some(ref p) = prefix {
request = request.prefix(p as &str);
}
@ -183,7 +180,7 @@ impl MicrosoftAzure {
.incomplete_vector
.vector
.into_iter()
.map(|blob| crate::path::Path::from_cloud_unchecked(blob.name))
.map(|blob| CloudPath::raw(blob.name))
.collect();
Some((Ok(names), next_state))

View File

@ -1,7 +1,7 @@
//! This module contains the IOx implementation for using Google Cloud Storage
//! as the object store.
use crate::{
path::cloud::CloudConverter, DataDoesNotMatchLength, Result, UnableToDeleteDataFromGcs,
path::cloud::CloudPath, DataDoesNotMatchLength, Result, UnableToDeleteDataFromGcs,
UnableToGetDataFromGcs, UnableToListDataFromGcs, UnableToListDataFromGcs2,
UnableToPutDataToGcs,
};
@ -25,12 +25,12 @@ impl GoogleCloudStorage {
}
/// Return a new location path appropriate for this object storage
pub fn new_path(&self) -> crate::path::Path {
crate::path::Path::default()
pub fn new_path(&self) -> CloudPath {
CloudPath::default()
}
/// Save the provided bytes to the specified location.
pub async fn put<S>(&self, location: &crate::path::Path, bytes: S, length: usize) -> Result<()>
pub async fn put<S>(&self, location: &CloudPath, bytes: S, length: usize) -> Result<()>
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
@ -49,7 +49,7 @@ impl GoogleCloudStorage {
}
);
let location = CloudConverter::convert(&location.inner);
let location = location.to_raw();
let location_copy = location.clone();
let bucket_name = self.bucket_name.clone();
@ -69,11 +69,8 @@ impl GoogleCloudStorage {
}
/// Return the bytes that are stored at the specified location.
pub async fn get(
&self,
location: &crate::path::Path,
) -> Result<impl Stream<Item = Result<Bytes>>> {
let location = CloudConverter::convert(&location.inner);
pub async fn get(&self, location: &CloudPath) -> Result<impl Stream<Item = Result<Bytes>>> {
let location = location.to_raw();
let location_copy = location.clone();
let bucket_name = self.bucket_name.clone();
@ -88,8 +85,8 @@ impl GoogleCloudStorage {
}
/// Delete the object at the specified location.
pub async fn delete(&self, location: &crate::path::Path) -> Result<()> {
let location = CloudConverter::convert(&location.inner);
pub async fn delete(&self, location: &CloudPath) -> Result<()> {
let location = location.to_raw();
let location_copy = location.clone();
let bucket_name = self.bucket_name.clone();
@ -106,11 +103,11 @@ impl GoogleCloudStorage {
/// List all the objects with the given prefix.
pub async fn list<'a>(
&'a self,
prefix: Option<&'a crate::path::Path>,
) -> Result<impl Stream<Item = Result<Vec<crate::path::Path>>> + 'a> {
prefix: Option<&'a CloudPath>,
) -> Result<impl Stream<Item = Result<Vec<CloudPath>>> + 'a> {
let objects = match prefix {
Some(prefix) => {
let cloud_prefix = CloudConverter::convert(&prefix.inner);
let cloud_prefix = prefix.to_raw();
let list = cloud_storage::Object::list_prefix(&self.bucket_name, &cloud_prefix)
.await
.context(UnableToListDataFromGcs {
@ -133,7 +130,7 @@ impl GoogleCloudStorage {
let objects = objects
.map_ok(|list| {
list.into_iter()
.map(|o| crate::path::Path::from_cloud_unchecked(o.name))
.map(|o| CloudPath::raw(o.name))
.collect::<Vec<_>>()
})
.context(UnableToListDataFromGcs2 {

View File

@ -69,13 +69,19 @@ impl ObjectStore {
pub fn new_path(&self) -> path::Path {
use ObjectStoreIntegration::*;
match &self.0 {
AmazonS3(s3) => s3.new_path(),
GoogleCloudStorage(gcs) => gcs.new_path(),
AmazonS3(s3) => path::Path {
inner: path::PathRepresentation::AmazonS3(s3.new_path()),
},
GoogleCloudStorage(gcs) => path::Path {
inner: path::PathRepresentation::GoogleCloudStorage(gcs.new_path()),
},
InMemory(in_mem) => path::Path {
inner: path::PathRepresentation::Parts(in_mem.new_path()),
},
File(file) => file.new_path(),
MicrosoftAzure(azure) => azure.new_path(),
MicrosoftAzure(azure) => path::Path {
inner: path::PathRepresentation::MicrosoftAzure(azure.new_path()),
},
}
}
@ -84,18 +90,34 @@ impl ObjectStore {
where
S: Stream<Item = io::Result<Bytes>> + Send + Sync + 'static,
{
use path::PathRepresentation;
use ObjectStoreIntegration::*;
match (&self.0, location) {
(AmazonS3(s3), _) => s3.put(location, bytes, length).await?,
(GoogleCloudStorage(gcs), _) => gcs.put(location, bytes, length).await?,
(
AmazonS3(s3),
path::Path {
inner: PathRepresentation::AmazonS3(location),
},
) => s3.put(location, bytes, length).await?,
(
GoogleCloudStorage(gcs),
path::Path {
inner: PathRepresentation::GoogleCloudStorage(location),
},
) => gcs.put(location, bytes, length).await?,
(
InMemory(in_mem),
path::Path {
inner: path::PathRepresentation::Parts(location),
inner: PathRepresentation::Parts(location),
},
) => in_mem.put(location, bytes, length).await?,
(File(file), _) => file.put(location, bytes, length).await?,
(MicrosoftAzure(azure), _) => azure.put(location, bytes, length).await?,
(
MicrosoftAzure(azure),
path::Path {
inner: PathRepresentation::MicrosoftAzure(location),
},
) => azure.put(location, bytes, length).await?,
_ => unreachable!(),
}
@ -104,18 +126,34 @@ impl ObjectStore {
/// Return the bytes that are stored at the specified location.
pub async fn get(&self, location: &path::Path) -> Result<impl Stream<Item = Result<Bytes>>> {
use path::PathRepresentation;
use ObjectStoreIntegration::*;
Ok(match (&self.0, location) {
(AmazonS3(s3), _) => s3.get(location).await?.boxed(),
(GoogleCloudStorage(gcs), _) => gcs.get(location).await?.boxed(),
(
AmazonS3(s3),
path::Path {
inner: PathRepresentation::AmazonS3(location),
},
) => s3.get(location).await?.boxed(),
(
GoogleCloudStorage(gcs),
path::Path {
inner: PathRepresentation::GoogleCloudStorage(location),
},
) => gcs.get(location).await?.boxed(),
(
InMemory(in_mem),
path::Path {
inner: path::PathRepresentation::Parts(location),
inner: PathRepresentation::Parts(location),
},
) => in_mem.get(location).await?.boxed(),
(File(file), _) => file.get(location).await?.boxed(),
(MicrosoftAzure(azure), _) => azure.get(location).await?.boxed(),
(
MicrosoftAzure(azure),
path::Path {
inner: PathRepresentation::MicrosoftAzure(location),
},
) => azure.get(location).await?.boxed(),
_ => unreachable!(),
}
.err_into())
@ -123,18 +161,34 @@ impl ObjectStore {
/// Delete the object at the specified location.
pub async fn delete(&self, location: &path::Path) -> Result<()> {
use path::PathRepresentation;
use ObjectStoreIntegration::*;
match (&self.0, location) {
(AmazonS3(s3), _) => s3.delete(location).await?,
(GoogleCloudStorage(gcs), _) => gcs.delete(location).await?,
(
AmazonS3(s3),
path::Path {
inner: PathRepresentation::AmazonS3(location),
},
) => s3.delete(location).await?,
(
GoogleCloudStorage(gcs),
path::Path {
inner: PathRepresentation::GoogleCloudStorage(location),
},
) => gcs.delete(location).await?,
(
InMemory(in_mem),
path::Path {
inner: path::PathRepresentation::Parts(location),
inner: PathRepresentation::Parts(location),
},
) => in_mem.delete(location).await?,
(File(file), _) => file.delete(location).await?,
(MicrosoftAzure(azure), _) => azure.delete(location).await?,
(
MicrosoftAzure(azure),
path::Path {
inner: PathRepresentation::MicrosoftAzure(location),
},
) => azure.delete(location).await?,
_ => unreachable!(),
}
@ -146,15 +200,68 @@ impl ObjectStore {
&'a self,
prefix: Option<&'a path::Path>,
) -> Result<impl Stream<Item = Result<Vec<path::Path>>> + 'a> {
use path::PathRepresentation;
use ObjectStoreIntegration::*;
Ok(match (&self.0, prefix) {
(AmazonS3(s3), _) => s3.list(prefix).await?.boxed(),
(GoogleCloudStorage(gcs), _) => gcs.list(prefix).await?.boxed(),
(
AmazonS3(s3),
Some(path::Path {
inner: PathRepresentation::AmazonS3(prefix),
}),
) => s3
.list(Some(prefix))
.await?
.map_ok(|s| {
s.into_iter()
.map(|p| path::Path {
inner: PathRepresentation::AmazonS3(p),
})
.collect()
})
.boxed(),
(AmazonS3(s3), None) => s3
.list(None)
.await?
.map_ok(|s| {
s.into_iter()
.map(|p| path::Path {
inner: PathRepresentation::AmazonS3(p),
})
.collect()
})
.boxed(),
(
GoogleCloudStorage(gcs),
Some(path::Path {
inner: PathRepresentation::GoogleCloudStorage(prefix),
}),
) => gcs
.list(Some(prefix))
.await?
.map_ok(|s| {
s.into_iter()
.map(|p| path::Path {
inner: PathRepresentation::GoogleCloudStorage(p),
})
.collect()
})
.boxed(),
(GoogleCloudStorage(gcs), None) => gcs
.list(None)
.await?
.map_ok(|s| {
s.into_iter()
.map(|p| path::Path {
inner: PathRepresentation::GoogleCloudStorage(p),
})
.collect()
})
.boxed(),
(
InMemory(in_mem),
Some(path::Path {
inner: path::PathRepresentation::Parts(dirs_and_file_name),
inner: PathRepresentation::Parts(dirs_and_file_name),
}),
) => in_mem
.list(Some(dirs_and_file_name))
@ -168,7 +275,33 @@ impl ObjectStore {
.boxed(),
(File(file), _) => file.list(prefix).await?.boxed(),
(MicrosoftAzure(azure), _) => azure.list(prefix).await?.boxed(),
(
MicrosoftAzure(azure),
Some(path::Path {
inner: PathRepresentation::MicrosoftAzure(prefix),
}),
) => azure
.list(Some(prefix))
.await?
.map_ok(|s| {
s.into_iter()
.map(|p| path::Path {
inner: PathRepresentation::MicrosoftAzure(p),
})
.collect()
})
.boxed(),
(MicrosoftAzure(azure), None) => azure
.list(None)
.await?
.map_ok(|s| {
s.into_iter()
.map(|p| path::Path {
inner: PathRepresentation::MicrosoftAzure(p),
})
.collect()
})
.boxed(),
_ => unreachable!(),
}
.err_into())
@ -181,9 +314,23 @@ impl ObjectStore {
&'a self,
prefix: &'a path::Path,
) -> Result<ListResult<path::Path>> {
use path::PathRepresentation;
use ObjectStoreIntegration::*;
match (&self.0, prefix) {
(AmazonS3(s3), _) => s3.list_with_delimiter(prefix, &None).await,
(
AmazonS3(s3),
path::Path {
inner: PathRepresentation::AmazonS3(prefix),
},
) => {
s3.list_with_delimiter(prefix, &None)
.map_ok(|list_result| {
list_result.map_paths(|p| path::Path {
inner: PathRepresentation::AmazonS3(p),
})
})
.await
}
(GoogleCloudStorage(_gcs), _) => unimplemented!(),
(
InMemory(in_mem),
@ -196,9 +343,9 @@ impl ObjectStore {
.map_ok(|list_result| list_result.map_paths(Into::into))
.await
}
(InMemory(_), _) => unreachable!(),
(File(_file), _) => unimplemented!(),
(MicrosoftAzure(_azure), _) => unimplemented!(),
_ => unreachable!(),
}
}
}
@ -419,6 +566,8 @@ pub enum Error {
#[cfg(test)]
mod tests {
use super::*;
use crate::path::{cloud::CloudPath, parsed::DirsAndFileName, ObjectStorePath};
use futures::stream;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
@ -502,6 +651,7 @@ mod tests {
let data = Bytes::from("arbitrary data");
let files: Vec<_> = [
"test_file",
"mydb/wal/000/000/000.segment",
"mydb/wal/000/000/001.segment",
"mydb/wal/000/000/002.segment",
@ -510,7 +660,7 @@ mod tests {
"mydb/data/whatevs",
]
.iter()
.map(|&s| path::Path::from_cloud_unchecked(s))
.map(|&s| str_to_path(s, storage))
.collect();
let time_before_creation = Utc::now();
@ -597,6 +747,24 @@ mod tests {
.freeze())
}
/// Parse a str as a `CloudPath` into a `DirAndFileName` even though the
/// associated storage might not be cloud storage to reuse the cloud
/// path parsing logic. Then convert into the correct type of path for
/// the given storage.
fn str_to_path(val: &str, storage: &ObjectStore) -> path::Path {
let cloud_path = CloudPath::raw(val);
let parsed: DirsAndFileName = cloud_path.into();
let mut path = storage.new_path();
for dir in parsed.directories {
path.push_dir(dir.to_string())
}
if let Some(file_name) = parsed.file_name {
path.set_file_name(file_name.to_string());
}
path
}
async fn delete_fixtures(storage: &ObjectStore) {
let files: Vec<_> = [
"test_file",
@ -608,7 +776,7 @@ mod tests {
"mydb/data/whatevs",
]
.iter()
.map(|&s| path::Path::from_cloud_unchecked(s))
.map(|&s| str_to_path(s, storage))
.collect();
for f in &files {

View File

@ -5,6 +5,7 @@ use std::{mem, path::PathBuf};
/// Paths that came from or are to be used in cloud-based object storage
pub mod cloud;
use cloud::CloudPath;
/// Paths that come from or are to be used in file-based object storage
pub mod file;
@ -68,17 +69,6 @@ impl ObjectStorePath for Path {
}
impl Path {
/// For use when receiving a path from an object store API directly, not
/// when building a path. Assumes DELIMITER is the separator.
///
/// TODO: This should only be available to cloud storage
pub fn from_cloud_unchecked(path: impl Into<String>) -> Self {
let path = path.into();
Self {
inner: PathRepresentation::RawCloud(path),
}
}
/// For use when receiving a path from a filesystem directly, not
/// when building a path. Uses the standard library's path splitting
/// implementation to separate into parts.
@ -147,11 +137,16 @@ impl From<DirsAndFileName> for Path {
}
}
/// Temporary
/// Defines which object stores use which path logic.
#[derive(Clone, Eq, Debug)]
pub enum PathRepresentation {
/// Will be transformed into a CloudPath type
RawCloud(String),
/// Amazon storage
AmazonS3(CloudPath),
/// GCP storage
GoogleCloudStorage(CloudPath),
/// Microsoft Azure Blob storage
MicrosoftAzure(CloudPath),
/// Will be transformed into a FilePath type
RawPathBuf(PathBuf),
/// Will be able to be used directly
@ -221,7 +216,9 @@ impl PathRepresentation {
fn display(&self) -> String {
match self {
Self::Parts(dirs_and_file_name) => dirs_and_file_name.display(),
Self::RawCloud(path) => path.to_owned(),
Self::AmazonS3(path) | Self::GoogleCloudStorage(path) | Self::MicrosoftAzure(path) => {
path.display().to_string()
}
Self::RawPathBuf(path_buf) => path_buf.display().to_string(),
}
}
@ -384,43 +381,9 @@ mod tests {
);
}
#[test]
fn convert_raw_before_partial_eq() {
// dir and file_name
let cloud = Path::from_cloud_unchecked("test_dir/test_file.json");
let mut built = Path::default();
built.push_dir("test_dir");
built.set_file_name("test_file.json");
assert_eq!(built, cloud);
// dir, no file_name
let cloud = Path::from_cloud_unchecked("test_dir");
let mut built = Path::default();
built.push_dir("test_dir");
assert_eq!(built, cloud);
// file_name, no dir
let cloud = Path::from_cloud_unchecked("test_file.json");
let mut built = Path::default();
built.set_file_name("test_file.json");
assert_eq!(built, cloud);
// empty
let cloud = Path::from_cloud_unchecked("");
let built = Path::default();
assert_eq!(built, cloud);
}
#[test]
fn path_rep_conversions() {
// dir and file name
let cloud = PathRepresentation::RawCloud("foo/bar/blah.json".into());
let cloud_parts: DirsAndFileName = cloud.into();
let path_buf = PathRepresentation::RawPathBuf("foo/bar/blah.json".into());
let path_buf_parts: DirsAndFileName = path_buf.into();
@ -429,44 +392,27 @@ mod tests {
expected_parts.push_dir("bar");
expected_parts.file_name = Some("blah.json".into());
assert_eq!(cloud_parts, expected_parts);
assert_eq!(path_buf_parts, expected_parts);
// dir, no file name
let cloud = PathRepresentation::RawCloud("foo/bar".into());
let cloud_parts: DirsAndFileName = cloud.into();
let path_buf = PathRepresentation::RawPathBuf("foo/bar".into());
let path_buf_parts: DirsAndFileName = path_buf.into();
expected_parts.file_name = None;
assert_eq!(cloud_parts, expected_parts);
assert_eq!(path_buf_parts, expected_parts);
// no dir, file name
let cloud = PathRepresentation::RawCloud("blah.json".into());
let cloud_parts: DirsAndFileName = cloud.into();
let path_buf = PathRepresentation::RawPathBuf("blah.json".into());
let path_buf_parts: DirsAndFileName = path_buf.into();
assert!(cloud_parts.directories.is_empty());
assert_eq!(cloud_parts.file_name.unwrap().encoded(), "blah.json");
assert!(path_buf_parts.directories.is_empty());
assert_eq!(path_buf_parts.file_name.unwrap().encoded(), "blah.json");
// empty
let cloud = PathRepresentation::RawCloud("".into());
let cloud_parts: DirsAndFileName = cloud.into();
let path_buf = PathRepresentation::RawPathBuf("".into());
let path_buf_parts: DirsAndFileName = path_buf.into();
assert!(cloud_parts.directories.is_empty());
assert!(cloud_parts.file_name.is_none());
assert!(path_buf_parts.directories.is_empty());
assert!(path_buf_parts.file_name.is_none());
}

View File

@ -1,22 +1,53 @@
use super::{PathPart, PathRepresentation, DELIMITER};
use super::{DirsAndFileName, ObjectStorePath, PathPart, DELIMITER};
use std::mem;
use itertools::Itertools;
/// Converts `PathRepresentation`s to `String`s that are appropriate for use as
/// locations in cloud storage.
#[derive(Debug, Clone, Copy)]
pub struct CloudConverter {}
/// An object storage location suitable for passing to cloud storage APIs such
/// as AWS, GCS, and Azure.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct CloudPath {
inner: CloudPathRepresentation,
}
impl CloudConverter {
/// Creates a cloud storage location by joining this `PathRepresentation`'s
impl ObjectStorePath for CloudPath {
fn set_file_name(&mut self, part: impl Into<String>) {
self.inner = mem::take(&mut self.inner).set_file_name(part);
}
fn push_dir(&mut self, part: impl Into<String>) {
self.inner = mem::take(&mut self.inner).push_dir(part);
}
fn push_all_dirs<'a>(&mut self, parts: impl AsRef<[&'a str]>) {
self.inner = mem::take(&mut self.inner).push_all_dirs(parts);
}
fn display(&self) -> String {
self.to_raw()
}
}
impl CloudPath {
/// Creates a cloud storage location from a string received from a cloud
/// storage API without parsing or allocating unless other methods are
/// called on this instance that need it
pub fn raw(path: impl Into<String>) -> Self {
let path = path.into();
Self {
inner: CloudPathRepresentation::Raw(path),
}
}
/// Creates a cloud storage location by joining this `CloudPath`'s
/// parts with `DELIMITER`
pub fn convert(object_store_path: &PathRepresentation) -> String {
match object_store_path {
PathRepresentation::RawCloud(path) => path.to_owned(),
PathRepresentation::RawPathBuf(_path) => {
todo!("convert");
}
PathRepresentation::Parts(dirs_and_file_name) => {
pub fn to_raw(&self) -> String {
use CloudPathRepresentation::*;
match &self.inner {
Raw(path) => path.to_owned(),
Parsed(dirs_and_file_name) => {
let mut path = dirs_and_file_name
.directories
.iter()
@ -35,6 +66,105 @@ impl CloudConverter {
}
}
impl From<CloudPath> for DirsAndFileName {
fn from(cloud_path: CloudPath) -> Self {
cloud_path.inner.into()
}
}
impl From<DirsAndFileName> for CloudPath {
fn from(dirs_and_file_name: DirsAndFileName) -> Self {
Self {
inner: CloudPathRepresentation::Parsed(dirs_and_file_name),
}
}
}
#[derive(Debug, Clone, Eq)]
enum CloudPathRepresentation {
Raw(String),
Parsed(DirsAndFileName),
}
impl Default for CloudPathRepresentation {
fn default() -> Self {
Self::Parsed(DirsAndFileName::default())
}
}
impl PartialEq for CloudPathRepresentation {
fn eq(&self, other: &Self) -> bool {
use CloudPathRepresentation::*;
match (self, other) {
(Parsed(self_parts), Parsed(other_parts)) => self_parts == other_parts,
(Parsed(self_parts), _) => {
let other_parts: DirsAndFileName = other.to_owned().into();
*self_parts == other_parts
}
(_, Parsed(other_parts)) => {
let self_parts: DirsAndFileName = self.to_owned().into();
self_parts == *other_parts
}
_ => {
let self_parts: DirsAndFileName = self.to_owned().into();
let other_parts: DirsAndFileName = other.to_owned().into();
self_parts == other_parts
}
}
}
}
impl CloudPathRepresentation {
fn push_dir(self, part: impl Into<String>) -> Self {
let mut dirs_and_file_name: DirsAndFileName = self.into();
dirs_and_file_name.push_dir(part);
Self::Parsed(dirs_and_file_name)
}
fn push_all_dirs<'a>(self, parts: impl AsRef<[&'a str]>) -> Self {
let mut dirs_and_file_name: DirsAndFileName = self.into();
dirs_and_file_name.push_all_dirs(parts);
Self::Parsed(dirs_and_file_name)
}
fn set_file_name(self, part: impl Into<String>) -> Self {
let mut dirs_and_file_name: DirsAndFileName = self.into();
dirs_and_file_name.set_file_name(part);
Self::Parsed(dirs_and_file_name)
}
}
impl From<CloudPathRepresentation> for DirsAndFileName {
fn from(cloud_path_rep: CloudPathRepresentation) -> Self {
use CloudPathRepresentation::*;
match cloud_path_rep {
Raw(path) => {
let mut parts: Vec<PathPart> = path
.split_terminator(DELIMITER)
.map(|s| PathPart(s.to_string()))
.collect();
let maybe_file_name = match parts.pop() {
Some(file) if file.encoded().contains('.') => Some(file),
Some(dir) => {
parts.push(dir);
None
}
None => None,
};
Self {
directories: parts,
file_name: maybe_file_name,
}
}
Parsed(dirs_and_file_name) => dirs_and_file_name,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -44,10 +174,10 @@ mod tests {
// Use case: a file named `test_file.json` exists in object storage and it
// should be returned for a search on prefix `test`, so the prefix path
// should not get a trailing delimiter automatically added
let prefix = PathRepresentation::default();
let prefix = prefix.set_file_name("test");
let mut prefix = CloudPath::default();
prefix.set_file_name("test");
let converted = CloudConverter::convert(&prefix);
let converted = prefix.to_raw();
assert_eq!(converted, "test");
}
@ -56,29 +186,96 @@ mod tests {
// Use case: files exist in object storage named `foo/bar.json` and
// `foo_test.json`. A search for the prefix `foo/` should return
// `foo/bar.json` but not `foo_test.json'.
let prefix = PathRepresentation::default();
let prefix = prefix.push_dir("test");
let mut prefix = CloudPath::default();
prefix.push_dir("test");
let converted = CloudConverter::convert(&prefix);
let converted = prefix.to_raw();
assert_eq!(converted, "test/");
}
#[test]
fn push_encodes() {
let location = PathRepresentation::default();
let location = location.push_dir("foo/bar");
let location = location.push_dir("baz%2Ftest");
let mut location = CloudPath::default();
location.push_dir("foo/bar");
location.push_dir("baz%2Ftest");
let converted = CloudConverter::convert(&location);
let converted = location.to_raw();
assert_eq!(converted, "foo%2Fbar/baz%252Ftest/");
}
#[test]
fn push_all_encodes() {
let location = PathRepresentation::default();
let location = location.push_all_dirs(&["foo/bar", "baz%2Ftest"]);
let mut location = CloudPath::default();
location.push_all_dirs(&["foo/bar", "baz%2Ftest"]);
let converted = CloudConverter::convert(&location);
let converted = location.to_raw();
assert_eq!(converted, "foo%2Fbar/baz%252Ftest/");
}
#[test]
fn convert_raw_before_partial_eq() {
// dir and file_name
let cloud = CloudPath::raw("test_dir/test_file.json");
let mut built = CloudPath::default();
built.push_dir("test_dir");
built.set_file_name("test_file.json");
assert_eq!(built, cloud);
// dir, no file_name
let cloud = CloudPath::raw("test_dir");
let mut built = CloudPath::default();
built.push_dir("test_dir");
assert_eq!(built, cloud);
// file_name, no dir
let cloud = CloudPath::raw("test_file.json");
let mut built = CloudPath::default();
built.set_file_name("test_file.json");
assert_eq!(built, cloud);
// empty
let cloud = CloudPath::raw("");
let built = CloudPath::default();
assert_eq!(built, cloud);
}
#[test]
fn conversions() {
// dir and file name
let cloud = CloudPath::raw("foo/bar/blah.json");
let cloud_parts: DirsAndFileName = cloud.into();
let mut expected_parts = DirsAndFileName::default();
expected_parts.push_dir("foo");
expected_parts.push_dir("bar");
expected_parts.file_name = Some("blah.json".into());
assert_eq!(cloud_parts, expected_parts);
// dir, no file name
let cloud = CloudPath::raw("foo/bar");
let cloud_parts: DirsAndFileName = cloud.into();
expected_parts.file_name = None;
assert_eq!(cloud_parts, expected_parts);
// no dir, file name
let cloud = CloudPath::raw("blah.json");
let cloud_parts: DirsAndFileName = cloud.into();
assert!(cloud_parts.directories.is_empty());
assert_eq!(cloud_parts.file_name.unwrap().encoded(), "blah.json");
// empty
let cloud = CloudPath::raw("");
let cloud_parts: DirsAndFileName = cloud.into();
assert!(cloud_parts.directories.is_empty());
assert!(cloud_parts.file_name.is_none());
}
}

View File

@ -13,9 +13,6 @@ impl FileConverter {
/// platform.
pub fn convert(object_store_path: &PathRepresentation) -> PathBuf {
match object_store_path {
PathRepresentation::RawCloud(_path) => {
todo!("convert");
}
PathRepresentation::RawPathBuf(path) => path.to_owned(),
PathRepresentation::Parts(dirs_and_file_name) => {
let mut path: PathBuf = dirs_and_file_name
@ -28,6 +25,7 @@ impl FileConverter {
}
path
}
_ => unreachable!(),
}
}
}

View File

@ -127,24 +127,9 @@ impl DirsAndFileName {
impl From<PathRepresentation> for DirsAndFileName {
fn from(path_rep: PathRepresentation) -> Self {
match path_rep {
PathRepresentation::RawCloud(path) => {
let mut parts: Vec<PathPart> = path
.split_terminator(DELIMITER)
.map(|s| PathPart(s.to_string()))
.collect();
let maybe_file_name = match parts.pop() {
Some(file) if file.encoded().contains('.') => Some(file),
Some(dir) => {
parts.push(dir);
None
}
None => None,
};
Self {
directories: parts,
file_name: maybe_file_name,
}
}
PathRepresentation::AmazonS3(path)
| PathRepresentation::GoogleCloudStorage(path)
| PathRepresentation::MicrosoftAzure(path) => path.into(),
PathRepresentation::RawPathBuf(path) => {
let mut parts: Vec<PathPart> = path
.iter()