feat: batch partition catalog requests in querier (take 2) (#8299)
* feat: batch partition catalog requests in querier This is mostly wiring that builds on top of the other PRs linked to #8089. I think we eventually could make the batching code nicer by adding better wrappers / helpers, but lets do that if we have other batched caches and this patterns proofs to be useful. Closes #8089. * test: extend `test_multi_get` * test: regression test for #8286 * fix: prevent auto-flush CPU looping * fix: panic when loading different tables at the same time --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
b2179e2f8b
commit
748e66731c
|
@ -105,8 +105,6 @@ where
|
|||
L: Loader<K = Vec<K>, Extra = Vec<Extra>, V = Vec<V>>,
|
||||
{
|
||||
async fn flush(&self) {
|
||||
trace!("flushing batch loader");
|
||||
|
||||
let pending: Vec<_> = {
|
||||
let mut pending = self.inner.pending.lock();
|
||||
std::mem::take(pending.as_mut())
|
||||
|
@ -115,6 +113,8 @@ where
|
|||
if pending.is_empty() {
|
||||
return;
|
||||
}
|
||||
trace!(n_pending = pending.len(), "flush batch loader",);
|
||||
|
||||
let job_id = self.inner.job_id_counter.fetch_add(1, Ordering::SeqCst);
|
||||
let handle_recv = CancellationSafeFutureReceiver::default();
|
||||
|
||||
|
@ -221,6 +221,15 @@ where
|
|||
|
||||
if !pending.is_empty() {
|
||||
self.flush().await;
|
||||
|
||||
// prevent hot-looping:
|
||||
// It seems that in some cases the underlying loader is ready but the data is not available via the
|
||||
// cache driver yet. This is likely due to the signalling system within the cache driver that prevents
|
||||
// cancelation, but also allows side-loading and at the same time prevents that the same key is loaded
|
||||
// multiple times. Tokio doesn't know that this method here is basically a wait loop. So we yield back
|
||||
// to the tokio worker and to allow it to make some progress. Since flush+load take some time anyways,
|
||||
// this yield here is not overall performance critical.
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
|
||||
futures = pending;
|
||||
|
|
|
@ -8,7 +8,11 @@ use cache_system::{
|
|||
PolicyBackend,
|
||||
},
|
||||
cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache},
|
||||
loader::{metrics::MetricsLoader, FunctionLoader},
|
||||
loader::{
|
||||
batch::{BatchLoader, BatchLoaderFlusher, BatchLoaderFlusherExt},
|
||||
metrics::MetricsLoader,
|
||||
FunctionLoader,
|
||||
},
|
||||
resource_consumption::FunctionEstimator,
|
||||
};
|
||||
use data_types::{
|
||||
|
@ -16,17 +20,17 @@ use data_types::{
|
|||
ColumnId, Partition, PartitionId, TransitionPartitionId,
|
||||
};
|
||||
use datafusion::scalar::ScalarValue;
|
||||
use iox_catalog::{interface::Catalog, partition_lookup};
|
||||
use iox_catalog::{interface::Catalog, partition_lookup_batch};
|
||||
use iox_query::chunk_statistics::{ColumnRange, ColumnRanges};
|
||||
use iox_time::TimeProvider;
|
||||
use observability_deps::tracing::debug;
|
||||
use schema::sort::SortKey;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
collections::{hash_map::Entry, HashMap, HashSet},
|
||||
mem::{size_of, size_of_val},
|
||||
sync::Arc,
|
||||
};
|
||||
use trace::span::Span;
|
||||
use trace::span::{Span, SpanRecorder};
|
||||
|
||||
use super::{namespace::CachedTable, ram::RamSize};
|
||||
|
||||
|
@ -46,6 +50,7 @@ type CacheT = Box<
|
|||
pub struct PartitionCache {
|
||||
cache: CacheT,
|
||||
remove_if_handle: RemoveIfHandle<PartitionId, Option<CachedPartition>>,
|
||||
flusher: Arc<dyn BatchLoaderFlusher>,
|
||||
}
|
||||
|
||||
impl PartitionCache {
|
||||
|
@ -58,24 +63,59 @@ impl PartitionCache {
|
|||
ram_pool: Arc<ResourcePool<RamSize>>,
|
||||
testing: bool,
|
||||
) -> Self {
|
||||
let loader =
|
||||
FunctionLoader::new(move |partition_id: PartitionId, extra: Arc<CachedTable>| {
|
||||
let loader = FunctionLoader::new(
|
||||
move |partition_ids: Vec<PartitionId>, cached_tables: Vec<Arc<CachedTable>>| {
|
||||
// sanity checks
|
||||
assert_eq!(partition_ids.len(), cached_tables.len());
|
||||
|
||||
let catalog = Arc::clone(&catalog);
|
||||
let backoff_config = backoff_config.clone();
|
||||
|
||||
async move {
|
||||
let partition = Backoff::new(&backoff_config)
|
||||
// prepare output buffer
|
||||
let mut out = (0..partition_ids.len()).map(|_| None).collect::<Vec<_>>();
|
||||
let mut out_map =
|
||||
HashMap::<PartitionId, usize>::with_capacity(partition_ids.len());
|
||||
for (idx, id) in partition_ids.iter().enumerate() {
|
||||
match out_map.entry(*id) {
|
||||
Entry::Occupied(_) => unreachable!("cache system requested same partition from loader concurrently, this should have been prevented by the CacheDriver"),
|
||||
Entry::Vacant(v) => {
|
||||
v.insert(idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// build `&[&TransitionPartitionId]` for batch catalog request
|
||||
let ids = partition_ids
|
||||
.iter()
|
||||
.copied()
|
||||
.map(TransitionPartitionId::Deprecated)
|
||||
.collect::<Vec<_>>();
|
||||
let ids = ids.iter().collect::<Vec<_>>();
|
||||
|
||||
// fetch catalog data
|
||||
let partitions = Backoff::new(&backoff_config)
|
||||
.retry_all_errors("get partition_key", || async {
|
||||
let mut repos = catalog.repositories().await;
|
||||
let id = TransitionPartitionId::Deprecated(partition_id);
|
||||
partition_lookup(repos.as_mut(), &id).await
|
||||
partition_lookup_batch(repos.as_mut(), &ids).await
|
||||
})
|
||||
.await
|
||||
.expect("retry forever")?;
|
||||
.expect("retry forever");
|
||||
|
||||
Some(CachedPartition::new(partition, &extra))
|
||||
// build output
|
||||
for p in partitions {
|
||||
let idx = out_map[&p.id];
|
||||
let cached_table = &cached_tables[idx];
|
||||
let p = CachedPartition::new(p, cached_table);
|
||||
out[idx] = Some(p);
|
||||
}
|
||||
|
||||
out
|
||||
}
|
||||
});
|
||||
},
|
||||
);
|
||||
let loader = Arc::new(BatchLoader::new(loader));
|
||||
let flusher = Arc::clone(&loader);
|
||||
let loader = Arc::new(MetricsLoader::new(
|
||||
loader,
|
||||
CACHE_ID,
|
||||
|
@ -111,51 +151,79 @@ impl PartitionCache {
|
|||
Self {
|
||||
cache,
|
||||
remove_if_handle,
|
||||
flusher,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get cached partition.
|
||||
///
|
||||
/// The result only contains existing partitions. The order is undefined.
|
||||
///
|
||||
/// Expire partition if the cached sort key does NOT cover the given set of columns.
|
||||
pub async fn get(
|
||||
&self,
|
||||
cached_table: Arc<CachedTable>,
|
||||
partition_id: PartitionId,
|
||||
sort_key_should_cover: &[ColumnId],
|
||||
partitions: Vec<PartitionRequest>,
|
||||
span: Option<Span>,
|
||||
) -> Option<CachedPartition> {
|
||||
self.remove_if_handle
|
||||
.remove_if_and_get(
|
||||
&self.cache,
|
||||
partition_id,
|
||||
|cached_partition| {
|
||||
let invalidates =
|
||||
if let Some(sort_key) = &cached_partition.and_then(|p| p.sort_key) {
|
||||
sort_key_should_cover
|
||||
.iter()
|
||||
.any(|col| !sort_key.column_set.contains(col))
|
||||
} else {
|
||||
// no sort key at all => need to update if there is anything to cover
|
||||
!sort_key_should_cover.is_empty()
|
||||
};
|
||||
) -> Vec<CachedPartition> {
|
||||
let span_recorder = SpanRecorder::new(span);
|
||||
|
||||
if invalidates {
|
||||
debug!(
|
||||
partition_id = partition_id.get(),
|
||||
"invalidate partition cache",
|
||||
);
|
||||
}
|
||||
let futures = partitions
|
||||
.into_iter()
|
||||
.map(
|
||||
|PartitionRequest {
|
||||
partition_id,
|
||||
sort_key_should_cover,
|
||||
}| {
|
||||
let cached_table = Arc::clone(&cached_table);
|
||||
let span = span_recorder.child_span("single partition cache lookup");
|
||||
|
||||
invalidates
|
||||
self.remove_if_handle.remove_if_and_get(
|
||||
&self.cache,
|
||||
partition_id,
|
||||
move |cached_partition| {
|
||||
let invalidates = if let Some(sort_key) =
|
||||
&cached_partition.and_then(|p| p.sort_key)
|
||||
{
|
||||
sort_key_should_cover
|
||||
.iter()
|
||||
.any(|col| !sort_key.column_set.contains(col))
|
||||
} else {
|
||||
// no sort key at all => need to update if there is anything to cover
|
||||
!sort_key_should_cover.is_empty()
|
||||
};
|
||||
|
||||
if invalidates {
|
||||
debug!(
|
||||
partition_id = partition_id.get(),
|
||||
"invalidate partition cache",
|
||||
);
|
||||
}
|
||||
|
||||
invalidates
|
||||
},
|
||||
(cached_table, span),
|
||||
)
|
||||
},
|
||||
(cached_table, span),
|
||||
)
|
||||
.await
|
||||
.collect();
|
||||
|
||||
let res = self.flusher.auto_flush(futures).await;
|
||||
|
||||
res.into_iter().flatten().collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// Request for [`PartitionCache::get`].
|
||||
#[derive(Debug)]
|
||||
pub struct PartitionRequest {
|
||||
pub partition_id: PartitionId,
|
||||
pub sort_key_should_cover: Vec<ColumnId>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct CachedPartition {
|
||||
pub id: PartitionId,
|
||||
pub sort_key: Option<Arc<PartitionSortKey>>,
|
||||
pub column_ranges: ColumnRanges,
|
||||
}
|
||||
|
@ -231,6 +299,7 @@ impl CachedPartition {
|
|||
column_ranges.shrink_to_fit();
|
||||
|
||||
Self {
|
||||
id: partition.id,
|
||||
sort_key,
|
||||
column_ranges: Arc::new(column_ranges),
|
||||
}
|
||||
|
@ -298,12 +367,15 @@ mod tests {
|
|||
use crate::cache::{
|
||||
ram::test_util::test_ram_pool, test_util::assert_catalog_access_metric_count,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use data_types::{partition_template::TablePartitionTemplateOverride, ColumnType};
|
||||
use futures::StreamExt;
|
||||
use generated_types::influxdata::iox::partition_template::v1::{
|
||||
template_part::Part, PartitionTemplate, TemplatePart,
|
||||
};
|
||||
use iox_tests::TestCatalog;
|
||||
use iox_tests::{TestCatalog, TestNamespace};
|
||||
use schema::{Schema, SchemaBuilder};
|
||||
use tokio::sync::Barrier;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_sort_key() {
|
||||
|
@ -348,7 +420,7 @@ mod tests {
|
|||
);
|
||||
|
||||
let sort_key1a = cache
|
||||
.get(Arc::clone(&cached_table), p1.id, &Vec::new(), None)
|
||||
.get_one(Arc::clone(&cached_table), p1.id, &Vec::new(), None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
|
@ -360,18 +432,26 @@ mod tests {
|
|||
column_order: [c1.column.id, c2.column.id].into(),
|
||||
}
|
||||
);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
1,
|
||||
);
|
||||
|
||||
let sort_key2 = cache
|
||||
.get(Arc::clone(&cached_table), p2.id, &Vec::new(), None)
|
||||
.get_one(Arc::clone(&cached_table), p2.id, &Vec::new(), None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
assert_eq!(sort_key2, None);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
2,
|
||||
);
|
||||
|
||||
let sort_key1b = cache
|
||||
.get(Arc::clone(&cached_table), p1.id, &Vec::new(), None)
|
||||
.get_one(Arc::clone(&cached_table), p1.id, &Vec::new(), None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
|
@ -379,12 +459,16 @@ mod tests {
|
|||
sort_key1a.as_ref().unwrap(),
|
||||
sort_key1b.as_ref().unwrap()
|
||||
));
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
2,
|
||||
);
|
||||
|
||||
// non-existing partition
|
||||
for _ in 0..2 {
|
||||
let res = cache
|
||||
.get(
|
||||
.get_one(
|
||||
Arc::clone(&cached_table),
|
||||
PartitionId::new(i64::MAX),
|
||||
&Vec::new(),
|
||||
|
@ -392,7 +476,11 @@ mod tests {
|
|||
)
|
||||
.await;
|
||||
assert_eq!(res, None);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
3,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -461,7 +549,7 @@ mod tests {
|
|||
);
|
||||
|
||||
let ranges1a = cache
|
||||
.get(Arc::clone(&cached_table), p1.id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), p1.id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.column_ranges;
|
||||
|
@ -488,10 +576,14 @@ mod tests {
|
|||
&ranges1a.get("tag1").unwrap().min_value,
|
||||
&ranges1a.get("tag1").unwrap().max_value,
|
||||
));
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
1,
|
||||
);
|
||||
|
||||
let ranges2 = cache
|
||||
.get(Arc::clone(&cached_table), p2.id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), p2.id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.column_ranges;
|
||||
|
@ -505,10 +597,14 @@ mod tests {
|
|||
}
|
||||
),]),
|
||||
);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
2,
|
||||
);
|
||||
|
||||
let ranges3 = cache
|
||||
.get(Arc::clone(&cached_table), p3.id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), p3.id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.column_ranges;
|
||||
|
@ -531,10 +627,14 @@ mod tests {
|
|||
),
|
||||
]),
|
||||
);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
3,
|
||||
);
|
||||
|
||||
let ranges4 = cache
|
||||
.get(Arc::clone(&cached_table), p4.id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), p4.id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.column_ranges;
|
||||
|
@ -557,10 +657,14 @@ mod tests {
|
|||
),
|
||||
]),
|
||||
);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
4,
|
||||
);
|
||||
|
||||
let ranges5 = cache
|
||||
.get(Arc::clone(&cached_table), p5.id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), p5.id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.column_ranges;
|
||||
|
@ -574,20 +678,28 @@ mod tests {
|
|||
}
|
||||
),]),
|
||||
);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
5,
|
||||
);
|
||||
|
||||
let ranges1b = cache
|
||||
.get(Arc::clone(&cached_table), p1.id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), p1.id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.column_ranges;
|
||||
assert!(Arc::ptr_eq(&ranges1a, &ranges1b));
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
5,
|
||||
);
|
||||
|
||||
// non-existing partition
|
||||
for _ in 0..2 {
|
||||
let res = cache
|
||||
.get(
|
||||
.get_one(
|
||||
Arc::clone(&cached_table),
|
||||
PartitionId::new(i64::MAX),
|
||||
&[],
|
||||
|
@ -595,7 +707,11 @@ mod tests {
|
|||
)
|
||||
.await;
|
||||
assert_eq!(res, None);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 6);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
6,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -635,31 +751,43 @@ mod tests {
|
|||
);
|
||||
|
||||
let sort_key = cache
|
||||
.get(Arc::clone(&cached_table), p_id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), p_id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
assert_eq!(sort_key, None,);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
1,
|
||||
);
|
||||
|
||||
// requesting nother will not expire
|
||||
assert!(p_sort_key.is_none());
|
||||
let sort_key = cache
|
||||
.get(Arc::clone(&cached_table), p_id, &[], None)
|
||||
.get_one(Arc::clone(&cached_table), p_id, &[], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
assert_eq!(sort_key, None,);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
1,
|
||||
);
|
||||
|
||||
// but requesting something will expire
|
||||
let sort_key = cache
|
||||
.get(Arc::clone(&cached_table), p_id, &[c1.column.id], None)
|
||||
.get_one(Arc::clone(&cached_table), p_id, &[c1.column.id], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
assert_eq!(sort_key, None,);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
2,
|
||||
);
|
||||
|
||||
// set sort key
|
||||
let p = p
|
||||
|
@ -668,11 +796,12 @@ mod tests {
|
|||
c2.column.name.as_str(),
|
||||
]))
|
||||
.await;
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
|
||||
|
||||
// expire & fetch
|
||||
let p_sort_key = p.partition.sort_key();
|
||||
let sort_key = cache
|
||||
.get(Arc::clone(&cached_table), p_id, &[c1.column.id], None)
|
||||
.get_one(Arc::clone(&cached_table), p_id, &[c1.column.id], None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
|
@ -684,7 +813,11 @@ mod tests {
|
|||
column_order: [c1.column.id, c2.column.id].into(),
|
||||
}
|
||||
);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
3,
|
||||
);
|
||||
|
||||
// subsets and the full key don't expire
|
||||
for should_cover in [
|
||||
|
@ -694,7 +827,7 @@ mod tests {
|
|||
vec![c1.column.id, c2.column.id],
|
||||
] {
|
||||
let sort_key_2 = cache
|
||||
.get(Arc::clone(&cached_table), p_id, &should_cover, None)
|
||||
.get_one(Arc::clone(&cached_table), p_id, &should_cover, None)
|
||||
.await
|
||||
.unwrap()
|
||||
.sort_key;
|
||||
|
@ -702,13 +835,17 @@ mod tests {
|
|||
sort_key.as_ref().unwrap(),
|
||||
sort_key_2.as_ref().unwrap()
|
||||
));
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
3,
|
||||
);
|
||||
}
|
||||
|
||||
// unknown columns expire
|
||||
let c3 = t.create_column("x", ColumnType::Tag).await;
|
||||
let sort_key_2 = cache
|
||||
.get(
|
||||
.get_one(
|
||||
Arc::clone(&cached_table),
|
||||
p_id,
|
||||
&[c1.column.id, c3.column.id],
|
||||
|
@ -722,10 +859,259 @@ mod tests {
|
|||
sort_key_2.as_ref().unwrap()
|
||||
));
|
||||
assert_eq!(sort_key, sort_key_2);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
4,
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multi_get() {
|
||||
let catalog = TestCatalog::new();
|
||||
|
||||
let ns = catalog.create_namespace_1hr_retention("ns").await;
|
||||
let t = ns.create_table("table").await;
|
||||
let p1 = t.create_partition("k1").await.partition.clone();
|
||||
let p2 = t.create_partition("k2").await.partition.clone();
|
||||
let cached_table = Arc::new(CachedTable {
|
||||
id: t.table.id,
|
||||
schema: schema(),
|
||||
column_id_map: HashMap::default(),
|
||||
column_id_map_rev: HashMap::default(),
|
||||
primary_key_column_ids: [].into(),
|
||||
partition_template: TablePartitionTemplateOverride::default(),
|
||||
});
|
||||
|
||||
let cache = PartitionCache::new(
|
||||
catalog.catalog(),
|
||||
BackoffConfig::default(),
|
||||
catalog.time_provider(),
|
||||
&catalog.metric_registry(),
|
||||
test_ram_pool(),
|
||||
true,
|
||||
);
|
||||
|
||||
let mut res = cache
|
||||
.get(
|
||||
Arc::clone(&cached_table),
|
||||
vec![
|
||||
PartitionRequest {
|
||||
partition_id: p1.id,
|
||||
sort_key_should_cover: vec![],
|
||||
},
|
||||
PartitionRequest {
|
||||
partition_id: p2.id,
|
||||
sort_key_should_cover: vec![],
|
||||
},
|
||||
PartitionRequest {
|
||||
partition_id: p1.id,
|
||||
sort_key_should_cover: vec![],
|
||||
},
|
||||
PartitionRequest {
|
||||
// requesting non-existing partitions is fine, they just don't appear in the output
|
||||
partition_id: PartitionId::new(i64::MAX),
|
||||
sort_key_should_cover: vec![],
|
||||
},
|
||||
],
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
res.sort_by_key(|p| p.id);
|
||||
let ids = res.iter().map(|p| p.id).collect::<Vec<_>>();
|
||||
assert_eq!(ids, vec![p1.id, p1.id, p2.id]);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
1,
|
||||
);
|
||||
|
||||
// empty get
|
||||
let res = cache.get(Arc::clone(&cached_table), vec![], None).await;
|
||||
assert_eq!(res, vec![]);
|
||||
}
|
||||
|
||||
/// This is a regression test for <https://github.com/influxdata/influxdb_iox/issues/8286>.
|
||||
///
|
||||
/// The issue happened when requests for multiple (different) tables were made concurrently. The root cause was the
|
||||
/// wrong assumption that when flushing the batched up requests, there would only be a single table in the flushed set.
|
||||
///
|
||||
/// To trigger this, we need at least 2 tokio threads.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_multi_table_concurrent_get() {
|
||||
// In most cases, the issue triggers on the first run. However let's be sure and try multiple times.
|
||||
for _ in 0..10 {
|
||||
test_multi_table_concurrent_get_inner().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Actually implementation of [`test_multi_table_concurrent_get`] that is tried multiple times.
|
||||
async fn test_multi_table_concurrent_get_inner() {
|
||||
let catalog = TestCatalog::new();
|
||||
|
||||
// prepare catalog state for two tables
|
||||
let ns = catalog.create_namespace_1hr_retention("ns").await;
|
||||
let state_1 = ConcurrencyTestState::prepare(&ns, "t1").await;
|
||||
let state_2 = ConcurrencyTestState::prepare(&ns, "t2").await;
|
||||
|
||||
// sanity checks for test setup
|
||||
assert!(!Arc::ptr_eq(&state_1.cached_table, &state_2.cached_table));
|
||||
assert_ne!(state_1.cached_table.id, state_2.cached_table.id);
|
||||
assert_ne!(state_1.c_id, state_2.c_id);
|
||||
assert_ne!(state_1.partitions, state_2.partitions);
|
||||
|
||||
let cache = Arc::new(PartitionCache::new(
|
||||
catalog.catalog(),
|
||||
BackoffConfig::default(),
|
||||
catalog.time_provider(),
|
||||
&catalog.metric_registry(),
|
||||
test_ram_pool(),
|
||||
true,
|
||||
));
|
||||
|
||||
// use a barrier to make sure that both tokio tasks are running at the same time
|
||||
let barrier = Arc::new(Barrier::new(2));
|
||||
|
||||
// set up first tokio task
|
||||
let barrier_captured = Arc::clone(&barrier);
|
||||
let cache_captured = Arc::clone(&cache);
|
||||
let handle_1 = tokio::spawn(async move {
|
||||
barrier_captured.wait().await;
|
||||
|
||||
// When running quickly, both tasks will end up on the same tokio worker and will run in sequence. It seems
|
||||
// that tokio tries to avoid costly work-stealing. However we can trick tokio into actually running both
|
||||
// task concurrently with a bit more async work: a simple sleep.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
|
||||
state_1.run(cache_captured).await;
|
||||
});
|
||||
|
||||
// set up 2nd tokio tasks in a same manner as the first one (but for the other table)
|
||||
let barrier_captured = Arc::clone(&barrier);
|
||||
let cache_captured = Arc::clone(&cache);
|
||||
let handle_2 = tokio::spawn(async move {
|
||||
barrier_captured.wait().await;
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
state_2.run(cache_captured).await;
|
||||
});
|
||||
|
||||
handle_1.await.unwrap();
|
||||
handle_2.await.unwrap();
|
||||
}
|
||||
|
||||
/// Building block for a single table within the [`test_multi_table_concurrent_get`] test.
|
||||
struct ConcurrencyTestState {
|
||||
/// Cached table that is used for [`PartitionCache::get`].
|
||||
cached_table: Arc<CachedTable>,
|
||||
|
||||
/// ID of the only column within that table.
|
||||
c_id: ColumnId,
|
||||
|
||||
/// Partitions within that table.
|
||||
partitions: Vec<PartitionId>,
|
||||
}
|
||||
|
||||
impl ConcurrencyTestState {
|
||||
/// Prepare catalog state.
|
||||
async fn prepare(ns: &Arc<TestNamespace>, name: &str) -> Self {
|
||||
let t = ns.create_table(name).await;
|
||||
let c = t.create_column("time", ColumnType::Time).await;
|
||||
let cached_table = Arc::new(CachedTable {
|
||||
id: t.table.id,
|
||||
schema: schema(),
|
||||
column_id_map: HashMap::from([(c.column.id, Arc::from(c.column.name.clone()))]),
|
||||
column_id_map_rev: HashMap::from([(Arc::from(c.column.name.clone()), c.column.id)]),
|
||||
primary_key_column_ids: [c.column.id].into(),
|
||||
partition_template: TablePartitionTemplateOverride::default(),
|
||||
});
|
||||
const N_PARTITIONS: usize = 20;
|
||||
let mut partitions = futures::stream::iter(0..N_PARTITIONS)
|
||||
.then(|i| {
|
||||
let t = Arc::clone(&t);
|
||||
async move {
|
||||
t.create_partition_with_sort_key(&format!("p{i}"), &["time"])
|
||||
.await
|
||||
.partition
|
||||
.id
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
partitions.sort();
|
||||
|
||||
Self {
|
||||
cached_table,
|
||||
c_id: c.column.id,
|
||||
partitions,
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform the actual [`PartitionCache::get`] call and run some basic sanity checks on the result.
|
||||
async fn run(self, cache: Arc<PartitionCache>) {
|
||||
let Self {
|
||||
cached_table,
|
||||
c_id,
|
||||
partitions,
|
||||
} = self;
|
||||
|
||||
let mut results = cache
|
||||
.get(
|
||||
cached_table,
|
||||
partitions
|
||||
.iter()
|
||||
.map(|p| PartitionRequest {
|
||||
partition_id: *p,
|
||||
sort_key_should_cover: vec![],
|
||||
})
|
||||
.collect(),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
results.sort_by_key(|p| p.id);
|
||||
let partitions_res = results.iter().map(|p| p.id).collect::<Vec<_>>();
|
||||
assert_eq!(partitions, partitions_res);
|
||||
assert!(results
|
||||
.iter()
|
||||
.all(|p| p.sort_key.as_ref().unwrap().column_set == HashSet::from([c_id])));
|
||||
}
|
||||
}
|
||||
|
||||
fn schema() -> Schema {
|
||||
SchemaBuilder::new().build().unwrap()
|
||||
}
|
||||
|
||||
/// Extension methods for simpler testing.
|
||||
#[async_trait]
|
||||
trait PartitionCacheExt {
|
||||
async fn get_one(
|
||||
&self,
|
||||
cached_table: Arc<CachedTable>,
|
||||
partition_id: PartitionId,
|
||||
sort_key_should_cover: &[ColumnId],
|
||||
span: Option<Span>,
|
||||
) -> Option<CachedPartition>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PartitionCacheExt for PartitionCache {
|
||||
async fn get_one(
|
||||
&self,
|
||||
cached_table: Arc<CachedTable>,
|
||||
partition_id: PartitionId,
|
||||
sort_key_should_cover: &[ColumnId],
|
||||
span: Option<Span>,
|
||||
) -> Option<CachedPartition> {
|
||||
self.get(
|
||||
cached_table,
|
||||
vec![PartitionRequest {
|
||||
partition_id,
|
||||
sort_key_should_cover: sort_key_should_cover.to_vec(),
|
||||
}],
|
||||
span,
|
||||
)
|
||||
.await
|
||||
.into_iter()
|
||||
.next()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -106,6 +106,7 @@ pub mod tests {
|
|||
|
||||
use crate::cache::{
|
||||
namespace::{CachedNamespace, CachedTable},
|
||||
partition::PartitionRequest,
|
||||
CatalogCache,
|
||||
};
|
||||
|
||||
|
@ -249,11 +250,15 @@ pub mod tests {
|
|||
.partition()
|
||||
.get(
|
||||
Arc::clone(&self.cached_table),
|
||||
self.parquet_file.partition_id,
|
||||
&[],
|
||||
vec![PartitionRequest {
|
||||
partition_id: self.parquet_file.partition_id,
|
||||
sort_key_should_cover: vec![],
|
||||
}],
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.into_iter()
|
||||
.next()
|
||||
.unwrap();
|
||||
let cached_partitions =
|
||||
HashMap::from([(self.parquet_file.partition_id, cached_partition)]);
|
||||
|
|
|
@ -1,17 +1,19 @@
|
|||
use self::query_access::QuerierTableChunkPruner;
|
||||
use crate::{
|
||||
cache::{namespace::CachedTable, partition::CachedPartition},
|
||||
cache::{
|
||||
namespace::CachedTable,
|
||||
partition::{CachedPartition, PartitionRequest},
|
||||
},
|
||||
ingester::{self, IngesterPartition},
|
||||
parquet::ChunkAdapter,
|
||||
IngesterConnection, CONCURRENT_CHUNK_CREATION_JOBS,
|
||||
IngesterConnection,
|
||||
};
|
||||
use data_types::{ColumnId, NamespaceId, ParquetFile, PartitionId, TableId};
|
||||
use datafusion::error::DataFusionError;
|
||||
use futures::{join, StreamExt};
|
||||
use futures::join;
|
||||
use iox_query::{provider, provider::ChunkPruner, QueryChunk};
|
||||
use observability_deps::tracing::{debug, trace};
|
||||
use predicate::Predicate;
|
||||
use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
|
||||
use schema::Schema;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::{
|
||||
|
@ -345,33 +347,26 @@ impl QuerierTable {
|
|||
.extend(f.column_set.iter().copied().filter(|id| pk.contains(id)));
|
||||
}
|
||||
|
||||
// shuffle order to even catalog load, because cache hits/misses might be correlated w/ the order of the
|
||||
// partitions.
|
||||
//
|
||||
// Note that we sort before shuffling to achieve a deterministic pseudo-random order
|
||||
let mut partitions = should_cover.into_iter().collect::<Vec<_>>();
|
||||
let mut rng = StdRng::seed_from_u64(cached_table.id.get() as u64);
|
||||
partitions.sort_by(|(a_p_id, _a_cols), (b_p_id, _b_cols)| a_p_id.cmp(b_p_id));
|
||||
partitions.shuffle(&mut rng);
|
||||
|
||||
futures::stream::iter(partitions)
|
||||
.map(|(p_id, cover)| {
|
||||
let catalog_cache = self.chunk_adapter.catalog_cache();
|
||||
let span = span_recorder.child_span("fetch partition");
|
||||
|
||||
async move {
|
||||
let cover = cover.into_iter().collect::<Vec<_>>();
|
||||
let cached_partition = catalog_cache
|
||||
.partition()
|
||||
.get(Arc::clone(cached_table), p_id, &cover, span)
|
||||
.await;
|
||||
cached_partition.map(|p| (p_id, p))
|
||||
}
|
||||
// batch request all partitions
|
||||
let requests = should_cover
|
||||
.into_iter()
|
||||
.map(|(id, cover)| PartitionRequest {
|
||||
partition_id: id,
|
||||
sort_key_should_cover: cover.into_iter().collect(),
|
||||
})
|
||||
.buffer_unordered(CONCURRENT_CHUNK_CREATION_JOBS)
|
||||
.filter_map(|x| async move { x })
|
||||
.collect::<HashMap<_, _>>()
|
||||
.await
|
||||
.collect();
|
||||
let partitions = self
|
||||
.chunk_adapter
|
||||
.catalog_cache()
|
||||
.partition()
|
||||
.get(
|
||||
Arc::clone(cached_table),
|
||||
requests,
|
||||
span_recorder.child_span("fetch partitions"),
|
||||
)
|
||||
.await;
|
||||
|
||||
partitions.into_iter().map(|p| (p.id, p)).collect()
|
||||
}
|
||||
|
||||
/// Get a chunk pruner that can be used to prune chunks retrieved via [`chunks`](Self::chunks)
|
||||
|
@ -891,12 +886,22 @@ mod tests {
|
|||
|
||||
let chunks = querier_table.chunks().await.unwrap();
|
||||
assert_eq!(chunks.len(), 5);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 6);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
1,
|
||||
);
|
||||
assert_cache_access_metric_count(&catalog.metric_registry, "partition", 2);
|
||||
|
||||
let chunks = querier_table.chunks().await.unwrap();
|
||||
assert_eq!(chunks.len(), 5);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 6);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
1,
|
||||
);
|
||||
assert_cache_access_metric_count(&catalog.metric_registry, "partition", 4);
|
||||
|
||||
partition_2
|
||||
|
@ -904,12 +909,22 @@ mod tests {
|
|||
TestParquetFileBuilder::default().with_line_protocol("table,tag1=a foo=1,bar=1 11"),
|
||||
)
|
||||
.await;
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 7);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
1,
|
||||
);
|
||||
|
||||
// file not visible yet
|
||||
let chunks = querier_table.chunks().await.unwrap();
|
||||
assert_eq!(chunks.len(), 5);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 7);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
1,
|
||||
);
|
||||
assert_cache_access_metric_count(&catalog.metric_registry, "partition", 6);
|
||||
|
||||
// change inster ID => invalidates cache
|
||||
|
@ -918,7 +933,12 @@ mod tests {
|
|||
.with_ingester_partition(ingester_partition_builder.build());
|
||||
let chunks = querier_table.chunks().await.unwrap();
|
||||
assert_eq!(chunks.len(), 6);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 8);
|
||||
assert_catalog_access_metric_count(&catalog.metric_registry, "partition_get_by_id", 5);
|
||||
assert_catalog_access_metric_count(
|
||||
&catalog.metric_registry,
|
||||
"partition_get_by_id_batch",
|
||||
2,
|
||||
);
|
||||
assert_cache_access_metric_count(&catalog.metric_registry, "partition", 8);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue