feat: Implement ReadBufferCache::new

pull/24376/head
Carol (Nichols || Goulding) 2022-05-25 11:46:30 -04:00
parent ab9010d9a6
commit 25b8260b72
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
4 changed files with 96 additions and 0 deletions

1
Cargo.lock generated
View File

@ -3948,6 +3948,7 @@ dependencies = [
"pin-project",
"predicate",
"rand",
"read_buffer",
"schema",
"service_common",
"service_grpc_schema",

View File

@ -25,6 +25,7 @@ pin-project = "1.0"
predicate = { path = "../predicate" }
iox_query = { path = "../iox_query" }
rand = "0.8.3"
read_buffer = { path = "../read_buffer" }
service_common = { path = "../service_common" }
service_grpc_schema = { path = "../service_grpc_schema" }
schema = { path = "../schema" }

View File

@ -15,6 +15,7 @@ pub mod parquet_file;
pub mod partition;
pub mod processed_tombstones;
mod ram;
pub mod read_buffer;
pub mod table;
pub mod tombstones;

93
querier/src/cache/read_buffer.rs vendored Normal file
View File

@ -0,0 +1,93 @@
//! Cache Parquet file data in Read Buffer chunks.
use super::ram::RamSize;
use cache_system::{
backend::{
lru::{LruBackend, ResourcePool},
resource_consumption::FunctionEstimator,
shared::SharedBackend,
},
driver::Cache,
loader::{metrics::MetricsLoader, FunctionLoader},
};
use data_types::ParquetFileId;
use iox_time::TimeProvider;
use read_buffer::RBChunk;
use std::{collections::HashMap, mem, sync::Arc};
const CACHE_ID: &str = "read_buffer";
/// Cache for parquet file data decoded into read buffer chunks
#[derive(Debug)]
pub struct ReadBufferCache {
_cache: Cache<ParquetFileId, Arc<RBChunk>>,
/// Handle that allows clearing entries for existing cache entries
_backend: SharedBackend<ParquetFileId, Arc<RBChunk>>,
}
impl ReadBufferCache {
/// Create a new empty cache.
pub fn new(
time_provider: Arc<dyn TimeProvider>,
metric_registry: &metric::Registry,
ram_pool: Arc<ResourcePool<RamSize>>,
) -> Self {
let loader = Box::new(FunctionLoader::new(
move |_parquet_file_id: ParquetFileId| async move {
unimplemented!();
},
));
let loader = Arc::new(MetricsLoader::new(
loader,
CACHE_ID,
Arc::clone(&time_provider),
metric_registry,
));
// add to memory pool
let backend = Box::new(LruBackend::new(
Box::new(HashMap::new()),
Arc::clone(&ram_pool),
CACHE_ID,
Arc::new(FunctionEstimator::new(
|k: &ParquetFileId, v: &Arc<RBChunk>| {
RamSize(mem::size_of_val(k) + mem::size_of_val(v) + v.size())
},
)),
));
// get a direct handle so we can clear out entries as needed
let _backend = SharedBackend::new(backend);
let _cache = Cache::new(loader, Box::new(_backend.clone()));
Self { _cache, _backend }
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cache::ram::test_util::test_ram_pool;
use iox_tests::util::TestCatalog;
fn make_cache(catalog: &TestCatalog) -> ReadBufferCache {
ReadBufferCache::new(
catalog.time_provider(),
&catalog.metric_registry(),
test_ram_pool(),
)
}
async fn make_catalog() -> Arc<TestCatalog> {
TestCatalog::new()
}
#[tokio::test]
async fn test_rb_chunks() {
let catalog = make_catalog().await;
let _cache = make_cache(&catalog);
}
}