feat: memory-cached object store for parquet files (#25377)

Part of #25347 

This sets up a new implementation of an in-memory parquet file cache in the `influxdb3_write` crate in the `parquet_cache.rs` module.

This module introduces the following types:
* `MemCachedObjectStore` - a wrapper around an `Arc<dyn ObjectStore>` that can serve GET-style requests to the store from an in-memory cache
* `ParquetCacheOracle` - an interface (trait) that can accept requests to create new cache entries in the cache used by the `MemCachedObjectStore`
* `MemCacheOracle` - implementation of the `ParquetCacheOracle` trait

## `MemCachedObjectStore`

This takes inspiration from the [`MemCacheObjectStore` type](1eaa4ed5ea/object_store_mem_cache/src/store.rs (L205-L213)) in core, but has some different semantics around its implementation of the `ObjectStore` trait, and uses a different cache implementation.

The reason for wrapping the object store is that this ensures that any GET-style request being made for a given object is served by the cache, e.g., metadata requests made by DataFusion.

The internal cache comes from the [`clru` crate](https://crates.io/crates/clru), which provides a least-recently used (LRU) cache implementation that allows for weighted entries. The cache is initialized with a capacity and entries are given a weight on insert to the cache that represents how much of the allotted capacity they will take up. If there isn't enough room for a new entry on insert, then the LRU item will be removed.

### Limitations of `clru`

The `clru` crate conveniently gives us an LRU eviction policy but its API may put some limitations on the system:
* gets to the cache require an `&mut` reference, which means that the cache needs to be behind a `Mutex`. If this slows down requests through the object store, then we may need to explore alternatives.
* we may want more sophisticated eviction policies than a straight LRU, i.e., to favour certain tables over others, or files that represent recent data over those that represent old data.

## `ParquetCacheOracle` / `MemCacheOracle`

The cache oracle is responsible for handling cache requests, i.e., to fetch an item and store it in the cache. In this PR, the oracle runs a background task to handle these requests. I defined this as a trait/struct pair since the implementation may look different in Pro vs. OSS.
praveen/telemetry-static-values
Trevor Hilton 2024-09-24 10:58:15 -04:00 committed by GitHub
parent c1a5e1b5fd
commit 9c71b3ce25
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 894 additions and 108 deletions

8
Cargo.lock generated
View File

@ -966,6 +966,12 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "clru"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbd0f76e066e64fdc5631e3bb46381254deab9ef1158292f27c8c57e3bf3fe59"
[[package]]
name = "colorchoice"
version = "1.0.2"
@ -2818,11 +2824,13 @@ dependencies = [
"byteorder",
"bytes",
"chrono",
"clru",
"crc32fast",
"crossbeam-channel",
"data_types",
"datafusion",
"datafusion_util",
"futures",
"futures-util",
"hashbrown 0.14.5",
"hex",

View File

@ -51,11 +51,13 @@ byteorder = "1.3.4"
bytes = "1.5"
chrono = "0.4"
clap = { version = "4", features = ["derive", "env", "string"] }
clru = "0.6.2"
crc32fast = "1.2.0"
crossbeam-channel = "0.5.11"
csv = "1.3.0"
datafusion = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "5de0c3577fd30dcf9213f428222a29efae789807" }
datafusion-proto = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "5de0c3577fd30dcf9213f428222a29efae789807" }
csv = "1.3.0"
dashmap = "6.1.0"
dotenvy = "0.15.7"
flate2 = "1.0.27"
futures = "0.3.28"

View File

@ -16,7 +16,8 @@ use influxdb3_server::{
};
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
last_cache::LastCacheProvider, persister::Persister, write_buffer::WriteBufferImpl, WriteBuffer,
last_cache::LastCacheProvider, parquet_cache::create_cached_obj_store_and_oracle,
persister::Persister, write_buffer::WriteBufferImpl, WriteBuffer,
};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_time::SystemProvider;
@ -258,6 +259,10 @@ pub async fn command(config: Config) -> Result<()> {
let object_store: Arc<DynObjectStore> =
make_object_store(&config.object_store_config).map_err(Error::ObjectStoreParsing)?;
// TODO(trevor): make this configurable/optional:
let cache_capacity = 1024 * 1024 * 1024;
let (object_store, parquet_cache) =
create_cached_obj_store_and_oracle(object_store, cache_capacity);
let trace_exporter = config.tracing_config.build()?;
@ -334,6 +339,7 @@ pub async fn command(config: Config) -> Result<()> {
Arc::<SystemProvider>::clone(&time_provider),
Arc::clone(&exec),
wal_config,
parquet_cache,
)
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?,

View File

@ -230,6 +230,7 @@ mod tests {
use influxdb3_catalog::catalog::Catalog;
use influxdb3_wal::WalConfig;
use influxdb3_write::last_cache::LastCacheProvider;
use influxdb3_write::parquet_cache::test_cached_obj_store_and_oracle;
use influxdb3_write::persister::Persister;
use influxdb3_write::WriteBuffer;
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
@ -744,6 +745,7 @@ mod tests {
let common_state =
crate::CommonServerState::new(Arc::clone(&metrics), None, trace_header_parser).unwrap();
let object_store: Arc<DynObjectStore> = Arc::new(object_store::memory::InMemory::new());
let (object_store, parquet_cache) = test_cached_obj_store_and_oracle(object_store);
let parquet_store =
ParquetStorage::new(Arc::clone(&object_store), StorageId::from("influxdb3"));
let exec = Arc::new(Executor::new_with_config_and_executor(
@ -771,6 +773,7 @@ mod tests {
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&exec),
WalConfig::test_config(),
parquet_cache,
)
.await
.unwrap(),

View File

@ -591,8 +591,8 @@ mod tests {
use influxdb3_catalog::catalog::Catalog;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
last_cache::LastCacheProvider, persister::Persister, write_buffer::WriteBufferImpl,
WriteBuffer,
last_cache::LastCacheProvider, parquet_cache::test_cached_obj_store_and_oracle,
persister::Persister, write_buffer::WriteBufferImpl, WriteBuffer,
};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_time::{MockProvider, Time};
@ -630,9 +630,10 @@ mod tests {
// Set up QueryExecutor
let object_store: Arc<dyn ObjectStore> =
Arc::new(LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap());
let (object_store, parquet_cache) = test_cached_obj_store_and_oracle(object_store);
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let executor = make_exec(object_store);
let executor = make_exec(Arc::clone(&object_store));
let host_id = Arc::from("dummy-host-id");
let instance_id = Arc::from("instance-id");
let write_buffer: Arc<dyn WriteBuffer> = Arc::new(
@ -648,6 +649,7 @@ mod tests {
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
parquet_cache,
)
.await
.unwrap(),

View File

@ -29,9 +29,11 @@ async-trait.workspace = true
byteorder.workspace = true
bytes.workspace = true
chrono.workspace = true
clru.workspace = true
crc32fast.workspace = true
crossbeam-channel.workspace = true
datafusion.workspace = true
futures.workspace = true
futures-util.workspace = true
hashbrown.workspace = true
hex.workspace = true

View File

@ -1567,6 +1567,7 @@ mod tests {
use crate::{
last_cache::{KeyValue, LastCacheProvider, Predicate, DEFAULT_CACHE_TTL},
parquet_cache::test_cached_obj_store_and_oracle,
persister::Persister,
write_buffer::WriteBufferImpl,
Bufferer, LastCacheManager, Precision,
@ -1582,6 +1583,7 @@ mod tests {
async fn setup_write_buffer() -> WriteBufferImpl {
let obj_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let (obj_store, parquet_cache) = test_cached_obj_store_and_oracle(obj_store);
let persister = Arc::new(Persister::new(obj_store, "test_host"));
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let host_id = Arc::from("dummy-host-id");
@ -1593,6 +1595,7 @@ mod tests {
time_provider,
crate::test_help::make_exec(),
WalConfig::test_config(),
parquet_cache,
)
.await
.unwrap()

View File

@ -7,6 +7,7 @@
pub mod cache;
pub mod chunk;
pub mod last_cache;
pub mod parquet_cache;
pub mod paths;
pub mod persister;
pub mod write_buffer;

View File

@ -0,0 +1,806 @@
//! An in-memory cache of Parquet files that are persisted to object storage
use std::{
fmt::Debug, hash::RandomState, num::NonZeroUsize, ops::Range, sync::Arc, time::Duration,
};
use async_trait::async_trait;
use bytes::Bytes;
use clru::{CLruCache, CLruCacheConfig, WeightScale};
use futures::{StreamExt, TryStreamExt};
use futures_util::stream::BoxStream;
use object_store::{
path::Path, Error, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload,
ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult,
Result as ObjectStoreResult,
};
use observability_deps::tracing::{error, info};
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
oneshot, Mutex,
};
/// A request to fetch an item at the given `path` from an object store
///
/// Contains a notifier to notify the caller that registers the cache request when the item
/// has been cached successfully (or if the cache request failed in some way)
pub struct CacheRequest {
path: Path,
notifier: oneshot::Sender<()>,
}
impl CacheRequest {
/// Create a new [`CacheRequest`] along with a receiver to catch the notify message when
/// the cache request has been fulfilled.
pub fn create(path: Path) -> (Self, oneshot::Receiver<()>) {
let (notifier, receiver) = oneshot::channel();
(Self { path, notifier }, receiver)
}
}
/// An interface for interacting with a Parquet Cache by registering [`CacheRequest`]s to it.
pub trait ParquetCacheOracle: Send + Sync + Debug {
/// Register a cache request with the oracle
fn register(&self, cache_request: CacheRequest);
}
/// Concrete implementation of the [`ParquetCacheOracle`]
///
/// This implementation sends all requests registered to be cached.
#[derive(Debug, Clone)]
pub struct MemCacheOracle {
cache_request_tx: Sender<CacheRequest>,
}
// TODO(trevor): make this configurable with reasonable default
const CACHE_REQUEST_BUFFER_SIZE: usize = 1_000_000;
impl MemCacheOracle {
/// Create a new [`MemCacheOracle`]
///
/// This spawns two background tasks:
/// * one to handle registered [`CacheRequest`]s
/// * one to prune deleted and un-needed cache entries on an interval
// TODO(trevor): this should be more configurable, e.g., channel size, prune interval
fn new(mem_cached_store: Arc<MemCachedObjectStore>) -> Self {
let (cache_request_tx, cache_request_rx) = channel(CACHE_REQUEST_BUFFER_SIZE);
background_cache_request_handler(Arc::clone(&mem_cached_store), cache_request_rx);
background_cache_pruner(mem_cached_store);
Self { cache_request_tx }
}
}
impl ParquetCacheOracle for MemCacheOracle {
fn register(&self, request: CacheRequest) {
let tx = self.cache_request_tx.clone();
tokio::spawn(async move {
if let Err(error) = tx.send(request).await {
error!(%error, "error registering cache request");
};
});
}
}
/// Helper function for creation of a [`MemCachedObjectStore`] and [`MemCacheOracle`]
/// that returns them as their `Arc<dyn _>` equivalent.
pub fn create_cached_obj_store_and_oracle(
object_store: Arc<dyn ObjectStore>,
cache_capacity: usize,
) -> (Arc<dyn ObjectStore>, Arc<dyn ParquetCacheOracle>) {
let store = Arc::new(MemCachedObjectStore::new(object_store, cache_capacity));
let oracle = Arc::new(MemCacheOracle::new(Arc::clone(&store)));
(store, oracle)
}
/// Create a test cached object store with a cache capacity of 1GB
pub fn test_cached_obj_store_and_oracle(
object_store: Arc<dyn ObjectStore>,
) -> (Arc<dyn ObjectStore>, Arc<dyn ParquetCacheOracle>) {
create_cached_obj_store_and_oracle(object_store, 1024 * 1024 * 1024)
}
/// An entry in the cache, containing the actual bytes as well as object store metadata
#[derive(Debug)]
struct CacheValue {
data: Bytes,
meta: ObjectMeta,
}
impl CacheValue {
/// Get the size of the cache value's memory footprint in bytes
fn size(&self) -> usize {
// TODO(trevor): could also calculate the size of the metadata...
self.data.len()
}
}
/// The state of a cache entry
#[derive(Debug)]
enum CacheEntry {
/// The cache entry is being fetched from object store
Fetching,
/// The cache entry was successfully fetched and is stored in the cache as a [`CacheValue`]
Success(Arc<CacheValue>),
/// The request to the object store failed
Failed,
/// The cache entry was deleted
Deleted,
/// The object is too large for the cache
TooLarge,
}
impl CacheEntry {
/// Get the size of thje cache entry in bytes
fn size(&self) -> usize {
match self {
CacheEntry::Fetching => 0,
CacheEntry::Success(v) => v.size(),
CacheEntry::Failed => 0,
CacheEntry::Deleted => 0,
CacheEntry::TooLarge => 0,
}
}
fn is_fetching(&self) -> bool {
matches!(self, CacheEntry::Fetching)
}
fn is_success(&self) -> bool {
matches!(self, CacheEntry::Success(_))
}
fn keep(&self) -> bool {
self.is_fetching() || self.is_success()
}
}
/// Implements the [`WeightScale`] trait to determine a [`CacheEntry`]'s size on insertion to
/// the cache
#[derive(Debug)]
struct CacheEntryScale;
impl WeightScale<Path, CacheEntry> for CacheEntryScale {
fn weight(&self, key: &Path, value: &CacheEntry) -> usize {
key.as_ref().len() + value.size()
}
}
/// Placeholder name for formatting datafusion errors
const STORE_NAME: &str = "mem_cached_object_store";
/// An object store with an associated cache that can serve GET-style requests using the cache
///
/// The least-recently used (LRU) entries will be evicted when new entries are inserted, if the
/// new entry would exceed the cache's memory capacity
#[derive(Debug)]
pub struct MemCachedObjectStore {
/// An inner object store for which items will be cached
inner: Arc<dyn ObjectStore>,
/// A weighted LRU cache for storing the objects associated with a given path in memory
// NOTE(trevor): this uses a mutex as the CLruCache type needs &mut self for its get method, so
// we always need an exclusive lock on the cache. If this creates a performance bottleneck then
// we will need to look for alternatives.
//
// A Tokio mutex is used to prevent blocking the thread while waiting for a lock, and so that
// the lock can be held accross await points.
cache: Arc<Mutex<CLruCache<Path, CacheEntry, RandomState, CacheEntryScale>>>,
}
impl MemCachedObjectStore {
/// Create a new [`MemCachedObjectStore`] with the given memory capacity
fn new(inner: Arc<dyn ObjectStore>, memory_capacity: usize) -> Self {
let cache = CLruCache::with_config(
CLruCacheConfig::new(NonZeroUsize::new(memory_capacity).unwrap())
.with_scale(CacheEntryScale),
);
Self {
inner,
cache: Arc::new(Mutex::new(cache)),
}
}
/// Get an entry in the cache if it contains a successful fetch result, or `None` otherwise
///
/// This requires `&mut self` as the underlying method on the cache requires a mutable reference
/// in order to update the recency of the entry in the cache
async fn get_cache_value(&self, path: &Path) -> Option<Arc<CacheValue>> {
self.cache
.lock()
.await
.get(path)
.and_then(|entry| match entry {
CacheEntry::Fetching
| CacheEntry::Failed
| CacheEntry::Deleted
| CacheEntry::TooLarge => None,
CacheEntry::Success(v) => Some(Arc::clone(v)),
})
}
/// Set the state of a cache entry to `Deleted`, since we cannot remove elements from the
/// cache directly.
async fn delete_cache_value(&self, path: &Path) {
let _ = self
.cache
.lock()
.await
.put_with_weight(path.clone(), CacheEntry::Deleted);
}
}
impl std::fmt::Display for MemCachedObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "MemCachedObjectStore({})", self.inner)
}
}
/// [`MemCachedObjectStore`] implements most [`ObjectStore`] methods as a pass-through, since
/// caching is decided externally. The exception is `delete`, which will have the entry removed
/// from the cache if the delete to the object store was successful.
///
/// GET-style methods will first check the cache for the object at the given path, before forwarding
/// to the inner [`ObjectStore`]. They do not, however, populate the cache after data has been fetched
/// from the inner store.
#[async_trait]
impl ObjectStore for MemCachedObjectStore {
async fn put(&self, location: &Path, bytes: PutPayload) -> ObjectStoreResult<PutResult> {
self.inner.put(location, bytes).await
}
async fn put_opts(
&self,
location: &Path,
bytes: PutPayload,
opts: PutOptions,
) -> ObjectStoreResult<PutResult> {
self.inner.put_opts(location, bytes, opts).await
}
async fn put_multipart(&self, location: &Path) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
self.inner.put_multipart(location).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOpts,
) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
self.inner.put_multipart_opts(location, opts).await
}
/// Get an object from the object store. If this object is cached, then it will not make a request
/// to the inner object store.
async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
if let Some(v) = self.get_cache_value(location).await {
Ok(GetResult {
payload: GetResultPayload::Stream(
futures::stream::iter([Ok(v.data.clone())]).boxed(),
),
meta: v.meta.clone(),
range: 0..v.data.len(),
attributes: Default::default(),
})
} else {
self.inner.get(location).await
}
}
async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
// NOTE(trevor): this could probably be supported through the cache if we need it via the
// ObjectMeta stored in the cache. For now this is conservative:
self.inner.get_opts(location, options).await
}
async fn get_range(&self, location: &Path, range: Range<usize>) -> ObjectStoreResult<Bytes> {
Ok(self
.get_ranges(location, &[range])
.await?
.into_iter()
.next()
.expect("requested one range"))
}
/// This request is used by DataFusion when requesting metadata for Parquet files, so we need
/// to use the cache to prevent excess network calls during query planning.
async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
) -> ObjectStoreResult<Vec<Bytes>> {
if let Some(v) = self.get_cache_value(location).await {
ranges
.iter()
.map(|range| {
if range.end > v.data.len() {
return Err(Error::Generic {
store: STORE_NAME,
source: format!(
"Range end ({}) out of bounds, object size is {}",
range.end,
v.data.len()
)
.into(),
});
}
if range.start > range.end {
return Err(Error::Generic {
store: STORE_NAME,
source: format!(
"Range end ({}) is before range start ({})",
range.end, range.start
)
.into(),
});
}
Ok(v.data.slice(range.clone()))
})
.collect()
} else {
self.inner.get_ranges(location, ranges).await
}
}
async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
if let Some(v) = self.get_cache_value(location).await {
Ok(v.meta.clone())
} else {
self.inner.head(location).await
}
}
/// Delete an object on object store, but also remove it from the cache.
async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
let result = self.inner.delete(location).await?;
self.delete_cache_value(location).await;
Ok(result)
}
fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, ObjectStoreResult<Path>>,
) -> BoxStream<'a, ObjectStoreResult<Path>> {
locations
.and_then(|_| futures::future::err(Error::NotImplemented))
.boxed()
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
self.inner.list(prefix)
}
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, ObjectStoreResult<ObjectMeta>> {
self.inner.list_with_offset(prefix, offset)
}
async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
self.inner.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
self.inner.copy(from, to).await
}
async fn rename(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
self.inner.rename(from, to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
self.inner.copy_if_not_exists(from, to).await
}
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
self.inner.rename_if_not_exists(from, to).await
}
}
/// Handle [`CacheRequest`]s in a background task
///
/// This waits on the given `Receiver` for new cache requests to be registered, i.e., via the oracle.
/// If a cache request is received for an entry that has not already been fetched successfully, or
/// one that is in the process of being fetched, then this will spin a separate background task to
/// fetch the object from object store and update the cache. This is so that cache requests can be
/// handled in parallel.
fn background_cache_request_handler(
mem_store: Arc<MemCachedObjectStore>,
mut rx: Receiver<CacheRequest>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(CacheRequest { path, notifier }) = rx.recv().await {
// clone the path before acquiring the lock:
let path_cloned = path.clone();
// Check that the cache does not already contain an entry for the provide path, or that
// it is not already in the process of fetching the given path:
let mut cache_lock = mem_store.cache.lock().await;
if cache_lock
.get(&path)
.is_some_and(|entry| entry.is_fetching() || entry.is_success())
{
continue;
}
// Put a `Fetching` state in the entry to prevent concurrent requests to the same path:
let _ = cache_lock.put_with_weight(path_cloned, CacheEntry::Fetching);
// Drop the lock before spawning the task below
drop(cache_lock);
let mem_store_captured = Arc::clone(&mem_store);
tokio::spawn(async move {
let cache_insertion_result = match mem_store_captured.inner.get(&path).await {
Ok(result) => {
let meta = result.meta.clone();
match result.bytes().await {
Ok(data) => mem_store_captured.cache.lock().await.put_with_weight(
path,
CacheEntry::Success(Arc::new(CacheValue { data, meta })),
),
Err(error) => {
error!(%error, "failed to retrieve payload from object store get result");
mem_store_captured
.cache
.lock()
.await
.put_with_weight(path, CacheEntry::Failed)
}
}
}
Err(error) => {
error!(%error, "failed to fulfill cache request with object store");
mem_store_captured
.cache
.lock()
.await
.put_with_weight(path, CacheEntry::Failed)
}
};
// If an entry would not fit in the cache at all, the put_with_weight method returns
// it as an Err from above, and we would not have cleared the `Fetching` entry, so
// we need to do that here:
if let Err((k, _)) = cache_insertion_result {
mem_store_captured
.cache
.lock()
.await
.put_with_weight(k, CacheEntry::TooLarge)
.expect("cache capacity is too small");
}
// notify that the cache request has been fulfilled:
let _ = notifier.send(());
});
}
info!("cache request handler closed");
})
}
/// A background task for pruning un-needed entries in the cache
// TODO(trevor): the interval could be configurable
fn background_cache_pruner(mem_store: Arc<MemCachedObjectStore>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
mem_store.cache.lock().await.retain(|_, entry| entry.keep());
}
})
}
#[cfg(test)]
mod tests {
use std::{ops::Range, sync::Arc};
use arrow::datatypes::ToByteSlice;
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
use hashbrown::HashMap;
use object_store::{
memory::InMemory, path::Path, GetOptions, GetResult, ListResult, MultipartUpload,
ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult,
};
use parking_lot::RwLock;
use pretty_assertions::assert_eq;
use crate::parquet_cache::{
create_cached_obj_store_and_oracle, test_cached_obj_store_and_oracle, CacheRequest,
};
macro_rules! assert_payload_at_equals {
($store:ident, $expected:ident, $path:ident) => {
assert_eq!(
$expected,
$store
.get(&$path)
.await
.unwrap()
.bytes()
.await
.unwrap()
.to_byte_slice()
)
};
}
#[tokio::test]
async fn hit_cache_instead_of_object_store() {
// set up the inner test object store and then wrap it with the mem cached store:
let inner_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new())));
let (cached_store, oracle) =
test_cached_obj_store_and_oracle(Arc::clone(&inner_store) as _);
// PUT a paylaod into the object store through the outer mem cached store:
let path = Path::from("0.parquet");
let payload = b"hello world";
cached_store
.put(&path, PutPayload::from_static(payload))
.await
.unwrap();
// GET the payload from the object store before caching:
assert_payload_at_equals!(cached_store, payload, path);
assert_eq!(1, inner_store.total_get_request_count());
assert_eq!(1, inner_store.get_request_count(&path));
// cache the entry:
let (cache_request, notifier_rx) = CacheRequest::create(path.clone());
oracle.register(cache_request);
// wait for cache notify:
let _ = notifier_rx.await;
// another request to inner store should have been made:
assert_eq!(2, inner_store.total_get_request_count());
assert_eq!(2, inner_store.get_request_count(&path));
// get the payload from the outer store again:
assert_payload_at_equals!(cached_store, payload, path);
// should hit the cache this time, so the inner store should not have been hit, and counts
// should therefore be same as previous:
assert_eq!(2, inner_store.total_get_request_count());
assert_eq!(2, inner_store.get_request_count(&path));
}
#[tokio::test]
async fn cache_evicts_lru_when_full() {
let inner_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new())));
let cache_capacity_bytes = 32;
let (cached_store, oracle) =
create_cached_obj_store_and_oracle(Arc::clone(&inner_store) as _, cache_capacity_bytes);
// PUT an entry into the store:
let path_1 = Path::from("0.parquet"); // 9 bytes for path
let payload_1 = b"Janeway"; // 7 bytes for payload
cached_store
.put(&path_1, PutPayload::from_static(payload_1))
.await
.unwrap();
// cache the entry and wait for it to complete:
let (cache_request, notifier_rx) = CacheRequest::create(path_1.clone());
oracle.register(cache_request);
let _ = notifier_rx.await;
// there will have been one get request made by the cache oracle:
assert_eq!(1, inner_store.total_get_request_count());
assert_eq!(1, inner_store.get_request_count(&path_1));
// GET the entry to check its there and was retrieved from cache, i.e., that the request
// counts do not change:
assert_payload_at_equals!(cached_store, payload_1, path_1);
assert_eq!(1, inner_store.total_get_request_count());
assert_eq!(1, inner_store.get_request_count(&path_1));
// PUT a second entry into the store:
let path_2 = Path::from("1.parquet"); // 9 bytes for path
let payload_2 = b"Paris"; // 5 bytes for payload
cached_store
.put(&path_2, PutPayload::from_static(payload_2))
.await
.unwrap();
// cache the second entry and wait for it to complete, this will not evict the first entry
// as both can fit in the cache whose capacity is 32 bytes:
let (cache_request, notifier_rx) = CacheRequest::create(path_2.clone());
oracle.register(cache_request);
let _ = notifier_rx.await;
// will have another request for the second path to the inner store, by the oracle:
assert_eq!(2, inner_store.total_get_request_count());
assert_eq!(1, inner_store.get_request_count(&path_1));
assert_eq!(1, inner_store.get_request_count(&path_2));
// GET the second entry and assert that it was retrieved from the cache, i.e., that the
// request counts do not change:
assert_payload_at_equals!(cached_store, payload_2, path_2);
assert_eq!(2, inner_store.total_get_request_count());
assert_eq!(1, inner_store.get_request_count(&path_1));
assert_eq!(1, inner_store.get_request_count(&path_2));
// GET the first entry again and assert that it was retrieved from the cache as before. This
// will also update the LRU so that the first entry (janeway) was used more recently than the
// second entry (paris):
assert_payload_at_equals!(cached_store, payload_1, path_1);
assert_eq!(2, inner_store.total_get_request_count());
assert_eq!(1, inner_store.get_request_count(&path_1));
// PUT a third entry into the store:
let path_3 = Path::from("2.parquet"); // 9 bytes for the path
let payload_3 = b"Neelix"; // 6 bytes for the payload
cached_store
.put(&path_3, PutPayload::from_static(payload_3))
.await
.unwrap();
// cache the third entry and wait for it to complete, this will evict paris from the cache
// as the LRU entry:
let (cache_request, notifier_rx) = CacheRequest::create(path_3.clone());
oracle.register(cache_request);
let _ = notifier_rx.await;
// will now have another request for the third path to the inner store, by the oracle:
assert_eq!(3, inner_store.total_get_request_count());
assert_eq!(1, inner_store.get_request_count(&path_1));
assert_eq!(1, inner_store.get_request_count(&path_2));
assert_eq!(1, inner_store.get_request_count(&path_3));
// GET the new entry from the strore, and check that it was served by the cache:
assert_payload_at_equals!(cached_store, payload_3, path_3);
assert_eq!(3, inner_store.total_get_request_count());
assert_eq!(1, inner_store.get_request_count(&path_1));
assert_eq!(1, inner_store.get_request_count(&path_2));
assert_eq!(1, inner_store.get_request_count(&path_3));
// GET paris from the cached store, this will not be served by the cache, because paris was
// evicted by neelix:
assert_payload_at_equals!(cached_store, payload_2, path_2);
assert_eq!(4, inner_store.total_get_request_count());
assert_eq!(1, inner_store.get_request_count(&path_1));
assert_eq!(2, inner_store.get_request_count(&path_2));
assert_eq!(1, inner_store.get_request_count(&path_3));
}
type RequestCounter = RwLock<HashMap<Path, usize>>;
#[derive(Debug)]
struct TestObjectStore {
inner: Arc<dyn ObjectStore>,
get: RequestCounter,
}
impl TestObjectStore {
fn new(inner: Arc<dyn ObjectStore>) -> Self {
Self {
inner,
get: Default::default(),
}
}
fn total_get_request_count(&self) -> usize {
self.get.read().iter().map(|(_, size)| size).sum()
}
fn get_request_count(&self, path: &Path) -> usize {
self.get.read().get(path).copied().unwrap_or(0)
}
}
impl std::fmt::Display for TestObjectStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TestObjectStore({})", self.inner)
}
}
/// [`MemCachedObjectStore`] implements most [`ObjectStore`] methods as a pass-through, since
/// caching is decided externally. The exception is `delete`, which will have the entry removed
/// from the cache if the delete to the object store was successful.
///
/// GET-style methods will first check the cache for the object at the given path, before forwarding
/// to the inner [`ObjectStore`]. They do not, however, populate the cache after data has been fetched
/// from the inner store.
#[async_trait]
impl ObjectStore for TestObjectStore {
async fn put(&self, location: &Path, bytes: PutPayload) -> object_store::Result<PutResult> {
self.inner.put(location, bytes).await
}
async fn put_opts(
&self,
location: &Path,
bytes: PutPayload,
opts: PutOptions,
) -> object_store::Result<PutResult> {
self.inner.put_opts(location, bytes, opts).await
}
async fn put_multipart(
&self,
location: &Path,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart(location).await
}
async fn put_multipart_opts(
&self,
location: &Path,
opts: PutMultipartOpts,
) -> object_store::Result<Box<dyn MultipartUpload>> {
self.inner.put_multipart_opts(location, opts).await
}
async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
*self.get.write().entry(location.clone()).or_insert(0) += 1;
self.inner.get(location).await
}
async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
self.inner.get_opts(location, options).await
}
async fn get_range(
&self,
location: &Path,
range: Range<usize>,
) -> object_store::Result<Bytes> {
self.inner.get_range(location, range).await
}
async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
) -> object_store::Result<Vec<Bytes>> {
self.inner.get_ranges(location, ranges).await
}
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
self.inner.head(location).await
}
/// Delete an object on object store, but also remove it from the cache.
async fn delete(&self, location: &Path) -> object_store::Result<()> {
self.inner.delete(location).await
}
fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, object_store::Result<Path>>,
) -> BoxStream<'a, object_store::Result<Path>> {
self.inner.delete_stream(locations)
}
fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
self.inner.list(prefix)
}
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
self.inner.list_with_offset(prefix, offset)
}
async fn list_with_delimiter(
&self,
prefix: Option<&Path>,
) -> object_store::Result<ListResult> {
self.inner.list_with_delimiter(prefix).await
}
async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.copy(from, to).await
}
async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.rename(from, to).await
}
async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.copy_if_not_exists(from, to).await
}
async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
self.inner.rename_if_not_exists(from, to).await
}
}
}

