refactor: parquet cache with less locking (#25389)

Closes #25382 
Closes #25383 

This refactors the parquet cache to use less locking by switching from using the `clru` crate to a hand-rolled cache implementation. The new cache still acts as an LRU, but it uses atomics to track hit-time per entry, and handles pruning in a separate process that is decoupled from insertion/gets to the cache.

The `Cache` type uses a [`DashMap`](https://docs.rs/dashmap/latest/dashmap/struct.DashMap.html) internally to store cache entries. This should help reduce lock contention, and also has the added benefit of not requiring mutability to insert  into _or_ get from the map.

The cache maps an `object_store::Path` to a `CacheEntry`. On a hit, an entry will have its `hit_time` (an `AtomicI64`) incremented. During a prune operation, entries that have the oldest hit times will be removed from the cache. See the `Cache::prune` method for details.

The cache is setup with a memory _capacity_ and a _prune percent_. The cache tracks memory used when entries are added, based on their _size_, and when a prune is invoked in the background, if the cache has exceeded its capacity, it will prune `prune_percent * cache.len()` entries from the cache.

Two tests were added:
* `cache_evicts_lru_when_full` to check LRU behaviour of the cache
* `cache_hit_while_fetching` to check that a cache entry hit while a request is in flight to fetch that entry will not result in extra calls to the underlying object store
pull/25410/head
Trevor Hilton 2024-09-27 11:59:17 -04:00 committed by GitHub
parent 70643d0136
commit 4184a331ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 493 additions and 209 deletions

9
Cargo.lock generated
View File

@ -966,12 +966,6 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "clru"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbd0f76e066e64fdc5631e3bb46381254deab9ef1158292f27c8c57e3bf3fe59"
[[package]]
name = "colorchoice"
version = "1.0.2"
@ -2822,15 +2816,16 @@ dependencies = [
name = "influxdb3_write"
version = "0.1.0"
dependencies = [
"anyhow",
"arrow",
"arrow_util",
"async-trait",
"byteorder",
"bytes",
"chrono",
"clru",
"crc32fast",
"crossbeam-channel",
"dashmap",
"data_types",
"datafusion",
"datafusion_util",

View File

@ -260,10 +260,17 @@ pub async fn command(config: Config) -> Result<()> {
let object_store: Arc<DynObjectStore> =
make_object_store(&config.object_store_config).map_err(Error::ObjectStoreParsing)?;
// TODO(trevor): make this configurable/optional:
let time_provider = Arc::new(SystemProvider::new());
// TODO(trevor): make the cache capacity and prune percent configurable/optional:
let cache_capacity = 1024 * 1024 * 1024;
let (object_store, parquet_cache) =
create_cached_obj_store_and_oracle(object_store, cache_capacity);
let prune_percent = 0.1;
let (object_store, parquet_cache) = create_cached_obj_store_and_oracle(
object_store,
Arc::clone(&time_provider) as _,
cache_capacity,
prune_percent,
);
let trace_exporter = config.tracing_config.build()?;
@ -322,7 +329,6 @@ pub async fn command(config: Config) -> Result<()> {
snapshot_size: config.wal_snapshot_size,
};
let time_provider = Arc::new(SystemProvider::new());
let catalog = persister
.load_or_create_catalog()
.await

View File

@ -745,7 +745,9 @@ mod tests {
let common_state =
crate::CommonServerState::new(Arc::clone(&metrics), None, trace_header_parser).unwrap();
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::new());
let (object_store, parquet_cache) = test_cached_obj_store_and_oracle(object_store);
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(start_time)));
let (object_store, parquet_cache) =
test_cached_obj_store_and_oracle(object_store, Arc::clone(&time_provider) as _);
let parquet_store =
ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3"));
let exec = Arc::new(Executor::new_with_config_and_executor(
@ -761,7 +763,6 @@ mod tests {
DedicatedExecutor::new_testing(),
));
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(start_time)));
let dummy_host_id = Arc::from("dummy-host-id");
let instance_id = Arc::from("dummy-instance-id");

View File

@ -630,9 +630,10 @@ mod tests {
// Set up QueryExecutor
let object_store: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap());
let (object_store, parquet_cache) = test_cached_obj_store_and_oracle(object_store);
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let (object_store, parquet_cache) =
test_cached_obj_store_and_oracle(object_store, Arc::clone(&time_provider) as _);
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let executor = make_exec(Arc::clone(&object_store));
let host_id = Arc::from("dummy-host-id");
let instance_id = Arc::from("instance-id");

View File

@ -24,14 +24,15 @@ influxdb3_id = { path = "../influxdb3_id" }
influxdb3_wal = { path = "../influxdb3_wal" }
# crates.io dependencies
anyhow.workspace = true
arrow.workspace = true
async-trait.workspace = true
byteorder.workspace = true
bytes.workspace = true
chrono.workspace = true
clru.workspace = true
crc32fast.workspace = true
crossbeam-channel.workspace = true
dashmap.workspace = true
datafusion.workspace = true
futures.workspace = true
futures-util.workspace = true

View File

@ -1579,13 +1579,15 @@ mod tests {
use influxdb3_id::DbId;
use influxdb3_wal::{LastCacheDefinition, WalConfig};
use insta::assert_json_snapshot;
use iox_time::{MockProvider, Time};
use iox_time::{MockProvider, Time, TimeProvider};
async fn setup_write_buffer() -> WriteBufferImpl {
let obj_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let (obj_store, parquet_cache) = test_cached_obj_store_and_oracle(obj_store);
let time_provider: Arc<dyn TimeProvider> =
Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let (obj_store, parquet_cache) =
test_cached_obj_store_and_oracle(obj_store, Arc::clone(&time_provider));
let persister = Arc::new(Persister::new(obj_store, "test_host"));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let host_id = Arc::from("dummy-host-id");
let instance_id = Arc::from("dummy-instance-id");
WriteBufferImpl::new(

View File

@ -1,24 +1,41 @@
//! An in-memory cache of Parquet files that are persisted to object storage
use std::{
fmt::Debug, hash::RandomState, num::NonZeroUsize, ops::Range, sync::Arc, time::Duration,
collections::BinaryHeap,
fmt::Debug,
ops::Range,
sync::{
atomic::{AtomicI64, AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use anyhow::bail;
use async_trait::async_trait;
use bytes::Bytes;
use clru::{CLruCache, CLruCacheConfig, WeightScale};
use futures::{StreamExt, TryStreamExt};
use futures_util::stream::BoxStream;
use dashmap::{DashMap, Entry};
use futures::{
future::{BoxFuture, Shared},
stream::BoxStream,
FutureExt, StreamExt, TryStreamExt,
};
use iox_time::TimeProvider;
use object_store::{
path::Path, Error, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload,
ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult,
Result as ObjectStoreResult,
};
use observability_deps::tracing::{error, info};
use observability_deps::tracing::{error, info, warn};
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
oneshot, Mutex,
oneshot,
};
/// Shared future type for cache values that are being fetched
type SharedCacheValueFuture = Shared<BoxFuture<'static, Result<Arc<CacheValue>, DynError>>>;
/// Dynamic error type that can be cloned
type DynError = Arc<dyn std::error::Error + Send + Sync>;
/// A request to fetch an item at the given `path` from an object store
///
/// Contains a notifier to notify the caller that registers the cache request when the item
@ -84,9 +101,16 @@ impl ParquetCacheOracle for MemCacheOracle {
/// that returns them as their `Arc<dyn _>` equivalent.
pub fn create_cached_obj_store_and_oracle(
object_store: Arc<dyn ObjectStore>,
time_provider: Arc<dyn TimeProvider>,
cache_capacity: usize,
prune_percent: f64,
) -> (Arc<dyn ObjectStore>, Arc<dyn ParquetCacheOracle>) {
let store = Arc::new(MemCachedObjectStore::new(object_store, cache_capacity));
let store = Arc::new(MemCachedObjectStore::new(
object_store,
cache_capacity,
time_provider,
prune_percent,
));
let oracle = Arc::new(MemCacheOracle::new(Arc::clone(&store)));
(store, oracle)
}
@ -94,11 +118,12 @@ pub fn create_cached_obj_store_and_oracle(
/// Create a test cached object store with a cache capacity of 1GB
pub fn test_cached_obj_store_and_oracle(
object_store: Arc<dyn ObjectStore>,
time_provider: Arc<dyn TimeProvider>,
) -> (Arc<dyn ObjectStore>, Arc<dyn ParquetCacheOracle>) {
create_cached_obj_store_and_oracle(object_store, 1024 * 1024 * 1024)
create_cached_obj_store_and_oracle(object_store, time_provider, 1024 * 1024 * 1024, 0.1)
}
/// An entry in the cache, containing the actual bytes as well as object store metadata
/// A value in the cache, containing the actual bytes as well as object store metadata
#[derive(Debug)]
struct CacheValue {
data: Bytes,
@ -108,59 +133,263 @@ struct CacheValue {
impl CacheValue {
/// Get the size of the cache value's memory footprint in bytes
fn size(&self) -> usize {
// TODO(trevor): could also calculate the size of the metadata...
self.data.len()
let Self {
data,
meta:
ObjectMeta {
location,
last_modified: _,
size: _,
e_tag,
version,
},
} = self;
data.len()
+ location.as_ref().len()
+ e_tag.as_ref().map(|s| s.capacity()).unwrap_or_default()
+ version.as_ref().map(|s| s.capacity()).unwrap_or_default()
}
/// Fetch the value from an object store
async fn fetch(store: Arc<dyn ObjectStore>, path: Path) -> object_store::Result<Self> {
let res = store.get(&path).await?;
let meta = res.meta.clone();
let data = res.bytes().await?;
Ok(Self { data, meta })
}
}
/// Holds the state and hit time for an entry in the cache
#[derive(Debug)]
struct CacheEntry {
state: CacheEntryState,
/// The nano-second timestamp of when this value was last hit
hit_time: AtomicI64,
}
impl CacheEntry {
/// Get the approximate memory footprint of this entry in bytes
fn size(&self) -> usize {
self.state.size() + std::mem::size_of::<AtomicI64>()
}
fn is_fetching(&self) -> bool {
matches!(self.state, CacheEntryState::Fetching(_))
}
fn is_success(&self) -> bool {
matches!(self.state, CacheEntryState::Success(_))
}
}
/// The state of a cache entry
#[derive(Debug)]
enum CacheEntry {
///
/// This implements `Clone` so that a reference to the entry in the `Cache` does not need to be
/// held for long.
#[derive(Debug, Clone)]
enum CacheEntryState {
/// The cache entry is being fetched from object store
Fetching,
Fetching(SharedCacheValueFuture),
/// The cache entry was successfully fetched and is stored in the cache as a [`CacheValue`]
Success(Arc<CacheValue>),
/// The request to the object store failed
Failed,
/// The cache entry was deleted
Deleted,
/// The object is too large for the cache
TooLarge,
}
impl CacheEntry {
/// Get the size of thje cache entry in bytes
impl CacheEntryState {
/// Get the approximate size of the cache entry in bytes
fn size(&self) -> usize {
match self {
CacheEntry::Fetching => 0,
CacheEntry::Success(v) => v.size(),
CacheEntry::Failed => 0,
CacheEntry::Deleted => 0,
CacheEntry::TooLarge => 0,
CacheEntryState::Fetching(_) => 0,
CacheEntryState::Success(v) => v.size(),
}
}
fn is_fetching(&self) -> bool {
matches!(self, CacheEntry::Fetching)
/// Get the value in this state, or wait for it if it is still fetching
///
/// This takes `self` as it is meant to be used on an entry's state that has been cloned.
async fn value(self) -> object_store::Result<Arc<CacheValue>> {
match self {
CacheEntryState::Fetching(fut) => fut.await.map_err(|e| Error::Generic {
store: STORE_NAME,
source: Box::new(e),
}),
CacheEntryState::Success(v) => Ok(v),
}
fn is_success(&self) -> bool {
matches!(self, CacheEntry::Success(_))
}
fn keep(&self) -> bool {
self.is_fetching() || self.is_success()
}
}
/// Implements the [`WeightScale`] trait to determine a [`CacheEntry`]'s size on insertion to
/// the cache
/// A cache for storing objects from object storage by their [`Path`]
///
/// This acts as a Least-Recently-Used (LRU) cache that allows for concurrent reads and writes. See
/// the [`Cache::prune`] method for implementation of how the cache entries are pruned. Pruning must
/// be invoked externally, e.g., on an interval.
#[derive(Debug)]
struct CacheEntryScale;
struct Cache {
/// The maximum amount of memory this cache should occupy in bytes
capacity: usize,
/// The current amount of memory being used by the cache in bytes
used: AtomicUsize,
/// What percentage of the total number of cache entries will be pruned during a pruning operation
prune_percent: f64,
/// The map storing cache entries
map: DashMap<Path, CacheEntry>,
/// Provides timestamps for updating the hit time of each cache entry
time_provider: Arc<dyn TimeProvider>,
}
impl WeightScale<Path, CacheEntry> for CacheEntryScale {
fn weight(&self, key: &Path, value: &CacheEntry) -> usize {
key.as_ref().len() + value.size()
impl Cache {
/// Create a new cache with a given capacity and prune percent
fn new(capacity: usize, prune_percent: f64, time_provider: Arc<dyn TimeProvider>) -> Self {
Self {
capacity,
used: AtomicUsize::new(0),
prune_percent,
map: DashMap::new(),
time_provider,
}
}
/// Get an entry in the cache or `None` if there is not an entry
///
/// This updates the hit time of the entry and returns a cloned copy of the entry state so that
/// the reference into the map is dropped
fn get(&self, path: &Path) -> Option<CacheEntryState> {
let entry = self.map.get(path)?;
if entry.is_success() {
entry
.hit_time
.store(self.time_provider.now().timestamp_nanos(), Ordering::SeqCst);
}
Some(entry.state.clone())
}
/// Check if an entry in the cache is in process of being fetched or if it was already fetched
/// successfully
///
/// This does not update the hit time of the entry
fn path_already_fetched(&self, path: &Path) -> bool {
self.map.get(path).is_some()
}
/// Insert a `Fetching` entry to the cache along with the shared future for polling the value
/// being fetched
fn set_fetching(&self, path: &Path, fut: SharedCacheValueFuture) {
let entry = CacheEntry {
state: CacheEntryState::Fetching(fut),
hit_time: AtomicI64::new(self.time_provider.now().timestamp_nanos()),
};
let additional = entry.size();
self.map.insert(path.clone(), entry);
self.used.fetch_add(additional, Ordering::SeqCst);
}
/// Update a `Fetching` entry to a `Success` entry in the cache
fn set_success(&self, path: &Path, value: Arc<CacheValue>) -> Result<(), anyhow::Error> {
match self.map.entry(path.clone()) {
Entry::Occupied(mut o) => {
let entry = o.get_mut();
if !entry.is_fetching() {
// NOTE(trevor): the only other state is Success, so bailing here just
// means that we leave the entry alone, and since objects in the store are
// treated as immutable, this should be okay.
bail!("attempted to store value in non-fetching cache entry");
}
entry.state = CacheEntryState::Success(value);
entry
.hit_time
.store(self.time_provider.now().timestamp_nanos(), Ordering::SeqCst);
// TODO(trevor): what if size is greater than cache capacity?
let additional = entry.size();
self.used.fetch_add(additional, Ordering::SeqCst);
Ok(())
}
Entry::Vacant(_) => bail!("attempted to set success state on an empty cache entry"),
}
}
/// Remove an entry from the cache, as well as its associated size from the used capacity
fn remove(&self, path: &Path) {
let Some((_, entry)) = self.map.remove(path) else {
return;
};
self.used.fetch_sub(entry.state.size(), Ordering::SeqCst);
}
/// Prune least recently hit entries from the cache
///
/// This is a no-op if the `used` amount on the cache is not >= its `capacity`
fn prune(&self) {
let used = self.used.load(Ordering::SeqCst);
if used < self.capacity {
return;
}
let n_to_prune = (self.map.len() as f64 * self.prune_percent).floor() as usize;
// use a BinaryHeap to determine the cut-off time, at which, entries that were
// last hit before that time will be pruned:
let mut prune_heap = BinaryHeap::with_capacity(n_to_prune);
for map_ref in self.map.iter() {
let hit_time = map_ref.value().hit_time.load(Ordering::SeqCst);
let size = map_ref.value().size();
let path = map_ref.key().as_ref();
if prune_heap.len() < n_to_prune {
// if the heap isn't full yet, throw this item on:
prune_heap.push(PruneHeapItem {
hit_time,
path_ref: path.into(),
size,
});
} else if hit_time < prune_heap.peek().map(|item| item.hit_time).unwrap() {
// otherwise, the heap is at its capacity, so only push if the hit_time
// in question is older than the top of the heap (after pop'ing the top
// of the heap to make room)
prune_heap.pop();
prune_heap.push(PruneHeapItem {
path_ref: path.into(),
hit_time,
size,
});
}
}
// track the total size of entries that get freed:
let mut freed = 0;
// drop entries with hit times before the cut-off:
for item in prune_heap {
self.map.remove(&Path::from(item.path_ref.as_ref()));
freed += item.size;
}
// update used mem size with freed amount:
self.used.fetch_sub(freed, Ordering::SeqCst);
}
}
/// An item that stores what is needed for pruning [`CacheEntry`]s
#[derive(Debug, Eq)]
struct PruneHeapItem {
/// Reference to the entry's `Path` key
path_ref: Arc<str>,
/// Entry's hit time for comparison and heap insertion
hit_time: i64,
/// Entry size used to calculate the amount of memory freed after a prune
size: usize,
}
impl PartialEq for PruneHeapItem {
fn eq(&self, other: &Self) -> bool {
self.hit_time.eq(&other.hit_time)
}
}
impl PartialOrd for PruneHeapItem {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.hit_time.cmp(&other.hit_time))
}
}
impl Ord for PruneHeapItem {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.hit_time.cmp(&other.hit_time)
}
}
@ -168,63 +397,26 @@ impl WeightScale<Path, CacheEntry> for CacheEntryScale {
const STORE_NAME: &str = "mem_cached_object_store";
/// An object store with an associated cache that can serve GET-style requests using the cache
///
/// The least-recently used (LRU) entries will be evicted when new entries are inserted, if the
/// new entry would exceed the cache's memory capacity
#[derive(Debug)]
pub struct MemCachedObjectStore {
/// An inner object store for which items will be cached
inner: Arc<dyn ObjectStore>,
/// A weighted LRU cache for storing the objects associated with a given path in memory
// NOTE(trevor): this uses a mutex as the CLruCache type needs &mut self for its get method, so
// we always need an exclusive lock on the cache. If this creates a performance bottleneck then
// we will need to look for alternatives.
//
// A Tokio mutex is used to prevent blocking the thread while waiting for a lock, and so that
// the lock can be held accross await points.
cache: Arc<Mutex<CLruCache<Path, CacheEntry, RandomState, CacheEntryScale>>>,
cache: Arc<Cache>,
}
impl MemCachedObjectStore {
/// Create a new [`MemCachedObjectStore`] with the given memory capacity
fn new(inner: Arc<dyn ObjectStore>, memory_capacity: usize) -> Self {
let cache = CLruCache::with_config(
CLruCacheConfig::new(NonZeroUsize::new(memory_capacity).unwrap())
.with_scale(CacheEntryScale),
);
/// Create a new [`MemCachedObjectStore`]
fn new(
inner: Arc<dyn ObjectStore>,
memory_capacity: usize,
time_provider: Arc<dyn TimeProvider>,
prune_percent: f64,
) -> Self {
Self {
inner,
cache: Arc::new(Mutex::new(cache)),
cache: Arc::new(Cache::new(memory_capacity, prune_percent, time_provider)),
}
}
/// Get an entry in the cache if it contains a successful fetch result, or `None` otherwise
///
/// This requires `&mut self` as the underlying method on the cache requires a mutable reference
/// in order to update the recency of the entry in the cache
async fn get_cache_value(&self, path: &Path) -> Option<Arc<CacheValue>> {
self.cache
.lock()
.await
.get(path)
.and_then(|entry| match entry {
CacheEntry::Fetching
| CacheEntry::Failed
| CacheEntry::Deleted
| CacheEntry::TooLarge => None,
CacheEntry::Success(v) => Some(Arc::clone(v)),
})
}
/// Set the state of a cache entry to `Deleted`, since we cannot remove elements from the
/// cache directly.
async fn delete_cache_value(&self, path: &Path) {
let _ = self
.cache
.lock()
.await
.put_with_weight(path.clone(), CacheEntry::Deleted);
}
}
impl std::fmt::Display for MemCachedObjectStore {
@ -242,7 +434,7 @@ impl std::fmt::Display for MemCachedObjectStore {
/// from the inner store.
#[async_trait]
impl ObjectStore for MemCachedObjectStore {
async fn put(&self, location: &Path, bytes: PutPayload) -> ObjectStoreResult<PutResult> {
async fn put(&self, location: &Path, bytes: PutPayload) -> object_store::Result<PutResult> {
self.inner.put(location, bytes).await
}
@ -251,11 +443,14 @@ impl ObjectStore for MemCachedObjectStore {
location: &Path,
bytes: PutPayload,
opts: PutOptions,
) -> ObjectStoreResult<PutResult> {
) -> object_store::Result<PutResult> {
self.inner.put_opts(location, bytes, opts).await
}
async fn put_multipart(&self, location: &Path) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
async fn put_multipart(
&self,
location: &Path,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart(location).await
}
@ -263,14 +458,15 @@ impl ObjectStore for MemCachedObjectStore {
&self,
location: &Path,
opts: PutMultipartOpts,
) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart_opts(location, opts).await
}
/// Get an object from the object store. If this object is cached, then it will not make a request
/// to the inner object store.
async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
if let Some(v) = self.get_cache_value(location).await {
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
if let Some(state) = self.cache.get(location) {
let v = state.value().await?;
Ok(GetResult {
payload: GetResultPayload::Stream(
futures::stream::iter([Ok(v.data.clone())]).boxed(),
@ -284,13 +480,17 @@ impl ObjectStore for MemCachedObjectStore {
}
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
// NOTE(trevor): this could probably be supported through the cache if we need it via the
// ObjectMeta stored in the cache. For now this is conservative:
self.inner.get_opts(location, options).await
}
async fn get_range(&self, location: &Path, range: Range<usize>) -> ObjectStoreResult<Bytes> {
async fn get_range(&self, location: &Path, range: Range<usize>) -> object_store::Result<Bytes> {
Ok(self
.get_ranges(location, &[range])
.await?
@ -305,8 +505,9 @@ impl ObjectStore for MemCachedObjectStore {
&self,
location: &Path,
ranges: &[Range<usize>],
) -> ObjectStoreResult<Vec<Bytes>> {
if let Some(v) = self.get_cache_value(location).await {
) -> object_store::Result<Vec<Bytes>> {
if let Some(state) = self.cache.get(location) {
let v = state.value().await?;
ranges
.iter()
.map(|range| {
@ -339,8 +540,9 @@ impl ObjectStore for MemCachedObjectStore {
}
}
async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
if let Some(v) = self.get_cache_value(location).await {
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
if let Some(state) = self.cache.get(location) {
let v = state.value().await?;
Ok(v.meta.clone())
} else {
self.inner.head(location).await
@ -348,22 +550,22 @@ impl ObjectStore for MemCachedObjectStore {
}
/// Delete an object on object store, but also remove it from the cache.
async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
async fn delete(&self, location: &Path) -> object_store::Result<()> {
let result = self.inner.delete(location).await?;
self.delete_cache_value(location).await;
self.cache.remove(location);
Ok(result)
}
fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, ObjectStoreResult<Path>>,
) -> BoxStream<'a, ObjectStoreResult<Path>> {
locations: BoxStream<'a, object_store::Result<Path>>,
) -> BoxStream<'a, object_store::Result<Path>> {
locations
.and_then(|_| futures::future::err(Error::NotImplemented))
.boxed()
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
self.inner.list(prefix)
}
@ -371,27 +573,27 @@ impl ObjectStore for MemCachedObjectStore {
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
self.inner.list_with_offset(prefix, offset)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
self.inner.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.copy(from, to).await
}
async fn rename(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.rename(from, to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.copy_if_not_exists(from, to).await
}
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.rename_if_not_exists(from, to).await
}
}
@ -409,61 +611,41 @@ fn background_cache_request_handler(
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(CacheRequest { path, notifier }) = rx.recv().await {
// clone the path before acquiring the lock:
let path_cloned = path.clone();
// Check that the cache does not already contain an entry for the provide path, or that
// it is not already in the process of fetching the given path:
let mut cache_lock = mem_store.cache.lock().await;
if cache_lock
.get(&path)
.is_some_and(|entry| entry.is_fetching() || entry.is_success())
{
// We assume that objects on object store are immutable, so we can skip objects that
// we have already fetched:
if mem_store.cache.path_already_fetched(&path) {
continue;
}
// Create a future that will go and fetch the cache value from the store:
let path_cloned = path.clone();
let store_cloned = Arc::clone(&mem_store.inner);
let fut = async move {
CacheValue::fetch(store_cloned, path_cloned)
.await
.map(Arc::new)
.map_err(|e| Arc::new(e) as _)
}
.boxed()
.shared();
// Put a `Fetching` state in the entry to prevent concurrent requests to the same path:
let _ = cache_lock.put_with_weight(path_cloned, CacheEntry::Fetching);
// Drop the lock before spawning the task below
drop(cache_lock);
mem_store.cache.set_fetching(&path, fut.clone());
let mem_store_captured = Arc::clone(&mem_store);
tokio::spawn(async move {
let cache_insertion_result = match mem_store_captured.inner.get(&path).await {
Ok(result) => {
let meta = result.meta.clone();
match result.bytes().await {
Ok(data) => mem_store_captured.cache.lock().await.put_with_weight(
path,
CacheEntry::Success(Arc::new(CacheValue { data, meta })),
),
Err(error) => {
error!(%error, "failed to retrieve payload from object store get result");
mem_store_captured
.cache
.lock()
.await
.put_with_weight(path, CacheEntry::Failed)
}
}
match fut.await {
Ok(value) => {
if let Err(error) = mem_store_captured.cache.set_success(&path, value) {
// NOTE(trevor): this would be an error if A) it tried to insert on an already
// successful entry, or B) it tried to insert on an empty entry, in either case
// we do not need to remove the entry to clear a fetching state, as in the
// other failure modes below...
warn!(%error, "failed to set the success state on the cache");
};
}
Err(error) => {
error!(%error, "failed to fulfill cache request with object store");
mem_store_captured
.cache
.lock()
.await
.put_with_weight(path, CacheEntry::Failed)
mem_store_captured.cache.remove(&path);
}
};
// If an entry would not fit in the cache at all, the put_with_weight method returns
// it as an Err from above, and we would not have cleared the `Fetching` entry, so
// we need to do that here:
if let Err((k, _)) = cache_insertion_result {
mem_store_captured
.cache
.lock()
.await
.put_with_weight(k, CacheEntry::TooLarge)
.expect("cache capacity is too small");
}
// notify that the cache request has been fulfilled:
let _ = notifier.send(());
});
@ -476,31 +658,32 @@ fn background_cache_request_handler(
// TODO(trevor): the interval could be configurable
fn background_cache_pruner(mem_store: Arc<MemCachedObjectStore>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
let mut interval = tokio::time::interval(Duration::from_millis(10));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
mem_store.cache.lock().await.retain(|_, entry| entry.keep());
mem_store.cache.prune();
}
})
}
#[cfg(test)]
mod tests {
use std::{ops::Range, sync::Arc};
use std::{ops::Range, sync::Arc, time::Duration};
use arrow::datatypes::ToByteSlice;
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use hashbrown::HashMap;
use iox_time::{MockProvider, Time, TimeProvider};
use object_store::{
memory::InMemory, path::Path, GetOptions, GetResult, ListResult, MultipartUpload,
ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult,
};
use parking_lot::RwLock;
use pretty_assertions::assert_eq;
use tokio::sync::Notify;
use crate::parquet_cache::{
create_cached_obj_store_and_oracle, test_cached_obj_store_and_oracle, CacheRequest,
@ -526,8 +709,12 @@ mod tests {
async fn hit_cache_instead_of_object_store() {
// set up the inner test object store and then wrap it with the mem cached store:
let inner_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new())));
let (cached_store, oracle) =
test_cached_obj_store_and_oracle(Arc::clone(&inner_store) as _);
let time_provider: Arc<dyn TimeProvider> =
Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let (cached_store, oracle) = test_cached_obj_store_and_oracle(
Arc::clone(&inner_store) as _,
Arc::clone(&time_provider),
);
// PUT a paylaod into the object store through the outer mem cached store:
let path = Path::from("0.parquet");
let payload = b"hello world";
@ -564,12 +751,19 @@ mod tests {
#[tokio::test]
async fn cache_evicts_lru_when_full() {
let inner_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new())));
let cache_capacity_bytes = 32;
let (cached_store, oracle) =
create_cached_obj_store_and_oracle(Arc::clone(&inner_store) as _, cache_capacity_bytes);
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
// these are magic numbers that will make it so the third entry exceeds the cache capacity:
let cache_capacity_bytes = 60;
let cache_prune_percent = 0.4;
let (cached_store, oracle) = create_cached_obj_store_and_oracle(
Arc::clone(&inner_store) as _,
Arc::clone(&time_provider) as _,
cache_capacity_bytes,
cache_prune_percent,
);
// PUT an entry into the store:
let path_1 = Path::from("0.parquet"); // 9 bytes for path
let payload_1 = b"Janeway"; // 7 bytes for payload
let path_1 = Path::from("0.parquet");
let payload_1 = b"Janeway";
cached_store
.put(&path_1, PutPayload::from_static(payload_1))
.await
@ -583,6 +777,9 @@ mod tests {
assert_eq!(1, inner_store.total_get_request_count());
assert_eq!(1, inner_store.get_request_count(&path_1));
// update time:
time_provider.set(Time::from_timestamp_nanos(1));
// GET the entry to check its there and was retrieved from cache, i.e., that the request
// counts do not change:
assert_payload_at_equals!(cached_store, payload_1, path_1);
@ -590,15 +787,18 @@ mod tests {
assert_eq!(1, inner_store.get_request_count(&path_1));
// PUT a second entry into the store:
let path_2 = Path::from("1.parquet"); // 9 bytes for path
let payload_2 = b"Paris"; // 5 bytes for payload
let path_2 = Path::from("1.parquet");
let payload_2 = b"Paris";
cached_store
.put(&path_2, PutPayload::from_static(payload_2))
.await
.unwrap();
// update time:
time_provider.set(Time::from_timestamp_nanos(2));
// cache the second entry and wait for it to complete, this will not evict the first entry
// as both can fit in the cache whose capacity is 32 bytes:
// as both can fit in the cache:
let (cache_request, notifier_rx) = CacheRequest::create(path_2.clone());
oracle.register(cache_request);
let _ = notifier_rx.await;
@ -607,6 +807,9 @@ mod tests {
assert_eq!(1, inner_store.get_request_count(&path_1));
assert_eq!(1, inner_store.get_request_count(&path_2));
// update time:
time_provider.set(Time::from_timestamp_nanos(3));
// GET the second entry and assert that it was retrieved from the cache, i.e., that the
// request counts do not change:
assert_payload_at_equals!(cached_store, payload_2, path_2);
@ -614,22 +817,29 @@ mod tests {
assert_eq!(1, inner_store.get_request_count(&path_1));
assert_eq!(1, inner_store.get_request_count(&path_2));
// update time:
time_provider.set(Time::from_timestamp_nanos(4));
// GET the first entry again and assert that it was retrieved from the cache as before. This
// will also update the LRU so that the first entry (janeway) was used more recently than the
// second entry (paris):
// will also update the hit count so that the first entry (janeway) was used more recently
// than the second entry (paris):
assert_payload_at_equals!(cached_store, payload_1, path_1);
assert_eq!(2, inner_store.total_get_request_count());
assert_eq!(1, inner_store.get_request_count(&path_1));
// PUT a third entry into the store:
let path_3 = Path::from("2.parquet"); // 9 bytes for the path
let payload_3 = b"Neelix"; // 6 bytes for the payload
let path_3 = Path::from("2.parquet");
let payload_3 = b"Neelix";
cached_store
.put(&path_3, PutPayload::from_static(payload_3))
.await
.unwrap();
// cache the third entry and wait for it to complete, this will evict paris from the cache
// as the LRU entry:
// update time:
time_provider.set(Time::from_timestamp_nanos(5));
// cache the third entry and wait for it to complete, this will push the cache past its
// capacity:
let (cache_request, notifier_rx) = CacheRequest::create(path_3.clone());
oracle.register(cache_request);
let _ = notifier_rx.await;
@ -639,6 +849,9 @@ mod tests {
assert_eq!(1, inner_store.get_request_count(&path_2));
assert_eq!(1, inner_store.get_request_count(&path_3));
// update time:
time_provider.set(Time::from_timestamp_nanos(6));
// GET the new entry from the strore, and check that it was served by the cache:
assert_payload_at_equals!(cached_store, payload_3, path_3);
assert_eq!(3, inner_store.total_get_request_count());
@ -646,6 +859,9 @@ mod tests {
assert_eq!(1, inner_store.get_request_count(&path_2));
assert_eq!(1, inner_store.get_request_count(&path_3));
// allow some time for pruning:
tokio::time::sleep(Duration::from_millis(100)).await;
// GET paris from the cached store, this will not be served by the cache, because paris was
// evicted by neelix:
assert_payload_at_equals!(cached_store, payload_2, path_2);
@ -655,12 +871,69 @@ mod tests {
assert_eq!(1, inner_store.get_request_count(&path_3));
}
#[tokio::test]
async fn cache_hit_while_fetching() {
// Create a test store with a barrier:
let to_store_notify = Arc::new(Notify::new());
let from_store_notify = Arc::new(Notify::new());
let inner_store = Arc::new(
TestObjectStore::new(Arc::new(InMemory::new()))
.with_notifies(Arc::clone(&to_store_notify), Arc::clone(&from_store_notify)),
);
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let (cached_store, oracle) = test_cached_obj_store_and_oracle(
Arc::clone(&inner_store) as _,
Arc::clone(&time_provider) as _,
);
// PUT an entry into the store:
let path = Path::from("0.parquet");
let payload = b"Picard";
cached_store
.put(&path, PutPayload::from_static(payload))
.await
.unwrap();
// cache the entry, but don't wait on it until below in spawned task:
let (cache_request, notifier_rx) = CacheRequest::create(path.clone());
oracle.register(cache_request);
// we are in the middle of a get request, i.e., the cache entry is "fetching"
// once this call to notified wakes:
let _ = from_store_notify.notified().await;
// spawn a thread to wake the in-flight get request initiated by the cache oracle
// after we have started a get request below, such that the get request below hits
// the cache while the entry is still "fetching" state:
let h = tokio::spawn(async move {
to_store_notify.notify_one();
let _ = notifier_rx.await;
});
// make the request to the store, which hits the cache in the "fetching" state
// since we haven't made the call to notify the store to continue yet:
assert_payload_at_equals!(cached_store, payload, path);
// drive the task to completion to ensure that the cache request has been fulfilled:
h.await.unwrap();
// there should only have been one request made, i.e., from the cache oracle:
assert_eq!(1, inner_store.total_get_request_count());
assert_eq!(1, inner_store.get_request_count(&path));
// make another request to the store, to be sure that it is in the cache:
assert_payload_at_equals!(cached_store, payload, path);
assert_eq!(1, inner_store.total_get_request_count());
assert_eq!(1, inner_store.get_request_count(&path));
}
type RequestCounter = RwLock<HashMap<Path, usize>>;
#[derive(Debug)]
struct TestObjectStore {
inner: Arc<dyn ObjectStore>,
get: RequestCounter,
notifies: Option<(Arc<Notify>, Arc<Notify>)>,
}
impl TestObjectStore {
@ -668,9 +941,15 @@ mod tests {
Self {
inner,
get: Default::default(),
notifies: None,
}
}
fn with_notifies(mut self, inbound: Arc<Notify>, outbound: Arc<Notify>) -> Self {
self.notifies = Some((inbound, outbound));
self
}
fn total_get_request_count(&self) -> usize {
self.get.read().iter().map(|(_, size)| size).sum()
}
@ -686,13 +965,6 @@ mod tests {
}
}
/// [`MemCachedObjectStore`] implements most [`ObjectStore`] methods as a pass-through, since
/// caching is decided externally. The exception is `delete`, which will have the entry removed
/// from the cache if the delete to the object store was successful.
///
/// GET-style methods will first check the cache for the object at the given path, before forwarding
/// to the inner [`ObjectStore`]. They do not, however, populate the cache after data has been fetched
/// from the inner store.
#[async_trait]
impl ObjectStore for TestObjectStore {
async fn put(&self, location: &Path, bytes: PutPayload) -> object_store::Result<PutResult> {
@ -725,6 +997,10 @@ mod tests {
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
*self.get.write().entry(location.clone()).or_insert(0) += 1;
if let Some((inbound, outbound)) = &self.notifies {
outbound.notify_one();
inbound.notified().await;
}
self.inner.get(location).await
}

View File

@ -570,12 +570,13 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn writes_data_to_wal_and_is_queryable() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let (object_store, parquet_cache) = test_cached_obj_store_and_oracle(object_store);
let time_provider: Arc<dyn TimeProvider> =
Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let (object_store, parquet_cache) =
test_cached_obj_store_and_oracle(object_store, Arc::clone(&time_provider));
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let catalog = persister.load_or_create_catalog().await.unwrap();
let last_cache = LastCacheProvider::new_from_catalog(&catalog.clone_inner()).unwrap();
let time_provider: Arc<dyn TimeProvider> =
Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let write_buffer = WriteBufferImpl::new(
Arc::clone(&persister),
Arc::new(catalog),
@ -1655,9 +1656,10 @@ mod tests {
object_store: Arc<dyn ObjectStore>,
wal_config: WalConfig,
) -> (WriteBufferImpl, IOxSessionContext) {
let (object_store, parquet_cache) = test_cached_obj_store_and_oracle(object_store);
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let time_provider: Arc<dyn TimeProvider> = Arc::new(MockProvider::new(start));
let (object_store, parquet_cache) =
test_cached_obj_store_and_oracle(object_store, Arc::clone(&time_provider));
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let catalog = persister.load_or_create_catalog().await.unwrap();
let last_cache = LastCacheProvider::new_from_catalog(&catalog.clone_inner()).unwrap();
let wbuf = WriteBufferImpl::new(