revert: batch partition catalog requests in querier (#8269) (#8283)

Panics in prod.

This reverts commit 0c347e8e64.
pull/24376/head
Marco Neumann 2023-07-20 11:42:40 +02:00 committed by GitHub
parent 0994f54af4
commit d3432198b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 109 additions and 370 deletions

View File

@ -8,11 +8,7 @@ use cache_system::{
PolicyBackend, PolicyBackend,
}, },
cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache}, cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache},
loader::{ loader::{metrics::MetricsLoader, FunctionLoader},
batch::{BatchLoader, BatchLoaderFlusher, BatchLoaderFlusherExt},
metrics::MetricsLoader,
FunctionLoader,
},
resource_consumption::FunctionEstimator, resource_consumption::FunctionEstimator,
}; };
use data_types::{ use data_types::{
@ -20,17 +16,17 @@ use data_types::{
ColumnId, Partition, PartitionId, TransitionPartitionId, ColumnId, Partition, PartitionId, TransitionPartitionId,
}; };
use datafusion::scalar::ScalarValue; use datafusion::scalar::ScalarValue;
use iox_catalog::{interface::Catalog, partition_lookup_batch}; use iox_catalog::{interface::Catalog, partition_lookup};
use iox_query::chunk_statistics::{ColumnRange, ColumnRanges}; use iox_query::chunk_statistics::{ColumnRange, ColumnRanges};
use iox_time::TimeProvider; use iox_time::TimeProvider;
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
use schema::sort::SortKey; use schema::sort::SortKey;
use std::{ use std::{
collections::{hash_map::Entry, HashMap, HashSet}, collections::{HashMap, HashSet},
mem::{size_of, size_of_val}, mem::{size_of, size_of_val},
sync::Arc, sync::Arc,
}; };
use trace::span::{Span, SpanRecorder}; use trace::span::Span;
use super::{namespace::CachedTable, ram::RamSize}; use super::{namespace::CachedTable, ram::RamSize};
@ -50,7 +46,6 @@ type CacheT = Box<
pub struct PartitionCache { pub struct PartitionCache {
cache: CacheT, cache: CacheT,
remove_if_handle: RemoveIfHandle<PartitionId, Option<CachedPartition>>, remove_if_handle: RemoveIfHandle<PartitionId, Option<CachedPartition>>,
flusher: Arc<dyn BatchLoaderFlusher>,
} }
impl PartitionCache { impl PartitionCache {
@ -63,61 +58,24 @@ impl PartitionCache {
ram_pool: Arc<ResourcePool<RamSize>>, ram_pool: Arc<ResourcePool<RamSize>>,
testing: bool, testing: bool,
) -> Self { ) -> Self {
let loader = FunctionLoader::new( let loader =
move |partition_ids: Vec<PartitionId>, cached_tables: Vec<Arc<CachedTable>>| { FunctionLoader::new(move |partition_id: PartitionId, extra: Arc<CachedTable>| {
// sanity checks
assert_eq!(partition_ids.len(), cached_tables.len());
assert!(!partition_ids.is_empty());
let cached_table = Arc::clone(&cached_tables[0]);
assert!(cached_tables.iter().all(|t| Arc::ptr_eq(t, &cached_table)));
let catalog = Arc::clone(&catalog); let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone(); let backoff_config = backoff_config.clone();
async move { async move {
// prepare output buffer let partition = Backoff::new(&backoff_config)
let mut out = (0..partition_ids.len()).map(|_| None).collect::<Vec<_>>();
let mut out_map =
HashMap::<PartitionId, usize>::with_capacity(partition_ids.len());
for (idx, id) in partition_ids.iter().enumerate() {
match out_map.entry(*id) {
Entry::Occupied(_) => unreachable!("cache system requested same partition from loader concurrently, this should have been prevented by the CacheDriver"),
Entry::Vacant(v) => {
v.insert(idx);
}
}
}
// build `&[&TransitionPartitionId]` for batch catalog request
let ids = partition_ids
.iter()
.copied()
.map(TransitionPartitionId::Deprecated)
.collect::<Vec<_>>();
let ids = ids.iter().collect::<Vec<_>>();
// fetch catalog data
let partitions = Backoff::new(&backoff_config)
.retry_all_errors("get partition_key", || async { .retry_all_errors("get partition_key", || async {
let mut repos = catalog.repositories().await; let mut repos = catalog.repositories().await;
partition_lookup_batch(repos.as_mut(), &ids).await let id = TransitionPartitionId::Deprecated(partition_id);
partition_lookup(repos.as_mut(), &id).await
}) })
.await .await
.expect("retry forever"); .expect("retry forever")?;
// build output Some(CachedPartition::new(partition, &extra))
for p in partitions {
let p = CachedPartition::new(p, &cached_table);
let idx = out_map[&p.id];
out[idx] = Some(p);
}
out
} }
}, });
);
let loader = Arc::new(BatchLoader::new(loader));
let flusher = Arc::clone(&loader);
let loader = Arc::new(MetricsLoader::new( let loader = Arc::new(MetricsLoader::new(
loader, loader,
CACHE_ID, CACHE_ID,
@ -153,79 +111,51 @@ impl PartitionCache {
Self { Self {
cache, cache,
remove_if_handle, remove_if_handle,
flusher,
} }
} }
/// Get cached partition. /// Get cached partition.
/// ///
/// The result only contains existing partitions. The order is undefined.
///
/// Expire partition if the cached sort key does NOT cover the given set of columns. /// Expire partition if the cached sort key does NOT cover the given set of columns.
pub async fn get( pub async fn get(
&self, &self,
cached_table: Arc<CachedTable>, cached_table: Arc<CachedTable>,
partitions: Vec<PartitionRequest>, partition_id: PartitionId,
sort_key_should_cover: &[ColumnId],
span: Option<Span>, span: Option<Span>,
) -> Vec<CachedPartition> { ) -> Option<CachedPartition> {
let span_recorder = SpanRecorder::new(span); self.remove_if_handle
.remove_if_and_get(
&self.cache,
partition_id,
|cached_partition| {
let invalidates =
if let Some(sort_key) = &cached_partition.and_then(|p| p.sort_key) {
sort_key_should_cover
.iter()
.any(|col| !sort_key.column_set.contains(col))
} else {
// no sort key at all => need to update if there is anything to cover
!sort_key_should_cover.is_empty()
};
let futures = partitions if invalidates {
.into_iter() debug!(
.map( partition_id = partition_id.get(),
|PartitionRequest { "invalidate partition cache",
partition_id, );
sort_key_should_cover, }
}| {
let cached_table = Arc::clone(&cached_table);
let span = span_recorder.child_span("single partition cache lookup");
self.remove_if_handle.remove_if_and_get( invalidates
&self.cache,
partition_id,
move |cached_partition| {
let invalidates = if let Some(sort_key) =
&cached_partition.and_then(|p| p.sort_key)
{
sort_key_should_cover
.iter()
.any(|col| !sort_key.column_set.contains(col))
} else {
// no sort key at all => need to update if there is anything to cover
!sort_key_should_cover.is_empty()
};
if invalidates {
debug!(
partition_id = partition_id.get(),
"invalidate partition cache",
);
}
invalidates
},
(cached_table, span),
)
}, },
(cached_table, span),
) )
.collect(); .await
let res = self.flusher.auto_flush(futures).await;
res.into_iter().flatten().collect()
} }
} }
/// Request for [`PartitionCache::get`].
#[derive(Debug)]
pub struct PartitionRequest {
pub partition_id: PartitionId,
pub sort_key_should_cover: Vec<ColumnId>,
}
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct CachedPartition { pub struct CachedPartition {
pub id: PartitionId,
pub sort_key: Option<Arc<PartitionSortKey>>, pub sort_key: Option<Arc<PartitionSortKey>>,
pub column_ranges: ColumnRanges, pub column_ranges: ColumnRanges,
} }
@ -301,7 +231,6 @@ impl CachedPartition {
column_ranges.shrink_to_fit(); column_ranges.shrink_to_fit();
Self { Self {
id: partition.id,
sort_key, sort_key,
column_ranges: Arc::new(column_ranges), column_ranges: Arc::new(column_ranges),
} }
@ -369,7 +298,6 @@ mod tests {
use crate::cache::{ use crate::cache::{
ram::test_util::test_ram_pool, test_util::assert_catalog_access_metric_count, ram::test_util::test_ram_pool, test_util::assert_catalog_access_metric_count,
}; };
use async_trait::async_trait;
use data_types::{partition_template::TablePartitionTemplateOverride, ColumnType}; use data_types::{partition_template::TablePartitionTemplateOverride, ColumnType};
use generated_types::influxdata::iox::partition_template::v1::{ use generated_types::influxdata::iox::partition_template::v1::{
template_part::Part, PartitionTemplate, TemplatePart, template_part::Part, PartitionTemplate, TemplatePart,
@ -420,7 +348,7 @@ mod tests {
); );
let sort_key1a = cache let sort_key1a = cache
.get_one(Arc::clone(&cached_table), p1.id, &Vec::new(), None) .get(Arc::clone(&cached_table), p1.id, &Vec::new(), None)
.await .await
.unwrap() .unwrap()
.sort_key; .sort_key;
@ -432,26 +360,18 @@ mod tests {
column_order: [c1.column.id, c2.column.id].into(), column_order: [c1.column.id, c2.column.id].into(),
} }
); );
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
&catalog.metric_registry,
"partition_get_by_id_batch",
1,
);
let sort_key2 = cache let sort_key2 = cache
.get_one(Arc::clone(&cached_table), p2.id, &Vec::new(), None) .get(Arc::clone(&cached_table), p2.id, &Vec::new(), None)
.await .await
.unwrap() .unwrap()
.sort_key; .sort_key;
assert_eq!(sort_key2, None); assert_eq!(sort_key2, None);
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
&catalog.metric_registry,
"partition_get_by_id_batch",
2,
);
let sort_key1b = cache let sort_key1b = cache
.get_one(Arc::clone(&cached_table), p1.id, &Vec::new(), None) .get(Arc::clone(&cached_table), p1.id, &Vec::new(), None)
.await .await
.unwrap() .unwrap()
.sort_key; .sort_key;
@ -459,16 +379,12 @@ mod tests {
sort_key1a.as_ref().unwrap(), sort_key1a.as_ref().unwrap(),
sort_key1b.as_ref().unwrap() sort_key1b.as_ref().unwrap()
)); ));
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
&catalog.metric_registry,
"partition_get_by_id_batch",
2,
);
// non-existing partition // non-existing partition
for _ in 0..2 { for _ in 0..2 {
let res = cache let res = cache
.get_one( .get(
Arc::clone(&cached_table), Arc::clone(&cached_table),
PartitionId::new(i64::MAX), PartitionId::new(i64::MAX),
&Vec::new(), &Vec::new(),
@ -476,11 +392,7 @@ mod tests {
) )
.await; .await;
assert_eq!(res, None); assert_eq!(res, None);
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
&catalog.metric_registry,
"partition_get_by_id_batch",
3,
);
} }
} }
@ -549,7 +461,7 @@ mod tests {
); );
let ranges1a = cache let ranges1a = cache
.get_one(Arc::clone(&cached_table), p1.id, &[], None) .get(Arc::clone(&cached_table), p1.id, &[], None)
.await .await
.unwrap() .unwrap()
.column_ranges; .column_ranges;
@ -576,14 +488,10 @@ mod tests {
&ranges1a.get("tag1").unwrap().min_value, &ranges1a.get("tag1").unwrap().min_value,
&ranges1a.get("tag1").unwrap().max_value, &ranges1a.get("tag1").unwrap().max_value,
)); ));
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
&catalog.metric_registry,
"partition_get_by_id_batch",
1,
);
let ranges2 = cache let ranges2 = cache
.get_one(Arc::clone(&cached_table), p2.id, &[], None) .get(Arc::clone(&cached_table), p2.id, &[], None)
.await .await
.unwrap() .unwrap()
.column_ranges; .column_ranges;
@ -597,14 +505,10 @@ mod tests {
} }
),]), ),]),
); );
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
&catalog.metric_registry,
"partition_get_by_id_batch",
2,
);
let ranges3 = cache let ranges3 = cache
.get_one(Arc::clone(&cached_table), p3.id, &[], None) .get(Arc::clone(&cached_table), p3.id, &[], None)
.await .await
.unwrap() .unwrap()
.column_ranges; .column_ranges;
@ -627,14 +531,10 @@ mod tests {
), ),
]), ]),
); );
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
&catalog.metric_registry,
"partition_get_by_id_batch",
3,
);
let ranges4 = cache let ranges4 = cache
.get_one(Arc::clone(&cached_table), p4.id, &[], None) .get(Arc::clone(&cached_table), p4.id, &[], None)
.await .await
.unwrap() .unwrap()
.column_ranges; .column_ranges;
@ -657,14 +557,10 @@ mod tests {
), ),
]), ]),
); );
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
&catalog.metric_registry,
"partition_get_by_id_batch",
4,
);
let ranges5 = cache let ranges5 = cache
.get_one(Arc::clone(&cached_table), p5.id, &[], None) .get(Arc::clone(&cached_table), p5.id, &[], None)
.await .await
.unwrap() .unwrap()
.column_ranges; .column_ranges;
@ -678,28 +574,20 @@ mod tests {
} }
),]), ),]),
); );
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
&catalog.metric_registry,
"partition_get_by_id_batch",
5,
);
let ranges1b = cache let ranges1b = cache
.get_one(Arc::clone(&cached_table), p1.id, &[], None) .get(Arc::clone(&cached_table), p1.id, &[], None)
.await .await
.unwrap() .unwrap()
.column_ranges; .column_ranges;
assert!(Arc::ptr_eq(&ranges1a, &ranges1b)); assert!(Arc::ptr_eq(&ranges1a, &ranges1b));
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
&catalog.metric_registry,
"partition_get_by_id_batch",
5,
);
// non-existing partition // non-existing partition
for _ in 0..2 { for _ in 0..2 {
let res = cache let res = cache
.get_one( .get(
Arc::clone(&cached_table), Arc::clone(&cached_table),
PartitionId::new(i64::MAX), PartitionId::new(i64::MAX),
&[], &[],
@ -707,11 +595,7 @@ mod tests {
) )
.await; .await;
assert_eq!(res, None); assert_eq!(res, None);
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 6);
&catalog.metric_registry,
"partition_get_by_id_batch",
6,
);
} }
} }
@ -751,43 +635,31 @@ mod tests {
); );
let sort_key = cache let sort_key = cache
.get_one(Arc::clone(&cached_table), p_id, &[], None) .get(Arc::clone(&cached_table), p_id, &[], None)
.await .await
.unwrap() .unwrap()
.sort_key; .sort_key;
assert_eq!(sort_key, None,); assert_eq!(sort_key, None,);
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
&catalog.metric_registry,
"partition_get_by_id_batch",
1,
);
// requesting nother will not expire // requesting nother will not expire
assert!(p_sort_key.is_none()); assert!(p_sort_key.is_none());
let sort_key = cache let sort_key = cache
.get_one(Arc::clone(&cached_table), p_id, &[], None) .get(Arc::clone(&cached_table), p_id, &[], None)
.await .await
.unwrap() .unwrap()
.sort_key; .sort_key;
assert_eq!(sort_key, None,); assert_eq!(sort_key, None,);
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
&catalog.metric_registry,
"partition_get_by_id_batch",
1,
);
// but requesting something will expire // but requesting something will expire
let sort_key = cache let sort_key = cache
.get_one(Arc::clone(&cached_table), p_id, &[c1.column.id], None) .get(Arc::clone(&cached_table), p_id, &[c1.column.id], None)
.await .await
.unwrap() .unwrap()
.sort_key; .sort_key;
assert_eq!(sort_key, None,); assert_eq!(sort_key, None,);
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
&catalog.metric_registry,
"partition_get_by_id_batch",
2,
);
// set sort key // set sort key
let p = p let p = p
@ -796,12 +668,11 @@ mod tests {
c2.column.name.as_str(), c2.column.name.as_str(),
])) ]))
.await; .await;
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
// expire & fetch // expire & fetch
let p_sort_key = p.partition.sort_key(); let p_sort_key = p.partition.sort_key();
let sort_key = cache let sort_key = cache
.get_one(Arc::clone(&cached_table), p_id, &[c1.column.id], None) .get(Arc::clone(&cached_table), p_id, &[c1.column.id], None)
.await .await
.unwrap() .unwrap()
.sort_key; .sort_key;
@ -813,11 +684,7 @@ mod tests {
column_order: [c1.column.id, c2.column.id].into(), column_order: [c1.column.id, c2.column.id].into(),
} }
); );
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
&catalog.metric_registry,
"partition_get_by_id_batch",
3,
);
// subsets and the full key don't expire // subsets and the full key don't expire
for should_cover in [ for should_cover in [
@ -827,7 +694,7 @@ mod tests {
vec![c1.column.id, c2.column.id], vec![c1.column.id, c2.column.id],
] { ] {
let sort_key_2 = cache let sort_key_2 = cache
.get_one(Arc::clone(&cached_table), p_id, &should_cover, None) .get(Arc::clone(&cached_table), p_id, &should_cover, None)
.await .await
.unwrap() .unwrap()
.sort_key; .sort_key;
@ -835,17 +702,13 @@ mod tests {
sort_key.as_ref().unwrap(), sort_key.as_ref().unwrap(),
sort_key_2.as_ref().unwrap() sort_key_2.as_ref().unwrap()
)); ));
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
&catalog.metric_registry,
"partition_get_by_id_batch",
3,
);
} }
// unknown columns expire // unknown columns expire
let c3 = t.create_column("x", ColumnType::Tag).await; let c3 = t.create_column("x", ColumnType::Tag).await;
let sort_key_2 = cache let sort_key_2 = cache
.get_one( .get(
Arc::clone(&cached_table), Arc::clone(&cached_table),
p_id, p_id,
&[c1.column.id, c3.column.id], &[c1.column.id, c3.column.id],
@ -859,109 +722,10 @@ mod tests {
sort_key_2.as_ref().unwrap() sort_key_2.as_ref().unwrap()
)); ));
assert_eq!(sort_key, sort_key_2); assert_eq!(sort_key, sort_key_2);
assert_catalog_access_metric_count( assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
&catalog.metric_registry,
"partition_get_by_id_batch",
4,
);
}
#[tokio::test]
async fn test_multi_get() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace_1hr_retention("ns").await;
let t = ns.create_table("table").await;
let p1 = t.create_partition("k1").await.partition.clone();
let p2 = t.create_partition("k2").await.partition.clone();
let cached_table = Arc::new(CachedTable {
id: t.table.id,
schema: schema(),
column_id_map: HashMap::default(),
column_id_map_rev: HashMap::default(),
primary_key_column_ids: [].into(),
partition_template: TablePartitionTemplateOverride::default(),
});
let cache = PartitionCache::new(
catalog.catalog(),
BackoffConfig::default(),
catalog.time_provider(),
&catalog.metric_registry(),
test_ram_pool(),
true,
);
let mut res = cache
.get(
Arc::clone(&cached_table),
vec![
PartitionRequest {
partition_id: p1.id,
sort_key_should_cover: vec![],
},
PartitionRequest {
partition_id: p2.id,
sort_key_should_cover: vec![],
},
PartitionRequest {
partition_id: p1.id,
sort_key_should_cover: vec![],
},
PartitionRequest {
partition_id: PartitionId::new(i64::MAX),
sort_key_should_cover: vec![],
},
],
None,
)
.await;
res.sort_by_key(|p| p.id);
let ids = res.iter().map(|p| p.id).collect::<Vec<_>>();
assert_eq!(ids, vec![p1.id, p1.id, p2.id]);
assert_catalog_access_metric_count(
&catalog.metric_registry,
"partition_get_by_id_batch",
1,
);
} }
fn schema() -> Schema { fn schema() -> Schema {
SchemaBuilder::new().build().unwrap() SchemaBuilder::new().build().unwrap()
} }
/// Extension methods for simpler testing.
#[async_trait]
trait PartitionCacheExt {
async fn get_one(
&self,
cached_table: Arc<CachedTable>,
partition_id: PartitionId,
sort_key_should_cover: &[ColumnId],
span: Option<Span>,
) -> Option<CachedPartition>;
}
#[async_trait]
impl PartitionCacheExt for PartitionCache {
async fn get_one(
&self,
cached_table: Arc<CachedTable>,
partition_id: PartitionId,
sort_key_should_cover: &[ColumnId],
span: Option<Span>,
) -> Option<CachedPartition> {
self.get(
cached_table,
vec![PartitionRequest {
partition_id,
sort_key_should_cover: sort_key_should_cover.to_vec(),
}],
span,
)
.await
.into_iter()
.next()
}
}
} }