View File

@ -5,9 +5,9 @@ pub mod queryable_buffer;
mod table_buffer;
pub(crate) mod validator;
use crate::cache::ParquetCache;
use crate::chunk::ParquetChunk;
use crate::last_cache::{self, CreateCacheArguments, LastCacheProvider};
use crate::parquet_cache::ParquetCacheOracle;
use crate::persister::Persister;
use crate::write_buffer::persisted_files::PersistedFiles;
use crate::write_buffer::queryable_buffer::QueryableBuffer;
@ -22,7 +22,6 @@ use datafusion::catalog::Session;
use datafusion::common::DataFusionError;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::SendableRecordBatchStream;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_wal::object_store::WalObjectStore;
use influxdb3_wal::CatalogOp::CreateLastCache;
@ -106,7 +105,10 @@ pub struct WriteRequest<'a> {
pub struct WriteBufferImpl {
catalog: Arc<Catalog>,
persister: Arc<Persister>,
parquet_cache: Arc<ParquetCache>,
// NOTE(trevor): the parquet cache interface may be used to register other cache
// requests from the write buffer, e.g., during query...
#[allow(dead_code)]
parquet_cache: Arc<dyn ParquetCacheOracle>,
persisted_files: Arc<PersistedFiles>,
buffer: Arc<QueryableBuffer>,
wal_config: WalConfig,
@ -126,6 +128,7 @@ impl WriteBufferImpl {
time_provider: Arc<dyn TimeProvider>,
executor: Arc<iox_query::exec::Executor>,
wal_config: WalConfig,
parquet_cache: Arc<dyn ParquetCacheOracle>,
) -> Result<Self> {
// load snapshots and replay the wal into the in memory buffer
let persisted_snapshots = persister
@ -159,6 +162,7 @@ impl WriteBufferImpl {
Arc::clone(&persister),
Arc::clone(&last_cache),
Arc::clone(&persisted_files),
Arc::clone(&parquet_cache),
));
// create the wal instance, which will replay into the queryable buffer and start
@ -175,7 +179,7 @@ impl WriteBufferImpl {
Ok(Self {
catalog,
parquet_cache: Arc::new(ParquetCache::new(&persister.mem_pool)),
parquet_cache,
persister,
wal_config,
wal,
@ -329,94 +333,8 @@ impl WriteBufferImpl {
chunks.push(Arc::new(parquet_chunk));
}
// Get any cached files and add them to the query
// This is mostly the same as above, but we change the object store to
// point to the in memory cache
for parquet_file in self
.parquet_cache
.get_parquet_files(database_name, table_name)
{
let partition_key = data_types::PartitionKey::from(parquet_file.path.clone());
let partition_id = data_types::partition::TransitionPartitionId::new(
data_types::TableId::new(0),
&partition_key,
);
let chunk_stats = create_chunk_statistics(
Some(parquet_file.row_count as usize),
&table_schema,
Some(parquet_file.timestamp_min_max()),
&NoColumnRanges,
);
let location = ObjPath::from(parquet_file.path.clone());
let parquet_exec = ParquetExecInput {
object_store_url: self.persister.object_store_url().clone(),
object_meta: ObjectMeta {
location,
last_modified: Default::default(),
size: parquet_file.size_bytes as usize,
e_tag: None,
version: None,
},
object_store: Arc::clone(&self.parquet_cache.object_store()),
};
let parquet_chunk = ParquetChunk {
schema: table_schema.clone(),
stats: Arc::new(chunk_stats),
partition_id,
sort_key: None,
id: ChunkId::new(),
chunk_order: ChunkOrder::new(chunk_order),
parquet_exec,
};
chunk_order += 1;
chunks.push(Arc::new(parquet_chunk));
}
Ok(chunks)
}
pub async fn cache_parquet(
&self,
db_name: &str,
table_name: &str,
min_time: i64,
max_time: i64,
records: SendableRecordBatchStream,
) -> Result<(), Error> {
Ok(self
.parquet_cache
.persist_parquet_file(db_name, table_name, min_time, max_time, records, None)
.await?)
}
pub async fn update_parquet(
&self,
db_name: &str,
table_name: &str,
min_time: i64,
max_time: i64,
path: ObjPath,
records: SendableRecordBatchStream,
) -> Result<(), Error> {
Ok(self
.parquet_cache
.persist_parquet_file(db_name, table_name, min_time, max_time, records, Some(path))
.await?)
}
pub async fn remove_parquet(&self, path: ObjPath) -> Result<(), Error> {
Ok(self.parquet_cache.remove_parquet_file(path).await?)
}
pub async fn purge_cache(&self) -> Result<(), Error> {
Ok(self.parquet_cache.purge_cache().await?)
}
}
pub fn parquet_chunk_from_file(
@ -607,6 +525,7 @@ impl WriteBuffer for WriteBufferImpl {}
#[allow(clippy::await_holding_lock)]
mod tests {
use super::*;
use crate::parquet_cache::test_cached_obj_store_and_oracle;
use crate::paths::{CatalogFilePath, SnapshotInfoFilePath};
use crate::persister::Persister;
use crate::PersistedSnapshot;
@ -651,6 +570,7 @@ mod tests {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn writes_data_to_wal_and_is_queryable() {
let object_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let (object_store, parquet_cache) = test_cached_obj_store_and_oracle(object_store);
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let catalog = persister.load_or_create_catalog().await.unwrap();
let last_cache = LastCacheProvider::new_from_catalog(&catalog.clone_inner()).unwrap();
@ -663,6 +583,7 @@ mod tests {
Arc::clone(&time_provider),
crate::test_help::make_exec(),
WalConfig::test_config(),
Arc::clone(&parquet_cache),
)
.await
.unwrap();
@ -741,6 +662,7 @@ mod tests {
flush_interval: Duration::from_millis(50),
snapshot_size: 100,
},
Arc::clone(&parquet_cache),
)
.await
.unwrap();
@ -751,9 +673,10 @@ mod tests {
#[tokio::test]
async fn last_cache_create_and_delete_is_durable() {
let obj_store: Arc<dyn ObjectStore> = Arc::new(InMemory::new());
let (wbuf, _ctx) = setup(
Time::from_timestamp_nanos(0),
Arc::new(InMemory::new()),
Arc::clone(&obj_store),
WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
@ -795,6 +718,7 @@ mod tests {
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
Arc::clone(&wbuf.parquet_cache),
)
.await
.unwrap();
@ -832,6 +756,7 @@ mod tests {
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
Arc::clone(&wbuf.parquet_cache),
)
.await
.unwrap();
@ -888,6 +813,7 @@ mod tests {
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
Arc::clone(&wbuf.parquet_cache),
)
.await
.unwrap();
@ -1041,6 +967,7 @@ mod tests {
flush_interval: Duration::from_millis(10),
snapshot_size: 2,
},
Arc::clone(&write_buffer.parquet_cache),
)
.await
.unwrap();
@ -1728,6 +1655,7 @@ mod tests {
object_store: Arc<dyn ObjectStore>,
wal_config: WalConfig,
) -> (WriteBufferImpl, IOxSessionContext) {
let (object_store, parquet_cache) = test_cached_obj_store_and_oracle(object_store);
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let time_provider: Arc<dyn TimeProvider> = Arc::new(MockProvider::new(start));
let catalog = persister.load_or_create_catalog().await.unwrap();
@ -1739,6 +1667,7 @@ mod tests {
Arc::clone(&time_provider),
crate::test_help::make_exec(),
wal_config,
parquet_cache,
)
.await
.unwrap();

View File

@ -1,5 +1,6 @@
use crate::chunk::BufferChunk;
use crate::last_cache::LastCacheProvider;
use crate::parquet_cache::{CacheRequest, ParquetCacheOracle};
use crate::paths::ParquetFilePath;
use crate::persister::Persister;
use crate::write_buffer::persisted_files::PersistedFiles;
@ -21,6 +22,7 @@ use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges};
use iox_query::exec::Executor;
use iox_query::frontend::reorg::ReorgPlanner;
use iox_query::QueryChunk;
use object_store::path::Path;
use observability_deps::tracing::{error, info};
use parking_lot::RwLock;
use parquet::format::FileMetaData;
@ -40,6 +42,7 @@ pub struct QueryableBuffer {
persister: Arc<Persister>,
persisted_files: Arc<PersistedFiles>,
buffer: Arc<RwLock<BufferState>>,
parquet_cache: Arc<dyn ParquetCacheOracle>,
/// Sends a notification to this watch channel whenever a snapshot info is persisted
persisted_snapshot_notify_rx: tokio::sync::watch::Receiver<Option<PersistedSnapshot>>,
persisted_snapshot_notify_tx: tokio::sync::watch::Sender<Option<PersistedSnapshot>>,
@ -52,6 +55,7 @@ impl QueryableBuffer {
persister: Arc<Persister>,
last_cache_provider: Arc<LastCacheProvider>,
persisted_files: Arc<PersistedFiles>,
parquet_cache: Arc<dyn ParquetCacheOracle>,
) -> Self {
let buffer = Arc::new(RwLock::new(BufferState::new(Arc::clone(&catalog))));
let (persisted_snapshot_notify_tx, persisted_snapshot_notify_rx) =
@ -63,6 +67,7 @@ impl QueryableBuffer {
persister,
persisted_files,
buffer,
parquet_cache,
persisted_snapshot_notify_rx,
persisted_snapshot_notify_tx,
}
@ -192,6 +197,7 @@ impl QueryableBuffer {
let buffer = Arc::clone(&self.buffer);
let catalog = Arc::clone(&self.catalog);
let notify_snapshot_tx = self.persisted_snapshot_notify_tx.clone();
let parquet_cache = Arc::clone(&self.parquet_cache);
tokio::spawn(async move {
// persist the catalog if it has been updated
@ -233,6 +239,7 @@ impl QueryableBuffer {
wal_file_number,
catalog.sequence_number(),
);
let mut cache_notifiers = vec![];
for persist_job in persist_jobs {
let path = persist_job.path.to_string();
let database_name = Arc::clone(&persist_job.database_name);
@ -241,9 +248,14 @@ impl QueryableBuffer {
let min_time = persist_job.timestamp_min_max.min;
let max_time = persist_job.timestamp_min_max.max;
let (size_bytes, meta) =
sort_dedupe_persist(persist_job, Arc::clone(&persister), Arc::clone(&executor))
.await;
let (size_bytes, meta, cache_notifier) = sort_dedupe_persist(
persist_job,
Arc::clone(&persister),
Arc::clone(&executor),
Arc::clone(&parquet_cache),
)
.await;
cache_notifiers.push(cache_notifier);
persisted_snapshot.add_parquet_file(
database_name,
table_name,
@ -276,15 +288,23 @@ impl QueryableBuffer {
}
}
// clear out the write buffer and add all the persisted files to the persisted files list
let mut buffer = buffer.write();
for (_, table_map) in buffer.db_to_table.iter_mut() {
for (_, table_buffer) in table_map.iter_mut() {
table_buffer.clear_snapshots();
// clear out the write buffer and add all the persisted files to the persisted files
// on a background task to ensure that the cache has been populated before we clear
// the buffer
tokio::spawn(async move {
// wait on the cache updates to complete:
for notifier in cache_notifiers {
let _ = notifier.await;
}
let mut buffer = buffer.write();
for (_, table_map) in buffer.db_to_table.iter_mut() {
for (_, table_buffer) in table_map.iter_mut() {
table_buffer.clear_snapshots();
}
}
}
persisted_files.add_persisted_snapshot_files(persisted_snapshot);
persisted_files.add_persisted_snapshot_files(persisted_snapshot);
});
let _ = sender.send(snapshot_details);
});
@ -433,7 +453,8 @@ async fn sort_dedupe_persist(
persist_job: PersistJob,
persister: Arc<Persister>,
executor: Arc<Executor>,
) -> (u64, FileMetaData) {
parquet_cache: Arc<dyn ParquetCacheOracle>,
) -> (u64, FileMetaData, oneshot::Receiver<()>) {
// Dedupe and sort using the COMPACT query built into
// iox_query
let row_count = persist_job.batch.num_rows();
@ -495,7 +516,10 @@ async fn sort_dedupe_persist(
{
Ok((size_bytes, meta)) => {
info!("Persisted parquet file: {}", persist_job.path.to_string());
return (size_bytes, meta);
let (cache_request, cache_notify_rx) =
CacheRequest::create(Path::from(persist_job.path.to_string()));
parquet_cache.register(cache_request);
return (size_bytes, meta, cache_notify_rx);
}
Err(e) => {
error!(