From f0f74bae02db14d9b5c849eead77c29f37689b3b Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 12 Apr 2023 14:32:50 -0400 Subject: [PATCH] fix: Treat empty ingester info differently than not having ingester info When pre-warming the catalog cache before the ingester responses have returned, we don't have any ingester parquet file counts. This is different than asking the ingesters for the parquet file counts and not getting any. So keep the Option to be able to treat "not present" differently from "present but empty". --- querier/src/cache/parquet_file.rs | 96 ++++++++++++++++++------------- querier/src/table/mod.rs | 59 ++++++++++++++++++- 2 files changed, 113 insertions(+), 42 deletions(-) diff --git a/querier/src/cache/parquet_file.rs b/querier/src/cache/parquet_file.rs index b140c76b50..d9c61f0426 100644 --- a/querier/src/cache/parquet_file.rs +++ b/querier/src/cache/parquet_file.rs @@ -32,7 +32,7 @@ pub enum Error { }, } -type IngesterCounts = Arc>; +type IngesterCounts = Option>>; /// Holds catalog information about a parquet file #[derive(Debug)] @@ -76,17 +76,19 @@ impl CachedParquetFiles { // Note size_of_val is the size of the Arc // https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=ae8fee8b4f7f5f013dc01ea1fda165da - // size of the Arc+(HashMap) itself + // size of the Arc+(Option+HashMap) itself mem::size_of_val(self) + // Vec overhead mem::size_of_val(self.files.as_ref()) + // size of the underlying parquet files self.files.iter().map(|f| f.size()).sum::() + // hashmap data - std::mem::size_of_val(self.persisted_file_counts_from_ingesters.as_ref()) + self.persisted_file_counts_from_ingesters .as_ref() - .capacity() * mem::size_of::<(Uuid, u64)>() + .map(|map| { + std::mem::size_of_val(map.as_ref()) + + map.capacity() * mem::size_of::<(Uuid, u64)>() + }).unwrap_or_default() } } @@ -127,7 +129,7 @@ impl ParquetFileCache { async move { Backoff::new(&backoff_config) .retry_all_errors("get parquet_files", || { - let extra = Arc::clone(&extra); + let extra = extra.clone(); async { // TODO refreshing all parquet files for the // entire table is likely to be quite wasteful @@ -207,30 +209,38 @@ impl ParquetFileCache { pub async fn get( &self, table_id: TableId, - persisted_file_counts_by_ingester_uuid: HashMap, + persisted_file_counts_by_ingester_uuid: Option>, span: Option, ) -> Arc { - let mut entries = persisted_file_counts_by_ingester_uuid - .into_iter() - .collect::>(); - entries.sort(); - entries.shrink_to_fit(); - - let persisted_file_counts_by_ingester_uuid = Arc::new(entries); + let persisted_file_counts_by_ingester_uuid = + persisted_file_counts_by_ingester_uuid.map(|map| { + let mut entries = map.into_iter().collect::>(); + entries.sort(); + entries.shrink_to_fit(); + Arc::new(entries) + }); let persisted_file_counts_by_ingester_uuid_captured = - Arc::clone(&persisted_file_counts_by_ingester_uuid); + persisted_file_counts_by_ingester_uuid.clone(); self.remove_if_handle .remove_if_and_get( &self.cache, table_id, |cached_file| { - // If there's new or different information about the ingesters or the - // number of files they've persisted, we need to refresh. - different( - cached_file.persisted_file_counts_from_ingesters.as_ref(), - &persisted_file_counts_by_ingester_uuid_captured, - ) + if let Some(ingester_counts) = &persisted_file_counts_by_ingester_uuid_captured + { + // If there's new or different information about the ingesters or the + // number of files they've persisted, we need to refresh. + different( + cached_file + .persisted_file_counts_from_ingesters + .as_ref() + .map(|x| x.as_ref().as_ref()), + ingester_counts, + ) + } else { + false + } }, (persisted_file_counts_by_ingester_uuid, span), ) @@ -244,10 +254,10 @@ impl ParquetFileCache { } } -fn different(stored_counts: &[(Uuid, u64)], ingester_counts: &[(Uuid, u64)]) -> bool { +fn different(stored_counts: Option<&[(Uuid, u64)]>, ingester_counts: &[(Uuid, u64)]) -> bool { // If we have some information stored for this table, - if !stored_counts.is_empty() { - ingester_counts != stored_counts + if let Some(stored) = stored_counts { + ingester_counts != stored } else { // Otherwise, we've never seen ingester file counts for this table. // If the hashmap we got is empty, then we still haven't gotten any information, so we @@ -278,7 +288,7 @@ mod tests { let tfile = partition.create_parquet_file(builder).await; let cache = make_cache(&catalog); - let cached_files = cache.get(table.table.id, HashMap::new(), None).await.vec(); + let cached_files = cache.get(table.table.id, None, None).await.vec(); assert_eq!(cached_files.len(), 1); let expected_parquet_file = &tfile.parquet_file; @@ -286,7 +296,7 @@ mod tests { // validate a second request doesn't result in a catalog request assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); - cache.get(table.table.id, HashMap::new(), None).await; + cache.get(table.table.id, None, None).await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); } @@ -305,12 +315,12 @@ mod tests { let cache = make_cache(&catalog); - let cached_files = cache.get(table1.table.id, HashMap::new(), None).await.vec(); + let cached_files = cache.get(table1.table.id, None, None).await.vec(); assert_eq!(cached_files.len(), 1); let expected_parquet_file = &tfile1.parquet_file; assert_eq!(cached_files[0].as_ref(), expected_parquet_file); - let cached_files = cache.get(table2.table.id, HashMap::new(), None).await.vec(); + let cached_files = cache.get(table2.table.id, None, None).await.vec(); assert_eq!(cached_files.len(), 1); let expected_parquet_file = &tfile2.parquet_file; assert_eq!(cached_files[0].as_ref(), expected_parquet_file); @@ -326,7 +336,7 @@ mod tests { let different_catalog = TestCatalog::new(); let cache = make_cache(&different_catalog); - let cached_files = cache.get(table.table.id, HashMap::new(), None).await.vec(); + let cached_files = cache.get(table.table.id, None, None).await.vec(); assert!(cached_files.is_empty()); } @@ -337,19 +347,19 @@ mod tests { partition.create_parquet_file(builder).await; let table_id = table.table.id; - let single_file_size = 256; - let two_file_size = 448; + let single_file_size = 232; + let two_file_size = 424; assert!(single_file_size < two_file_size); let cache = make_cache(&catalog); - let cached_files = cache.get(table_id, HashMap::new(), None).await; + let cached_files = cache.get(table_id, None, None).await; assert_eq!(cached_files.size(), single_file_size); // add a second file, and force the cache to find it let builder = TestParquetFileBuilder::default().with_line_protocol(TABLE1_LINE_PROTOCOL); partition.create_parquet_file(builder).await; cache.expire(table_id); - let cached_files = cache.get(table_id, HashMap::new(), None).await; + let cached_files = cache.get(table_id, None, None).await; assert_eq!(cached_files.size(), two_file_size); } @@ -361,35 +371,41 @@ mod tests { let cache = make_cache(&catalog); // No metadata: make one request that should be cached - cache.get(table_id, HashMap::new(), None).await; + cache.get(table_id, None, None).await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); - cache.get(table_id, HashMap::new(), None).await; + cache.get(table_id, None, None).await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); // Empty metadata: make one request, should still be cached - cache.get(table_id, HashMap::new(), None).await; + cache.get(table_id, Some(HashMap::new()), None).await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); // See a new UUID: refresh the cache - cache.get(table_id, HashMap::from([(uuid, 3)]), None).await; + cache + .get(table_id, Some(HashMap::from([(uuid, 3)])), None) + .await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2); // See the same UUID with the same count: should still be cached - cache.get(table_id, HashMap::from([(uuid, 3)]), None).await; + cache + .get(table_id, Some(HashMap::from([(uuid, 3)])), None) + .await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2); // See the same UUID with a different count: refresh the cache - cache.get(table_id, HashMap::from([(uuid, 4)]), None).await; + cache + .get(table_id, Some(HashMap::from([(uuid, 4)])), None) + .await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 3); // Empty metadata again: still use the cache - cache.get(table_id, HashMap::new(), None).await; + cache.get(table_id, Some(HashMap::new()), None).await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 4); // See a new UUID and not the old one: refresh the cache let new_uuid = Uuid::new_v4(); cache - .get(table_id, HashMap::from([(new_uuid, 1)]), None) + .get(table_id, Some(HashMap::from([(new_uuid, 1)])), None) .await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 5); } diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index ac6aca1b44..a5fb1255c2 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -219,7 +219,7 @@ impl QuerierTable { ), catalog_cache.parquet_file().get( self.id(), - HashMap::new(), + None, span_recorder.child_span("cache GET parquet_file (pre-warm") ), catalog_cache.tombstone().get( @@ -260,7 +260,7 @@ impl QuerierTable { let (parquet_files, tombstones) = join!( catalog_cache.parquet_file().get( self.id(), - persisted_file_counts_by_ingester_uuid, + Some(persisted_file_counts_by_ingester_uuid), span_recorder.child_span("cache GET parquet_file"), ), catalog_cache.tombstone().get( @@ -822,6 +822,61 @@ mod tests { assert_batches_eq!(&expected, &batches); } + #[tokio::test] + async fn test_parquet_cache_refresh() { + maybe_start_logging(); + let catalog = TestCatalog::new(); + let ns = catalog.create_namespace_1hr_retention("ns").await; + let table = ns.create_table("table1").await; + let shard = ns.create_shard(1).await; + let partition = table.with_shard(&shard).create_partition("k").await; + let schema = make_schema(&table).await; + + let builder = + IngesterPartitionBuilder::new(schema, &shard, &partition).with_lp(["table foo=1 1"]); + + // Parquet file between with max sequence number 2 + let pf_builder = TestParquetFileBuilder::default() + .with_line_protocol("table1 foo=1 11") + .with_max_seq(2); + partition.create_parquet_file(pf_builder).await; + + let ingester_partition = + builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(2))); + + let querier_table = TestQuerierTable::new(&catalog, &table) + .await + .with_ingester_partition(ingester_partition); + + // Expect 2 chunks: one for ingester, and one from parquet file + let chunks = querier_table.chunks().await.unwrap(); + assert_eq!(chunks.len(), 2); + + // Now, make a second chunk with max sequence number 3 + let pf_builder = TestParquetFileBuilder::default() + .with_line_protocol("table1 foo=1 22") + .with_max_seq(3); + partition.create_parquet_file(pf_builder).await; + + // With the same ingester response, still expect 2 chunks: one + // for ingester, and one from parquet file + let chunks = querier_table.chunks().await.unwrap(); + assert_eq!(chunks.len(), 2); + + // update the ingester response to return a new max parquet + // sequence number that includes the new file (3) + let ingester_partition = + builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(3))); + + let querier_table = querier_table + .clear_ingester_partitions() + .with_ingester_partition(ingester_partition); + + // expect the second file is found, resulting in three chunks + let chunks = querier_table.chunks().await.unwrap(); + assert_eq!(chunks.len(), 3); + } + #[tokio::test] async fn test_tombstone_cache_refresh() { maybe_start_logging();