feat: wire up RB metrics for querier chunks

pull/24376/head
Marco Neumann 2022-06-07 15:31:49 +02:00
parent 5d98988c9f
commit 4509e3db57
2 changed files with 49 additions and 10 deletions

View File

@ -115,7 +115,7 @@ impl CatalogCache {
let read_buffer_cache = ReadBufferCache::new(
backoff_config,
Arc::clone(&time_provider),
&metric_registry,
metric_registry,
Arc::clone(&ram_pool),
);

View File

@ -16,7 +16,7 @@ use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use iox_time::TimeProvider;
use parquet_file::{chunk::DecodedParquetFile, storage::ParquetStorage};
use read_buffer::RBChunk;
use read_buffer::{ChunkMetrics, RBChunk};
use snafu::{ResultExt, Snafu};
use std::{collections::HashMap, mem, sync::Arc};
@ -43,12 +43,14 @@ impl ReadBufferCache {
pub fn new(
backoff_config: BackoffConfig,
time_provider: Arc<dyn TimeProvider>,
metric_registry: &metric::Registry,
metric_registry: Arc<metric::Registry>,
ram_pool: Arc<ResourcePool<RamSize>>,
) -> Self {
let metric_registry_captured = Arc::clone(&metric_registry);
let loader = Box::new(FunctionLoader::new(
move |_parquet_file_id, extra_fetch_info: ExtraFetchInfo| {
let backoff_config = backoff_config.clone();
let metric_registry = Arc::clone(&metric_registry_captured);
async move {
let rb_chunk = Backoff::new(&backoff_config)
@ -63,6 +65,7 @@ impl ReadBufferCache {
decoded_parquet_file_for_load,
table_name_for_load,
store_for_load,
&metric_registry,
)
.await
}
@ -79,7 +82,7 @@ impl ReadBufferCache {
loader,
CACHE_ID,
Arc::clone(&time_provider),
metric_registry,
&metric_registry,
));
// add to memory pool
@ -137,10 +140,11 @@ async fn load_from_parquet_file(
decoded_parquet_file: Arc<DecodedParquetFile>,
table_name: Arc<str>,
store: ParquetStorage,
metric_registry: &metric::Registry,
) -> Result<RBChunk, LoadError> {
let record_batch_stream =
record_batches_stream(decoded_parquet_file, store).context(ReadingFromStorageSnafu)?;
read_buffer_chunk_from_stream(table_name, record_batch_stream)
read_buffer_chunk_from_stream(table_name, record_batch_stream, metric_registry)
.await
.context(BuildingChunkSnafu)
}
@ -170,10 +174,15 @@ enum RBChunkError {
async fn read_buffer_chunk_from_stream(
table_name: Arc<str>,
mut stream: SendableRecordBatchStream,
metric_registry: &metric::Registry,
) -> Result<RBChunk, RBChunkError> {
let schema = stream.schema();
let mut builder = read_buffer::RBChunkBuilder::new(table_name.as_ref(), schema);
// create "global" metric object, so that we don't blow up prometheus w/ too many metrics
let metrics = ChunkMetrics::new(metric_registry, "iox_shared");
let mut builder =
read_buffer::RBChunkBuilder::new(table_name.as_ref(), schema).with_metrics(metrics);
while let Some(record_batch) = stream.next().await {
builder
@ -192,7 +201,7 @@ mod tests {
use arrow_util::assert_batches_eq;
use datafusion_util::stream_from_batches;
use iox_tests::util::{TestCatalog, TestPartition};
use metric::{Attributes, Metric, U64Counter};
use metric::{Attributes, CumulativeGauge, Metric, U64Counter};
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use read_buffer::Predicate;
use schema::selection::Selection;
@ -202,7 +211,7 @@ mod tests {
ReadBufferCache::new(
BackoffConfig::default(),
catalog.time_provider(),
&catalog.metric_registry(),
catalog.metric_registry(),
test_ram_pool(),
)
}
@ -304,7 +313,7 @@ mod tests {
let cache = ReadBufferCache::new(
BackoffConfig::default(),
catalog.time_provider(),
&catalog.metric_registry(),
catalog.metric_registry(),
ram_pool,
);
@ -407,7 +416,9 @@ mod tests {
let stream = stream_from_batches(batches);
let rb = read_buffer_chunk_from_stream("cpu".into(), stream)
let metric_registry = metric::Registry::new();
let rb = read_buffer_chunk_from_stream("cpu".into(), stream, &metric_registry)
.await
.unwrap();
@ -427,4 +438,32 @@ mod tests {
assert_batches_eq!(expected, &rb_batches);
}
#[tokio::test]
async fn test_rb_metrics() {
let (catalog, partition) = make_catalog().await;
let test_parquet_file = partition.create_parquet_file("table1 foo=1 11").await;
let parquet_file = test_parquet_file.parquet_file;
let decoded = Arc::new(DecodedParquetFile::new(parquet_file));
let storage = ParquetStorage::new(Arc::clone(&catalog.object_store));
let cache = make_cache(&catalog);
let _rb = cache
.get(Arc::clone(&decoded), "table1".into(), storage.clone())
.await;
let g: Metric<CumulativeGauge> = catalog
.metric_registry
.get_instrument("read_buffer_row_group_total")
.unwrap();
let v = g
.get_observer(&Attributes::from(&[("db_name", "iox_shared")]))
.unwrap()
.fetch();
// Load is only called once
assert_eq!(v, 1);
}
}