chore: make cache a type in object store trait
parent
d0ea812041
commit
47044d537c
|
@ -134,6 +134,7 @@ impl fmt::Debug for AmazonS3 {
|
|||
|
||||
#[async_trait]
|
||||
impl ObjectStoreApi for AmazonS3 {
|
||||
type Cache = LocalFSCache;
|
||||
type Path = CloudPath;
|
||||
type Error = Error;
|
||||
|
||||
|
@ -287,7 +288,7 @@ impl ObjectStoreApi for AmazonS3 {
|
|||
self.list_with_delimiter_and_token(prefix, &None).await
|
||||
}
|
||||
|
||||
fn cache(&self) -> Option<&dyn Cache> {
|
||||
fn cache(&self) -> Option<&Self::Cache> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,6 +65,7 @@ pub struct MicrosoftAzure {
|
|||
|
||||
#[async_trait]
|
||||
impl ObjectStoreApi for MicrosoftAzure {
|
||||
type Cache = LocalFSCache;
|
||||
type Path = CloudPath;
|
||||
type Error = Error;
|
||||
|
||||
|
@ -241,7 +242,7 @@ impl ObjectStoreApi for MicrosoftAzure {
|
|||
})
|
||||
}
|
||||
|
||||
fn cache(&self) -> Option<&dyn Cache> {
|
||||
fn cache(&self) -> Option<&Self::Cache> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,7 +40,9 @@ pub trait Cache {
|
|||
|
||||
/// Implementation of the local file system cache that keeps the LRU stats and
|
||||
/// performs any evictions to load new objects in.
|
||||
pub(crate) struct LocalFSCache {}
|
||||
#[derive(Debug)]
|
||||
#[allow(missing_copy_implementations)]
|
||||
pub struct LocalFSCache {}
|
||||
|
||||
#[async_trait]
|
||||
impl Cache for LocalFSCache {
|
||||
|
|
|
@ -72,6 +72,7 @@ pub struct File {
|
|||
|
||||
#[async_trait]
|
||||
impl ObjectStoreApi for File {
|
||||
type Cache = Self;
|
||||
type Path = FilePath;
|
||||
type Error = Error;
|
||||
|
||||
|
@ -251,7 +252,7 @@ impl ObjectStoreApi for File {
|
|||
})
|
||||
}
|
||||
|
||||
fn cache(&self) -> Option<&dyn Cache> {
|
||||
fn cache(&self) -> Option<&Self::Cache> {
|
||||
Some(self)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,8 @@
|
|||
use async_trait::async_trait;
|
||||
use snafu::Snafu;
|
||||
|
||||
use crate::{cache::Cache, path::cloud::CloudPath, ObjectStoreApi};
|
||||
use crate::cache::LocalFSCache;
|
||||
use crate::{path::cloud::CloudPath, ObjectStoreApi};
|
||||
|
||||
/// A specialized `Error` for Azure object store-related errors
|
||||
#[derive(Debug, Snafu, Clone)]
|
||||
|
@ -35,6 +36,7 @@ pub type GoogleCloudStorage = DummyObjectStore;
|
|||
|
||||
#[async_trait]
|
||||
impl ObjectStoreApi for DummyObjectStore {
|
||||
type Cache = LocalFSCache;
|
||||
type Path = CloudPath;
|
||||
type Error = Error;
|
||||
|
||||
|
@ -85,7 +87,7 @@ impl ObjectStoreApi for DummyObjectStore {
|
|||
NotSupported { name: &self.name }.fail()
|
||||
}
|
||||
|
||||
fn cache(&self) -> Option<&dyn Cache> {
|
||||
fn cache(&self) -> Option<&Self::Cache> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,6 +80,7 @@ pub struct GoogleCloudStorage {
|
|||
|
||||
#[async_trait]
|
||||
impl ObjectStoreApi for GoogleCloudStorage {
|
||||
type Cache = LocalFSCache;
|
||||
type Path = CloudPath;
|
||||
type Error = Error;
|
||||
|
||||
|
@ -251,7 +252,7 @@ impl ObjectStoreApi for GoogleCloudStorage {
|
|||
Ok(result)
|
||||
}
|
||||
|
||||
fn cache(&self) -> Option<&dyn Cache> {
|
||||
fn cache(&self) -> Option<&Self::Cache> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,17 +50,22 @@ use throttle::ThrottledStore;
|
|||
/// Publically expose throttling configuration
|
||||
pub use throttle::ThrottleConfig;
|
||||
|
||||
use crate::cache::Cache;
|
||||
use crate::cache::{Cache, LocalFSCache};
|
||||
use crate::path::Path;
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures::{stream::BoxStream, Stream, StreamExt, TryFutureExt, TryStreamExt};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::sync::Arc;
|
||||
use std::{io, path::PathBuf};
|
||||
|
||||
/// Universal API to multiple object store services.
|
||||
#[async_trait]
|
||||
pub trait ObjectStoreApi: Send + Sync + 'static {
|
||||
/// The type of the local filesystem cache to use with this object store.
|
||||
type Cache: cache::Cache;
|
||||
|
||||
/// The type of the locations used in interacting with this object store.
|
||||
type Path: path::ObjectStorePath;
|
||||
|
||||
|
@ -104,7 +109,7 @@ pub trait ObjectStoreApi: Send + Sync + 'static {
|
|||
) -> Result<ListResult<Self::Path>, Self::Error>;
|
||||
|
||||
/// Return the local filesystem cache, if configured, for this object store.
|
||||
fn cache(&self) -> Option<&dyn Cache>;
|
||||
fn cache(&self) -> Option<&Self::Cache>;
|
||||
}
|
||||
|
||||
/// Universal interface to multiple object store services.
|
||||
|
@ -195,6 +200,7 @@ impl ObjectStore {
|
|||
|
||||
#[async_trait]
|
||||
impl ObjectStoreApi for ObjectStore {
|
||||
type Cache = ObjectStoreFileCache;
|
||||
type Path = path::Path;
|
||||
type Error = Error;
|
||||
|
||||
|
@ -419,7 +425,7 @@ impl ObjectStoreApi for ObjectStore {
|
|||
}
|
||||
}
|
||||
|
||||
fn cache(&self) -> Option<&dyn Cache> {
|
||||
fn cache(&self) -> Option<&Self::Cache> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
@ -441,6 +447,51 @@ pub enum ObjectStoreIntegration {
|
|||
MicrosoftAzure(Box<MicrosoftAzure>),
|
||||
}
|
||||
|
||||
/// Cache wrapper so local file object store can pass through to its implementation
|
||||
/// while others use the `LocalFSCache`.
|
||||
#[derive(Debug)]
|
||||
pub enum ObjectStoreFileCache {
|
||||
/// If using the local filesystem for object store, don't create additional copies for caching
|
||||
Passthrough(File),
|
||||
/// Remote object stores should use the LocalFSCache implementation
|
||||
File(LocalFSCache),
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Cache for ObjectStoreFileCache {
|
||||
fn evict(&self, path: &Path) -> crate::cache::Result<()> {
|
||||
match &self {
|
||||
Self::Passthrough(f) => f.evict(path),
|
||||
Self::File(f) => f.evict(path),
|
||||
}
|
||||
}
|
||||
|
||||
async fn fs_path_or_cache(
|
||||
&self,
|
||||
path: &Path,
|
||||
store: Arc<ObjectStore>,
|
||||
) -> crate::cache::Result<&str> {
|
||||
match &self {
|
||||
Self::Passthrough(f) => f.fs_path_or_cache(path, store).await,
|
||||
Self::File(f) => f.fs_path_or_cache(path, store).await,
|
||||
}
|
||||
}
|
||||
|
||||
fn size(&self) -> u64 {
|
||||
match &self {
|
||||
Self::Passthrough(f) => f.size(),
|
||||
Self::File(f) => f.size(),
|
||||
}
|
||||
}
|
||||
|
||||
fn limit(&self) -> u64 {
|
||||
match &self {
|
||||
Self::Passthrough(f) => f.size(),
|
||||
Self::File(f) => f.size(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Result of a list call that includes objects, prefixes (directories) and a
|
||||
/// token for the next set of results. Individual result sets may be limited to
|
||||
/// 1,000 objects based on the underlying object storage's limitations.
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
//! This module contains the IOx implementation for using memory as the object
|
||||
//! store.
|
||||
use crate::cache::LocalFSCache;
|
||||
use crate::{
|
||||
cache::Cache, path::parsed::DirsAndFileName, ListResult, ObjectMeta, ObjectStoreApi,
|
||||
ObjectStorePath,
|
||||
path::parsed::DirsAndFileName, ListResult, ObjectMeta, ObjectStoreApi, ObjectStorePath,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
|
@ -39,6 +39,7 @@ pub struct InMemory {
|
|||
|
||||
#[async_trait]
|
||||
impl ObjectStoreApi for InMemory {
|
||||
type Cache = LocalFSCache;
|
||||
type Path = DirsAndFileName;
|
||||
type Error = Error;
|
||||
|
||||
|
@ -155,7 +156,7 @@ impl ObjectStoreApi for InMemory {
|
|||
})
|
||||
}
|
||||
|
||||
fn cache(&self) -> Option<&dyn Cache> {
|
||||
fn cache(&self) -> Option<&Self::Cache> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
//! This module contains the IOx implementation for wrapping existing object store types into an artificial "sleep" wrapper.
|
||||
use std::{convert::TryInto, io, sync::Arc};
|
||||
|
||||
use crate::cache::Cache;
|
||||
use crate::{ListResult, ObjectStoreApi, Result};
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
|
@ -107,6 +106,8 @@ impl<T: ObjectStoreApi> ThrottledStore<T> {
|
|||
|
||||
#[async_trait]
|
||||
impl<T: ObjectStoreApi> ObjectStoreApi for ThrottledStore<T> {
|
||||
type Cache = T::Cache;
|
||||
|
||||
type Path = T::Path;
|
||||
|
||||
type Error = T::Error;
|
||||
|
@ -226,7 +227,7 @@ impl<T: ObjectStoreApi> ObjectStoreApi for ThrottledStore<T> {
|
|||
}
|
||||
}
|
||||
|
||||
fn cache(&self) -> Option<&dyn Cache> {
|
||||
fn cache(&self) -> Option<&Self::Cache> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue