feat: add skeleton for object store file cache

pull/24376/head
Paul Dix 2021-07-20 19:34:35 -04:00
parent e1b2909818
commit d0ea812041
10 changed files with 136 additions and 3 deletions

View File

@ -2,6 +2,7 @@
//! store.
use crate::{
buffer::slurp_stream_tempfile,
cache::{Cache, LocalFSCache},
path::{cloud::CloudPath, DELIMITER},
ListResult, ObjectMeta, ObjectStoreApi,
};
@ -285,6 +286,10 @@ impl ObjectStoreApi for AmazonS3 {
async fn list_with_delimiter(&self, prefix: &Self::Path) -> Result<ListResult<Self::Path>> {
self.list_with_delimiter_and_token(prefix, &None).await
}
fn cache(&self) -> Option<&dyn Cache> {
todo!()
}
}
/// Configure a connection to Amazon S3 using the specified credentials in

View File

@ -1,6 +1,7 @@
//! This module contains the IOx implementation for using Azure Blob storage as
//! the object store.
use crate::{
cache::{Cache, LocalFSCache},
path::{cloud::CloudPath, DELIMITER},
ListResult, ObjectMeta, ObjectStoreApi,
};
@ -239,6 +240,10 @@ impl ObjectStoreApi for MicrosoftAzure {
objects,
})
}
fn cache(&self) -> Option<&dyn Cache> {
todo!()
}
}
/// Configure a connection to container with given name on Microsoft Azure

62
object_store/src/cache.rs Normal file
View File

@ -0,0 +1,62 @@
//! This module contains a trait and implementation for caching object storage objects
//! in the local filesystem. In the case of the disk backed object store implementation,
//! it yields locations to its files for cache locations and no-ops any cache modifications.
use crate::path::Path;
use crate::ObjectStore;
use async_trait::async_trait;
use snafu::Snafu;
use std::sync::Arc;
/// Result for the cache
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// A specialized `Error` for Cache related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display("unable to evict '{}' from the local cache", name))]
UnableToEvict { name: String },
}
/// Defines an LRU cache with local file locations for objects from object store.
#[async_trait]
pub trait Cache {
/// Evicts an object from the local filesystem cache.
fn evict(&self, path: &Path) -> Result<()>;
/// Returns the local filesystem path for the given object. If it isn't present, this
/// will get the object from object storage and write it to the local filesystem cache.
/// If the cache is over its limit, it will evict other cached objects based on an LRU
/// policy.
async fn fs_path_or_cache(&self, path: &Path, store: Arc<ObjectStore>) -> Result<&str>;
/// The size in bytes of all files in the cache.
fn size(&self) -> u64;
/// The user configured limit in bytes for all files in the cache.
fn limit(&self) -> u64;
}
/// Implementation of the local file system cache that keeps the LRU stats and
/// performs any evictions to load new objects in.
pub(crate) struct LocalFSCache {}
#[async_trait]
impl Cache for LocalFSCache {
fn evict(&self, _path: &Path) -> Result<()> {
todo!()
}
async fn fs_path_or_cache(&self, _path: &Path, _store: Arc<ObjectStore>) -> Result<&str> {
todo!()
}
fn size(&self) -> u64 {
todo!()
}
fn limit(&self) -> u64 {
todo!()
}
}

View File

@ -1,6 +1,8 @@
//! This module contains the IOx implementation for using local disk as the
//! object store.
use crate::{path::file::FilePath, ListResult, ObjectMeta, ObjectStoreApi};
use crate::cache::Cache;
use crate::path::Path;
use crate::{path::file::FilePath, ListResult, ObjectMeta, ObjectStore, ObjectStoreApi};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{
@ -8,6 +10,7 @@ use futures::{
Stream, StreamExt, TryStreamExt,
};
use snafu::{ensure, futures::TryStreamExt as _, OptionExt, ResultExt, Snafu};
use std::sync::Arc;
use std::{collections::BTreeSet, convert::TryFrom, io, path::PathBuf};
use tokio::fs;
use tokio_util::codec::{BytesCodec, FramedRead};
@ -247,6 +250,33 @@ impl ObjectStoreApi for File {
objects,
})
}
fn cache(&self) -> Option<&dyn Cache> {
Some(self)
}
}
#[async_trait]
impl Cache for File {
fn evict(&self, _path: &Path) -> crate::cache::Result<()> {
todo!()
}
async fn fs_path_or_cache(
&self,
_path: &Path,
_store: Arc<ObjectStore>,
) -> crate::cache::Result<&str> {
todo!()
}
fn size(&self) -> u64 {
todo!()
}
fn limit(&self) -> u64 {
todo!()
}
}
/// Convert walkdir results and converts not-found errors into `None`.

