refactor: path_from_dirs_and_filename trait method
Moves the path_from_dirs_and_filename from an ObjectStoreImpl method to a trait method, completing the abstraction over all object store backends.pull/24376/head
parent
1d5066c421
commit
b727d26dab
|
@ -1,7 +1,7 @@
|
|||
//! This module contains the IOx implementation for using S3 as the object
|
||||
//! store.
|
||||
use crate::{
|
||||
path::{cloud::CloudPath, DELIMITER},
|
||||
path::{cloud::CloudPath, parsed::DirsAndFileName, DELIMITER},
|
||||
GetResult, ListResult, ObjectMeta, ObjectStoreApi, ObjectStorePath,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
|
@ -176,6 +176,10 @@ impl ObjectStoreApi for AmazonS3 {
|
|||
CloudPath::raw(raw)
|
||||
}
|
||||
|
||||
fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path {
|
||||
path.into()
|
||||
}
|
||||
|
||||
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
|
||||
let bucket_name = self.bucket_name.clone();
|
||||
let key = location.to_raw();
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//! This module contains the IOx implementation for using Azure Blob storage as
|
||||
//! the object store.
|
||||
use crate::{
|
||||
path::{cloud::CloudPath, DELIMITER},
|
||||
path::{cloud::CloudPath, parsed::DirsAndFileName, DELIMITER},
|
||||
GetResult, ListResult, ObjectMeta, ObjectStoreApi, ObjectStorePath,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
|
@ -77,6 +77,10 @@ impl ObjectStoreApi for MicrosoftAzure {
|
|||
CloudPath::raw(raw)
|
||||
}
|
||||
|
||||
fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path {
|
||||
path.into()
|
||||
}
|
||||
|
||||
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
|
||||
let location = location.to_raw();
|
||||
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
//! This module contains the IOx implementation for using local disk as the
|
||||
//! object store.
|
||||
use crate::cache::Cache;
|
||||
use crate::path::Path;
|
||||
use crate::{path::file::FilePath, GetResult, ListResult, ObjectMeta, ObjectStoreImpl, ObjectStoreApi};
|
||||
use crate::path::{parsed::DirsAndFileName, Path};
|
||||
use crate::{
|
||||
path::file::FilePath, GetResult, ListResult, ObjectMeta, ObjectStoreApi, ObjectStoreImpl,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::{
|
||||
|
@ -100,6 +102,10 @@ impl ObjectStoreApi for File {
|
|||
FilePath::raw(raw, true)
|
||||
}
|
||||
|
||||
fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path {
|
||||
path.into()
|
||||
}
|
||||
|
||||
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
|
||||
let content = bytes::BytesMut::from(&*bytes);
|
||||
|
||||
|
@ -330,7 +336,7 @@ mod tests {
|
|||
get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
|
||||
put_get_delete_list,
|
||||
},
|
||||
Error as ObjectStoreError, ObjectStoreImpl, ObjectStoreApi, ObjectStorePath,
|
||||
Error as ObjectStoreError, ObjectStoreApi, ObjectStoreImpl, ObjectStorePath,
|
||||
};
|
||||
use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
|
||||
use tempfile::TempDir;
|
||||
|
|
|
@ -6,7 +6,10 @@ use async_trait::async_trait;
|
|||
use bytes::Bytes;
|
||||
use snafu::Snafu;
|
||||
|
||||
use crate::{path::cloud::CloudPath, GetResult, ObjectStoreApi};
|
||||
use crate::{
|
||||
path::{cloud::CloudPath, parsed::DirsAndFileName},
|
||||
GetResult, ObjectStoreApi,
|
||||
};
|
||||
|
||||
/// A specialized `Error` for Azure object store-related errors
|
||||
#[derive(Debug, Snafu, Clone)]
|
||||
|
@ -49,6 +52,10 @@ impl ObjectStoreApi for DummyObjectStore {
|
|||
CloudPath::raw(raw)
|
||||
}
|
||||
|
||||
fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path {
|
||||
path.into()
|
||||
}
|
||||
|
||||
async fn put(&self, _location: &Self::Path, _bytes: Bytes) -> crate::Result<(), Self::Error> {
|
||||
NotSupportedSnafu { name: &self.name }.fail()
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//! This module contains the IOx implementation for using Google Cloud Storage
|
||||
//! as the object store.
|
||||
use crate::{
|
||||
path::{cloud::CloudPath, DELIMITER},
|
||||
path::{cloud::CloudPath, parsed::DirsAndFileName, DELIMITER},
|
||||
GetResult, ListResult, ObjectMeta, ObjectStoreApi, ObjectStorePath,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
|
@ -95,6 +95,10 @@ impl ObjectStoreApi for GoogleCloudStorage {
|
|||
CloudPath::raw(raw)
|
||||
}
|
||||
|
||||
fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path {
|
||||
path.into()
|
||||
}
|
||||
|
||||
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
|
||||
let location = location.to_raw();
|
||||
let location_copy = location.clone();
|
||||
|
|
|
@ -78,6 +78,10 @@ pub trait ObjectStoreApi: Send + Sync + 'static {
|
|||
/// Return a new location path constructed from a string appropriate for this object storage
|
||||
fn path_from_raw(&self, raw: &str) -> Self::Path;
|
||||
|
||||
/// Construct an implementation-specific path from the parsed
|
||||
/// representation.
|
||||
fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path;
|
||||
|
||||
/// Save the provided bytes to the specified location.
|
||||
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<(), Self::Error>;
|
||||
|
||||
|
@ -209,19 +213,6 @@ impl ObjectStoreImpl {
|
|||
})
|
||||
}
|
||||
|
||||
/// Create implementation-specific path from parsed representation.
|
||||
pub fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> path::Path {
|
||||
use ObjectStoreIntegration::*;
|
||||
match &self.integration {
|
||||
AmazonS3(_) => path::Path::AmazonS3(path.into()),
|
||||
GoogleCloudStorage(_) => path::Path::GoogleCloudStorage(path.into()),
|
||||
InMemory(_) => path::Path::InMemory(path),
|
||||
InMemoryThrottled(_) => path::Path::InMemory(path),
|
||||
File(_) => path::Path::File(path.into()),
|
||||
MicrosoftAzure(_) => path::Path::MicrosoftAzure(path.into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the filesystem cache if configured
|
||||
pub fn cache(&self) -> &Option<ObjectStoreFileCache> {
|
||||
&self.cache
|
||||
|
@ -455,6 +446,18 @@ impl ObjectStoreApi for ObjectStoreImpl {
|
|||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> path::Path {
|
||||
use ObjectStoreIntegration::*;
|
||||
match &self.integration {
|
||||
AmazonS3(_) => path::Path::AmazonS3(path.into()),
|
||||
GoogleCloudStorage(_) => path::Path::GoogleCloudStorage(path.into()),
|
||||
InMemory(_) => path::Path::InMemory(path),
|
||||
InMemoryThrottled(_) => path::Path::InMemory(path),
|
||||
File(_) => path::Path::File(path.into()),
|
||||
MicrosoftAzure(_) => path::Path::MicrosoftAzure(path.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// All supported object storage integrations
|
||||
|
|
|
@ -47,6 +47,10 @@ impl ObjectStoreApi for InMemory {
|
|||
cloud_path.into()
|
||||
}
|
||||
|
||||
fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path {
|
||||
path.into()
|
||||
}
|
||||
|
||||
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<()> {
|
||||
self.storage
|
||||
.write()
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//! This module contains the IOx implementation for wrapping existing object store types into an artificial "sleep" wrapper.
|
||||
use std::{convert::TryInto, sync::Mutex};
|
||||
|
||||
use crate::{GetResult, ListResult, ObjectStoreApi, Result};
|
||||
use crate::{path::parsed::DirsAndFileName, GetResult, ListResult, ObjectStoreApi, Result};
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::{stream::BoxStream, StreamExt};
|
||||
|
@ -110,7 +110,10 @@ impl<T: ObjectStoreApi> ThrottledStore<T> {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: ObjectStoreApi> ObjectStoreApi for ThrottledStore<T> {
|
||||
impl<T: ObjectStoreApi> ObjectStoreApi for ThrottledStore<T>
|
||||
where
|
||||
DirsAndFileName: Into<T::Path>,
|
||||
{
|
||||
type Path = T::Path;
|
||||
|
||||
type Error = T::Error;
|
||||
|
@ -123,6 +126,10 @@ impl<T: ObjectStoreApi> ObjectStoreApi for ThrottledStore<T> {
|
|||
self.inner.path_from_raw(raw)
|
||||
}
|
||||
|
||||
fn path_from_dirs_and_filename(&self, path: DirsAndFileName) -> Self::Path {
|
||||
path.into()
|
||||
}
|
||||
|
||||
async fn put(&self, location: &Self::Path, bytes: Bytes) -> Result<(), Self::Error> {
|
||||
sleep(self.config().wait_put_per_call).await;
|
||||
|
||||
|
|
Loading…
Reference in New Issue