From d0ea81204111ab94ef037571763848d2173a5600 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Tue, 20 Jul 2021 19:34:35 -0400 Subject: [PATCH] feat: add skeleton for object store file cache --- object_store/src/aws.rs | 5 +++ object_store/src/azure.rs | 5 +++ object_store/src/cache.rs | 62 ++++++++++++++++++++++++++++++++++++ object_store/src/disk.rs | 32 ++++++++++++++++++- object_store/src/dummy.rs | 6 +++- object_store/src/gcp.rs | 5 +++ object_store/src/lib.rs | 9 ++++++ object_store/src/memory.rs | 7 +++- object_store/src/throttle.rs | 5 +++ parquet_file/src/storage.rs | 3 ++ 10 files changed, 136 insertions(+), 3 deletions(-) create mode 100644 object_store/src/cache.rs diff --git a/object_store/src/aws.rs b/object_store/src/aws.rs index 94d66986b7..73c8ce7fc6 100644 --- a/object_store/src/aws.rs +++ b/object_store/src/aws.rs @@ -2,6 +2,7 @@ //! store. use crate::{ buffer::slurp_stream_tempfile, + cache::{Cache, LocalFSCache}, path::{cloud::CloudPath, DELIMITER}, ListResult, ObjectMeta, ObjectStoreApi, }; @@ -285,6 +286,10 @@ impl ObjectStoreApi for AmazonS3 { async fn list_with_delimiter(&self, prefix: &Self::Path) -> Result> { self.list_with_delimiter_and_token(prefix, &None).await } + + fn cache(&self) -> Option<&dyn Cache> { + todo!() + } } /// Configure a connection to Amazon S3 using the specified credentials in diff --git a/object_store/src/azure.rs b/object_store/src/azure.rs index 7fc5b815ed..2eaf74f320 100644 --- a/object_store/src/azure.rs +++ b/object_store/src/azure.rs @@ -1,6 +1,7 @@ //! This module contains the IOx implementation for using Azure Blob storage as //! the object store. use crate::{ + cache::{Cache, LocalFSCache}, path::{cloud::CloudPath, DELIMITER}, ListResult, ObjectMeta, ObjectStoreApi, }; @@ -239,6 +240,10 @@ impl ObjectStoreApi for MicrosoftAzure { objects, }) } + + fn cache(&self) -> Option<&dyn Cache> { + todo!() + } } /// Configure a connection to container with given name on Microsoft Azure diff --git a/object_store/src/cache.rs b/object_store/src/cache.rs new file mode 100644 index 0000000000..11d2c09a99 --- /dev/null +++ b/object_store/src/cache.rs @@ -0,0 +1,62 @@ +//! This module contains a trait and implementation for caching object storage objects +//! in the local filesystem. In the case of the disk backed object store implementation, +//! it yields locations to its files for cache locations and no-ops any cache modifications. + +use crate::path::Path; +use crate::ObjectStore; +use async_trait::async_trait; +use snafu::Snafu; +use std::sync::Arc; + +/// Result for the cache +pub type Result = std::result::Result; + +/// A specialized `Error` for Cache related errors +#[derive(Debug, Snafu)] +#[allow(missing_docs)] +pub enum Error { + #[snafu(display("unable to evict '{}' from the local cache", name))] + UnableToEvict { name: String }, +} + +/// Defines an LRU cache with local file locations for objects from object store. +#[async_trait] +pub trait Cache { + /// Evicts an object from the local filesystem cache. + fn evict(&self, path: &Path) -> Result<()>; + + /// Returns the local filesystem path for the given object. If it isn't present, this + /// will get the object from object storage and write it to the local filesystem cache. + /// If the cache is over its limit, it will evict other cached objects based on an LRU + /// policy. + async fn fs_path_or_cache(&self, path: &Path, store: Arc) -> Result<&str>; + + /// The size in bytes of all files in the cache. + fn size(&self) -> u64; + + /// The user configured limit in bytes for all files in the cache. + fn limit(&self) -> u64; +} + +/// Implementation of the local file system cache that keeps the LRU stats and +/// performs any evictions to load new objects in. +pub(crate) struct LocalFSCache {} + +#[async_trait] +impl Cache for LocalFSCache { + fn evict(&self, _path: &Path) -> Result<()> { + todo!() + } + + async fn fs_path_or_cache(&self, _path: &Path, _store: Arc) -> Result<&str> { + todo!() + } + + fn size(&self) -> u64 { + todo!() + } + + fn limit(&self) -> u64 { + todo!() + } +} diff --git a/object_store/src/disk.rs b/object_store/src/disk.rs index 089b82e6b8..3d7a818f56 100644 --- a/object_store/src/disk.rs +++ b/object_store/src/disk.rs @@ -1,6 +1,8 @@ //! This module contains the IOx implementation for using local disk as the //! object store. -use crate::{path::file::FilePath, ListResult, ObjectMeta, ObjectStoreApi}; +use crate::cache::Cache; +use crate::path::Path; +use crate::{path::file::FilePath, ListResult, ObjectMeta, ObjectStore, ObjectStoreApi}; use async_trait::async_trait; use bytes::Bytes; use futures::{ @@ -8,6 +10,7 @@ use futures::{ Stream, StreamExt, TryStreamExt, }; use snafu::{ensure, futures::TryStreamExt as _, OptionExt, ResultExt, Snafu}; +use std::sync::Arc; use std::{collections::BTreeSet, convert::TryFrom, io, path::PathBuf}; use tokio::fs; use tokio_util::codec::{BytesCodec, FramedRead}; @@ -247,6 +250,33 @@ impl ObjectStoreApi for File { objects, }) } + + fn cache(&self) -> Option<&dyn Cache> { + Some(self) + } +} + +#[async_trait] +impl Cache for File { + fn evict(&self, _path: &Path) -> crate::cache::Result<()> { + todo!() + } + + async fn fs_path_or_cache( + &self, + _path: &Path, + _store: Arc, + ) -> crate::cache::Result<&str> { + todo!() + } + + fn size(&self) -> u64 { + todo!() + } + + fn limit(&self) -> u64 { + todo!() + } } /// Convert walkdir results and converts not-found errors into `None`. diff --git a/object_store/src/dummy.rs b/object_store/src/dummy.rs index c0ab46c794..5d8d3b090c 100644 --- a/object_store/src/dummy.rs +++ b/object_store/src/dummy.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use snafu::Snafu; -use crate::{path::cloud::CloudPath, ObjectStoreApi}; +use crate::{cache::Cache, path::cloud::CloudPath, ObjectStoreApi}; /// A specialized `Error` for Azure object store-related errors #[derive(Debug, Snafu, Clone)] @@ -84,6 +84,10 @@ impl ObjectStoreApi for DummyObjectStore { ) -> crate::Result, Self::Error> { NotSupported { name: &self.name }.fail() } + + fn cache(&self) -> Option<&dyn Cache> { + todo!() + } } /// Stub when s3 is not configured diff --git a/object_store/src/gcp.rs b/object_store/src/gcp.rs index f5bc49385a..4a4a201fc5 100644 --- a/object_store/src/gcp.rs +++ b/object_store/src/gcp.rs @@ -1,6 +1,7 @@ //! This module contains the IOx implementation for using Google Cloud Storage //! as the object store. use crate::{ + cache::{Cache, LocalFSCache}, path::{cloud::CloudPath, DELIMITER}, ListResult, ObjectMeta, ObjectStoreApi, }; @@ -249,6 +250,10 @@ impl ObjectStoreApi for GoogleCloudStorage { Ok(result) } + + fn cache(&self) -> Option<&dyn Cache> { + todo!() + } } /// Configure a connection to Google Cloud Storage. diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 2f29fc11b0..dfe96d22e8 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -29,6 +29,7 @@ mod memory; pub mod path; mod throttle; +pub mod cache; pub mod dummy; #[cfg(not(feature = "aws"))] @@ -49,6 +50,7 @@ use throttle::ThrottledStore; /// Publically expose throttling configuration pub use throttle::ThrottleConfig; +use crate::cache::Cache; use async_trait::async_trait; use bytes::Bytes; use chrono::{DateTime, Utc}; @@ -100,6 +102,9 @@ pub trait ObjectStoreApi: Send + Sync + 'static { &self, prefix: &Self::Path, ) -> Result, Self::Error>; + + /// Return the local filesystem cache, if configured, for this object store. + fn cache(&self) -> Option<&dyn Cache>; } /// Universal interface to multiple object store services. @@ -413,6 +418,10 @@ impl ObjectStoreApi for ObjectStore { _ => unreachable!(), } } + + fn cache(&self) -> Option<&dyn Cache> { + todo!() + } } /// All supported object storage integrations diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 95e24cf198..8f8fc4567b 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -1,7 +1,8 @@ //! This module contains the IOx implementation for using memory as the object //! store. use crate::{ - path::parsed::DirsAndFileName, ListResult, ObjectMeta, ObjectStoreApi, ObjectStorePath, + cache::Cache, path::parsed::DirsAndFileName, ListResult, ObjectMeta, ObjectStoreApi, + ObjectStorePath, }; use async_trait::async_trait; use bytes::Bytes; @@ -153,6 +154,10 @@ impl ObjectStoreApi for InMemory { next_token: None, }) } + + fn cache(&self) -> Option<&dyn Cache> { + todo!() + } } impl InMemory { diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index 52f064fbc3..64f1656f9b 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -1,6 +1,7 @@ //! This module contains the IOx implementation for wrapping existing object store types into an artificial "sleep" wrapper. use std::{convert::TryInto, io, sync::Arc}; +use crate::cache::Cache; use crate::{ListResult, ObjectStoreApi, Result}; use async_trait::async_trait; use bytes::Bytes; @@ -224,6 +225,10 @@ impl ObjectStoreApi for ThrottledStore { Err(err) => Err(err), } } + + fn cache(&self) -> Option<&dyn Cache> { + todo!() + } } /// Saturated `usize` to `u32` cast. diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 5690d87048..ebe9052d18 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -278,6 +278,9 @@ impl Storage { // Limit of total rows to read let limit: Option = None; // Todo: this should be a parameter of the function + // todo(paul): Here is where I'd get the cache from object store. If it has + // one, I'd do the `fs_path_or_cache`. Otherwise, do the temp file like below. + // read parquet file to local file let mut temp_file = tempfile::Builder::new() .prefix("iox-parquet-cache")