diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs index c232b47d83..ea2b5c8ee6 100644 --- a/querier/src/cache/partition.rs +++ b/querier/src/cache/partition.rs @@ -117,54 +117,40 @@ impl PartitionCache { } } - /// Get sort key + /// Get cached partition. /// /// Expire partition if the cached sort key does NOT cover the given set of columns. - pub async fn sort_key( + pub async fn get( &self, cached_table: Arc, partition_id: PartitionId, - should_cover: &[ColumnId], + sort_key_should_cover: &[ColumnId], span: Option, - ) -> Option> { + ) -> Option { self.remove_if_handle .remove_if_and_get( &self.cache, partition_id, |cached_partition| { if let Some(sort_key) = &cached_partition.and_then(|p| p.sort_key) { - should_cover + sort_key_should_cover .iter() .any(|col| !sort_key.column_set.contains(col)) } else { // no sort key at all => need to update if there is anything to cover - !should_cover.is_empty() + !sort_key_should_cover.is_empty() } }, (cached_table, span), ) .await - .and_then(|p| p.sort_key) - } - - /// Get known column ranges. - pub async fn column_ranges( - &self, - cached_table: Arc, - partition_id: PartitionId, - span: Option, - ) -> Option { - self.cache - .get(partition_id, (cached_table, span)) - .await - .map(|p| p.column_ranges) } } -#[derive(Debug, Clone)] -struct CachedPartition { - sort_key: Option>, - column_ranges: ColumnRanges, +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CachedPartition { + pub sort_key: Option>, + pub column_ranges: ColumnRanges, } impl CachedPartition { @@ -353,8 +339,10 @@ mod tests { ); let sort_key1a = cache - .sort_key(Arc::clone(&cached_table), p1.id, &Vec::new(), None) - .await; + .get(Arc::clone(&cached_table), p1.id, &Vec::new(), None) + .await + .unwrap() + .sort_key; assert_eq!( sort_key1a.as_ref().unwrap().as_ref(), &PartitionSortKey { @@ -366,14 +354,18 @@ mod tests { assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1); let sort_key2 = cache - .sort_key(Arc::clone(&cached_table), p2.id, &Vec::new(), None) - .await; + .get(Arc::clone(&cached_table), p2.id, &Vec::new(), None) + .await + .unwrap() + .sort_key; assert_eq!(sort_key2, None); assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2); let sort_key1b = cache - .sort_key(Arc::clone(&cached_table), p1.id, &Vec::new(), None) - .await; + .get(Arc::clone(&cached_table), p1.id, &Vec::new(), None) + .await + .unwrap() + .sort_key; assert!(Arc::ptr_eq( sort_key1a.as_ref().unwrap(), sort_key1b.as_ref().unwrap() @@ -383,7 +375,7 @@ mod tests { // non-existing partition for _ in 0..2 { let res = cache - .sort_key( + .get( Arc::clone(&cached_table), PartitionId::new(i64::MAX), &Vec::new(), @@ -460,9 +452,10 @@ mod tests { ); let ranges1a = cache - .column_ranges(Arc::clone(&cached_table), p1.id, None) + .get(Arc::clone(&cached_table), p1.id, &[], None) .await - .unwrap(); + .unwrap() + .column_ranges; assert_eq!( ranges1a.as_ref(), &HashMap::from([ @@ -489,9 +482,10 @@ mod tests { assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1); let ranges2 = cache - .column_ranges(Arc::clone(&cached_table), p2.id, None) + .get(Arc::clone(&cached_table), p2.id, &[], None) .await - .unwrap(); + .unwrap() + .column_ranges; assert_eq!( ranges2.as_ref(), &HashMap::from([( @@ -505,9 +499,10 @@ mod tests { assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2); let ranges3 = cache - .column_ranges(Arc::clone(&cached_table), p3.id, None) + .get(Arc::clone(&cached_table), p3.id, &[], None) .await - .unwrap(); + .unwrap() + .column_ranges; assert_eq!( ranges3.as_ref(), &HashMap::from([ @@ -530,9 +525,10 @@ mod tests { assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3); let ranges4 = cache - .column_ranges(Arc::clone(&cached_table), p4.id, None) + .get(Arc::clone(&cached_table), p4.id, &[], None) .await - .unwrap(); + .unwrap() + .column_ranges; assert_eq!( ranges4.as_ref(), &HashMap::from([ @@ -555,9 +551,10 @@ mod tests { assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); let ranges5 = cache - .column_ranges(Arc::clone(&cached_table), p5.id, None) + .get(Arc::clone(&cached_table), p5.id, &[], None) .await - .unwrap(); + .unwrap() + .column_ranges; assert_eq!( ranges5.as_ref(), &HashMap::from([( @@ -571,86 +568,28 @@ mod tests { assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); let ranges1b = cache - .column_ranges(Arc::clone(&cached_table), p1.id, None) + .get(Arc::clone(&cached_table), p1.id, &[], None) .await - .unwrap(); + .unwrap() + .column_ranges; assert!(Arc::ptr_eq(&ranges1a, &ranges1b)); assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); // non-existing partition for _ in 0..2 { let res = cache - .column_ranges(Arc::clone(&cached_table), PartitionId::new(i64::MAX), None) + .get( + Arc::clone(&cached_table), + PartitionId::new(i64::MAX), + &[], + None, + ) .await; assert_eq!(res, None); assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 6); } } - #[tokio::test] - async fn test_cache_sharing() { - let catalog = TestCatalog::new(); - - let ns = catalog.create_namespace_1hr_retention("ns").await; - let t = ns.create_table("table").await; - let c1 = t.create_column("tag", ColumnType::Tag).await; - let c2 = t.create_column("time", ColumnType::Time).await; - let p1 = t - .create_partition_with_sort_key("k1", &["tag", "time"]) - .await - .partition - .clone(); - let p2 = t.create_partition("k2").await.partition.clone(); - let p3 = t.create_partition("k3").await.partition.clone(); - let cached_table = Arc::new(CachedTable { - id: t.table.id, - schema: schema(), - column_id_map: HashMap::from([ - (c1.column.id, Arc::from(c1.column.name.clone())), - (c2.column.id, Arc::from(c2.column.name.clone())), - ]), - column_id_map_rev: HashMap::from([ - (Arc::from(c1.column.name.clone()), c1.column.id), - (Arc::from(c2.column.name.clone()), c2.column.id), - ]), - primary_key_column_ids: [c1.column.id, c2.column.id].into(), - partition_template: TablePartitionTemplateOverride::default(), - }); - - let cache = PartitionCache::new( - catalog.catalog(), - BackoffConfig::default(), - catalog.time_provider(), - &catalog.metric_registry(), - test_ram_pool(), - true, - ); - - cache - .sort_key(Arc::clone(&cached_table), p3.id, &Vec::new(), None) - .await; - cache - .column_ranges(Arc::clone(&cached_table), p3.id, None) - .await; - assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1); - - cache - .sort_key(Arc::clone(&cached_table), p2.id, &Vec::new(), None) - .await; - cache - .column_ranges(Arc::clone(&cached_table), p2.id, None) - .await; - assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2); - - cache - .column_ranges(Arc::clone(&cached_table), p1.id, None) - .await; - cache - .sort_key(Arc::clone(&cached_table), p1.id, &Vec::new(), None) - .await; - assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3); - } - #[tokio::test] async fn test_expiration() { let catalog = TestCatalog::new(); @@ -687,23 +626,29 @@ mod tests { ); let sort_key = cache - .sort_key(Arc::clone(&cached_table), p_id, &Vec::new(), None) - .await; + .get(Arc::clone(&cached_table), p_id, &[], None) + .await + .unwrap() + .sort_key; assert_eq!(sort_key, None,); assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1); // requesting nother will not expire assert!(p_sort_key.is_none()); let sort_key = cache - .sort_key(Arc::clone(&cached_table), p_id, &Vec::new(), None) - .await; + .get(Arc::clone(&cached_table), p_id, &[], None) + .await + .unwrap() + .sort_key; assert_eq!(sort_key, None,); assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1); // but requesting something will expire let sort_key = cache - .sort_key(Arc::clone(&cached_table), p_id, &[c1.column.id], None) - .await; + .get(Arc::clone(&cached_table), p_id, &[c1.column.id], None) + .await + .unwrap() + .sort_key; assert_eq!(sort_key, None,); assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2); @@ -718,8 +663,10 @@ mod tests { // expire & fetch let p_sort_key = p.partition.sort_key(); let sort_key = cache - .sort_key(Arc::clone(&cached_table), p_id, &[c1.column.id], None) - .await; + .get(Arc::clone(&cached_table), p_id, &[c1.column.id], None) + .await + .unwrap() + .sort_key; assert_eq!( sort_key.as_ref().unwrap().as_ref(), &PartitionSortKey { @@ -738,8 +685,10 @@ mod tests { vec![c1.column.id, c2.column.id], ] { let sort_key_2 = cache - .sort_key(Arc::clone(&cached_table), p_id, &should_cover, None) - .await; + .get(Arc::clone(&cached_table), p_id, &should_cover, None) + .await + .unwrap() + .sort_key; assert!(Arc::ptr_eq( sort_key.as_ref().unwrap(), sort_key_2.as_ref().unwrap() @@ -750,13 +699,15 @@ mod tests { // unknown columns expire let c3 = t.create_column("x", ColumnType::Tag).await; let sort_key_2 = cache - .sort_key( + .get( Arc::clone(&cached_table), p_id, &[c1.column.id, c3.column.id], None, ) - .await; + .await + .unwrap() + .sort_key; assert!(!Arc::ptr_eq( sort_key.as_ref().unwrap(), sort_key_2.as_ref().unwrap() diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index 45963c4ddc..e32530aa15 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -662,8 +662,9 @@ impl IngesterStreamDecoder { // the chunk pruning will not be as efficient though. let ranges = catalog_cache .partition() - .column_ranges(Arc::clone(cached_table), p.partition_id, span) + .get(Arc::clone(cached_table), p.partition_id, &[], span) .await + .map(|p| p.column_ranges) .unwrap_or_default(); (p, ranges) } diff --git a/querier/src/parquet/creation.rs b/querier/src/parquet/creation.rs index 7ccf6cc33f..bffc7cab35 100644 --- a/querier/src/parquet/creation.rs +++ b/querier/src/parquet/creation.rs @@ -91,8 +91,9 @@ impl ChunkAdapter { async move { let ranges = catalog_cache .partition() - .column_ranges(Arc::clone(cached_table), p, span) + .get(Arc::clone(cached_table), p, &[], span) .await + .map(|p| p.column_ranges) .unwrap_or_default(); (p, ranges) } @@ -217,13 +218,15 @@ impl ChunkAdapter { let partition_sort_key = self .catalog_cache .partition() - .sort_key( + .get( Arc::clone(&cached_table), parquet_file.partition_id, &relevant_pk_columns, span_recorder.child_span("cache GET partition sort key"), ) .await + .expect("partition should be set when a parquet file exists") + .sort_key .expect("partition sort key should be set when a parquet file exists"); // NOTE: Because we've looked up the sort key AFTER the namespace schema, it may contain columns for which we