View File

@ -106,7 +106,6 @@ pub mod tests {
use crate::cache::{ use crate::cache::{
namespace::{CachedNamespace, CachedTable}, namespace::{CachedNamespace, CachedTable},
partition::PartitionRequest,
CatalogCache, CatalogCache,
}; };
@ -250,15 +249,11 @@ pub mod tests {
.partition() .partition()
.get( .get(
Arc::clone(&self.cached_table), Arc::clone(&self.cached_table),
vec![PartitionRequest { self.parquet_file.partition_id,
partition_id: self.parquet_file.partition_id, &[],
sort_key_should_cover: vec![],
}],
None, None,
) )
.await .await
.into_iter()
.next()
.unwrap(); .unwrap();
let cached_partitions = let cached_partitions =
HashMap::from([(self.parquet_file.partition_id, cached_partition)]); HashMap::from([(self.parquet_file.partition_id, cached_partition)]);

View File

@ -1,19 +1,17 @@
use self::query_access::QuerierTableChunkPruner; use self::query_access::QuerierTableChunkPruner;
use crate::{ use crate::{
cache::{ cache::{namespace::CachedTable, partition::CachedPartition},
namespace::CachedTable,
partition::{CachedPartition, PartitionRequest},
},
ingester::{self, IngesterPartition}, ingester::{self, IngesterPartition},
parquet::ChunkAdapter, parquet::ChunkAdapter,
IngesterConnection, IngesterConnection, CONCURRENT_CHUNK_CREATION_JOBS,
}; };
use data_types::{ColumnId, NamespaceId, ParquetFile, PartitionId, TableId}; use data_types::{ColumnId, NamespaceId, ParquetFile, PartitionId, TableId};
use datafusion::error::DataFusionError; use datafusion::error::DataFusionError;
use futures::join; use futures::{join, StreamExt};
use iox_query::{provider, provider::ChunkPruner, QueryChunk}; use iox_query::{provider, provider::ChunkPruner, QueryChunk};
use observability_deps::tracing::{debug, trace}; use observability_deps::tracing::{debug, trace};
use predicate::Predicate; use predicate::Predicate;
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
use schema::Schema; use schema::Schema;
use snafu::{ResultExt, Snafu}; use snafu::{ResultExt, Snafu};
use std::{ use std::{
@ -347,26 +345,33 @@ impl QuerierTable {
.extend(f.column_set.iter().copied().filter(|id| pk.contains(id))); .extend(f.column_set.iter().copied().filter(|id| pk.contains(id)));
} }
// batch request all partitions // shuffle order to even catalog load, because cache hits/misses might be correlated w/ the order of the
let requests = should_cover // partitions.
.into_iter() //
.map(|(id, cover)| PartitionRequest { // Note that we sort before shuffling to achieve a deterministic pseudo-random order
partition_id: id, let mut partitions = should_cover.into_iter().collect::<Vec<_>>();
sort_key_should_cover: cover.into_iter().collect(), let mut rng = StdRng::seed_from_u64(cached_table.id.get() as u64);
}) partitions.sort_by(|(a_p_id, _a_cols), (b_p_id, _b_cols)| a_p_id.cmp(b_p_id));
.collect(); partitions.shuffle(&mut rng);
let partitions = self
.chunk_adapter
.catalog_cache()
.partition()
.get(
Arc::clone(cached_table),
requests,
span_recorder.child_span("fetch partitions"),
)
.await;
partitions.into_iter().map(|p| (p.id, p)).collect() futures::stream::iter(partitions)
.map(|(p_id, cover)| {
let catalog_cache = self.chunk_adapter.catalog_cache();
let span = span_recorder.child_span("fetch partition");
async move {
let cover = cover.into_iter().collect::<Vec<_>>();
let cached_partition = catalog_cache
.partition()
.get(Arc::clone(cached_table), p_id, &cover, span)
.await;
cached_partition.map(|p| (p_id, p))
}
})
.buffer_unordered(CONCURRENT_CHUNK_CREATION_JOBS)
.filter_map(|x| async move { x })
.collect::<HashMap<_, _>>()
.await
} }
/// Get a chunk pruner that can be used to prune chunks retrieved via [`chunks`](Self::chunks) /// Get a chunk pruner that can be used to prune chunks retrieved via [`chunks`](Self::chunks)
@ -886,22 +891,12 @@ mod tests {
let chunks = querier_table.chunks().await.unwrap(); let chunks = querier_table.chunks().await.unwrap();
assert_eq!(chunks.len(), 5); assert_eq!(chunks.len(), 5);
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 6);
assert_catalog_access_metric_count(
&catalog.metric_registry,
"partition_get_by_id_batch",
1,
);
assert_cache_access_metric_count(&catalog.metric_registry, "partition", 2); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 2);
let chunks = querier_table.chunks().await.unwrap(); let chunks = querier_table.chunks().await.unwrap();
assert_eq!(chunks.len(), 5); assert_eq!(chunks.len(), 5);
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 6);
assert_catalog_access_metric_count(
&catalog.metric_registry,
"partition_get_by_id_batch",
1,
);
assert_cache_access_metric_count(&catalog.metric_registry, "partition", 4); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 4);
partition_2 partition_2
@ -909,22 +904,12 @@ mod tests {
TestParquetFileBuilder::default().with_line_protocol("table,tag1=a foo=1,bar=1 11"), TestParquetFileBuilder::default().with_line_protocol("table,tag1=a foo=1,bar=1 11"),
) )
.await; .await;
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 7);
assert_catalog_access_metric_count(
&catalog.metric_registry,
"partition_get_by_id_batch",
1,
);
// file not visible yet // file not visible yet
let chunks = querier_table.chunks().await.unwrap(); let chunks = querier_table.chunks().await.unwrap();
assert_eq!(chunks.len(), 5); assert_eq!(chunks.len(), 5);
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 7);
assert_catalog_access_metric_count(
&catalog.metric_registry,
"partition_get_by_id_batch",
1,
);
assert_cache_access_metric_count(&catalog.metric_registry, "partition", 6); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 6);
// change inster ID => invalidates cache // change inster ID => invalidates cache
@ -933,12 +918,7 @@ mod tests {
.with_ingester_partition(ingester_partition_builder.build()); .with_ingester_partition(ingester_partition_builder.build());
let chunks = querier_table.chunks().await.unwrap(); let chunks = querier_table.chunks().await.unwrap();
assert_eq!(chunks.len(), 6); assert_eq!(chunks.len(), 6);
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5); assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 8);
assert_catalog_access_metric_count(
&catalog.metric_registry,
"partition_get_by_id_batch",
2,
);
assert_cache_access_metric_count(&catalog.metric_registry, "partition", 8); assert_cache_access_metric_count(&catalog.metric_registry, "partition", 8);
} }