From b727d26dab2fe0a8ee513ff3d4e8aef21f8440d7 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 11 Mar 2022 17:28:50 +0000 Subject: [PATCH] refactor: path_from_dirs_and_filename trait method Moves the path_from_dirs_and_filename from an ObjectStoreImpl method to a trait method, completing the abstraction over all object store backends. --- object_store/src/aws.rs | 6 +++++- object_store/src/azure.rs | 6 +++++- object_store/src/disk.rs | 12 +++++++++--- object_store/src/dummy.rs | 9 ++++++++- object_store/src/gcp.rs | 6 +++++- object_store/src/lib.rs | 29 ++++++++++++++++------------- object_store/src/memory.rs | 4 ++++ object_store/src/throttle.rs | 11 +++++++++-- 8 files changed, 61 insertions(+), 22 deletions(-) diff --git a/object_store/src/aws.rs b/object_store/src/aws.rs index 9bb04a8562..6f11d8119b 100644 --- a/object_store/src/aws.rs +++ b/object_store/src/aws.rs @@ -1,7 +1,7 @@ //! This module contains the IOx implementation for using S3 as the object //! store. use crate::{ - path::{cloud::CloudPath, DELIMITER}, + path::{cloud::CloudPath, parsed::DirsAndFileName, DELIMITER}, GetResult, ListResult, ObjectMeta, ObjectStoreApi, ObjectStorePath, }; use async_trait::async_trait; @@ -176,6 +176,10 @@ impl ObjectStoreApi for AmazonS3 { CloudPath::raw(raw) } + fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path { + path.into() + } + async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> { let bucket_name = self.bucket_name.clone(); let key = location.to_raw(); diff --git a/object_store/src/azure.rs b/object_store/src/azure.rs index 5b6b2e407d..a207f10d00 100644 --- a/object_store/src/azure.rs +++ b/object_store/src/azure.rs @@ -1,7 +1,7 @@ //! This module contains the IOx implementation for using Azure Blob storage as //! the object store. use crate::{ - path::{cloud::CloudPath, DELIMITER}, + path::{cloud::CloudPath, parsed::DirsAndFileName, DELIMITER}, GetResult, ListResult, ObjectMeta, ObjectStoreApi, ObjectStorePath, }; use async_trait::async_trait; @@ -77,6 +77,10 @@ impl ObjectStoreApi for MicrosoftAzure { CloudPath::raw(raw) } + fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path { + path.into() + } + async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> { let location = location.to_raw(); diff --git a/object_store/src/disk.rs b/object_store/src/disk.rs index 80d754aaad..c60c3e9725 100644 --- a/object_store/src/disk.rs +++ b/object_store/src/disk.rs @@ -1,8 +1,10 @@ //! This module contains the IOx implementation for using local disk as the //! object store. use crate::cache::Cache; -use crate::path::Path; -use crate::{path::file::FilePath, GetResult, ListResult, ObjectMeta, ObjectStoreImpl, ObjectStoreApi}; +use crate::path::{parsed::DirsAndFileName, Path}; +use crate::{ + path::file::FilePath, GetResult, ListResult, ObjectMeta, ObjectStoreApi, ObjectStoreImpl, +}; use async_trait::async_trait; use bytes::Bytes; use futures::{ @@ -100,6 +102,10 @@ impl ObjectStoreApi for File { FilePath::raw(raw, true) } + fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path { + path.into() + } + async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> { let content = bytes::BytesMut::from(&*bytes); @@ -330,7 +336,7 @@ mod tests { get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter, put_get_delete_list, }, - Error as ObjectStoreError, ObjectStoreImpl, ObjectStoreApi, ObjectStorePath, + Error as ObjectStoreError, ObjectStoreApi, ObjectStoreImpl, ObjectStorePath, }; use std::{fs::set_permissions, os::unix::prelude::PermissionsExt}; use tempfile::TempDir; diff --git a/object_store/src/dummy.rs b/object_store/src/dummy.rs index cb4862fbf9..01089e50c4 100644 --- a/object_store/src/dummy.rs +++ b/object_store/src/dummy.rs @@ -6,7 +6,10 @@ use async_trait::async_trait; use bytes::Bytes; use snafu::Snafu; -use crate::{path::cloud::CloudPath, GetResult, ObjectStoreApi}; +use crate::{ + path::{cloud::CloudPath, parsed::DirsAndFileName}, + GetResult, ObjectStoreApi, +}; /// A specialized `Error` for Azure object store-related errors #[derive(Debug, Snafu, Clone)] @@ -49,6 +52,10 @@ impl ObjectStoreApi for DummyObjectStore { CloudPath::raw(raw) } + fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path { + path.into() + } + async fn put(&self, _location: &Self::Path, _bytes: Bytes) -> crate::Result<(), Self::Error> { NotSupportedSnafu { name: &self.name }.fail() } diff --git a/object_store/src/gcp.rs b/object_store/src/gcp.rs index 6225d6ada8..a740f4587b 100644 --- a/object_store/src/gcp.rs +++ b/object_store/src/gcp.rs @@ -1,7 +1,7 @@ //! This module contains the IOx implementation for using Google Cloud Storage //! as the object store. use crate::{ - path::{cloud::CloudPath, DELIMITER}, + path::{cloud::CloudPath, parsed::DirsAndFileName, DELIMITER}, GetResult, ListResult, ObjectMeta, ObjectStoreApi, ObjectStorePath, }; use async_trait::async_trait; @@ -95,6 +95,10 @@ impl ObjectStoreApi for GoogleCloudStorage { CloudPath::raw(raw) } + fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path { + path.into() + } + async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> { let location = location.to_raw(); let location_copy = location.clone(); diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 7dbb2ae32c..542ab78259 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -78,6 +78,10 @@ pub trait ObjectStoreApi: Send + Sync + 'static { /// Return a new location path constructed from a string appropriate for this object storage fn path_from_raw(&self, raw: &str) -> Self::Path; + /// Construct an implementation-specific path from the parsed + /// representation. + fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path; + /// Save the provided bytes to the specified location. async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<(), Self::Error>; @@ -209,19 +213,6 @@ impl ObjectStoreImpl { }) } - /// Create implementation-specific path from parsed representation. - pub fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> path::Path { - use ObjectStoreIntegration::*; - match &self.integration { - AmazonS3(_) => path::Path::AmazonS3(path.into()), - GoogleCloudStorage(_) => path::Path::GoogleCloudStorage(path.into()), - InMemory(_) => path::Path::InMemory(path), - InMemoryThrottled(_) => path::Path::InMemory(path), - File(_) => path::Path::File(path.into()), - MicrosoftAzure(_) => path::Path::MicrosoftAzure(path.into()), - } - } - /// Returns the filesystem cache if configured pub fn cache(&self) -> &Option { &self.cache @@ -455,6 +446,18 @@ impl ObjectStoreApi for ObjectStoreImpl { _ => unreachable!(), } } + + fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> path::Path { + use ObjectStoreIntegration::*; + match &self.integration { + AmazonS3(_) => path::Path::AmazonS3(path.into()), + GoogleCloudStorage(_) => path::Path::GoogleCloudStorage(path.into()), + InMemory(_) => path::Path::InMemory(path), + InMemoryThrottled(_) => path::Path::InMemory(path), + File(_) => path::Path::File(path.into()), + MicrosoftAzure(_) => path::Path::MicrosoftAzure(path.into()), + } + } } /// All supported object storage integrations diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index f51c5ce03a..7518cb71f7 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -47,6 +47,10 @@ impl ObjectStoreApi for InMemory { cloud_path.into() } + fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path { + path.into() + } + async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> { self.storage .write() diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index 8422d01286..cdde9817b9 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -1,7 +1,7 @@ //! This module contains the IOx implementation for wrapping existing object store types into an artificial "sleep" wrapper. use std::{convert::TryInto, sync::Mutex}; -use crate::{GetResult, ListResult, ObjectStoreApi, Result}; +use crate::{path::parsed::DirsAndFileName, GetResult, ListResult, ObjectStoreApi, Result}; use async_trait::async_trait; use bytes::Bytes; use futures::{stream::BoxStream, StreamExt}; @@ -110,7 +110,10 @@ impl ThrottledStore { } #[async_trait] -impl ObjectStoreApi for ThrottledStore { +impl ObjectStoreApi for ThrottledStore +where + DirsAndFileName: Into, +{ type Path = T::Path; type Error = T::Error; @@ -123,6 +126,10 @@ impl ObjectStoreApi for ThrottledStore { self.inner.path_from_raw(raw) } + fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path { + path.into() + } + async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<(), Self::Error> { sleep(self.config().wait_put_per_call).await;