diff --git a/querier/src/cache/mod.rs b/querier/src/cache/mod.rs index fa70536f9f..7d0129cc2e 100644 --- a/querier/src/cache/mod.rs +++ b/querier/src/cache/mod.rs @@ -115,7 +115,7 @@ impl CatalogCache { let read_buffer_cache = ReadBufferCache::new( backoff_config, Arc::clone(&time_provider), - &metric_registry, + metric_registry, Arc::clone(&ram_pool), ); diff --git a/querier/src/cache/read_buffer.rs b/querier/src/cache/read_buffer.rs index e95d795cd5..72c5203c04 100644 --- a/querier/src/cache/read_buffer.rs +++ b/querier/src/cache/read_buffer.rs @@ -16,7 +16,7 @@ use datafusion::physical_plan::SendableRecordBatchStream; use futures::StreamExt; use iox_time::TimeProvider; use parquet_file::{chunk::DecodedParquetFile, storage::ParquetStorage}; -use read_buffer::RBChunk; +use read_buffer::{ChunkMetrics, RBChunk}; use snafu::{ResultExt, Snafu}; use std::{collections::HashMap, mem, sync::Arc}; @@ -43,12 +43,14 @@ impl ReadBufferCache { pub fn new( backoff_config: BackoffConfig, time_provider: Arc, - metric_registry: &metric::Registry, + metric_registry: Arc, ram_pool: Arc>, ) -> Self { + let metric_registry_captured = Arc::clone(&metric_registry); let loader = Box::new(FunctionLoader::new( move |_parquet_file_id, extra_fetch_info: ExtraFetchInfo| { let backoff_config = backoff_config.clone(); + let metric_registry = Arc::clone(&metric_registry_captured); async move { let rb_chunk = Backoff::new(&backoff_config) @@ -63,6 +65,7 @@ impl ReadBufferCache { decoded_parquet_file_for_load, table_name_for_load, store_for_load, + &metric_registry, ) .await } @@ -79,7 +82,7 @@ impl ReadBufferCache { loader, CACHE_ID, Arc::clone(&time_provider), - metric_registry, + &metric_registry, )); // add to memory pool @@ -137,10 +140,11 @@ async fn load_from_parquet_file( decoded_parquet_file: Arc, table_name: Arc, store: ParquetStorage, + metric_registry: &metric::Registry, ) -> Result { let record_batch_stream = record_batches_stream(decoded_parquet_file, store).context(ReadingFromStorageSnafu)?; - read_buffer_chunk_from_stream(table_name, record_batch_stream) + read_buffer_chunk_from_stream(table_name, record_batch_stream, metric_registry) .await .context(BuildingChunkSnafu) } @@ -170,10 +174,15 @@ enum RBChunkError { async fn read_buffer_chunk_from_stream( table_name: Arc, mut stream: SendableRecordBatchStream, + metric_registry: &metric::Registry, ) -> Result { let schema = stream.schema(); - let mut builder = read_buffer::RBChunkBuilder::new(table_name.as_ref(), schema); + // create "global" metric object, so that we don't blow up prometheus w/ too many metrics + let metrics = ChunkMetrics::new(metric_registry, "iox_shared"); + + let mut builder = + read_buffer::RBChunkBuilder::new(table_name.as_ref(), schema).with_metrics(metrics); while let Some(record_batch) = stream.next().await { builder @@ -192,7 +201,7 @@ mod tests { use arrow_util::assert_batches_eq; use datafusion_util::stream_from_batches; use iox_tests::util::{TestCatalog, TestPartition}; - use metric::{Attributes, Metric, U64Counter}; + use metric::{Attributes, CumulativeGauge, Metric, U64Counter}; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use read_buffer::Predicate; use schema::selection::Selection; @@ -202,7 +211,7 @@ mod tests { ReadBufferCache::new( BackoffConfig::default(), catalog.time_provider(), - &catalog.metric_registry(), + catalog.metric_registry(), test_ram_pool(), ) } @@ -304,7 +313,7 @@ mod tests { let cache = ReadBufferCache::new( BackoffConfig::default(), catalog.time_provider(), - &catalog.metric_registry(), + catalog.metric_registry(), ram_pool, ); @@ -407,7 +416,9 @@ mod tests { let stream = stream_from_batches(batches); - let rb = read_buffer_chunk_from_stream("cpu".into(), stream) + let metric_registry = metric::Registry::new(); + + let rb = read_buffer_chunk_from_stream("cpu".into(), stream, &metric_registry) .await .unwrap(); @@ -427,4 +438,32 @@ mod tests { assert_batches_eq!(expected, &rb_batches); } + + #[tokio::test] + async fn test_rb_metrics() { + let (catalog, partition) = make_catalog().await; + + let test_parquet_file = partition.create_parquet_file("table1 foo=1 11").await; + let parquet_file = test_parquet_file.parquet_file; + let decoded = Arc::new(DecodedParquetFile::new(parquet_file)); + let storage = ParquetStorage::new(Arc::clone(&catalog.object_store)); + + let cache = make_cache(&catalog); + + let _rb = cache + .get(Arc::clone(&decoded), "table1".into(), storage.clone()) + .await; + + let g: Metric = catalog + .metric_registry + .get_instrument("read_buffer_row_group_total") + .unwrap(); + let v = g + .get_observer(&Attributes::from(&[("db_name", "iox_shared")])) + .unwrap() + .fetch(); + + // Load is only called once + assert_eq!(v, 1); + } }