View File

@ -3,7 +3,7 @@
use async_trait::async_trait;
use snafu::Snafu;
use crate::{path::cloud::CloudPath, ObjectStoreApi};
use crate::{cache::Cache, path::cloud::CloudPath, ObjectStoreApi};
/// A specialized `Error` for Azure object store-related errors
#[derive(Debug, Snafu, Clone)]
@ -84,6 +84,10 @@ impl ObjectStoreApi for DummyObjectStore {
) -> crate::Result<crate::ListResult<Self::Path>, Self::Error> {
NotSupported { name: &self.name }.fail()
}
fn cache(&self) -> Option<&dyn Cache> {
todo!()
}
}
/// Stub when s3 is not configured

View File

@ -1,6 +1,7 @@
//! This module contains the IOx implementation for using Google Cloud Storage
//! as the object store.
use crate::{
cache::{Cache, LocalFSCache},
path::{cloud::CloudPath, DELIMITER},
ListResult, ObjectMeta, ObjectStoreApi,
};
@ -249,6 +250,10 @@ impl ObjectStoreApi for GoogleCloudStorage {
Ok(result)
}
fn cache(&self) -> Option<&dyn Cache> {
todo!()
}
}
/// Configure a connection to Google Cloud Storage.

View File

@ -29,6 +29,7 @@ mod memory;
pub mod path;
mod throttle;
pub mod cache;
pub mod dummy;
#[cfg(not(feature = "aws"))]
@ -49,6 +50,7 @@ use throttle::ThrottledStore;
/// Publically expose throttling configuration
pub use throttle::ThrottleConfig;
use crate::cache::Cache;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
@ -100,6 +102,9 @@ pub trait ObjectStoreApi: Send + Sync + 'static {
&self,
prefix: &Self::Path,
) -> Result<ListResult<Self::Path>, Self::Error>;
/// Return the local filesystem cache, if configured, for this object store.
fn cache(&self) -> Option<&dyn Cache>;
}
/// Universal interface to multiple object store services.
@ -413,6 +418,10 @@ impl ObjectStoreApi for ObjectStore {
_ => unreachable!(),
}
}
fn cache(&self) -> Option<&dyn Cache> {
todo!()
}
}
/// All supported object storage integrations

View File

@ -1,7 +1,8 @@
//! This module contains the IOx implementation for using memory as the object
//! store.
use crate::{
path::parsed::DirsAndFileName, ListResult, ObjectMeta, ObjectStoreApi, ObjectStorePath,
cache::Cache, path::parsed::DirsAndFileName, ListResult, ObjectMeta, ObjectStoreApi,
ObjectStorePath,
};
use async_trait::async_trait;
use bytes::Bytes;
@ -153,6 +154,10 @@ impl ObjectStoreApi for InMemory {
next_token: None,
})
}
fn cache(&self) -> Option<&dyn Cache> {
todo!()
}
}
impl InMemory {

View File

@ -1,6 +1,7 @@
//! This module contains the IOx implementation for wrapping existing object store types into an artificial "sleep" wrapper.
use std::{convert::TryInto, io, sync::Arc};
use crate::cache::Cache;
use crate::{ListResult, ObjectStoreApi, Result};
use async_trait::async_trait;
use bytes::Bytes;
@ -224,6 +225,10 @@ impl<T: ObjectStoreApi> ObjectStoreApi for ThrottledStore<T> {
Err(err) => Err(err),
}
}
fn cache(&self) -> Option<&dyn Cache> {
todo!()
}
}
/// Saturated `usize` to `u32` cast.

View File

@ -278,6 +278,9 @@ impl Storage {
// Limit of total rows to read
let limit: Option<usize> = None; // Todo: this should be a parameter of the function
// todo(paul): Here is where I'd get the cache from object store. If it has
// one, I'd do the `fs_path_or_cache`. Otherwise, do the temp file like below.
// read parquet file to local file
let mut temp_file = tempfile::Builder::new()
.prefix("iox-parquet-cache")