diff --git a/Cargo.lock b/Cargo.lock index f41eed1208..f7ba4f732a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -966,12 +966,6 @@ dependencies = [ "workspace-hack", ] -[[package]] -name = "clru" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbd0f76e066e64fdc5631e3bb46381254deab9ef1158292f27c8c57e3bf3fe59" - [[package]] name = "colorchoice" version = "1.0.2" @@ -2822,15 +2816,16 @@ dependencies = [ name = "influxdb3_write" version = "0.1.0" dependencies = [ + "anyhow", "arrow", "arrow_util", "async-trait", "byteorder", "bytes", "chrono", - "clru", "crc32fast", "crossbeam-channel", + "dashmap", "data_types", "datafusion", "datafusion_util", diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index abfb9a6a19..63877620a3 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -260,10 +260,17 @@ pub async fn command(config: Config) -> Result<()> { let object_store: Arc = make_object_store(&config.object_store_config).map_err(Error::ObjectStoreParsing)?; - // TODO(trevor): make this configurable/optional: + let time_provider = Arc::new(SystemProvider::new()); + + // TODO(trevor): make the cache capacity and prune percent configurable/optional: let cache_capacity = 1024 * 1024 * 1024; - let (object_store, parquet_cache) = - create_cached_obj_store_and_oracle(object_store, cache_capacity); + let prune_percent = 0.1; + let (object_store, parquet_cache) = create_cached_obj_store_and_oracle( + object_store, + Arc::clone(&time_provider) as _, + cache_capacity, + prune_percent, + ); let trace_exporter = config.tracing_config.build()?; @@ -322,7 +329,6 @@ pub async fn command(config: Config) -> Result<()> { snapshot_size: config.wal_snapshot_size, }; - let time_provider = Arc::new(SystemProvider::new()); let catalog = persister .load_or_create_catalog() .await diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 2735d2dcbd..140c6b47fa 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -745,7 +745,9 @@ mod tests { let common_state = crate::CommonServerState::new(Arc::clone(&metrics), None, trace_header_parser).unwrap(); let object_store: Arc = Arc::new(object_store::memory::InMemory::new()); - let (object_store, parquet_cache) = test_cached_obj_store_and_oracle(object_store); + let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(start_time))); + let (object_store, parquet_cache) = + test_cached_obj_store_and_oracle(object_store, Arc::clone(&time_provider) as _); let parquet_store = ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3")); let exec = Arc::new(Executor::new_with_config_and_executor( @@ -761,7 +763,6 @@ mod tests { DedicatedExecutor::new_testing(), )); let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host")); - let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(start_time))); let dummy_host_id = Arc::from("dummy-host-id"); let instance_id = Arc::from("dummy-instance-id"); diff --git a/influxdb3_server/src/query_executor.rs b/influxdb3_server/src/query_executor.rs index 58a6feecee..d238f48f50 100644 --- a/influxdb3_server/src/query_executor.rs +++ b/influxdb3_server/src/query_executor.rs @@ -630,9 +630,10 @@ mod tests { // Set up QueryExecutor let object_store: Arc = Arc::new(LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap()); - let (object_store, parquet_cache) = test_cached_obj_store_and_oracle(object_store); - let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host")); let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let (object_store, parquet_cache) = + test_cached_obj_store_and_oracle(object_store, Arc::clone(&time_provider) as _); + let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host")); let executor = make_exec(Arc::clone(&object_store)); let host_id = Arc::from("dummy-host-id"); let instance_id = Arc::from("instance-id"); diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index 0c11c21581..9fb21822ff 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -24,14 +24,15 @@ influxdb3_id = { path = "../influxdb3_id" } influxdb3_wal = { path = "../influxdb3_wal" } # crates.io dependencies +anyhow.workspace = true arrow.workspace = true async-trait.workspace = true byteorder.workspace = true bytes.workspace = true chrono.workspace = true -clru.workspace = true crc32fast.workspace = true crossbeam-channel.workspace = true +dashmap.workspace = true datafusion.workspace = true futures.workspace = true futures-util.workspace = true diff --git a/influxdb3_write/src/last_cache/mod.rs b/influxdb3_write/src/last_cache/mod.rs index f8d8478ff2..b40f0137f8 100644 --- a/influxdb3_write/src/last_cache/mod.rs +++ b/influxdb3_write/src/last_cache/mod.rs @@ -1579,13 +1579,15 @@ mod tests { use influxdb3_id::DbId; use influxdb3_wal::{LastCacheDefinition, WalConfig}; use insta::assert_json_snapshot; - use iox_time::{MockProvider, Time}; + use iox_time::{MockProvider, Time, TimeProvider}; async fn setup_write_buffer() -> WriteBufferImpl { let obj_store: Arc = Arc::new(InMemory::new()); - let (obj_store, parquet_cache) = test_cached_obj_store_and_oracle(obj_store); + let time_provider: Arc = + Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let (obj_store, parquet_cache) = + test_cached_obj_store_and_oracle(obj_store, Arc::clone(&time_provider)); let persister = Arc::new(Persister::new(obj_store, "test_host")); - let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); let host_id = Arc::from("dummy-host-id"); let instance_id = Arc::from("dummy-instance-id"); WriteBufferImpl::new( diff --git a/influxdb3_write/src/parquet_cache/mod.rs b/influxdb3_write/src/parquet_cache/mod.rs index 23687f8da0..5cbe481dee 100644 --- a/influxdb3_write/src/parquet_cache/mod.rs +++ b/influxdb3_write/src/parquet_cache/mod.rs @@ -1,24 +1,41 @@ //! An in-memory cache of Parquet files that are persisted to object storage use std::{ - fmt::Debug, hash::RandomState, num::NonZeroUsize, ops::Range, sync::Arc, time::Duration, + collections::BinaryHeap, + fmt::Debug, + ops::Range, + sync::{ + atomic::{AtomicI64, AtomicUsize, Ordering}, + Arc, + }, + time::Duration, }; +use anyhow::bail; use async_trait::async_trait; use bytes::Bytes; -use clru::{CLruCache, CLruCacheConfig, WeightScale}; -use futures::{StreamExt, TryStreamExt}; -use futures_util::stream::BoxStream; +use dashmap::{DashMap, Entry}; +use futures::{ + future::{BoxFuture, Shared}, + stream::BoxStream, + FutureExt, StreamExt, TryStreamExt, +}; +use iox_time::TimeProvider; use object_store::{ path::Path, Error, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, - Result as ObjectStoreResult, }; -use observability_deps::tracing::{error, info}; +use observability_deps::tracing::{error, info, warn}; use tokio::sync::{ mpsc::{channel, Receiver, Sender}, - oneshot, Mutex, + oneshot, }; +/// Shared future type for cache values that are being fetched +type SharedCacheValueFuture = Shared, DynError>>>; + +/// Dynamic error type that can be cloned +type DynError = Arc; + /// A request to fetch an item at the given `path` from an object store /// /// Contains a notifier to notify the caller that registers the cache request when the item @@ -84,9 +101,16 @@ impl ParquetCacheOracle for MemCacheOracle { /// that returns them as their `Arc` equivalent. pub fn create_cached_obj_store_and_oracle( object_store: Arc, + time_provider: Arc, cache_capacity: usize, + prune_percent: f64, ) -> (Arc, Arc) { - let store = Arc::new(MemCachedObjectStore::new(object_store, cache_capacity)); + let store = Arc::new(MemCachedObjectStore::new( + object_store, + cache_capacity, + time_provider, + prune_percent, + )); let oracle = Arc::new(MemCacheOracle::new(Arc::clone(&store))); (store, oracle) } @@ -94,11 +118,12 @@ pub fn create_cached_obj_store_and_oracle( /// Create a test cached object store with a cache capacity of 1GB pub fn test_cached_obj_store_and_oracle( object_store: Arc, + time_provider: Arc, ) -> (Arc, Arc) { - create_cached_obj_store_and_oracle(object_store, 1024 * 1024 * 1024) + create_cached_obj_store_and_oracle(object_store, time_provider, 1024 * 1024 * 1024, 0.1) } -/// An entry in the cache, containing the actual bytes as well as object store metadata +/// A value in the cache, containing the actual bytes as well as object store metadata #[derive(Debug)] struct CacheValue { data: Bytes, @@ -108,59 +133,263 @@ struct CacheValue { impl CacheValue { /// Get the size of the cache value's memory footprint in bytes fn size(&self) -> usize { - // TODO(trevor): could also calculate the size of the metadata... - self.data.len() + let Self { + data, + meta: + ObjectMeta { + location, + last_modified: _, + size: _, + e_tag, + version, + }, + } = self; + + data.len() + + location.as_ref().len() + + e_tag.as_ref().map(|s| s.capacity()).unwrap_or_default() + + version.as_ref().map(|s| s.capacity()).unwrap_or_default() + } + + /// Fetch the value from an object store + async fn fetch(store: Arc, path: Path) -> object_store::Result { + let res = store.get(&path).await?; + let meta = res.meta.clone(); + let data = res.bytes().await?; + Ok(Self { data, meta }) + } +} + +/// Holds the state and hit time for an entry in the cache +#[derive(Debug)] +struct CacheEntry { + state: CacheEntryState, + /// The nano-second timestamp of when this value was last hit + hit_time: AtomicI64, +} + +impl CacheEntry { + /// Get the approximate memory footprint of this entry in bytes + fn size(&self) -> usize { + self.state.size() + std::mem::size_of::() + } + + fn is_fetching(&self) -> bool { + matches!(self.state, CacheEntryState::Fetching(_)) + } + + fn is_success(&self) -> bool { + matches!(self.state, CacheEntryState::Success(_)) } } /// The state of a cache entry -#[derive(Debug)] -enum CacheEntry { +/// +/// This implements `Clone` so that a reference to the entry in the `Cache` does not need to be +/// held for long. +#[derive(Debug, Clone)] +enum CacheEntryState { /// The cache entry is being fetched from object store - Fetching, + Fetching(SharedCacheValueFuture), /// The cache entry was successfully fetched and is stored in the cache as a [`CacheValue`] Success(Arc), - /// The request to the object store failed - Failed, - /// The cache entry was deleted - Deleted, - /// The object is too large for the cache - TooLarge, } -impl CacheEntry { - /// Get the size of thje cache entry in bytes +impl CacheEntryState { + /// Get the approximate size of the cache entry in bytes fn size(&self) -> usize { match self { - CacheEntry::Fetching => 0, - CacheEntry::Success(v) => v.size(), - CacheEntry::Failed => 0, - CacheEntry::Deleted => 0, - CacheEntry::TooLarge => 0, + CacheEntryState::Fetching(_) => 0, + CacheEntryState::Success(v) => v.size(), } } - fn is_fetching(&self) -> bool { - matches!(self, CacheEntry::Fetching) - } - - fn is_success(&self) -> bool { - matches!(self, CacheEntry::Success(_)) - } - - fn keep(&self) -> bool { - self.is_fetching() || self.is_success() + /// Get the value in this state, or wait for it if it is still fetching + /// + /// This takes `self` as it is meant to be used on an entry's state that has been cloned. + async fn value(self) -> object_store::Result> { + match self { + CacheEntryState::Fetching(fut) => fut.await.map_err(|e| Error::Generic { + store: STORE_NAME, + source: Box::new(e), + }), + CacheEntryState::Success(v) => Ok(v), + } } } -/// Implements the [`WeightScale`] trait to determine a [`CacheEntry`]'s size on insertion to -/// the cache +/// A cache for storing objects from object storage by their [`Path`] +/// +/// This acts as a Least-Recently-Used (LRU) cache that allows for concurrent reads and writes. See +/// the [`Cache::prune`] method for implementation of how the cache entries are pruned. Pruning must +/// be invoked externally, e.g., on an interval. #[derive(Debug)] -struct CacheEntryScale; +struct Cache { + /// The maximum amount of memory this cache should occupy in bytes + capacity: usize, + /// The current amount of memory being used by the cache in bytes + used: AtomicUsize, + /// What percentage of the total number of cache entries will be pruned during a pruning operation + prune_percent: f64, + /// The map storing cache entries + map: DashMap, + /// Provides timestamps for updating the hit time of each cache entry + time_provider: Arc, +} -impl WeightScale for CacheEntryScale { - fn weight(&self, key: &Path, value: &CacheEntry) -> usize { - key.as_ref().len() + value.size() +impl Cache { + /// Create a new cache with a given capacity and prune percent + fn new(capacity: usize, prune_percent: f64, time_provider: Arc) -> Self { + Self { + capacity, + used: AtomicUsize::new(0), + prune_percent, + map: DashMap::new(), + time_provider, + } + } + + /// Get an entry in the cache or `None` if there is not an entry + /// + /// This updates the hit time of the entry and returns a cloned copy of the entry state so that + /// the reference into the map is dropped + fn get(&self, path: &Path) -> Option { + let entry = self.map.get(path)?; + if entry.is_success() { + entry + .hit_time + .store(self.time_provider.now().timestamp_nanos(), Ordering::SeqCst); + } + Some(entry.state.clone()) + } + + /// Check if an entry in the cache is in process of being fetched or if it was already fetched + /// successfully + /// + /// This does not update the hit time of the entry + fn path_already_fetched(&self, path: &Path) -> bool { + self.map.get(path).is_some() + } + + /// Insert a `Fetching` entry to the cache along with the shared future for polling the value + /// being fetched + fn set_fetching(&self, path: &Path, fut: SharedCacheValueFuture) { + let entry = CacheEntry { + state: CacheEntryState::Fetching(fut), + hit_time: AtomicI64::new(self.time_provider.now().timestamp_nanos()), + }; + let additional = entry.size(); + self.map.insert(path.clone(), entry); + self.used.fetch_add(additional, Ordering::SeqCst); + } + + /// Update a `Fetching` entry to a `Success` entry in the cache + fn set_success(&self, path: &Path, value: Arc) -> Result<(), anyhow::Error> { + match self.map.entry(path.clone()) { + Entry::Occupied(mut o) => { + let entry = o.get_mut(); + if !entry.is_fetching() { + // NOTE(trevor): the only other state is Success, so bailing here just + // means that we leave the entry alone, and since objects in the store are + // treated as immutable, this should be okay. + bail!("attempted to store value in non-fetching cache entry"); + } + entry.state = CacheEntryState::Success(value); + entry + .hit_time + .store(self.time_provider.now().timestamp_nanos(), Ordering::SeqCst); + // TODO(trevor): what if size is greater than cache capacity? + let additional = entry.size(); + self.used.fetch_add(additional, Ordering::SeqCst); + Ok(()) + } + Entry::Vacant(_) => bail!("attempted to set success state on an empty cache entry"), + } + } + + /// Remove an entry from the cache, as well as its associated size from the used capacity + fn remove(&self, path: &Path) { + let Some((_, entry)) = self.map.remove(path) else { + return; + }; + self.used.fetch_sub(entry.state.size(), Ordering::SeqCst); + } + + /// Prune least recently hit entries from the cache + /// + /// This is a no-op if the `used` amount on the cache is not >= its `capacity` + fn prune(&self) { + let used = self.used.load(Ordering::SeqCst); + if used < self.capacity { + return; + } + let n_to_prune = (self.map.len() as f64 * self.prune_percent).floor() as usize; + // use a BinaryHeap to determine the cut-off time, at which, entries that were + // last hit before that time will be pruned: + let mut prune_heap = BinaryHeap::with_capacity(n_to_prune); + + for map_ref in self.map.iter() { + let hit_time = map_ref.value().hit_time.load(Ordering::SeqCst); + let size = map_ref.value().size(); + let path = map_ref.key().as_ref(); + if prune_heap.len() < n_to_prune { + // if the heap isn't full yet, throw this item on: + prune_heap.push(PruneHeapItem { + hit_time, + path_ref: path.into(), + size, + }); + } else if hit_time < prune_heap.peek().map(|item| item.hit_time).unwrap() { + // otherwise, the heap is at its capacity, so only push if the hit_time + // in question is older than the top of the heap (after pop'ing the top + // of the heap to make room) + prune_heap.pop(); + prune_heap.push(PruneHeapItem { + path_ref: path.into(), + hit_time, + size, + }); + } + } + + // track the total size of entries that get freed: + let mut freed = 0; + // drop entries with hit times before the cut-off: + for item in prune_heap { + self.map.remove(&Path::from(item.path_ref.as_ref())); + freed += item.size; + } + // update used mem size with freed amount: + self.used.fetch_sub(freed, Ordering::SeqCst); + } +} + +/// An item that stores what is needed for pruning [`CacheEntry`]s +#[derive(Debug, Eq)] +struct PruneHeapItem { + /// Reference to the entry's `Path` key + path_ref: Arc, + /// Entry's hit time for comparison and heap insertion + hit_time: i64, + /// Entry size used to calculate the amount of memory freed after a prune + size: usize, +} + +impl PartialEq for PruneHeapItem { + fn eq(&self, other: &Self) -> bool { + self.hit_time.eq(&other.hit_time) + } +} + +impl PartialOrd for PruneHeapItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.hit_time.cmp(&other.hit_time)) + } +} + +impl Ord for PruneHeapItem { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.hit_time.cmp(&other.hit_time) } } @@ -168,63 +397,26 @@ impl WeightScale for CacheEntryScale { const STORE_NAME: &str = "mem_cached_object_store"; /// An object store with an associated cache that can serve GET-style requests using the cache -/// -/// The least-recently used (LRU) entries will be evicted when new entries are inserted, if the -/// new entry would exceed the cache's memory capacity #[derive(Debug)] pub struct MemCachedObjectStore { /// An inner object store for which items will be cached inner: Arc, - /// A weighted LRU cache for storing the objects associated with a given path in memory - // NOTE(trevor): this uses a mutex as the CLruCache type needs &mut self for its get method, so - // we always need an exclusive lock on the cache. If this creates a performance bottleneck then - // we will need to look for alternatives. - // - // A Tokio mutex is used to prevent blocking the thread while waiting for a lock, and so that - // the lock can be held accross await points. - cache: Arc>>, + cache: Arc, } impl MemCachedObjectStore { - /// Create a new [`MemCachedObjectStore`] with the given memory capacity - fn new(inner: Arc, memory_capacity: usize) -> Self { - let cache = CLruCache::with_config( - CLruCacheConfig::new(NonZeroUsize::new(memory_capacity).unwrap()) - .with_scale(CacheEntryScale), - ); + /// Create a new [`MemCachedObjectStore`] + fn new( + inner: Arc, + memory_capacity: usize, + time_provider: Arc, + prune_percent: f64, + ) -> Self { Self { inner, - cache: Arc::new(Mutex::new(cache)), + cache: Arc::new(Cache::new(memory_capacity, prune_percent, time_provider)), } } - - /// Get an entry in the cache if it contains a successful fetch result, or `None` otherwise - /// - /// This requires `&mut self` as the underlying method on the cache requires a mutable reference - /// in order to update the recency of the entry in the cache - async fn get_cache_value(&self, path: &Path) -> Option> { - self.cache - .lock() - .await - .get(path) - .and_then(|entry| match entry { - CacheEntry::Fetching - | CacheEntry::Failed - | CacheEntry::Deleted - | CacheEntry::TooLarge => None, - CacheEntry::Success(v) => Some(Arc::clone(v)), - }) - } - - /// Set the state of a cache entry to `Deleted`, since we cannot remove elements from the - /// cache directly. - async fn delete_cache_value(&self, path: &Path) { - let _ = self - .cache - .lock() - .await - .put_with_weight(path.clone(), CacheEntry::Deleted); - } } impl std::fmt::Display for MemCachedObjectStore { @@ -242,7 +434,7 @@ impl std::fmt::Display for MemCachedObjectStore { /// from the inner store. #[async_trait] impl ObjectStore for MemCachedObjectStore { - async fn put(&self, location: &Path, bytes: PutPayload) -> ObjectStoreResult { + async fn put(&self, location: &Path, bytes: PutPayload) -> object_store::Result { self.inner.put(location, bytes).await } @@ -251,11 +443,14 @@ impl ObjectStore for MemCachedObjectStore { location: &Path, bytes: PutPayload, opts: PutOptions, - ) -> ObjectStoreResult { + ) -> object_store::Result { self.inner.put_opts(location, bytes, opts).await } - async fn put_multipart(&self, location: &Path) -> ObjectStoreResult> { + async fn put_multipart( + &self, + location: &Path, + ) -> object_store::Result> { self.inner.put_multipart(location).await } @@ -263,14 +458,15 @@ impl ObjectStore for MemCachedObjectStore { &self, location: &Path, opts: PutMultipartOpts, - ) -> ObjectStoreResult> { + ) -> object_store::Result> { self.inner.put_multipart_opts(location, opts).await } /// Get an object from the object store. If this object is cached, then it will not make a request /// to the inner object store. - async fn get(&self, location: &Path) -> ObjectStoreResult { - if let Some(v) = self.get_cache_value(location).await { + async fn get(&self, location: &Path) -> object_store::Result { + if let Some(state) = self.cache.get(location) { + let v = state.value().await?; Ok(GetResult { payload: GetResultPayload::Stream( futures::stream::iter([Ok(v.data.clone())]).boxed(), @@ -284,13 +480,17 @@ impl ObjectStore for MemCachedObjectStore { } } - async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { // NOTE(trevor): this could probably be supported through the cache if we need it via the // ObjectMeta stored in the cache. For now this is conservative: self.inner.get_opts(location, options).await } - async fn get_range(&self, location: &Path, range: Range) -> ObjectStoreResult { + async fn get_range(&self, location: &Path, range: Range) -> object_store::Result { Ok(self .get_ranges(location, &[range]) .await? @@ -305,8 +505,9 @@ impl ObjectStore for MemCachedObjectStore { &self, location: &Path, ranges: &[Range], - ) -> ObjectStoreResult> { - if let Some(v) = self.get_cache_value(location).await { + ) -> object_store::Result> { + if let Some(state) = self.cache.get(location) { + let v = state.value().await?; ranges .iter() .map(|range| { @@ -339,8 +540,9 @@ impl ObjectStore for MemCachedObjectStore { } } - async fn head(&self, location: &Path) -> ObjectStoreResult { - if let Some(v) = self.get_cache_value(location).await { + async fn head(&self, location: &Path) -> object_store::Result { + if let Some(state) = self.cache.get(location) { + let v = state.value().await?; Ok(v.meta.clone()) } else { self.inner.head(location).await @@ -348,22 +550,22 @@ impl ObjectStore for MemCachedObjectStore { } /// Delete an object on object store, but also remove it from the cache. - async fn delete(&self, location: &Path) -> ObjectStoreResult<()> { + async fn delete(&self, location: &Path) -> object_store::Result<()> { let result = self.inner.delete(location).await?; - self.delete_cache_value(location).await; + self.cache.remove(location); Ok(result) } fn delete_stream<'a>( &'a self, - locations: BoxStream<'a, ObjectStoreResult>, - ) -> BoxStream<'a, ObjectStoreResult> { + locations: BoxStream<'a, object_store::Result>, + ) -> BoxStream<'a, object_store::Result> { locations .and_then(|_| futures::future::err(Error::NotImplemented)) .boxed() } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result> { self.inner.list(prefix) } @@ -371,27 +573,27 @@ impl ObjectStore for MemCachedObjectStore { &self, prefix: Option<&Path>, offset: &Path, - ) -> BoxStream<'_, ObjectStoreResult> { + ) -> BoxStream<'_, object_store::Result> { self.inner.list_with_offset(prefix, offset) } - async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult { + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result { self.inner.list_with_delimiter(prefix).await } - async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { self.inner.copy(from, to).await } - async fn rename(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> { self.inner.rename(from, to).await } - async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { self.inner.copy_if_not_exists(from, to).await } - async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> { + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { self.inner.rename_if_not_exists(from, to).await } } @@ -409,61 +611,41 @@ fn background_cache_request_handler( ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { while let Some(CacheRequest { path, notifier }) = rx.recv().await { - // clone the path before acquiring the lock: - let path_cloned = path.clone(); - // Check that the cache does not already contain an entry for the provide path, or that - // it is not already in the process of fetching the given path: - let mut cache_lock = mem_store.cache.lock().await; - if cache_lock - .get(&path) - .is_some_and(|entry| entry.is_fetching() || entry.is_success()) - { + // We assume that objects on object store are immutable, so we can skip objects that + // we have already fetched: + if mem_store.cache.path_already_fetched(&path) { continue; } + // Create a future that will go and fetch the cache value from the store: + let path_cloned = path.clone(); + let store_cloned = Arc::clone(&mem_store.inner); + let fut = async move { + CacheValue::fetch(store_cloned, path_cloned) + .await + .map(Arc::new) + .map_err(|e| Arc::new(e) as _) + } + .boxed() + .shared(); // Put a `Fetching` state in the entry to prevent concurrent requests to the same path: - let _ = cache_lock.put_with_weight(path_cloned, CacheEntry::Fetching); - // Drop the lock before spawning the task below - drop(cache_lock); + mem_store.cache.set_fetching(&path, fut.clone()); let mem_store_captured = Arc::clone(&mem_store); tokio::spawn(async move { - let cache_insertion_result = match mem_store_captured.inner.get(&path).await { - Ok(result) => { - let meta = result.meta.clone(); - match result.bytes().await { - Ok(data) => mem_store_captured.cache.lock().await.put_with_weight( - path, - CacheEntry::Success(Arc::new(CacheValue { data, meta })), - ), - Err(error) => { - error!(%error, "failed to retrieve payload from object store get result"); - mem_store_captured - .cache - .lock() - .await - .put_with_weight(path, CacheEntry::Failed) - } - } + match fut.await { + Ok(value) => { + if let Err(error) = mem_store_captured.cache.set_success(&path, value) { + // NOTE(trevor): this would be an error if A) it tried to insert on an already + // successful entry, or B) it tried to insert on an empty entry, in either case + // we do not need to remove the entry to clear a fetching state, as in the + // other failure modes below... + warn!(%error, "failed to set the success state on the cache"); + }; } Err(error) => { error!(%error, "failed to fulfill cache request with object store"); - mem_store_captured - .cache - .lock() - .await - .put_with_weight(path, CacheEntry::Failed) + mem_store_captured.cache.remove(&path); } }; - // If an entry would not fit in the cache at all, the put_with_weight method returns - // it as an Err from above, and we would not have cleared the `Fetching` entry, so - // we need to do that here: - if let Err((k, _)) = cache_insertion_result { - mem_store_captured - .cache - .lock() - .await - .put_with_weight(k, CacheEntry::TooLarge) - .expect("cache capacity is too small"); - } // notify that the cache request has been fulfilled: let _ = notifier.send(()); }); @@ -476,31 +658,32 @@ fn background_cache_request_handler( // TODO(trevor): the interval could be configurable fn background_cache_pruner(mem_store: Arc) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(60)); + let mut interval = tokio::time::interval(Duration::from_millis(10)); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { interval.tick().await; - - mem_store.cache.lock().await.retain(|_, entry| entry.keep()); + mem_store.cache.prune(); } }) } #[cfg(test)] mod tests { - use std::{ops::Range, sync::Arc}; + use std::{ops::Range, sync::Arc, time::Duration}; use arrow::datatypes::ToByteSlice; use async_trait::async_trait; use bytes::Bytes; use futures::stream::BoxStream; use hashbrown::HashMap; + use iox_time::{MockProvider, Time, TimeProvider}; use object_store::{ memory::InMemory, path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, }; use parking_lot::RwLock; use pretty_assertions::assert_eq; + use tokio::sync::Notify; use crate::parquet_cache::{ create_cached_obj_store_and_oracle, test_cached_obj_store_and_oracle, CacheRequest, @@ -526,8 +709,12 @@ mod tests { async fn hit_cache_instead_of_object_store() { // set up the inner test object store and then wrap it with the mem cached store: let inner_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new()))); - let (cached_store, oracle) = - test_cached_obj_store_and_oracle(Arc::clone(&inner_store) as _); + let time_provider: Arc = + Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let (cached_store, oracle) = test_cached_obj_store_and_oracle( + Arc::clone(&inner_store) as _, + Arc::clone(&time_provider), + ); // PUT a paylaod into the object store through the outer mem cached store: let path = Path::from("0.parquet"); let payload = b"hello world"; @@ -564,12 +751,19 @@ mod tests { #[tokio::test] async fn cache_evicts_lru_when_full() { let inner_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new()))); - let cache_capacity_bytes = 32; - let (cached_store, oracle) = - create_cached_obj_store_and_oracle(Arc::clone(&inner_store) as _, cache_capacity_bytes); + let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + // these are magic numbers that will make it so the third entry exceeds the cache capacity: + let cache_capacity_bytes = 60; + let cache_prune_percent = 0.4; + let (cached_store, oracle) = create_cached_obj_store_and_oracle( + Arc::clone(&inner_store) as _, + Arc::clone(&time_provider) as _, + cache_capacity_bytes, + cache_prune_percent, + ); // PUT an entry into the store: - let path_1 = Path::from("0.parquet"); // 9 bytes for path - let payload_1 = b"Janeway"; // 7 bytes for payload + let path_1 = Path::from("0.parquet"); + let payload_1 = b"Janeway"; cached_store .put(&path_1, PutPayload::from_static(payload_1)) .await @@ -583,6 +777,9 @@ mod tests { assert_eq!(1, inner_store.total_get_request_count()); assert_eq!(1, inner_store.get_request_count(&path_1)); + // update time: + time_provider.set(Time::from_timestamp_nanos(1)); + // GET the entry to check its there and was retrieved from cache, i.e., that the request // counts do not change: assert_payload_at_equals!(cached_store, payload_1, path_1); @@ -590,15 +787,18 @@ mod tests { assert_eq!(1, inner_store.get_request_count(&path_1)); // PUT a second entry into the store: - let path_2 = Path::from("1.parquet"); // 9 bytes for path - let payload_2 = b"Paris"; // 5 bytes for payload + let path_2 = Path::from("1.parquet"); + let payload_2 = b"Paris"; cached_store .put(&path_2, PutPayload::from_static(payload_2)) .await .unwrap(); + // update time: + time_provider.set(Time::from_timestamp_nanos(2)); + // cache the second entry and wait for it to complete, this will not evict the first entry - // as both can fit in the cache whose capacity is 32 bytes: + // as both can fit in the cache: let (cache_request, notifier_rx) = CacheRequest::create(path_2.clone()); oracle.register(cache_request); let _ = notifier_rx.await; @@ -607,6 +807,9 @@ mod tests { assert_eq!(1, inner_store.get_request_count(&path_1)); assert_eq!(1, inner_store.get_request_count(&path_2)); + // update time: + time_provider.set(Time::from_timestamp_nanos(3)); + // GET the second entry and assert that it was retrieved from the cache, i.e., that the // request counts do not change: assert_payload_at_equals!(cached_store, payload_2, path_2); @@ -614,22 +817,29 @@ mod tests { assert_eq!(1, inner_store.get_request_count(&path_1)); assert_eq!(1, inner_store.get_request_count(&path_2)); + // update time: + time_provider.set(Time::from_timestamp_nanos(4)); + // GET the first entry again and assert that it was retrieved from the cache as before. This - // will also update the LRU so that the first entry (janeway) was used more recently than the - // second entry (paris): + // will also update the hit count so that the first entry (janeway) was used more recently + // than the second entry (paris): assert_payload_at_equals!(cached_store, payload_1, path_1); assert_eq!(2, inner_store.total_get_request_count()); assert_eq!(1, inner_store.get_request_count(&path_1)); // PUT a third entry into the store: - let path_3 = Path::from("2.parquet"); // 9 bytes for the path - let payload_3 = b"Neelix"; // 6 bytes for the payload + let path_3 = Path::from("2.parquet"); + let payload_3 = b"Neelix"; cached_store .put(&path_3, PutPayload::from_static(payload_3)) .await .unwrap(); - // cache the third entry and wait for it to complete, this will evict paris from the cache - // as the LRU entry: + + // update time: + time_provider.set(Time::from_timestamp_nanos(5)); + + // cache the third entry and wait for it to complete, this will push the cache past its + // capacity: let (cache_request, notifier_rx) = CacheRequest::create(path_3.clone()); oracle.register(cache_request); let _ = notifier_rx.await; @@ -639,6 +849,9 @@ mod tests { assert_eq!(1, inner_store.get_request_count(&path_2)); assert_eq!(1, inner_store.get_request_count(&path_3)); + // update time: + time_provider.set(Time::from_timestamp_nanos(6)); + // GET the new entry from the strore, and check that it was served by the cache: assert_payload_at_equals!(cached_store, payload_3, path_3); assert_eq!(3, inner_store.total_get_request_count()); @@ -646,6 +859,9 @@ mod tests { assert_eq!(1, inner_store.get_request_count(&path_2)); assert_eq!(1, inner_store.get_request_count(&path_3)); + // allow some time for pruning: + tokio::time::sleep(Duration::from_millis(100)).await; + // GET paris from the cached store, this will not be served by the cache, because paris was // evicted by neelix: assert_payload_at_equals!(cached_store, payload_2, path_2); @@ -655,12 +871,69 @@ mod tests { assert_eq!(1, inner_store.get_request_count(&path_3)); } + #[tokio::test] + async fn cache_hit_while_fetching() { + // Create a test store with a barrier: + let to_store_notify = Arc::new(Notify::new()); + let from_store_notify = Arc::new(Notify::new()); + let inner_store = Arc::new( + TestObjectStore::new(Arc::new(InMemory::new())) + .with_notifies(Arc::clone(&to_store_notify), Arc::clone(&from_store_notify)), + ); + let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let (cached_store, oracle) = test_cached_obj_store_and_oracle( + Arc::clone(&inner_store) as _, + Arc::clone(&time_provider) as _, + ); + + // PUT an entry into the store: + let path = Path::from("0.parquet"); + let payload = b"Picard"; + cached_store + .put(&path, PutPayload::from_static(payload)) + .await + .unwrap(); + + // cache the entry, but don't wait on it until below in spawned task: + let (cache_request, notifier_rx) = CacheRequest::create(path.clone()); + oracle.register(cache_request); + + // we are in the middle of a get request, i.e., the cache entry is "fetching" + // once this call to notified wakes: + let _ = from_store_notify.notified().await; + + // spawn a thread to wake the in-flight get request initiated by the cache oracle + // after we have started a get request below, such that the get request below hits + // the cache while the entry is still "fetching" state: + let h = tokio::spawn(async move { + to_store_notify.notify_one(); + let _ = notifier_rx.await; + }); + + // make the request to the store, which hits the cache in the "fetching" state + // since we haven't made the call to notify the store to continue yet: + assert_payload_at_equals!(cached_store, payload, path); + + // drive the task to completion to ensure that the cache request has been fulfilled: + h.await.unwrap(); + + // there should only have been one request made, i.e., from the cache oracle: + assert_eq!(1, inner_store.total_get_request_count()); + assert_eq!(1, inner_store.get_request_count(&path)); + + // make another request to the store, to be sure that it is in the cache: + assert_payload_at_equals!(cached_store, payload, path); + assert_eq!(1, inner_store.total_get_request_count()); + assert_eq!(1, inner_store.get_request_count(&path)); + } + type RequestCounter = RwLock>; #[derive(Debug)] struct TestObjectStore { inner: Arc, get: RequestCounter, + notifies: Option<(Arc, Arc)>, } impl TestObjectStore { @@ -668,9 +941,15 @@ mod tests { Self { inner, get: Default::default(), + notifies: None, } } + fn with_notifies(mut self, inbound: Arc, outbound: Arc) -> Self { + self.notifies = Some((inbound, outbound)); + self + } + fn total_get_request_count(&self) -> usize { self.get.read().iter().map(|(_, size)| size).sum() } @@ -686,13 +965,6 @@ mod tests { } } - /// [`MemCachedObjectStore`] implements most [`ObjectStore`] methods as a pass-through, since - /// caching is decided externally. The exception is `delete`, which will have the entry removed - /// from the cache if the delete to the object store was successful. - /// - /// GET-style methods will first check the cache for the object at the given path, before forwarding - /// to the inner [`ObjectStore`]. They do not, however, populate the cache after data has been fetched - /// from the inner store. #[async_trait] impl ObjectStore for TestObjectStore { async fn put(&self, location: &Path, bytes: PutPayload) -> object_store::Result { @@ -725,6 +997,10 @@ mod tests { async fn get(&self, location: &Path) -> object_store::Result { *self.get.write().entry(location.clone()).or_insert(0) += 1; + if let Some((inbound, outbound)) = &self.notifies { + outbound.notify_one(); + inbound.notified().await; + } self.inner.get(location).await } diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 09284c935d..9f1a852e01 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -570,12 +570,13 @@ mod tests { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn writes_data_to_wal_and_is_queryable() { let object_store: Arc = Arc::new(InMemory::new()); - let (object_store, parquet_cache) = test_cached_obj_store_and_oracle(object_store); + let time_provider: Arc = + Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); + let (object_store, parquet_cache) = + test_cached_obj_store_and_oracle(object_store, Arc::clone(&time_provider)); let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host")); let catalog = persister.load_or_create_catalog().await.unwrap(); let last_cache = LastCacheProvider::new_from_catalog(&catalog.clone_inner()).unwrap(); - let time_provider: Arc = - Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); let write_buffer = WriteBufferImpl::new( Arc::clone(&persister), Arc::new(catalog), @@ -1655,9 +1656,10 @@ mod tests { object_store: Arc, wal_config: WalConfig, ) -> (WriteBufferImpl, IOxSessionContext) { - let (object_store, parquet_cache) = test_cached_obj_store_and_oracle(object_store); - let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host")); let time_provider: Arc = Arc::new(MockProvider::new(start)); + let (object_store, parquet_cache) = + test_cached_obj_store_and_oracle(object_store, Arc::clone(&time_provider)); + let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host")); let catalog = persister.load_or_create_catalog().await.unwrap(); let last_cache = LastCacheProvider::new_from_catalog(&catalog.clone_inner()).unwrap(); let wbuf = WriteBufferImpl::new(