refactor: single entry point for partition cache (#8093)

For #8089 I would like to request each partition only once. Since
internally we store both the sort key and the column ranges in one cache
value anyways, there is no reason to offer two different methods to look
them up.

This only changes the `PartitionCache` interface. The actual lookups are
still separate, but will be changed in a follow-up.
pull/24376/head
Marco Neumann 2023-06-27 18:22:13 +02:00 committed by GitHub
parent 9d8b620cd2
commit 9775e150b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 78 additions and 123 deletions

View File

@ -117,54 +117,40 @@ impl PartitionCache {
}
}
/// Get sort key
/// Get cached partition.
///
/// Expire partition if the cached sort key does NOT cover the given set of columns.
pub async fn sort_key(
pub async fn get(
&self,
cached_table: Arc<CachedTable>,
partition_id: PartitionId,
should_cover: &[ColumnId],
sort_key_should_cover: &[ColumnId],
span: Option<Span>,
) -> Option<Arc<PartitionSortKey>> {
) -> Option<CachedPartition> {
self.remove_if_handle
.remove_if_and_get(
&self.cache,
partition_id,
|cached_partition| {
if let Some(sort_key) = &cached_partition.and_then(|p| p.sort_key) {
should_cover
sort_key_should_cover
.iter()
.any(|col| !sort_key.column_set.contains(col))
} else {
// no sort key at all => need to update if there is anything to cover
!should_cover.is_empty()
!sort_key_should_cover.is_empty()
}
},
(cached_table, span),
)
.await
.and_then(|p| p.sort_key)
}
/// Get known column ranges.
pub async fn column_ranges(
&self,
cached_table: Arc<CachedTable>,
partition_id: PartitionId,
span: Option<Span>,
) -> Option<ColumnRanges> {
self.cache
.get(partition_id, (cached_table, span))
.await
.map(|p| p.column_ranges)
}
}
#[derive(Debug, Clone)]
struct CachedPartition {
sort_key: Option<Arc<PartitionSortKey>>,
column_ranges: ColumnRanges,
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CachedPartition {
pub sort_key: Option<Arc<PartitionSortKey>>,
pub column_ranges: ColumnRanges,
}
impl CachedPartition {
@ -353,8 +339,10 @@ mod tests {
);
let sort_key1a = cache
.sort_key(Arc::clone(&cached_table), p1.id, &Vec::new(), None)
.await;
.get(Arc::clone(&cached_table), p1.id, &Vec::new(), None)
.await
.unwrap()
.sort_key;
assert_eq!(
sort_key1a.as_ref().unwrap().as_ref(),
&PartitionSortKey {
@ -366,14 +354,18 @@ mod tests {
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
let sort_key2 = cache
.sort_key(Arc::clone(&cached_table), p2.id, &Vec::new(), None)
.await;
.get(Arc::clone(&cached_table), p2.id, &Vec::new(), None)
.await
.unwrap()
.sort_key;
assert_eq!(sort_key2, None);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
let sort_key1b = cache
.sort_key(Arc::clone(&cached_table), p1.id, &Vec::new(), None)
.await;
.get(Arc::clone(&cached_table), p1.id, &Vec::new(), None)
.await
.unwrap()
.sort_key;
assert!(Arc::ptr_eq(
sort_key1a.as_ref().unwrap(),
sort_key1b.as_ref().unwrap()
@ -383,7 +375,7 @@ mod tests {
// non-existing partition
for _ in 0..2 {
let res = cache
.sort_key(
.get(
Arc::clone(&cached_table),
PartitionId::new(i64::MAX),
&Vec::new(),
@ -460,9 +452,10 @@ mod tests {
);
let ranges1a = cache
.column_ranges(Arc::clone(&cached_table), p1.id, None)
.get(Arc::clone(&cached_table), p1.id, &[], None)
.await
.unwrap();
.unwrap()
.column_ranges;
assert_eq!(
ranges1a.as_ref(),
&HashMap::from([
@ -489,9 +482,10 @@ mod tests {
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
let ranges2 = cache
.column_ranges(Arc::clone(&cached_table), p2.id, None)
.get(Arc::clone(&cached_table), p2.id, &[], None)
.await
.unwrap();
.unwrap()
.column_ranges;
assert_eq!(
ranges2.as_ref(),
&HashMap::from([(
@ -505,9 +499,10 @@ mod tests {
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
let ranges3 = cache
.column_ranges(Arc::clone(&cached_table), p3.id, None)
.get(Arc::clone(&cached_table), p3.id, &[], None)
.await
.unwrap();
.unwrap()
.column_ranges;
assert_eq!(
ranges3.as_ref(),
&HashMap::from([
@ -530,9 +525,10 @@ mod tests {
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
let ranges4 = cache
.column_ranges(Arc::clone(&cached_table), p4.id, None)
.get(Arc::clone(&cached_table), p4.id, &[], None)
.await
.unwrap();
.unwrap()
.column_ranges;
assert_eq!(
ranges4.as_ref(),
&HashMap::from([
@ -555,9 +551,10 @@ mod tests {
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
let ranges5 = cache
.column_ranges(Arc::clone(&cached_table), p5.id, None)
.get(Arc::clone(&cached_table), p5.id, &[], None)
.await
.unwrap();
.unwrap()
.column_ranges;
assert_eq!(
ranges5.as_ref(),
&HashMap::from([(
@ -571,86 +568,28 @@ mod tests {
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
let ranges1b = cache
.column_ranges(Arc::clone(&cached_table), p1.id, None)
.get(Arc::clone(&cached_table), p1.id, &[], None)
.await
.unwrap();
.unwrap()
.column_ranges;
assert!(Arc::ptr_eq(&ranges1a, &ranges1b));
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
// non-existing partition
for _ in 0..2 {
let res = cache
.column_ranges(Arc::clone(&cached_table), PartitionId::new(i64::MAX), None)
.get(
Arc::clone(&cached_table),
PartitionId::new(i64::MAX),
&[],
None,
)
.await;
assert_eq!(res, None);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 6);
}
}
#[tokio::test]
async fn test_cache_sharing() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace_1hr_retention("ns").await;
let t = ns.create_table("table").await;
let c1 = t.create_column("tag", ColumnType::Tag).await;
let c2 = t.create_column("time", ColumnType::Time).await;
let p1 = t
.create_partition_with_sort_key("k1", &["tag", "time"])
.await
.partition
.clone();
let p2 = t.create_partition("k2").await.partition.clone();
let p3 = t.create_partition("k3").await.partition.clone();
let cached_table = Arc::new(CachedTable {
id: t.table.id,
schema: schema(),
column_id_map: HashMap::from([
(c1.column.id, Arc::from(c1.column.name.clone())),
(c2.column.id, Arc::from(c2.column.name.clone())),
]),
column_id_map_rev: HashMap::from([
(Arc::from(c1.column.name.clone()), c1.column.id),
(Arc::from(c2.column.name.clone()), c2.column.id),
]),
primary_key_column_ids: [c1.column.id, c2.column.id].into(),
partition_template: TablePartitionTemplateOverride::default(),
});
let cache = PartitionCache::new(
catalog.catalog(),
BackoffConfig::default(),
catalog.time_provider(),
&catalog.metric_registry(),
test_ram_pool(),
true,
);
cache
.sort_key(Arc::clone(&cached_table), p3.id, &Vec::new(), None)
.await;
cache
.column_ranges(Arc::clone(&cached_table), p3.id, None)
.await;
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
cache
.sort_key(Arc::clone(&cached_table), p2.id, &Vec::new(), None)
.await;
cache
.column_ranges(Arc::clone(&cached_table), p2.id, None)
.await;
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
cache
.column_ranges(Arc::clone(&cached_table), p1.id, None)
.await;
cache
.sort_key(Arc::clone(&cached_table), p1.id, &Vec::new(), None)
.await;
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
}
#[tokio::test]
async fn test_expiration() {
let catalog = TestCatalog::new();
@ -687,23 +626,29 @@ mod tests {
);
let sort_key = cache
.sort_key(Arc::clone(&cached_table), p_id, &Vec::new(), None)
.await;
.get(Arc::clone(&cached_table), p_id, &[], None)
.await
.unwrap()
.sort_key;
assert_eq!(sort_key, None,);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
// requesting nother will not expire
assert!(p_sort_key.is_none());
let sort_key = cache
.sort_key(Arc::clone(&cached_table), p_id, &Vec::new(), None)
.await;
.get(Arc::clone(&cached_table), p_id, &[], None)
.await
.unwrap()
.sort_key;
assert_eq!(sort_key, None,);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
// but requesting something will expire
let sort_key = cache
.sort_key(Arc::clone(&cached_table), p_id, &[c1.column.id], None)
.await;
.get(Arc::clone(&cached_table), p_id, &[c1.column.id], None)
.await
.unwrap()
.sort_key;
assert_eq!(sort_key, None,);
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
@ -718,8 +663,10 @@ mod tests {
// expire & fetch
let p_sort_key = p.partition.sort_key();
let sort_key = cache
.sort_key(Arc::clone(&cached_table), p_id, &[c1.column.id], None)
.await;
.get(Arc::clone(&cached_table), p_id, &[c1.column.id], None)
.await
.unwrap()
.sort_key;
assert_eq!(
sort_key.as_ref().unwrap().as_ref(),
&PartitionSortKey {
@ -738,8 +685,10 @@ mod tests {
vec![c1.column.id, c2.column.id],
] {
let sort_key_2 = cache
.sort_key(Arc::clone(&cached_table), p_id, &should_cover, None)
.await;
.get(Arc::clone(&cached_table), p_id, &should_cover, None)
.await
.unwrap()
.sort_key;
assert!(Arc::ptr_eq(
sort_key.as_ref().unwrap(),
sort_key_2.as_ref().unwrap()
@ -750,13 +699,15 @@ mod tests {
// unknown columns expire
let c3 = t.create_column("x", ColumnType::Tag).await;
let sort_key_2 = cache
.sort_key(
.get(
Arc::clone(&cached_table),
p_id,
&[c1.column.id, c3.column.id],
None,
)
.await;
.await
.unwrap()
.sort_key;
assert!(!Arc::ptr_eq(
sort_key.as_ref().unwrap(),
sort_key_2.as_ref().unwrap()

View File

@ -662,8 +662,9 @@ impl IngesterStreamDecoder {
// the chunk pruning will not be as efficient though.
let ranges = catalog_cache
.partition()
.column_ranges(Arc::clone(cached_table), p.partition_id, span)
.get(Arc::clone(cached_table), p.partition_id, &[], span)
.await
.map(|p| p.column_ranges)
.unwrap_or_default();
(p, ranges)
}

View File

@ -91,8 +91,9 @@ impl ChunkAdapter {
async move {
let ranges = catalog_cache
.partition()
.column_ranges(Arc::clone(cached_table), p, span)
.get(Arc::clone(cached_table), p, &[], span)
.await
.map(|p| p.column_ranges)
.unwrap_or_default();
(p, ranges)
}
@ -217,13 +218,15 @@ impl ChunkAdapter {
let partition_sort_key = self
.catalog_cache
.partition()
.sort_key(
.get(
Arc::clone(&cached_table),
parquet_file.partition_id,
&relevant_pk_columns,
span_recorder.child_span("cache GET partition sort key"),
)
.await
.expect("partition should be set when a parquet file exists")
.sort_key
.expect("partition sort key should be set when a parquet file exists");
// NOTE: Because we've looked up the sort key AFTER the namespace schema, it may contain columns for which we