feat: introduce parquet caching in query path (#25937)

* feat: introduce parquet caching in query path

This commit scans the parquet files that will be used in query to check
if they can be cached. There are three conditions to satisfy,
  - not cached already
  - cache has enough space
  - file times overlap with the cache policy times

closes: https://github.com/influxdata/influxdb/issues/25906

* refactor: rename env var
fix/persist_create_database
praveen-influx 2025-01-30 21:16:37 +00:00 committed by GitHub
parent 05da40fa9b
commit 56ca85ef8e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 347 additions and 24 deletions

View File

@ -295,6 +295,16 @@ pub struct Config {
)]
pub disable_parquet_mem_cache: bool,
/// The duration from `now` to check if parquet files pulled in query path requires caching
/// Enter as a human-readable time, e.g., "5h", "3d"
#[clap(
long = "parquet-mem-cache-query-path-duration",
env = "INFLUXDB3_PARQUET_MEM_CACHE_QUERY_PATH_DURATION",
default_value = "5h",
action
)]
pub parquet_mem_cache_query_path_duration: humantime::Duration,
/// The interval on which to evict expired entries from the Last-N-Value cache, expressed as a
/// human-readable time, e.g., "20s", "1m", "1h".
#[clap(
@ -458,6 +468,7 @@ pub async fn command(config: Config) -> Result<()> {
Arc::clone(&time_provider) as _,
Arc::clone(&metrics),
config.parquet_mem_cache_size.as_num_bytes(),
config.parquet_mem_cache_query_path_duration.into(),
config.parquet_mem_cache_prune_percentage.into(),
config.parquet_mem_cache_prune_interval.into(),
);

View File

@ -7,6 +7,7 @@ license.workspace = true
[dependencies]
# Core Crates
data_types.workspace = true
iox_time.workspace = true
metric.workspace = true
observability_deps.workspace = true

View File

