test: check parquet cache in the write buffer (#25411)

* test: check parquet cache in the write buffer

Checked that the parquet cache will serve queries when chunks are
requested from the write buffer. The added test also checks for get_range
requests made to the object store, which are typically made by DataFusion
to infer schema for parquet files.

* refactor: make parquet cache optional on write buffer

* test: add test to verify parquet cache function

This makes the parquet cache optional at the write buffer level, and adds
a test that verifies that the cache catches and prevents requests to the
object store in the event of a cache hit.
pull/25415/head
Trevor Hilton 2024-09-30 14:54:12 -04:00 committed by GitHub
parent 4df7de8b9e
commit a05c3fe87b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 288 additions and 32 deletions

View File

@ -349,7 +349,7 @@ pub async fn command(config: Config) -> Result<()> {
Arc::<SystemProvider>::clone(&time_provider),
Arc::clone(&exec),
wal_config,
parquet_cache,
Some(parquet_cache),
)
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?,

View File

@ -774,7 +774,7 @@ mod tests {
Arc::<MockProvider>::clone(&time_provider),
Arc::clone(&exec),
WalConfig::test_config(),
parquet_cache,
Some(parquet_cache),
)
.await
.unwrap(),

View File

@ -650,7 +650,7 @@ mod tests {
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
parquet_cache,
Some(parquet_cache),
)
.await
.unwrap(),

View File

@ -1597,7 +1597,7 @@ mod tests {
time_provider,
crate::test_help::make_exec(),
WalConfig::test_config(),
parquet_cache,
Some(parquet_cache),
)
.await
.unwrap()

View File

@ -668,7 +668,7 @@ fn background_cache_pruner(mem_store: Arc<MemCachedObjectStore>) -> tokio::task:
}
#[cfg(test)]
mod tests {
pub(crate) mod tests {
use std::{ops::Range, sync::Arc, time::Duration};
use arrow::datatypes::ToByteSlice;
@ -930,17 +930,25 @@ mod tests {
type RequestCounter = RwLock<HashMap<Path, usize>>;
#[derive(Debug)]
struct TestObjectStore {
pub(crate) struct TestObjectStore {
inner: Arc<dyn ObjectStore>,
get: RequestCounter,
get_opts: RequestCounter,
get_range: RequestCounter,
get_ranges: RequestCounter,
head: RequestCounter,
notifies: Option<(Arc<Notify>, Arc<Notify>)>,
}
impl TestObjectStore {
fn new(inner: Arc<dyn ObjectStore>) -> Self {
pub(crate) fn new(inner: Arc<dyn ObjectStore>) -> Self {
Self {
inner,
get: Default::default(),
get_opts: Default::default(),
get_range: Default::default(),
get_ranges: Default::default(),
head: Default::default(),
notifies: None,
}
}
@ -950,13 +958,29 @@ mod tests {
self
}
fn total_get_request_count(&self) -> usize {
pub(crate) fn total_get_request_count(&self) -> usize {
self.get.read().iter().map(|(_, size)| size).sum()
}
fn get_request_count(&self, path: &Path) -> usize {
pub(crate) fn get_request_count(&self, path: &Path) -> usize {
self.get.read().get(path).copied().unwrap_or(0)
}
pub(crate) fn get_opts_request_count(&self, path: &Path) -> usize {
self.get_opts.read().get(path).copied().unwrap_or(0)
}
pub(crate) fn get_range_request_count(&self, path: &Path) -> usize {
self.get_range.read().get(path).copied().unwrap_or(0)
}
pub(crate) fn get_ranges_request_count(&self, path: &Path) -> usize {
self.get_ranges.read().get(path).copied().unwrap_or(0)
}
pub(crate) fn head_request_count(&self, path: &Path) -> usize {
self.head.read().get(path).copied().unwrap_or(0)
}
}
impl std::fmt::Display for TestObjectStore {
@ -1009,6 +1033,7 @@ mod tests {
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
*self.get_opts.write().entry(location.clone()).or_insert(0) += 1;
self.inner.get_opts(location, options).await
}
@ -1017,6 +1042,7 @@ mod tests {
location: &Path,
range: Range<usize>,
) -> object_store::Result<Bytes> {
*self.get_range.write().entry(location.clone()).or_insert(0) += 1;
self.inner.get_range(location, range).await
}
@ -1025,10 +1051,12 @@ mod tests {
location: &Path,
ranges: &[Range<usize>],
) -> object_store::Result<Vec<Bytes>> {
*self.get_ranges.write().entry(location.clone()).or_insert(0) += 1;
self.inner.get_ranges(location, ranges).await
}
async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
*self.head.write().entry(location.clone()).or_insert(0) += 1;
self.inner.head(location).await
}

View File

@ -108,7 +108,7 @@ pub struct WriteBufferImpl {
// NOTE(trevor): the parquet cache interface may be used to register other cache
// requests from the write buffer, e.g., during query...
#[allow(dead_code)]
parquet_cache: Arc<dyn ParquetCacheOracle>,
parquet_cache: Option<Arc<dyn ParquetCacheOracle>>,
persisted_files: Arc<PersistedFiles>,
buffer: Arc<QueryableBuffer>,
wal_config: WalConfig,
@ -128,7 +128,7 @@ impl WriteBufferImpl {
time_provider: Arc<dyn TimeProvider>,
executor: Arc<iox_query::exec::Executor>,
wal_config: WalConfig,
parquet_cache: Arc<dyn ParquetCacheOracle>,
parquet_cache: Option<Arc<dyn ParquetCacheOracle>>,
) -> Result<Self> {
// load snapshots and replay the wal into the in memory buffer
let persisted_snapshots = persister
@ -162,7 +162,7 @@ impl WriteBufferImpl {
Arc::clone(&persister),
Arc::clone(&last_cache),
Arc::clone(&persisted_files),
Arc::clone(&parquet_cache),
parquet_cache.clone(),
));
// create the wal instance, which will replay into the queryable buffer and start
@ -526,6 +526,7 @@ impl WriteBuffer for WriteBufferImpl {}
mod tests {
use super::*;
use crate::parquet_cache::test_cached_obj_store_and_oracle;
use crate::parquet_cache::tests::TestObjectStore;
use crate::paths::{CatalogFilePath, SnapshotInfoFilePath};
use crate::persister::Persister;
use crate::PersistedSnapshot;
@ -584,7 +585,7 @@ mod tests {
Arc::clone(&time_provider),
crate::test_help::make_exec(),
WalConfig::test_config(),
Arc::clone(&parquet_cache),
Some(Arc::clone(&parquet_cache)),
)
.await
.unwrap();
@ -663,7 +664,7 @@ mod tests {
flush_interval: Duration::from_millis(50),
snapshot_size: 100,
},
Arc::clone(&parquet_cache),
Some(Arc::clone(&parquet_cache)),
)
.await
.unwrap();
@ -719,7 +720,7 @@ mod tests {
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
Arc::clone(&wbuf.parquet_cache),
wbuf.parquet_cache.clone(),
)
.await
.unwrap();
@ -757,7 +758,7 @@ mod tests {
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
Arc::clone(&wbuf.parquet_cache),
wbuf.parquet_cache.clone(),
)
.await
.unwrap();
@ -814,7 +815,7 @@ mod tests {
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
Arc::clone(&wbuf.parquet_cache),
wbuf.parquet_cache.clone(),
)
.await
.unwrap();
@ -968,7 +969,7 @@ mod tests {
flush_interval: Duration::from_millis(10),
snapshot_size: 2,
},
Arc::clone(&write_buffer.parquet_cache),
write_buffer.parquet_cache.clone(),
)
.await
.unwrap();
@ -1575,6 +1576,215 @@ mod tests {
assert_eq!(DbId::next_id().as_u32(), 1);
}
#[tokio::test]
async fn test_parquet_cache() {
// set up a write buffer using a TestObjectStore so we can spy on requests that get
// through to the object store for parquet files:
let test_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new())));
let obj_store: Arc<dyn ObjectStore> = Arc::clone(&test_store) as _;
let (wbuf, ctx) = setup_cache_optional(
Time::from_timestamp_nanos(0),
Arc::clone(&obj_store),
WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
true,
)
.await;
let db_name = "my_corp";
let tbl_name = "temp";
// make some writes to generate a snapshot:
do_writes(
db_name,
&wbuf,
&[
TestWrite {
lp: format!(
"\
{tbl_name},warehouse=us-east,room=01a,device=10001 reading=36\n\
{tbl_name},warehouse=us-east,room=01b,device=10002 reading=29\n\
{tbl_name},warehouse=us-east,room=02a,device=30003 reading=33\n\
"
),
time_seconds: 1,
},
TestWrite {
lp: format!(
"\
{tbl_name},warehouse=us-east,room=01a,device=10001 reading=37\n\
{tbl_name},warehouse=us-east,room=01b,device=10002 reading=28\n\
{tbl_name},warehouse=us-east,room=02a,device=30003 reading=32\n\
"
),
time_seconds: 2,
},
// This write will trigger the snapshot:
TestWrite {
lp: format!(
"\
{tbl_name},warehouse=us-east,room=01a,device=10001 reading=35\n\
{tbl_name},warehouse=us-east,room=01b,device=10002 reading=24\n\
{tbl_name},warehouse=us-east,room=02a,device=30003 reading=30\n\
"
),
time_seconds: 3,
},
],
)
.await;
// Wait for snapshot to be created, once this is done, then the parquet has been persisted:
verify_snapshot_count(1, &wbuf.persister).await;
// get the path for the created parquet file:
let persisted_files = wbuf.persisted_files().get_files(db_name, tbl_name);
assert_eq!(1, persisted_files.len());
let path = ObjPath::from(persisted_files[0].path.as_str());
// check the number of requests to that path before making a query:
// there should be one get request, made by the cache oracle:
assert_eq!(1, test_store.get_request_count(&path));
assert_eq!(0, test_store.get_opts_request_count(&path));
assert_eq!(0, test_store.get_ranges_request_count(&path));
assert_eq!(0, test_store.get_range_request_count(&path));
assert_eq!(0, test_store.head_request_count(&path));
let batches = get_table_batches(&wbuf, db_name, tbl_name, &ctx).await;
assert_batches_sorted_eq!(
[
"+--------+---------+------+----------------------+-----------+",
"| device | reading | room | time | warehouse |",
"+--------+---------+------+----------------------+-----------+",
"| 10001 | 35.0 | 01a | 1970-01-01T00:00:03Z | us-east |",
"| 10001 | 36.0 | 01a | 1970-01-01T00:00:01Z | us-east |",
"| 10001 | 37.0 | 01a | 1970-01-01T00:00:02Z | us-east |",
"| 10002 | 24.0 | 01b | 1970-01-01T00:00:03Z | us-east |",
"| 10002 | 28.0 | 01b | 1970-01-01T00:00:02Z | us-east |",
"| 10002 | 29.0 | 01b | 1970-01-01T00:00:01Z | us-east |",
"| 30003 | 30.0 | 02a | 1970-01-01T00:00:03Z | us-east |",
"| 30003 | 32.0 | 02a | 1970-01-01T00:00:02Z | us-east |",
"| 30003 | 33.0 | 02a | 1970-01-01T00:00:01Z | us-east |",
"+--------+---------+------+----------------------+-----------+",
],
&batches
);
// counts should not change, since requests for this parquet file hit the cache:
assert_eq!(1, test_store.get_request_count(&path));
assert_eq!(0, test_store.get_opts_request_count(&path));
assert_eq!(0, test_store.get_ranges_request_count(&path));
assert_eq!(0, test_store.get_range_request_count(&path));
assert_eq!(0, test_store.head_request_count(&path));
}
#[tokio::test]
async fn test_no_parquet_cache() {
// set up a write buffer using a TestObjectStore so we can spy on requests that get
// through to the object store for parquet files:
let test_store = Arc::new(TestObjectStore::new(Arc::new(InMemory::new())));
let obj_store: Arc<dyn ObjectStore> = Arc::clone(&test_store) as _;
let (wbuf, ctx) = setup_cache_optional(
Time::from_timestamp_nanos(0),
Arc::clone(&obj_store),
WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
false,
)
.await;
let db_name = "my_corp";
let tbl_name = "temp";
// make some writes to generate a snapshot:
do_writes(
db_name,
&wbuf,
&[
TestWrite {
lp: format!(
"\
{tbl_name},warehouse=us-east,room=01a,device=10001 reading=36\n\
{tbl_name},warehouse=us-east,room=01b,device=10002 reading=29\n\
{tbl_name},warehouse=us-east,room=02a,device=30003 reading=33\n\
"
),
time_seconds: 1,
},
TestWrite {
lp: format!(
"\
{tbl_name},warehouse=us-east,room=01a,device=10001 reading=37\n\
{tbl_name},warehouse=us-east,room=01b,device=10002 reading=28\n\
{tbl_name},warehouse=us-east,room=02a,device=30003 reading=32\n\
"
),
time_seconds: 2,
},
// This write will trigger the snapshot:
TestWrite {
lp: format!(
"\
{tbl_name},warehouse=us-east,room=01a,device=10001 reading=35\n\
{tbl_name},warehouse=us-east,room=01b,device=10002 reading=24\n\
{tbl_name},warehouse=us-east,room=02a,device=30003 reading=30\n\
"
),
time_seconds: 3,
},
],
)
.await;
// Wait for snapshot to be created, once this is done, then the parquet has been persisted:
verify_snapshot_count(1, &wbuf.persister).await;
// get the path for the created parquet file:
let persisted_files = wbuf.persisted_files().get_files(db_name, tbl_name);
assert_eq!(1, persisted_files.len());
let path = ObjPath::from(persisted_files[0].path.as_str());
// check the number of requests to that path before making a query:
// there should be no get or get_range requests since nothing has asked for this file yet:
assert_eq!(0, test_store.get_request_count(&path));
assert_eq!(0, test_store.get_opts_request_count(&path));
assert_eq!(0, test_store.get_ranges_request_count(&path));
assert_eq!(0, test_store.get_range_request_count(&path));
assert_eq!(0, test_store.head_request_count(&path));
let batches = get_table_batches(&wbuf, db_name, tbl_name, &ctx).await;
assert_batches_sorted_eq!(
[
"+--------+---------+------+----------------------+-----------+",
"| device | reading | room | time | warehouse |",
"+--------+---------+------+----------------------+-----------+",
"| 10001 | 35.0 | 01a | 1970-01-01T00:00:03Z | us-east |",
"| 10001 | 36.0 | 01a | 1970-01-01T00:00:01Z | us-east |",
"| 10001 | 37.0 | 01a | 1970-01-01T00:00:02Z | us-east |",
"| 10002 | 24.0 | 01b | 1970-01-01T00:00:03Z | us-east |",
"| 10002 | 28.0 | 01b | 1970-01-01T00:00:02Z | us-east |",
"| 10002 | 29.0 | 01b | 1970-01-01T00:00:01Z | us-east |",
"| 30003 | 30.0 | 02a | 1970-01-01T00:00:03Z | us-east |",
"| 30003 | 32.0 | 02a | 1970-01-01T00:00:02Z | us-east |",
"| 30003 | 33.0 | 02a | 1970-01-01T00:00:01Z | us-east |",
"+--------+---------+------+----------------------+-----------+",
],
&batches
);
// counts should change, since requests for this parquet file were made with no cache:
assert_eq!(0, test_store.get_request_count(&path));
assert_eq!(0, test_store.get_opts_request_count(&path));
assert_eq!(1, test_store.get_ranges_request_count(&path));
assert_eq!(2, test_store.get_range_request_count(&path));
assert_eq!(0, test_store.head_request_count(&path));
}
struct TestWrite<LP> {
lp: LP,
time_seconds: i64,
@ -1655,10 +1865,24 @@ mod tests {
start: Time,
object_store: Arc<dyn ObjectStore>,
wal_config: WalConfig,
) -> (WriteBufferImpl, IOxSessionContext) {
setup_cache_optional(start, object_store, wal_config, true).await
}
async fn setup_cache_optional(
start: Time,
object_store: Arc<dyn ObjectStore>,
wal_config: WalConfig,
use_cache: bool,
) -> (WriteBufferImpl, IOxSessionContext) {
let time_provider: Arc<dyn TimeProvider> = Arc::new(MockProvider::new(start));
let (object_store, parquet_cache) =
test_cached_obj_store_and_oracle(object_store, Arc::clone(&time_provider));
let (object_store, parquet_cache) = if use_cache {
let (object_store, parquet_cache) =
test_cached_obj_store_and_oracle(object_store, Arc::clone(&time_provider));
(object_store, Some(parquet_cache))
} else {
(object_store, None)
};
let persister = Arc::new(Persister::new(Arc::clone(&object_store), "test_host"));
let catalog = persister.load_or_create_catalog().await.unwrap();
let last_cache = LastCacheProvider::new_from_catalog(&catalog.clone_inner()).unwrap();

View File

@ -42,7 +42,7 @@ pub struct QueryableBuffer {
persister: Arc<Persister>,
persisted_files: Arc<PersistedFiles>,
buffer: Arc<RwLock<BufferState>>,
parquet_cache: Arc<dyn ParquetCacheOracle>,
parquet_cache: Option<Arc<dyn ParquetCacheOracle>>,
/// Sends a notification to this watch channel whenever a snapshot info is persisted
persisted_snapshot_notify_rx: tokio::sync::watch::Receiver<Option<PersistedSnapshot>>,
persisted_snapshot_notify_tx: tokio::sync::watch::Sender<Option<PersistedSnapshot>>,
@ -55,7 +55,7 @@ impl QueryableBuffer {
persister: Arc<Persister>,
last_cache_provider: Arc<LastCacheProvider>,
persisted_files: Arc<PersistedFiles>,
parquet_cache: Arc<dyn ParquetCacheOracle>,
parquet_cache: Option<Arc<dyn ParquetCacheOracle>>,
) -> Self {
let buffer = Arc::new(RwLock::new(BufferState::new(Arc::clone(&catalog))));
let (persisted_snapshot_notify_tx, persisted_snapshot_notify_rx) =
@ -197,7 +197,7 @@ impl QueryableBuffer {
let buffer = Arc::clone(&self.buffer);
let catalog = Arc::clone(&self.catalog);
let notify_snapshot_tx = self.persisted_snapshot_notify_tx.clone();
let parquet_cache = Arc::clone(&self.parquet_cache);
let parquet_cache = self.parquet_cache.clone();
tokio::spawn(async move {
// persist the catalog if it has been updated
@ -252,7 +252,7 @@ impl QueryableBuffer {
persist_job,
Arc::clone(&persister),
Arc::clone(&executor),
Arc::clone(&parquet_cache),
parquet_cache.clone(),
)
.await;
cache_notifiers.push(cache_notifier);
@ -292,8 +292,8 @@ impl QueryableBuffer {
// on a background task to ensure that the cache has been populated before we clear
// the buffer
tokio::spawn(async move {
// wait on the cache updates to complete:
for notifier in cache_notifiers {
// wait on the cache updates to complete if there is a cache:
for notifier in cache_notifiers.into_iter().flatten() {
let _ = notifier.await;
}
let mut buffer = buffer.write();
@ -453,8 +453,8 @@ async fn sort_dedupe_persist(
persist_job: PersistJob,
persister: Arc<Persister>,
executor: Arc<Executor>,
parquet_cache: Arc<dyn ParquetCacheOracle>,
) -> (u64, FileMetaData, oneshot::Receiver<()>) {
parquet_cache: Option<Arc<dyn ParquetCacheOracle>>,
) -> (u64, FileMetaData, Option<oneshot::Receiver<()>>) {
// Dedupe and sort using the COMPACT query built into
// iox_query
let row_count = persist_job.batch.num_rows();
@ -516,10 +516,14 @@ async fn sort_dedupe_persist(
{
Ok((size_bytes, meta)) => {
info!("Persisted parquet file: {}", persist_job.path.to_string());
let (cache_request, cache_notify_rx) =
CacheRequest::create(Path::from(persist_job.path.to_string()));
parquet_cache.register(cache_request);
return (size_bytes, meta, cache_notify_rx);
if let Some(pq) = parquet_cache {
let (cache_request, cache_notify_rx) =
CacheRequest::create(Path::from(persist_job.path.to_string()));
pq.register(cache_request);
return (size_bytes, meta, Some(cache_notify_rx));
} else {
return (size_bytes, meta, None);
}
}
Err(e) => {
error!(