diff --git a/querier/src/cache/parquet_file.rs b/querier/src/cache/parquet_file.rs index b140c76b50..d9c61f0426 100644 --- a/querier/src/cache/parquet_file.rs +++ b/querier/src/cache/parquet_file.rs @@ -32,7 +32,7 @@ pub enum Error { }, } -type IngesterCounts = Arc>; +type IngesterCounts = Option>>; /// Holds catalog information about a parquet file #[derive(Debug)] @@ -76,17 +76,19 @@ impl CachedParquetFiles { // Note size_of_val is the size of the Arc // https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=ae8fee8b4f7f5f013dc01ea1fda165da - // size of the Arc+(HashMap) itself + // size of the Arc+(Option+HashMap) itself mem::size_of_val(self) + // Vec overhead mem::size_of_val(self.files.as_ref()) + // size of the underlying parquet files self.files.iter().map(|f| f.size()).sum::() + // hashmap data - std::mem::size_of_val(self.persisted_file_counts_from_ingesters.as_ref()) + self.persisted_file_counts_from_ingesters .as_ref() - .capacity() * mem::size_of::<(Uuid, u64)>() + .map(|map| { + std::mem::size_of_val(map.as_ref()) + + map.capacity() * mem::size_of::<(Uuid, u64)>() + }).unwrap_or_default() } } @@ -127,7 +129,7 @@ impl ParquetFileCache { async move { Backoff::new(&backoff_config) .retry_all_errors("get parquet_files", || { - let extra = Arc::clone(&extra); + let extra = extra.clone(); async { // TODO refreshing all parquet files for the // entire table is likely to be quite wasteful @@ -207,30 +209,38 @@ impl ParquetFileCache { pub async fn get( &self, table_id: TableId, - persisted_file_counts_by_ingester_uuid: HashMap, + persisted_file_counts_by_ingester_uuid: Option>, span: Option, ) -> Arc { - let mut entries = persisted_file_counts_by_ingester_uuid - .into_iter() - .collect::>(); - entries.sort(); - entries.shrink_to_fit(); - - let persisted_file_counts_by_ingester_uuid = Arc::new(entries); + let persisted_file_counts_by_ingester_uuid = + persisted_file_counts_by_ingester_uuid.map(|map| { + let mut entries = map.into_iter().collect::>(); + entries.sort(); + entries.shrink_to_fit(); + Arc::new(entries) + }); let persisted_file_counts_by_ingester_uuid_captured = - Arc::clone(&persisted_file_counts_by_ingester_uuid); + persisted_file_counts_by_ingester_uuid.clone(); self.remove_if_handle .remove_if_and_get( &self.cache, table_id, |cached_file| { - // If there's new or different information about the ingesters or the - // number of files they've persisted, we need to refresh. - different( - cached_file.persisted_file_counts_from_ingesters.as_ref(), - &persisted_file_counts_by_ingester_uuid_captured, - ) + if let Some(ingester_counts) = &persisted_file_counts_by_ingester_uuid_captured + { + // If there's new or different information about the ingesters or the + // number of files they've persisted, we need to refresh. + different( + cached_file + .persisted_file_counts_from_ingesters + .as_ref() + .map(|x| x.as_ref().as_ref()), + ingester_counts, + ) + } else { + false + } }, (persisted_file_counts_by_ingester_uuid, span), ) @@ -244,10 +254,10 @@ impl ParquetFileCache { } } -fn different(stored_counts: &[(Uuid, u64)], ingester_counts: &[(Uuid, u64)]) -> bool { +fn different(stored_counts: Option<&[(Uuid, u64)]>, ingester_counts: &[(Uuid, u64)]) -> bool { // If we have some information stored for this table, - if !stored_counts.is_empty() { - ingester_counts != stored_counts + if let Some(stored) = stored_counts { + ingester_counts != stored } else { // Otherwise, we've never seen ingester file counts for this table. // If the hashmap we got is empty, then we still haven't gotten any information, so we @@ -278,7 +288,7 @@ mod tests { let tfile = partition.create_parquet_file(builder).await; let cache = make_cache(&catalog); - let cached_files = cache.get(table.table.id, HashMap::new(), None).await.vec(); + let cached_files = cache.get(table.table.id, None, None).await.vec(); assert_eq!(cached_files.len(), 1); let expected_parquet_file = &tfile.parquet_file; @@ -286,7 +296,7 @@ mod tests { // validate a second request doesn't result in a catalog request assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); - cache.get(table.table.id, HashMap::new(), None).await; + cache.get(table.table.id, None, None).await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); } @@ -305,12 +315,12 @@ mod tests { let cache = make_cache(&catalog); - let cached_files = cache.get(table1.table.id, HashMap::new(), None).await.vec(); + let cached_files = cache.get(table1.table.id, None, None).await.vec(); assert_eq!(cached_files.len(), 1); let expected_parquet_file = &tfile1.parquet_file; assert_eq!(cached_files[0].as_ref(), expected_parquet_file); - let cached_files = cache.get(table2.table.id, HashMap::new(), None).await.vec(); + let cached_files = cache.get(table2.table.id, None, None).await.vec(); assert_eq!(cached_files.len(), 1); let expected_parquet_file = &tfile2.parquet_file; assert_eq!(cached_files[0].as_ref(), expected_parquet_file); @@ -326,7 +336,7 @@ mod tests { let different_catalog = TestCatalog::new(); let cache = make_cache(&different_catalog); - let cached_files = cache.get(table.table.id, HashMap::new(), None).await.vec(); + let cached_files = cache.get(table.table.id, None, None).await.vec(); assert!(cached_files.is_empty()); } @@ -337,19 +347,19 @@ mod tests { partition.create_parquet_file(builder).await; let table_id = table.table.id; - let single_file_size = 256; - let two_file_size = 448; + let single_file_size = 232; + let two_file_size = 424; assert!(single_file_size < two_file_size); let cache = make_cache(&catalog); - let cached_files = cache.get(table_id, HashMap::new(), None).await; + let cached_files = cache.get(table_id, None, None).await; assert_eq!(cached_files.size(), single_file_size); // add a second file, and force the cache to find it let builder = TestParquetFileBuilder::default().with_line_protocol(TABLE1_LINE_PROTOCOL); partition.create_parquet_file(builder).await; cache.expire(table_id); - let cached_files = cache.get(table_id, HashMap::new(), None).await; + let cached_files = cache.get(table_id, None, None).await; assert_eq!(cached_files.size(), two_file_size); } @@ -361,35 +371,41 @@ mod tests { let cache = make_cache(&catalog); // No metadata: make one request that should be cached - cache.get(table_id, HashMap::new(), None).await; + cache.get(table_id, None, None).await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); - cache.get(table_id, HashMap::new(), None).await; + cache.get(table_id, None, None).await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); // Empty metadata: make one request, should still be cached - cache.get(table_id, HashMap::new(), None).await; + cache.get(table_id, Some(HashMap::new()), None).await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1); // See a new UUID: refresh the cache - cache.get(table_id, HashMap::from([(uuid, 3)]), None).await; + cache + .get(table_id, Some(HashMap::from([(uuid, 3)])), None) + .await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2); // See the same UUID with the same count: should still be cached - cache.get(table_id, HashMap::from([(uuid, 3)]), None).await; + cache + .get(table_id, Some(HashMap::from([(uuid, 3)])), None) + .await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2); // See the same UUID with a different count: refresh the cache - cache.get(table_id, HashMap::from([(uuid, 4)]), None).await; + cache + .get(table_id, Some(HashMap::from([(uuid, 4)])), None) + .await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 3); // Empty metadata again: still use the cache - cache.get(table_id, HashMap::new(), None).await; + cache.get(table_id, Some(HashMap::new()), None).await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 4); // See a new UUID and not the old one: refresh the cache let new_uuid = Uuid::new_v4(); cache - .get(table_id, HashMap::from([(new_uuid, 1)]), None) + .get(table_id, Some(HashMap::from([(new_uuid, 1)])), None) .await; assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 5); } diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index ac6aca1b44..a5fb1255c2 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -219,7 +219,7 @@ impl QuerierTable { ), catalog_cache.parquet_file().get( self.id(), - HashMap::new(), + None, span_recorder.child_span("cache GET parquet_file (pre-warm") ), catalog_cache.tombstone().get( @@ -260,7 +260,7 @@ impl QuerierTable { let (parquet_files, tombstones) = join!( catalog_cache.parquet_file().get( self.id(), - persisted_file_counts_by_ingester_uuid, + Some(persisted_file_counts_by_ingester_uuid), span_recorder.child_span("cache GET parquet_file"), ), catalog_cache.tombstone().get( @@ -822,6 +822,61 @@ mod tests { assert_batches_eq!(&expected, &batches); } + #[tokio::test] + async fn test_parquet_cache_refresh() { + maybe_start_logging(); + let catalog = TestCatalog::new(); + let ns = catalog.create_namespace_1hr_retention("ns").await; + let table = ns.create_table("table1").await; + let shard = ns.create_shard(1).await; + let partition = table.with_shard(&shard).create_partition("k").await; + let schema = make_schema(&table).await; + + let builder = + IngesterPartitionBuilder::new(schema, &shard, &partition).with_lp(["table foo=1 1"]); + + // Parquet file between with max sequence number 2 + let pf_builder = TestParquetFileBuilder::default() + .with_line_protocol("table1 foo=1 11") + .with_max_seq(2); + partition.create_parquet_file(pf_builder).await; + + let ingester_partition = + builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(2))); + + let querier_table = TestQuerierTable::new(&catalog, &table) + .await + .with_ingester_partition(ingester_partition); + + // Expect 2 chunks: one for ingester, and one from parquet file + let chunks = querier_table.chunks().await.unwrap(); + assert_eq!(chunks.len(), 2); + + // Now, make a second chunk with max sequence number 3 + let pf_builder = TestParquetFileBuilder::default() + .with_line_protocol("table1 foo=1 22") + .with_max_seq(3); + partition.create_parquet_file(pf_builder).await; + + // With the same ingester response, still expect 2 chunks: one + // for ingester, and one from parquet file + let chunks = querier_table.chunks().await.unwrap(); + assert_eq!(chunks.len(), 2); + + // update the ingester response to return a new max parquet + // sequence number that includes the new file (3) + let ingester_partition = + builder.build_with_max_parquet_sequence_number(Some(SequenceNumber::new(3))); + + let querier_table = querier_table + .clear_ingester_partitions() + .with_ingester_partition(ingester_partition); + + // expect the second file is found, resulting in three chunks + let chunks = querier_table.chunks().await.unwrap(); + assert_eq!(chunks.len(), 3); + } + #[tokio::test] async fn test_tombstone_cache_refresh() { maybe_start_logging();