@ -15,6 +15,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use dashmap::{DashMap, Entry};
use data_types::{TimestampMinMax, TimestampRange};
use futures::{
future::{BoxFuture, Shared},
stream::BoxStream,
@ -76,11 +77,14 @@ pub struct ImmediateCacheRequest {
pub struct EventualCacheRequest {
path: Path,
notifier: oneshot::Sender<()>,
file_timestamp_min_max: Option<TimestampMinMax>,
}
impl EventualCacheRequest {
fn get_path_and_notifier(self) -> (Path, oneshot::Sender<()>) {
(self.path, self.notifier)
fn get_path_and_notifier_and_timestamp(
self,
) -> (Path, oneshot::Sender<()>, Option<TimestampMinMax>) {
(self.path, self.notifier, self.file_timestamp_min_max)
}
}
@ -108,10 +112,17 @@ impl CacheRequest {
// Create a new [`CacheRequest::Eventual`] for eventually loading the path passed in, by doing
// a GET request from object store for the path. Since it is async operation, the receiver is
// passed back to notify once the data is loaded into the cache
pub fn create_eventual_mode_cache_request(path: Path) -> (Self, oneshot::Receiver<()>) {
pub fn create_eventual_mode_cache_request(
path: Path,
file_timestamp_min_max: Option<TimestampMinMax>,
) -> (Self, oneshot::Receiver<()>) {
let (notifier, receiver) = oneshot::channel();
(
Self::Eventual(EventualCacheRequest { path, notifier }),
Self::Eventual(EventualCacheRequest {
path,
notifier,
file_timestamp_min_max,
}),
receiver,
)
}
@ -122,7 +133,11 @@ impl CacheRequest {
path,
parquet_data: _,
}) => path,
CacheRequest::Eventual(EventualCacheRequest { path, notifier: _ }) => path,
CacheRequest::Eventual(EventualCacheRequest {
path,
notifier: _,
file_timestamp_min_max: _,
}) => path,
}
}
}
@ -206,7 +221,7 @@ impl ParquetCacheOracle for MemCacheOracle {
tokio::spawn(async move {
if let Err(error) = tx.send(eventual_cache_req).await {
error!(%error, "error registering cache request");
};
}
});
} else {
let _ = eventual_cache_req.notifier.send(());
@ -227,6 +242,7 @@ pub fn create_cached_obj_store_and_oracle(
time_provider: Arc<dyn TimeProvider>,
metric_registry: Arc<Registry>,
cache_capacity: usize,
query_cache_duration: Duration,
prune_percent: f64,
prune_interval: Duration,
) -> (Arc<dyn ObjectStore>, Arc<dyn ParquetCacheOracle>) {
@ -236,6 +252,7 @@ pub fn create_cached_obj_store_and_oracle(
inner: object_store,
memory_capacity: cache_capacity,
prune_percent,
query_cache_duration,
}));
let oracle = Arc::new(MemCacheOracle::new(Arc::clone(&store), prune_interval));
(store, oracle)
@ -252,6 +269,7 @@ pub fn test_cached_obj_store_and_oracle(
time_provider,
metric_registry,
1024 * 1024 * 1024,
Duration::from_millis(1000),
0.1,
Duration::from_millis(10),
)
@ -373,6 +391,7 @@ struct Cache {
access_metrics: AccessMetrics,
/// Track metrics for observing the size of the cache
size_metrics: SizeMetrics,
query_cache_duration: Duration,
}
impl Cache {
@ -382,6 +401,7 @@ impl Cache {
prune_percent: f64,
time_provider: Arc<dyn TimeProvider>,
metric_registry: Arc<Registry>,
query_cache_duration: Duration,
) -> Self {
Self {
capacity,
@ -391,6 +411,7 @@ impl Cache {
time_provider,
access_metrics: AccessMetrics::new(&metric_registry),
size_metrics: SizeMetrics::new(&metric_registry),
query_cache_duration,
}
}
@ -589,6 +610,7 @@ pub struct MemCachedObjectStoreArgs {
pub inner: Arc<dyn ObjectStore>,
pub memory_capacity: usize,
pub prune_percent: f64,
pub query_cache_duration: Duration,
}
impl MemCachedObjectStore {
@ -600,6 +622,7 @@ impl MemCachedObjectStore {
inner,
memory_capacity,
prune_percent,
query_cache_duration,
}: MemCachedObjectStoreArgs,
) -> Self {
Self {
@ -607,8 +630,9 @@ impl MemCachedObjectStore {
cache: Arc::new(Cache::new(
memory_capacity,
prune_percent,
time_provider,
Arc::clone(&time_provider),
metric_registry,
query_cache_duration,
)),
}
}
@ -806,7 +830,16 @@ fn background_cache_request_handler(
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
while let Some(cache_request) = rx.recv().await {
let (path, notifier) = cache_request.get_path_and_notifier();
let (path, notifier, file_timestamp_min_max) =
cache_request.get_path_and_notifier_and_timestamp();
if !should_request_be_cached(file_timestamp_min_max, &mem_store.cache) {
debug!(?path, ">>> not caching parquet file path");
let _ = notifier.send(());
continue;
}
debug!(?path, ">>> caching parquet file path");
// Create a future that will go and fetch the cache value from the store:
let path_cloned = path.clone();
let store_cloned = Arc::clone(&mem_store.inner);
@ -837,7 +870,8 @@ fn background_cache_request_handler(
mem_store_captured.cache.remove(&path);
}
};
// notify that the cache request has been fulfilled:
// notify that the cache request has been fulfilled, if receiver has been
// dropped this error is not bubbled up
let _ = notifier.send(());
});
}
@ -845,6 +879,35 @@ fn background_cache_request_handler(
})
}
fn should_request_be_cached(
file_timestamp_min_max: Option<TimestampMinMax>,
cache: &Cache,
) -> bool {
// If there's a timestamp range, check if there's capacity to add these
// files. These are currently expected to come through from query path
// which could be fetching older file. Check it's within allowed interval
// before adding to cache
file_timestamp_min_max
.map(|file_timestamp_min_max| {
if cache.used.load(Ordering::SeqCst) < cache.capacity {
let end = cache.time_provider.now();
let start = end - cache.query_cache_duration;
let allowed_time_range =
TimestampRange::new(start.timestamp_nanos(), end.timestamp_nanos());
debug!(
?file_timestamp_min_max,
?allowed_time_range,
">>> parquet file timestamp min max to cache"
);
file_timestamp_min_max.overlaps(allowed_time_range)
} else {
false
}
})
.unwrap_or(true)
}
/// A background task for pruning un-needed entries in the cache
fn background_cache_pruner(
mem_store: Arc<MemCachedObjectStore>,
@ -852,6 +915,7 @@ fn background_cache_pruner(
interval_duration: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
debug!(">>> test: background cache pruning running");
let mut interval = tokio::time::interval(interval_duration);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
@ -869,6 +933,7 @@ pub(crate) mod tests {
use arrow::datatypes::ToByteSlice;
use bytes::Bytes;
use data_types::TimestampMinMax;
use influxdb3_test_helpers::object_store::{
RequestCountedObjectStore, SynchronizedObjectStore,
};
@ -882,7 +947,8 @@ pub(crate) mod tests {
use crate::parquet_cache::{
create_cached_obj_store_and_oracle,
metrics::{CACHE_ACCESS_NAME, CACHE_SIZE_BYTES_NAME, CACHE_SIZE_N_FILES_NAME},
test_cached_obj_store_and_oracle, CacheRequest, ParquetFileDataToCache,
should_request_be_cached, test_cached_obj_store_and_oracle, Cache, CacheRequest,
ParquetFileDataToCache,
};
macro_rules! assert_payload_at_equals {
@ -926,7 +992,7 @@ pub(crate) mod tests {
// cache the entry:
let (cache_request, notifier_rx) =
CacheRequest::create_eventual_mode_cache_request(path.clone());
CacheRequest::create_eventual_mode_cache_request(path.clone(), None);
oracle.register(cache_request);
// wait for cache notify:
@ -1038,8 +1104,10 @@ pub(crate) mod tests {
oracle.register(immediate_cache_request);
// Now try to cache 1st path eventually
let (cache_request, notifier_rx) =
CacheRequest::create_eventual_mode_cache_request(path_1_eventually_cached.clone());
let (cache_request, notifier_rx) = CacheRequest::create_eventual_mode_cache_request(
path_1_eventually_cached.clone(),
None,
);
oracle.register(cache_request);
// Oracle should've fulfilled the immediate cache request
@ -1074,8 +1142,10 @@ pub(crate) mod tests {
);
// Now try to cache 1st path again eventually
let (cache_request, notifier_rx) =
CacheRequest::create_eventual_mode_cache_request(path_1_eventually_cached.clone());
let (cache_request, notifier_rx) = CacheRequest::create_eventual_mode_cache_request(
path_1_eventually_cached.clone(),
None,
);
oracle.register(cache_request);
// should resolve immediately as path is already in cache
let _ = notifier_rx.await;
@ -1087,8 +1157,10 @@ pub(crate) mod tests {
);
// Try caching 2nd path (previously immediately cached)
let (cache_request, notifier_rx) =
CacheRequest::create_eventual_mode_cache_request(path_2_immediately_cached.clone());
let (cache_request, notifier_rx) = CacheRequest::create_eventual_mode_cache_request(
path_2_immediately_cached.clone(),
None,
);
oracle.register(cache_request);
// should resolve immediately as path is already in cache
let _ = notifier_rx.await;
@ -1113,6 +1185,7 @@ pub(crate) mod tests {
Arc::clone(&time_provider) as _,
Default::default(),
cache_capacity_bytes,
Duration::from_millis(10),
cache_prune_percent,
cache_prune_interval,
);
@ -1127,7 +1200,7 @@ pub(crate) mod tests {
// cache the entry and wait for it to complete:
let (cache_request, notifier_rx) =
CacheRequest::create_eventual_mode_cache_request(path_1.clone());
CacheRequest::create_eventual_mode_cache_request(path_1.clone(), None);
oracle.register(cache_request);
let _ = notifier_rx.await;
// there will have been one get request made by the cache oracle:
@ -1155,7 +1228,7 @@ pub(crate) mod tests {
// cache the second entry and wait for it to complete, this will not evict the first entry
// as both can fit in the cache:
let (cache_request, notifier_rx) =
CacheRequest::create_eventual_mode_cache_request(path_2.clone());
CacheRequest::create_eventual_mode_cache_request(path_2.clone(), None);
oracle.register(cache_request);
let _ = notifier_rx.await;
// will have another request for the second path to the inner store, by the oracle:
@ -1195,7 +1268,7 @@ pub(crate) mod tests {
// cache the third entry and wait for it to complete, this will push the cache past its
// capacity:
let (cache_request, notifier_rx) =
CacheRequest::create_eventual_mode_cache_request(path_3.clone());
CacheRequest::create_eventual_mode_cache_request(path_3.clone(), None);
oracle.register(cache_request);
let _ = notifier_rx.await;
// will now have another request for the third path to the inner store, by the oracle:
@ -1251,7 +1324,7 @@ pub(crate) mod tests {
// cache the entry, but don't wait on it until below in spawned task:
let (cache_request, notifier_rx) =
CacheRequest::create_eventual_mode_cache_request(path.clone());
CacheRequest::create_eventual_mode_cache_request(path.clone(), None);
oracle.register(cache_request);
// we are in the middle of a get request, i.e., the cache entry is "fetching"
@ -1413,7 +1486,7 @@ pub(crate) mod tests {
// have the cache oracle cache the object:
let (cache_request, notifier_rx) =
CacheRequest::create_eventual_mode_cache_request(path.clone());
CacheRequest::create_eventual_mode_cache_request(path.clone(), None);
oracle.register(cache_request);
// we are in the middle of a get request, i.e., the cache entry is "fetching" once this
@ -1459,4 +1532,58 @@ pub(crate) mod tests {
// removing the entry should bring the cache sizes back to zero:
metric_verifier.assert_size(0, 0);
}
#[test_log::test(test)]
fn test_should_request_be_cached_partial_overlap_of_file_time() {
let time_provider: Arc<dyn TimeProvider> =
Arc::new(MockProvider::new(Time::from_timestamp_nanos(100)));
let max_size_bytes = 100;
let cache = Cache::new(
max_size_bytes,
0.1,
Arc::clone(&time_provider),
Arc::new(Registry::new()),
Duration::from_nanos(100),
);
let file_timestamp_min_max = Some(TimestampMinMax::new(0, 100));
let should_cache = should_request_be_cached(file_timestamp_min_max, &cache);
assert!(should_cache);
}
#[test_log::test(test)]
fn test_should_request_be_cached_no_overlap_of_file_time() {
let time_provider: Arc<dyn TimeProvider> =
Arc::new(MockProvider::new(Time::from_timestamp_nanos(1000)));
let max_size_bytes = 100;
let cache = Cache::new(
max_size_bytes,
0.1,
Arc::clone(&time_provider),
Arc::new(Registry::new()),
Duration::from_nanos(100),
);
let file_timestamp_min_max = Some(TimestampMinMax::new(0, 100));
let should_cache = should_request_be_cached(file_timestamp_min_max, &cache);
assert!(!should_cache);
}
#[test_log::test(test)]
fn test_should_request_be_cached_no_timestamp_set() {
let time_provider: Arc<dyn TimeProvider> =
Arc::new(MockProvider::new(Time::from_timestamp_nanos(1000)));
let max_size_bytes = 100;
let cache = Cache::new(
max_size_bytes,
0.1,
Arc::clone(&time_provider),
Arc::new(Registry::new()),
Duration::from_nanos(100),
);
let file_timestamp_min_max = Some(TimestampMinMax::new(0, 100));
let should_cache = should_request_be_cached(file_timestamp_min_max, &cache);
assert!(!should_cache);
}
}

View File

@ -5,6 +5,7 @@ pub mod persisted_files;
pub mod queryable_buffer;
mod table_buffer;
pub(crate) use table_buffer::INDEX_HASH_SEED;
use tokio::sync::{oneshot, watch::Receiver};
pub mod validator;
use crate::persister::Persister;
@ -24,9 +25,12 @@ use data_types::{
use datafusion::catalog::Session;
use datafusion::common::DataFusionError;
use datafusion::datasource::object_store::ObjectStoreUrl;
use influxdb3_cache::distinct_cache::{self, CreateDistinctCacheArgs, DistinctCacheProvider};
use influxdb3_cache::last_cache::{self, LastCacheProvider};
use influxdb3_cache::parquet_cache::ParquetCacheOracle;
use influxdb3_cache::{
distinct_cache::{self, CreateDistinctCacheArgs, DistinctCacheProvider},
parquet_cache::CacheRequest,
};
use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, TableDefinition};
use influxdb3_id::{ColumnId, DbId, TableId};
use influxdb3_wal::FieldDataType;
@ -52,7 +56,6 @@ use schema::Schema;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::watch::Receiver;
#[derive(Debug, Error)]
pub enum Error {
@ -357,6 +360,10 @@ impl WriteBufferImpl {
}
let mut chunk_order = chunks.len() as i64;
// Although this sends a cache request, it does not mean all these
// files will be cached. This depends on parquet cache's capacity
// and whether these files are recent enough
self.cache_parquet_files(&parquet_files);
for parquet_file in parquet_files {
let parquet_chunk = parquet_chunk_from_file(
@ -375,6 +382,29 @@ impl WriteBufferImpl {
Ok(chunks)
}
fn cache_parquet_files(&self, parquet_files: &[ParquetFile]) {
if let Some(parquet_cache) = &self.parquet_cache {
let all_cache_notifiers: Vec<oneshot::Receiver<()>> = parquet_files
.iter()
.map(|file| {
// When datafusion tries to fetch this file we'll have cache in "Fetching" state.
// There is a slim chance that this request hasn't been processed yet, then we
// could incur extra GET req to populate the cache. Having a transparent
// cache might be handy for this case.
let (cache_req, receiver) = CacheRequest::create_eventual_mode_cache_request(
ObjPath::from(file.path.as_str()),
Some(file.timestamp_min_max()),
);
parquet_cache.register(cache_req);
receiver
})
.collect();
// there's no explicit await on these receivers - we're only letting parquet cache know
// this file can be cached if it meets cache's policy.
debug!(len = ?all_cache_notifiers.len(), ">>> num parquet file cache requests created");
}
}
#[cfg(test)]
fn get_table_chunks_from_buffer_only(
&self,
@ -2937,6 +2967,160 @@ mod tests {
);
}
// 2 threads are used here as we drop the write buffer in this test and the test
// relies on parquet cache being able to work with eventual mode cache requests.
// When write buffer is dropped the default background_cache_handler loop gets
// stuck waiting for new messages in the channel. So, using another thread works
// around the problem - another way may have been to have some shutdown hook
// but that'd require further changes. For now, this work around should suffice
// if finer grained control is necessary, shutdown hook can be explored.
#[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 2))]
async fn test_query_path_parquet_cache() {
let inner_store = Arc::new(RequestCountedObjectStore::new(Arc::new(InMemory::new())));
let (write_buffer, ctx, _) = setup_cache_optional(
Time::from_timestamp_nanos(100),
Arc::clone(&inner_store) as _,
WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
false,
)
.await;
let db_name = "test_db";
// perform writes over time to generate WAL files and some snapshots
for i in 1..=3 {
let _ = write_buffer
.write_lp(
NamespaceName::new(db_name).unwrap(),
"temp,warehouse=us-east,room=01a,device=10001 reading=36\n\
temp,warehouse=us-east,room=01b,device=10002 reading=29\n\
temp,warehouse=us-east,room=02a,device=30003 reading=33\n\
",
Time::from_timestamp_nanos(i * 1_000_000_000),
false,
Precision::Nanosecond,
false,
)
.await
.unwrap();
}
// Wait for snapshot to be created, once this is done, then the parquet has been persisted
verify_snapshot_count(1, &write_buffer.persister).await;
// get the path for the created parquet file
let persisted_files = write_buffer
.persisted_files()
.get_files(DbId::from(0), TableId::from(0));
assert_eq!(1, persisted_files.len());
let path = ObjPath::from(persisted_files[0].path.as_str());
let batches = write_buffer
.get_record_batches_unchecked(db_name, "temp", &ctx)
.await;
assert_batches_sorted_eq!(
[
"+--------+---------+------+----------------------+-----------+",
"| device | reading | room | time | warehouse |",
"+--------+---------+------+----------------------+-----------+",
"| 10001 | 36.0 | 01a | 1970-01-01T00:00:01Z | us-east |",
"| 10001 | 36.0 | 01a | 1970-01-01T00:00:02Z | us-east |",
"| 10001 | 36.0 | 01a | 1970-01-01T00:00:03Z | us-east |",
"| 10002 | 29.0 | 01b | 1970-01-01T00:00:01Z | us-east |",
"| 10002 | 29.0 | 01b | 1970-01-01T00:00:02Z | us-east |",
"| 10002 | 29.0 | 01b | 1970-01-01T00:00:03Z | us-east |",
"| 30003 | 33.0 | 02a | 1970-01-01T00:00:01Z | us-east |",
"| 30003 | 33.0 | 02a | 1970-01-01T00:00:02Z | us-east |",
"| 30003 | 33.0 | 02a | 1970-01-01T00:00:03Z | us-east |",
"+--------+---------+------+----------------------+-----------+",
],
&batches
);
// at this point everything should've been snapshotted
drop(write_buffer);
debug!(">>> test: stopped");
// nothing in the cache at this point and not in buffer
let (write_buffer, ctx, _) = setup_cache_optional(
// move the time
Time::from_timestamp_nanos(2_000_000_000),
Arc::clone(&inner_store) as _,
WalConfig {
gen1_duration: Gen1Duration::new_1m(),
max_write_buffer_size: 100,
flush_interval: Duration::from_millis(10),
snapshot_size: 1,
},
true,
)
.await;
debug!(">>> test: restarted");
// nothing in query buffer
let batches =
get_table_batches_from_query_buffer(&write_buffer, db_name, "temp", &ctx).await;
assert_batches_sorted_eq!(["++", "++",], &batches);
// we need to get everything from OS and cache them
let batches = write_buffer
.get_record_batches_unchecked(db_name, "temp", &ctx)
.await;
assert_batches_sorted_eq!(
[
"+--------+---------+------+----------------------+-----------+",
"| device | reading | room | time | warehouse |",
"+--------+---------+------+----------------------+-----------+",
"| 10001 | 36.0 | 01a | 1970-01-01T00:00:01Z | us-east |",
"| 10001 | 36.0 | 01a | 1970-01-01T00:00:02Z | us-east |",
"| 10001 | 36.0 | 01a | 1970-01-01T00:00:03Z | us-east |",
"| 10002 | 29.0 | 01b | 1970-01-01T00:00:01Z | us-east |",
"| 10002 | 29.0 | 01b | 1970-01-01T00:00:02Z | us-east |",
"| 10002 | 29.0 | 01b | 1970-01-01T00:00:03Z | us-east |",
"| 30003 | 33.0 | 02a | 1970-01-01T00:00:01Z | us-east |",
"| 30003 | 33.0 | 02a | 1970-01-01T00:00:02Z | us-east |",
"| 30003 | 33.0 | 02a | 1970-01-01T00:00:03Z | us-east |",
"+--------+---------+------+----------------------+-----------+",
],
&batches
);
assert!(inner_store.total_read_request_count(&path) > 0);
let expected_req_counts = inner_store.total_read_request_count(&path);
let batches = write_buffer
.get_record_batches_unchecked(db_name, "temp", &ctx)
.await;
assert_batches_sorted_eq!(
[
"+--------+---------+------+----------------------+-----------+",
"| device | reading | room | time | warehouse |",
"+--------+---------+------+----------------------+-----------+",
"| 10001 | 36.0 | 01a | 1970-01-01T00:00:01Z | us-east |",
"| 10001 | 36.0 | 01a | 1970-01-01T00:00:02Z | us-east |",
"| 10001 | 36.0 | 01a | 1970-01-01T00:00:03Z | us-east |",
"| 10002 | 29.0 | 01b | 1970-01-01T00:00:01Z | us-east |",
"| 10002 | 29.0 | 01b | 1970-01-01T00:00:02Z | us-east |",
"| 10002 | 29.0 | 01b | 1970-01-01T00:00:03Z | us-east |",
"| 30003 | 33.0 | 02a | 1970-01-01T00:00:01Z | us-east |",
"| 30003 | 33.0 | 02a | 1970-01-01T00:00:02Z | us-east |",
"| 30003 | 33.0 | 02a | 1970-01-01T00:00:03Z | us-east |",
"+--------+---------+------+----------------------+-----------+",
],
&batches
);
// everything came from cache, all counts should stay the same as
// above
assert_eq!(
expected_req_counts,
inner_store.total_read_request_count(&path)
);
}
struct TestWrite<LP> {
lp: LP,
time_seconds: i64,