From 9328ba8c4508b8a25529b24b438f7d76a78288df Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 27 May 2022 16:25:35 -0400 Subject: [PATCH] feat: Use new extra loading info to load read buffer chunks into cache --- parquet_file/src/storage.rs | 9 ++ querier/src/cache/read_buffer.rs | 145 +++++++++++++++++++++++++------ querier/src/chunk/mod.rs | 10 ++- 3 files changed, 134 insertions(+), 30 deletions(-) diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 35c2f29e43..17e0f00950 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -207,6 +207,15 @@ impl ParquetStorage { Some(Arc::new(AutoAbortJoinHandle::new(handle))), )) } + + /// Read all data from the parquet file. + pub fn read_all( + &self, + schema: SchemaRef, + meta: &IoxMetadata, + ) -> Result { + self.read_filter(&Predicate::default(), Selection::All, schema, meta) + } } /// Return indices of the schema's fields of the selection columns diff --git a/querier/src/cache/read_buffer.rs b/querier/src/cache/read_buffer.rs index 59918fa4e9..cc72663764 100644 --- a/querier/src/cache/read_buffer.rs +++ b/querier/src/cache/read_buffer.rs @@ -11,21 +11,23 @@ use cache_system::{ driver::Cache, loader::{metrics::MetricsLoader, FunctionLoader}, }; -use data_types::{ParquetFile, ParquetFileId}; +use data_types::ParquetFileId; use datafusion::physical_plan::SendableRecordBatchStream; use futures::StreamExt; use iox_time::TimeProvider; -use parquet_file::chunk::DecodedParquetFile; +use parquet_file::{chunk::DecodedParquetFile, storage::ParquetStorage}; use read_buffer::RBChunk; use snafu::{ResultExt, Snafu}; use std::{collections::HashMap, mem, sync::Arc}; const CACHE_ID: &str = "read_buffer"; +type ExtraFetchInfo = (Arc, Arc, ParquetStorage); + /// Cache for parquet file data decoded into read buffer chunks #[derive(Debug)] pub struct ReadBufferCache { - cache: Cache, ()>, + cache: Cache, ExtraFetchInfo>, /// Handle that allows clearing entries for existing cache entries _backend: SharedBackend>, @@ -39,16 +41,24 @@ impl ReadBufferCache { ram_pool: Arc>, ) -> Self { let loader = Box::new(FunctionLoader::new( - move |parquet_file_id: ParquetFileId, _extra| { + move |_parquet_file_id, (decoded_parquet_file, table_name, store): ExtraFetchInfo| { let backoff_config = BackoffConfig::default(); async move { let rb_chunk = Backoff::new(&backoff_config) - .retry_all_errors("get read buffer chunk by parquet file ID", || async { - let parquet_file = parquet_file_by_id(parquet_file_id); - let table_name = parquet_file_table_name(&parquet_file).to_string(); - let record_batch_stream = record_batches_stream(&parquet_file); - read_buffer_chunk_from_stream(table_name, record_batch_stream).await + .retry_all_errors("get read buffer chunk from parquet file", || { + let decoded_parquet_file_for_load = Arc::clone(&decoded_parquet_file); + let table_name_for_load = Arc::clone(&table_name); + let store_for_load = store.clone(); + + async { + load_from_parquet_file( + decoded_parquet_file_for_load, + table_name_for_load, + store_for_load, + ) + .await + } }) .await .expect("retry forever"); @@ -86,23 +96,52 @@ impl ReadBufferCache { } /// Get read buffer chunks from the cache or the Parquet file - pub async fn get(&self, decoded_parquet_file: &DecodedParquetFile) -> Arc { - let parquet_file = &decoded_parquet_file.parquet_file; - - self.cache.get(parquet_file.id, ()).await + pub async fn get( + &self, + decoded_parquet_file: Arc, + table_name: Arc, + store: ParquetStorage, + ) -> Arc { + self.cache + .get( + decoded_parquet_file.parquet_file_id(), + (decoded_parquet_file, table_name, store), + ) + .await } } -fn parquet_file_by_id(_parquet_file_id: ParquetFileId) -> ParquetFile { - unimplemented!() +#[derive(Debug, Snafu)] +enum LoadError { + #[snafu(display("Error reading from storage: {}", source))] + ReadingFromStorage { + source: parquet_file::storage::ReadError, + }, + + #[snafu(display("Error building read buffer chunk: {}", source))] + BuildingChunk { source: RBChunkError }, } -fn parquet_file_table_name(_parquet_file: &ParquetFile) -> &str { - unimplemented!() +async fn load_from_parquet_file( + decoded_parquet_file: Arc, + table_name: Arc, + store: ParquetStorage, +) -> Result { + let record_batch_stream = + record_batches_stream(decoded_parquet_file, store).context(ReadingFromStorageSnafu)?; + read_buffer_chunk_from_stream(table_name, record_batch_stream) + .await + .context(BuildingChunkSnafu) } -fn record_batches_stream(_parquet_file: &ParquetFile) -> SendableRecordBatchStream { - unimplemented!() +fn record_batches_stream( + decoded_parquet_file: Arc, + store: ParquetStorage, +) -> Result { + let schema = decoded_parquet_file.schema().as_arrow(); + let iox_metadata = &decoded_parquet_file.iox_metadata; + + store.read_all(schema, iox_metadata) } #[derive(Debug, Snafu)] @@ -118,12 +157,12 @@ enum RBChunkError { } async fn read_buffer_chunk_from_stream( - table_name: String, + table_name: Arc, mut stream: SendableRecordBatchStream, ) -> Result { let schema = stream.schema(); - let mut builder = read_buffer::RBChunkBuilder::new(table_name, schema); + let mut builder = read_buffer::RBChunkBuilder::new(table_name.as_ref(), schema); while let Some(record_batch) = stream.next().await { builder @@ -141,7 +180,8 @@ mod tests { use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_eq; use datafusion_util::stream_from_batches; - use iox_tests::util::TestCatalog; + use iox_tests::util::{TestCatalog, TestPartition}; + use metric::{Attributes, Metric, U64Counter}; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use read_buffer::Predicate; use schema::selection::Selection; @@ -154,14 +194,65 @@ mod tests { ) } - async fn make_catalog() -> Arc { - TestCatalog::new() + async fn make_catalog() -> (Arc, Arc) { + let catalog = TestCatalog::new(); + let ns = catalog.create_namespace("ns").await; + + let table = ns.create_table("table1").await; + let sequencer1 = ns.create_sequencer(1).await; + + let partition = table + .with_sequencer(&sequencer1) + .create_partition("k") + .await; + + (catalog, partition) } #[tokio::test] async fn test_rb_chunks() { - let catalog = make_catalog().await; - let _cache = make_cache(&catalog); + let (catalog, partition) = make_catalog().await; + + let test_parquet_file = partition.create_parquet_file("table1 foo=1 11").await; + let parquet_file = test_parquet_file.parquet_file; + let decoded = Arc::new(DecodedParquetFile::new(parquet_file)); + let storage = ParquetStorage::new(Arc::clone(&catalog.object_store)); + + let cache = make_cache(&catalog); + + let rb = cache + .get(Arc::clone(&decoded), "table1".into(), storage.clone()) + .await; + + let rb_batches: Vec = rb + .read_filter(Predicate::default(), Selection::All, vec![]) + .unwrap() + .collect(); + + let expected = [ + "+-----+--------------------------------+", + "| foo | time |", + "+-----+--------------------------------+", + "| 1 | 1970-01-01T00:00:00.000000011Z |", + "+-----+--------------------------------+", + ]; + + assert_batches_eq!(expected, &rb_batches); + + // This should fetch from the cache + let _rb_again = cache.get(decoded, "table1".into(), storage).await; + + let m: Metric = catalog + .metric_registry + .get_instrument("cache_load_function_calls") + .unwrap(); + let v = m + .get_observer(&Attributes::from(&[("name", "read_buffer")])) + .unwrap() + .fetch(); + + // Load is only called once + assert_eq!(v, 1); } fn lp_to_record_batch(lp: &str) -> RecordBatch { @@ -181,7 +272,7 @@ mod tests { let stream = stream_from_batches(batches); - let rb = read_buffer_chunk_from_stream("cpu".to_string(), stream) + let rb = read_buffer_chunk_from_stream("cpu".into(), stream) .await .unwrap(); diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index cb7d4006ce..f593584c1c 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -257,7 +257,7 @@ pub struct ChunkAdapter { /// Cache catalog_cache: Arc, - /// Object store. + /// Object store. Wrapper around an Arc; cheap to clone. store: ParquetStorage, /// Metric registry. @@ -369,7 +369,7 @@ impl ChunkAdapter { /// Create read buffer chunk. May be from the cache, may be from the parquet file. pub async fn new_rb_chunk( &self, - decoded_parquet_file: &DecodedParquetFile, + decoded_parquet_file: Arc, ) -> Option { let parquet_file_id = decoded_parquet_file.parquet_file_id(); let schema = decoded_parquet_file.schema(); @@ -383,7 +383,11 @@ impl ChunkAdapter { let rb_chunk = self .catalog_cache() .read_buffer() - .get(decoded_parquet_file) + .get( + Arc::clone(&decoded_parquet_file), + Arc::clone(&table_name), + self.store.clone(), + ) .await; // Somewhat hacky workaround because of implicit chunk orders, use min sequence number and