From d3432198b6273aab0465a9ec65ad58ec006c803c Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 20 Jul 2023 11:42:40 +0200 Subject: [PATCH] revert: batch partition catalog requests in querier (#8269) (#8283) Panics in prod. This reverts commit 0c347e8e640e331d6e677369bf37de8e385a321d. --- querier/src/cache/partition.rs | 380 +++++++-------------------------- querier/src/parquet/mod.rs | 9 +- querier/src/table/mod.rs | 90 +++----- 3 files changed, 109 insertions(+), 370 deletions(-) diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs index 0e79fd26a3..fd5fa52cd3 100644 --- a/querier/src/cache/partition.rs +++ b/querier/src/cache/partition.rs @@ -8,11 +8,7 @@ use cache_system::{ PolicyBackend, }, cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache}, - loader::{ - batch::{BatchLoader, BatchLoaderFlusher, BatchLoaderFlusherExt}, - metrics::MetricsLoader, - FunctionLoader, - }, + loader::{metrics::MetricsLoader, FunctionLoader}, resource_consumption::FunctionEstimator, }; use data_types::{ @@ -20,17 +16,17 @@ use data_types::{ ColumnId, Partition, PartitionId, TransitionPartitionId, }; use datafusion::scalar::ScalarValue; -use iox_catalog::{interface::Catalog, partition_lookup_batch}; +use iox_catalog::{interface::Catalog, partition_lookup}; use iox_query::chunk_statistics::{ColumnRange, ColumnRanges}; use iox_time::TimeProvider; use observability_deps::tracing::debug; use schema::sort::SortKey; use std::{ - collections::{hash_map::Entry, HashMap, HashSet}, + collections::{HashMap, HashSet}, mem::{size_of, size_of_val}, sync::Arc, }; -use trace::span::{Span, SpanRecorder}; +use trace::span::Span; use super::{namespace::CachedTable, ram::RamSize}; @@ -50,7 +46,6 @@ type CacheT = Box< pub struct PartitionCache { cache: CacheT, remove_if_handle: RemoveIfHandle>, - flusher: Arc, } impl PartitionCache { @@ -63,61 +58,24 @@ impl PartitionCache { ram_pool: Arc>, testing: bool, ) -> Self { - let loader = FunctionLoader::new( - move |partition_ids: Vec, cached_tables: Vec>| { - // sanity checks - assert_eq!(partition_ids.len(), cached_tables.len()); - assert!(!partition_ids.is_empty()); - let cached_table = Arc::clone(&cached_tables[0]); - assert!(cached_tables.iter().all(|t| Arc::ptr_eq(t, &cached_table))); - + let loader = + FunctionLoader::new(move |partition_id: PartitionId, extra: Arc| { let catalog = Arc::clone(&catalog); let backoff_config = backoff_config.clone(); async move { - // prepare output buffer - let mut out = (0..partition_ids.len()).map(|_| None).collect::>(); - let mut out_map = - HashMap::::with_capacity(partition_ids.len()); - for (idx, id) in partition_ids.iter().enumerate() { - match out_map.entry(*id) { - Entry::Occupied(_) => unreachable!("cache system requested same partition from loader concurrently, this should have been prevented by the CacheDriver"), - Entry::Vacant(v) => { - v.insert(idx); - } - } - } - - // build `&[&TransitionPartitionId]` for batch catalog request - let ids = partition_ids - .iter() - .copied() - .map(TransitionPartitionId::Deprecated) - .collect::>(); - let ids = ids.iter().collect::>(); - - // fetch catalog data - let partitions = Backoff::new(&backoff_config) + let partition = Backoff::new(&backoff_config) .retry_all_errors("get partition_key", || async { let mut repos = catalog.repositories().await; - partition_lookup_batch(repos.as_mut(), &ids).await + let id = TransitionPartitionId::Deprecated(partition_id); + partition_lookup(repos.as_mut(), &id).await }) .await - .expect("retry forever"); + .expect("retry forever")?; - // build output - for p in partitions { - let p = CachedPartition::new(p, &cached_table); - let idx = out_map[&p.id]; - out[idx] = Some(p); - } - - out + Some(CachedPartition::new(partition, &extra)) } - }, - ); - let loader = Arc::new(BatchLoader::new(loader)); - let flusher = Arc::clone(&loader); + }); let loader = Arc::new(MetricsLoader::new( loader, CACHE_ID, @@ -153,79 +111,51 @@ impl PartitionCache { Self { cache, remove_if_handle, - flusher, } } /// Get cached partition. /// - /// The result only contains existing partitions. The order is undefined. - /// /// Expire partition if the cached sort key does NOT cover the given set of columns. pub async fn get( &self, cached_table: Arc, - partitions: Vec, + partition_id: PartitionId, + sort_key_should_cover: &[ColumnId], span: Option, - ) -> Vec { - let span_recorder = SpanRecorder::new(span); + ) -> Option { + self.remove_if_handle + .remove_if_and_get( + &self.cache, + partition_id, + |cached_partition| { + let invalidates = + if let Some(sort_key) = &cached_partition.and_then(|p| p.sort_key) { + sort_key_should_cover + .iter() + .any(|col| !sort_key.column_set.contains(col)) + } else { + // no sort key at all => need to update if there is anything to cover + !sort_key_should_cover.is_empty() + }; - let futures = partitions - .into_iter() - .map( - |PartitionRequest { - partition_id, - sort_key_should_cover, - }| { - let cached_table = Arc::clone(&cached_table); - let span = span_recorder.child_span("single partition cache lookup"); + if invalidates { + debug!( + partition_id = partition_id.get(), + "invalidate partition cache", + ); + } - self.remove_if_handle.remove_if_and_get( - &self.cache, - partition_id, - move |cached_partition| { - let invalidates = if let Some(sort_key) = - &cached_partition.and_then(|p| p.sort_key) - { - sort_key_should_cover - .iter() - .any(|col| !sort_key.column_set.contains(col)) - } else { - // no sort key at all => need to update if there is anything to cover - !sort_key_should_cover.is_empty() - }; - - if invalidates { - debug!( - partition_id = partition_id.get(), - "invalidate partition cache", - ); - } - - invalidates - }, - (cached_table, span), - ) + invalidates }, + (cached_table, span), ) - .collect(); - - let res = self.flusher.auto_flush(futures).await; - - res.into_iter().flatten().collect() + .await } } -/// Request for [`PartitionCache::get`]. -#[derive(Debug)] -pub struct PartitionRequest { - pub partition_id: PartitionId, - pub sort_key_should_cover: Vec, -} - #[derive(Debug, Clone, PartialEq, Eq)] pub struct CachedPartition { - pub id: PartitionId, pub sort_key: Option>, pub column_ranges: ColumnRanges, } @@ -301,7 +231,6 @@ impl CachedPartition { column_ranges.shrink_to_fit(); Self { - id: partition.id, sort_key, column_ranges: Arc::new(column_ranges), } @@ -369,7 +298,6 @@ mod tests { use crate::cache::{ ram::test_util::test_ram_pool, test_util::assert_catalog_access_metric_count, }; - use async_trait::async_trait; use data_types::{partition_template::TablePartitionTemplateOverride, ColumnType}; use generated_types::influxdata::iox::partition_template::v1::{ template_part::Part, PartitionTemplate, TemplatePart, @@ -420,7 +348,7 @@ mod tests { ); let sort_key1a = cache - .get_one(Arc::clone(&cached_table), p1.id, &Vec::new(), None) + .get(Arc::clone(&cached_table), p1.id, &Vec::new(), None) .await .unwrap() .sort_key; @@ -432,26 +360,18 @@ mod tests { column_order: [c1.column.id, c2.column.id].into(), } ); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 1, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1); let sort_key2 = cache - .get_one(Arc::clone(&cached_table), p2.id, &Vec::new(), None) + .get(Arc::clone(&cached_table), p2.id, &Vec::new(), None) .await .unwrap() .sort_key; assert_eq!(sort_key2, None); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 2, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 2); let sort_key1b = cache - .get_one(Arc::clone(&cached_table), p1.id, &Vec::new(), None) + .get(Arc::clone(&cached_table), p1.id, &Vec::new(), None) .await .unwrap() .sort_key; @@ -459,16 +379,12 @@ mod tests { sort_key1a.as_ref().unwrap(), sort_key1b.as_ref().unwrap() )); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 2, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 2); // non-existing partition for _ in 0..2 { let res = cache - .get_one( + .get( Arc::clone(&cached_table), PartitionId::new(i64::MAX), &Vec::new(), @@ -476,11 +392,7 @@ mod tests { ) .await; assert_eq!(res, None); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 3, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 3); } } @@ -549,7 +461,7 @@ mod tests { ); let ranges1a = cache - .get_one(Arc::clone(&cached_table), p1.id, &[], None) + .get(Arc::clone(&cached_table), p1.id, &[], None) .await .unwrap() .column_ranges; @@ -576,14 +488,10 @@ mod tests { &ranges1a.get("tag1").unwrap().min_value, &ranges1a.get("tag1").unwrap().max_value, )); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 1, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1); let ranges2 = cache - .get_one(Arc::clone(&cached_table), p2.id, &[], None) + .get(Arc::clone(&cached_table), p2.id, &[], None) .await .unwrap() .column_ranges; @@ -597,14 +505,10 @@ mod tests { } ),]), ); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 2, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 2); let ranges3 = cache - .get_one(Arc::clone(&cached_table), p3.id, &[], None) + .get(Arc::clone(&cached_table), p3.id, &[], None) .await .unwrap() .column_ranges; @@ -627,14 +531,10 @@ mod tests { ), ]), ); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 3, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 3); let ranges4 = cache - .get_one(Arc::clone(&cached_table), p4.id, &[], None) + .get(Arc::clone(&cached_table), p4.id, &[], None) .await .unwrap() .column_ranges; @@ -657,14 +557,10 @@ mod tests { ), ]), ); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 4, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); let ranges5 = cache - .get_one(Arc::clone(&cached_table), p5.id, &[], None) + .get(Arc::clone(&cached_table), p5.id, &[], None) .await .unwrap() .column_ranges; @@ -678,28 +574,20 @@ mod tests { } ),]), ); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 5, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); let ranges1b = cache - .get_one(Arc::clone(&cached_table), p1.id, &[], None) + .get(Arc::clone(&cached_table), p1.id, &[], None) .await .unwrap() .column_ranges; assert!(Arc::ptr_eq(&ranges1a, &ranges1b)); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 5, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); // non-existing partition for _ in 0..2 { let res = cache - .get_one( + .get( Arc::clone(&cached_table), PartitionId::new(i64::MAX), &[], @@ -707,11 +595,7 @@ mod tests { ) .await; assert_eq!(res, None); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 6, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 6); } } @@ -751,43 +635,31 @@ mod tests { ); let sort_key = cache - .get_one(Arc::clone(&cached_table), p_id, &[], None) + .get(Arc::clone(&cached_table), p_id, &[], None) .await .unwrap() .sort_key; assert_eq!(sort_key, None,); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 1, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1); // requesting nother will not expire assert!(p_sort_key.is_none()); let sort_key = cache - .get_one(Arc::clone(&cached_table), p_id, &[], None) + .get(Arc::clone(&cached_table), p_id, &[], None) .await .unwrap() .sort_key; assert_eq!(sort_key, None,); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 1, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1); // but requesting something will expire let sort_key = cache - .get_one(Arc::clone(&cached_table), p_id, &[c1.column.id], None) + .get(Arc::clone(&cached_table), p_id, &[c1.column.id], None) .await .unwrap() .sort_key; assert_eq!(sort_key, None,); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 2, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 2); // set sort key let p = p @@ -796,12 +668,11 @@ mod tests { c2.column.name.as_str(), ])) .await; - assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1); // expire & fetch let p_sort_key = p.partition.sort_key(); let sort_key = cache - .get_one(Arc::clone(&cached_table), p_id, &[c1.column.id], None) + .get(Arc::clone(&cached_table), p_id, &[c1.column.id], None) .await .unwrap() .sort_key; @@ -813,11 +684,7 @@ mod tests { column_order: [c1.column.id, c2.column.id].into(), } ); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 3, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); // subsets and the full key don't expire for should_cover in [ @@ -827,7 +694,7 @@ mod tests { vec![c1.column.id, c2.column.id], ] { let sort_key_2 = cache - .get_one(Arc::clone(&cached_table), p_id, &should_cover, None) + .get(Arc::clone(&cached_table), p_id, &should_cover, None) .await .unwrap() .sort_key; @@ -835,17 +702,13 @@ mod tests { sort_key.as_ref().unwrap(), sort_key_2.as_ref().unwrap() )); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 3, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); } // unknown columns expire let c3 = t.create_column("x", ColumnType::Tag).await; let sort_key_2 = cache - .get_one( + .get( Arc::clone(&cached_table), p_id, &[c1.column.id, c3.column.id], @@ -859,109 +722,10 @@ mod tests { sort_key_2.as_ref().unwrap() )); assert_eq!(sort_key, sort_key_2); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 4, - ); - } - - #[tokio::test] - async fn test_multi_get() { - let catalog = TestCatalog::new(); - - let ns = catalog.create_namespace_1hr_retention("ns").await; - let t = ns.create_table("table").await; - let p1 = t.create_partition("k1").await.partition.clone(); - let p2 = t.create_partition("k2").await.partition.clone(); - let cached_table = Arc::new(CachedTable { - id: t.table.id, - schema: schema(), - column_id_map: HashMap::default(), - column_id_map_rev: HashMap::default(), - primary_key_column_ids: [].into(), - partition_template: TablePartitionTemplateOverride::default(), - }); - - let cache = PartitionCache::new( - catalog.catalog(), - BackoffConfig::default(), - catalog.time_provider(), - &catalog.metric_registry(), - test_ram_pool(), - true, - ); - - let mut res = cache - .get( - Arc::clone(&cached_table), - vec![ - PartitionRequest { - partition_id: p1.id, - sort_key_should_cover: vec![], - }, - PartitionRequest { - partition_id: p2.id, - sort_key_should_cover: vec![], - }, - PartitionRequest { - partition_id: p1.id, - sort_key_should_cover: vec![], - }, - PartitionRequest { - partition_id: PartitionId::new(i64::MAX), - sort_key_should_cover: vec![], - }, - ], - None, - ) - .await; - res.sort_by_key(|p| p.id); - let ids = res.iter().map(|p| p.id).collect::>(); - assert_eq!(ids, vec![p1.id, p1.id, p2.id]); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 1, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); } fn schema() -> Schema { SchemaBuilder::new().build().unwrap() } - - /// Extension methods for simpler testing. - #[async_trait] - trait PartitionCacheExt { - async fn get_one( - &self, - cached_table: Arc, - partition_id: PartitionId, - sort_key_should_cover: &[ColumnId], - span: Option, - ) -> Option; - } - - #[async_trait] - impl PartitionCacheExt for PartitionCache { - async fn get_one( - &self, - cached_table: Arc, - partition_id: PartitionId, - sort_key_should_cover: &[ColumnId], - span: Option, - ) -> Option { - self.get( - cached_table, - vec![PartitionRequest { - partition_id, - sort_key_should_cover: sort_key_should_cover.to_vec(), - }], - span, - ) - .await - .into_iter() - .next() - } - } } diff --git a/querier/src/parquet/mod.rs b/querier/src/parquet/mod.rs index c3794a82dc..fe75fc3064 100644 --- a/querier/src/parquet/mod.rs +++ b/querier/src/parquet/mod.rs @@ -106,7 +106,6 @@ pub mod tests { use crate::cache::{ namespace::{CachedNamespace, CachedTable}, - partition::PartitionRequest, CatalogCache, }; @@ -250,15 +249,11 @@ pub mod tests { .partition() .get( Arc::clone(&self.cached_table), - vec![PartitionRequest { - partition_id: self.parquet_file.partition_id, - sort_key_should_cover: vec![], - }], + self.parquet_file.partition_id, + &[], None, ) .await - .into_iter() - .next() .unwrap(); let cached_partitions = HashMap::from([(self.parquet_file.partition_id, cached_partition)]); diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 52750ec47e..add7d855b9 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -1,19 +1,17 @@ use self::query_access::QuerierTableChunkPruner; use crate::{ - cache::{ - namespace::CachedTable, - partition::{CachedPartition, PartitionRequest}, - }, + cache::{namespace::CachedTable, partition::CachedPartition}, ingester::{self, IngesterPartition}, parquet::ChunkAdapter, - IngesterConnection, + IngesterConnection, CONCURRENT_CHUNK_CREATION_JOBS, }; use data_types::{ColumnId, NamespaceId, ParquetFile, PartitionId, TableId}; use datafusion::error::DataFusionError; -use futures::join; +use futures::{join, StreamExt}; use iox_query::{provider, provider::ChunkPruner, QueryChunk}; use observability_deps::tracing::{debug, trace}; use predicate::Predicate; +use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; use schema::Schema; use snafu::{ResultExt, Snafu}; use std::{ @@ -347,26 +345,33 @@ impl QuerierTable { .extend(f.column_set.iter().copied().filter(|id| pk.contains(id))); } - // batch request all partitions - let requests = should_cover - .into_iter() - .map(|(id, cover)| PartitionRequest { - partition_id: id, - sort_key_should_cover: cover.into_iter().collect(), - }) - .collect(); - let partitions = self - .chunk_adapter - .catalog_cache() - .partition() - .get( - Arc::clone(cached_table), - requests, - span_recorder.child_span("fetch partitions"), - ) - .await; + // shuffle order to even catalog load, because cache hits/misses might be correlated w/ the order of the + // partitions. + // + // Note that we sort before shuffling to achieve a deterministic pseudo-random order + let mut partitions = should_cover.into_iter().collect::>(); + let mut rng = StdRng::seed_from_u64(cached_table.id.get() as u64); + partitions.sort_by(|(a_p_id, _a_cols), (b_p_id, _b_cols)| a_p_id.cmp(b_p_id)); + partitions.shuffle(&mut rng); - partitions.into_iter().map(|p| (p.id, p)).collect() + futures::stream::iter(partitions) + .map(|(p_id, cover)| { + let catalog_cache = self.chunk_adapter.catalog_cache(); + let span = span_recorder.child_span("fetch partition"); + + async move { + let cover = cover.into_iter().collect::>(); + let cached_partition = catalog_cache + .partition() + .get(Arc::clone(cached_table), p_id, &cover, span) + .await; + cached_partition.map(|p| (p_id, p)) + } + }) + .buffer_unordered(CONCURRENT_CHUNK_CREATION_JOBS) + .filter_map(|x| async move { x }) + .collect::>() + .await } /// Get a chunk pruner that can be used to prune chunks retrieved via [`chunks`](Self::chunks) @@ -886,22 +891,12 @@ mod tests { let chunks = querier_table.chunks().await.unwrap(); assert_eq!(chunks.len(), 5); - assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 1, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 6); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 2); let chunks = querier_table.chunks().await.unwrap(); assert_eq!(chunks.len(), 5); - assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 1, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 6); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 4); partition_2 @@ -909,22 +904,12 @@ mod tests { TestParquetFileBuilder::default().with_line_protocol("table,tag1=a foo=1,bar=1 11"), ) .await; - assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 1, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 7); // file not visible yet let chunks = querier_table.chunks().await.unwrap(); assert_eq!(chunks.len(), 5); - assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 1, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 7); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 6); // change inster ID => invalidates cache @@ -933,12 +918,7 @@ mod tests { .with_ingester_partition(ingester_partition_builder.build()); let chunks = querier_table.chunks().await.unwrap(); assert_eq!(chunks.len(), 6); - assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); - assert_catalog_access_metric_count( - &catalog.metric_registry, - "partition_get_by_id_batch", - 2, - ); + assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 8); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 8); }