refactor: improve consistent access under "remove if" (#5693)
* refactor: improve consistent access under "remove if" With all the concurrency introduced in #5668, we should be a bit more careful with our "remove if" handling, esp. if a removal is triggered while a load is running concurrently. This change introduces as `remove_if_and_get` helper that ensures this and the querier over to use it. The parquet file and tombstone caches required a bit of a larger change because there the invalidation and the actual GET were kinda separate. We had this separation for the other caches as well at some point and decided that this easily leads to API misuse, so I took this opportunity to "fix" the parquet file and tombstone cache as well. * docs: improvepull/24376/head
parent
e0ad5e4c20
commit
fced536ebd
|
|
@ -3,7 +3,10 @@ use metric::U64Counter;
|
|||
use parking_lot::Mutex;
|
||||
use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc};
|
||||
|
||||
use super::{CacheBackend, CallbackHandle, ChangeRequest, Subscriber};
|
||||
use crate::{
|
||||
backend::policy::{CacheBackend, CallbackHandle, ChangeRequest, Subscriber},
|
||||
cache::{Cache, CacheGetStatus},
|
||||
};
|
||||
|
||||
/// Allows explicitly removing entries from the cache.
|
||||
#[derive(Debug, Clone)]
|
||||
|
|
@ -96,7 +99,7 @@ where
|
|||
/// held (and thus the inner backend can't be concurrently accessed
|
||||
pub fn remove_if<P>(&self, k: &K, predicate: P) -> bool
|
||||
where
|
||||
P: Fn(V) -> bool + Send,
|
||||
P: FnOnce(V) -> bool,
|
||||
{
|
||||
let mut guard = self.callback_handle.lock();
|
||||
let handle = match guard.as_mut() {
|
||||
|
|
@ -120,6 +123,65 @@ where
|
|||
|
||||
removed
|
||||
}
|
||||
|
||||
/// Performs [`remove_if`](Self::remove_if) and [`GET`](Cache::get) in one go.
|
||||
///
|
||||
/// Ensures that these two actions interact correctly.
|
||||
///
|
||||
/// # Forward process
|
||||
/// This function only works if cache values evolve in one direction. This is that the predicate can only flip from
|
||||
/// `true` to `false` over time (i.e. it detects an outdated value and then an up-to-date value), NOT the other way
|
||||
/// around (i.e. data cannot get outdated under the same predicate).
|
||||
pub async fn remove_if_and_get<P, C, GetExtra>(
|
||||
&self,
|
||||
cache: &C,
|
||||
k: K,
|
||||
predicate: P,
|
||||
extra: GetExtra,
|
||||
) -> V
|
||||
where
|
||||
P: Fn(V) -> bool + Send,
|
||||
C: Cache<K = K, V = V, GetExtra = GetExtra>,
|
||||
GetExtra: Clone + Send,
|
||||
{
|
||||
let mut removed = self.remove_if(&k, &predicate);
|
||||
|
||||
loop {
|
||||
// avoid some `Sync` bounds
|
||||
let k_for_get = k.clone();
|
||||
let extra_for_get = extra.clone();
|
||||
let (v, status) = cache.get_with_status(k_for_get, extra_for_get).await;
|
||||
|
||||
match status {
|
||||
CacheGetStatus::Hit => {
|
||||
// key existed and no other process loaded it => safe to use
|
||||
return v;
|
||||
}
|
||||
CacheGetStatus::Miss => {
|
||||
// key didn't exist and we loaded it => safe to use
|
||||
return v;
|
||||
}
|
||||
CacheGetStatus::MissAlreadyLoading => {
|
||||
if removed {
|
||||
// key was outdated but there was some loading in process, this may have overlapped with our check
|
||||
// so our check might have been incomplete => need to re-check
|
||||
removed = self.remove_if(&k, &predicate);
|
||||
if removed {
|
||||
// removed again, so cannot use our result
|
||||
continue;
|
||||
} else {
|
||||
// didn't remove => safe to use
|
||||
return v;
|
||||
}
|
||||
} else {
|
||||
// there was a load action in process but the key was already up-to-date, so it's OK to use the new
|
||||
// data as well (forward process)
|
||||
return v;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
|||
|
|
@ -167,25 +167,30 @@ impl NamespaceCache {
|
|||
should_cover: &[(&str, &HashSet<ColumnId>)],
|
||||
span: Option<Span>,
|
||||
) -> Option<Arc<CachedNamespace>> {
|
||||
self.remove_if_handle.remove_if(&name, |cached_namespace| {
|
||||
if let Some(namespace) = cached_namespace.as_ref() {
|
||||
should_cover.iter().any(|(table_name, columns)| {
|
||||
if let Some(table) = namespace.tables.get(*table_name) {
|
||||
columns
|
||||
.iter()
|
||||
.any(|col| !table.column_id_map.contains_key(col))
|
||||
self.remove_if_handle
|
||||
.remove_if_and_get(
|
||||
&self.cache,
|
||||
name,
|
||||
|cached_namespace| {
|
||||
if let Some(namespace) = cached_namespace.as_ref() {
|
||||
should_cover.iter().any(|(table_name, columns)| {
|
||||
if let Some(table) = namespace.tables.get(*table_name) {
|
||||
columns
|
||||
.iter()
|
||||
.any(|col| !table.column_id_map.contains_key(col))
|
||||
} else {
|
||||
// table unknown => need to update
|
||||
true
|
||||
}
|
||||
})
|
||||
} else {
|
||||
// table unknown => need to update
|
||||
true
|
||||
// namespace unknown => need to update if should cover anything
|
||||
!should_cover.is_empty()
|
||||
}
|
||||
})
|
||||
} else {
|
||||
// namespace unknown => need to update if should cover anything
|
||||
!should_cover.is_empty()
|
||||
}
|
||||
});
|
||||
|
||||
self.cache.get(name, ((), span)).await
|
||||
},
|
||||
((), span),
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -177,16 +177,8 @@ impl ParquetFileCache {
|
|||
}
|
||||
|
||||
/// Get list of cached parquet files, by table id
|
||||
pub async fn get(&self, table_id: TableId, span: Option<Span>) -> Arc<CachedParquetFiles> {
|
||||
self.cache.get(table_id, ((), span)).await
|
||||
}
|
||||
|
||||
/// Mark the entry for table_id as expired (and needs a refresh)
|
||||
#[cfg(test)]
|
||||
pub fn expire(&self, table_id: TableId) {
|
||||
self.remove_if_handle.remove_if(&table_id, |_| true);
|
||||
}
|
||||
|
||||
///
|
||||
/// # Expiration
|
||||
/// Clear the parquet file cache if the cache does not contain any
|
||||
/// files that have the specified `max_parquet_sequence_number`.
|
||||
///
|
||||
|
|
@ -203,38 +195,50 @@ impl ParquetFileCache {
|
|||
/// If a `max_parquet_sequence_number` is supplied that is not in
|
||||
/// our cache, it means the ingester has written new data to the
|
||||
/// catalog and the cache is out of date.
|
||||
pub fn expire_on_newly_persisted_files(
|
||||
pub async fn get(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
max_parquet_sequence_number: Option<SequenceNumber>,
|
||||
) -> bool {
|
||||
if let Some(max_parquet_sequence_number) = max_parquet_sequence_number {
|
||||
// check backend cache to see if the maximum sequence
|
||||
// number desired is less than what we know about
|
||||
self.remove_if_handle.remove_if(&table_id, |cached_file| {
|
||||
let max_cached = cached_file.max_parquet_sequence_number();
|
||||
span: Option<Span>,
|
||||
) -> Arc<CachedParquetFiles> {
|
||||
self.remove_if_handle
|
||||
.remove_if_and_get(
|
||||
&self.cache,
|
||||
table_id,
|
||||
|cached_file| {
|
||||
if let Some(max_parquet_sequence_number) = max_parquet_sequence_number {
|
||||
let max_cached = cached_file.max_parquet_sequence_number();
|
||||
|
||||
let expire = if let Some(max_cached) = max_cached {
|
||||
max_cached < max_parquet_sequence_number
|
||||
} else {
|
||||
// a max sequence was provided but there were no
|
||||
// files in the cache. Means we need to refresh
|
||||
true
|
||||
};
|
||||
let expire = if let Some(max_cached) = max_cached {
|
||||
max_cached < max_parquet_sequence_number
|
||||
} else {
|
||||
// a max sequence was provided but there were no
|
||||
// files in the cache. Means we need to refresh
|
||||
true
|
||||
};
|
||||
|
||||
debug!(
|
||||
expire,
|
||||
?max_cached,
|
||||
max_parquet_sequence_number = max_parquet_sequence_number.get(),
|
||||
table_id = table_id.get(),
|
||||
"expire parquet file cache",
|
||||
);
|
||||
debug!(
|
||||
expire,
|
||||
?max_cached,
|
||||
max_parquet_sequence_number = max_parquet_sequence_number.get(),
|
||||
table_id = table_id.get(),
|
||||
"expire parquet file cache",
|
||||
);
|
||||
|
||||
expire
|
||||
})
|
||||
} else {
|
||||
false
|
||||
}
|
||||
expire
|
||||
} else {
|
||||
false
|
||||
}
|
||||
},
|
||||
((), span),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Mark the entry for table_id as expired (and needs a refresh)
|
||||
#[cfg(test)]
|
||||
pub fn expire(&self, table_id: TableId) {
|
||||
self.remove_if_handle.remove_if(&table_id, |_| true);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -262,7 +266,7 @@ mod tests {
|
|||
let tfile = partition.create_parquet_file(builder).await;
|
||||
|
||||
let cache = make_cache(&catalog);
|
||||
let cached_files = cache.get(table.table.id, None).await.vec();
|
||||
let cached_files = cache.get(table.table.id, None, None).await.vec();
|
||||
|
||||
assert_eq!(cached_files.len(), 1);
|
||||
let expected_parquet_file = &tfile.parquet_file;
|
||||
|
|
@ -270,7 +274,7 @@ mod tests {
|
|||
|
||||
// validate a second request doens't result in a catalog request
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
cache.get(table.table.id, None).await;
|
||||
cache.get(table.table.id, None, None).await;
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
}
|
||||
|
||||
|
|
@ -289,12 +293,12 @@ mod tests {
|
|||
|
||||
let cache = make_cache(&catalog);
|
||||
|
||||
let cached_files = cache.get(table1.table.id, None).await.vec();
|
||||
let cached_files = cache.get(table1.table.id, None, None).await.vec();
|
||||
assert_eq!(cached_files.len(), 1);
|
||||
let expected_parquet_file = &tfile1.parquet_file;
|
||||
assert_eq!(cached_files[0].as_ref(), expected_parquet_file);
|
||||
|
||||
let cached_files = cache.get(table2.table.id, None).await.vec();
|
||||
let cached_files = cache.get(table2.table.id, None, None).await.vec();
|
||||
assert_eq!(cached_files.len(), 1);
|
||||
let expected_parquet_file = &tfile2.parquet_file;
|
||||
assert_eq!(cached_files[0].as_ref(), expected_parquet_file);
|
||||
|
|
@ -310,7 +314,7 @@ mod tests {
|
|||
let different_catalog = TestCatalog::new();
|
||||
let cache = make_cache(&different_catalog);
|
||||
|
||||
let cached_files = cache.get(table.table.id, None).await.vec();
|
||||
let cached_files = cache.get(table.table.id, None, None).await.vec();
|
||||
assert!(cached_files.is_empty());
|
||||
}
|
||||
|
||||
|
|
@ -326,14 +330,14 @@ mod tests {
|
|||
assert!(single_file_size < two_file_size);
|
||||
|
||||
let cache = make_cache(&catalog);
|
||||
let cached_files = cache.get(table_id, None).await;
|
||||
let cached_files = cache.get(table_id, None, None).await;
|
||||
assert_eq!(cached_files.size(), single_file_size);
|
||||
|
||||
// add a second file, and force the cache to find it
|
||||
let builder = TestParquetFileBuilder::default().with_line_protocol(TABLE1_LINE_PROTOCOL);
|
||||
partition.create_parquet_file(builder).await;
|
||||
cache.expire(table_id);
|
||||
let cached_files = cache.get(table_id, None).await;
|
||||
let cached_files = cache.get(table_id, None, None).await;
|
||||
assert_eq!(cached_files.size(), two_file_size);
|
||||
}
|
||||
|
||||
|
|
@ -362,25 +366,26 @@ mod tests {
|
|||
let cache = make_cache(&catalog);
|
||||
let table_id = table.table.id;
|
||||
assert_eq!(
|
||||
cache.get(table_id, None).await.ids(),
|
||||
cache.get(table_id, None, None).await.ids(),
|
||||
ids(&[&tfile1_2, &tfile1_3])
|
||||
);
|
||||
|
||||
// simulate request with sequence number 2
|
||||
// should not expire anything
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_2));
|
||||
assert_eq!(
|
||||
cache.get(table_id, None).await.ids(),
|
||||
cache
|
||||
.get(table_id, Some(sequence_number_2), None)
|
||||
.await
|
||||
.ids(),
|
||||
ids(&[&tfile1_2, &tfile1_3])
|
||||
);
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
|
||||
// simulate request with no sequence number
|
||||
// should not expire anything
|
||||
cache.expire_on_newly_persisted_files(table_id, None);
|
||||
assert_eq!(
|
||||
cache.get(table_id, None).await.ids(),
|
||||
cache.get(table_id, None, None).await.ids(),
|
||||
ids(&[&tfile1_2, &tfile1_3])
|
||||
);
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
|
|
@ -394,15 +399,17 @@ mod tests {
|
|||
let tfile1_10 = partition.create_parquet_file(builder).await;
|
||||
// cache doesn't have tfile1_10
|
||||
assert_eq!(
|
||||
cache.get(table_id, None).await.ids(),
|
||||
cache.get(table_id, None, None).await.ids(),
|
||||
ids(&[&tfile1_2, &tfile1_3])
|
||||
);
|
||||
|
||||
// new request includes sequence 10 and causes a cache refresh
|
||||
cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_10));
|
||||
// now cache has tfile!_10 (yay!)
|
||||
assert_eq!(
|
||||
cache.get(table_id, None).await.ids(),
|
||||
cache
|
||||
.get(table_id, Some(sequence_number_10), None)
|
||||
.await
|
||||
.ids(),
|
||||
ids(&[&tfile1_2, &tfile1_3, &tfile1_10])
|
||||
);
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
|
||||
|
|
@ -415,16 +422,15 @@ mod tests {
|
|||
let table_id = table.table.id;
|
||||
|
||||
// no parquet files, sould be none
|
||||
assert!(cache.get(table_id, None).await.files.is_empty());
|
||||
assert!(cache.get(table_id, None, None).await.files.is_empty());
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
|
||||
// second request should be cached
|
||||
assert!(cache.get(table_id, None).await.files.is_empty());
|
||||
assert!(cache.get(table_id, None, None).await.files.is_empty());
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
|
||||
// Calls to expire if there is no known persisted file, should still be cached
|
||||
cache.expire_on_newly_persisted_files(table_id, None);
|
||||
assert!(cache.get(table_id, None).await.files.is_empty());
|
||||
assert!(cache.get(table_id, None, None).await.files.is_empty());
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
|
||||
// make a new parquet file
|
||||
|
|
@ -437,12 +443,17 @@ mod tests {
|
|||
let tfile = partition.create_parquet_file(builder).await;
|
||||
|
||||
// cache is stale
|
||||
assert!(cache.get(table_id, None).await.files.is_empty());
|
||||
assert!(cache.get(table_id, None, None).await.files.is_empty());
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
|
||||
// Now call to expire with knowledge of new file, will cause a cache refresh
|
||||
cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_1));
|
||||
assert_eq!(cache.get(table_id, None).await.ids(), ids(&[&tfile]));
|
||||
assert_eq!(
|
||||
cache
|
||||
.get(table_id, Some(sequence_number_1), None)
|
||||
.await
|
||||
.ids(),
|
||||
ids(&[&tfile])
|
||||
);
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -121,16 +121,21 @@ impl PartitionCache {
|
|||
span: Option<Span>,
|
||||
) -> Arc<Option<SortKey>> {
|
||||
self.remove_if_handle
|
||||
.remove_if(&partition_id, |cached_partition| {
|
||||
if let Some(sort_key) = cached_partition.sort_key.as_ref().as_ref() {
|
||||
should_cover.iter().any(|col| !sort_key.contains(col))
|
||||
} else {
|
||||
// no sort key at all => need to update if there is anything to cover
|
||||
!should_cover.is_empty()
|
||||
}
|
||||
});
|
||||
|
||||
self.cache.get(partition_id, ((), span)).await.sort_key
|
||||
.remove_if_and_get(
|
||||
&self.cache,
|
||||
partition_id,
|
||||
|cached_partition| {
|
||||
if let Some(sort_key) = cached_partition.sort_key.as_ref().as_ref() {
|
||||
should_cover.iter().any(|col| !sort_key.contains(col))
|
||||
} else {
|
||||
// no sort key at all => need to update if there is anything to cover
|
||||
!should_cover.is_empty()
|
||||
}
|
||||
},
|
||||
((), span),
|
||||
)
|
||||
.await
|
||||
.sort_key
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -155,16 +155,8 @@ impl TombstoneCache {
|
|||
}
|
||||
|
||||
/// Get list of cached tombstones, by table id
|
||||
pub async fn get(&self, table_id: TableId, span: Option<Span>) -> CachedTombstones {
|
||||
self.cache.get(table_id, ((), span)).await
|
||||
}
|
||||
|
||||
/// Mark the entry for table_id as expired / needs a refresh
|
||||
#[cfg(test)]
|
||||
pub fn expire(&self, table_id: TableId) {
|
||||
self.remove_if_handle.remove_if(&table_id, |_| true);
|
||||
}
|
||||
|
||||
///
|
||||
/// # Expiration
|
||||
/// Clear the tombstone cache if it doesn't contain any tombstones
|
||||
/// that have the specified `max_tombstone_sequence_number`.
|
||||
///
|
||||
|
|
@ -181,28 +173,40 @@ impl TombstoneCache {
|
|||
/// If a `max_tombstone_sequence_number` is supplied that is not in
|
||||
/// our cache, it means the ingester has written new data to the
|
||||
/// catalog and the cache is out of date.
|
||||
pub fn expire_on_newly_persisted_files(
|
||||
pub async fn get(
|
||||
&self,
|
||||
table_id: TableId,
|
||||
max_tombstone_sequence_number: Option<SequenceNumber>,
|
||||
) -> bool {
|
||||
if let Some(max_tombstone_sequence_number) = max_tombstone_sequence_number {
|
||||
// check backend cache to see if the maximum sequence
|
||||
// number desired is less than what we know about
|
||||
self.remove_if_handle.remove_if(&table_id, |cached_file| {
|
||||
let max_cached = cached_file.max_tombstone_sequence_number();
|
||||
span: Option<Span>,
|
||||
) -> CachedTombstones {
|
||||
self.remove_if_handle
|
||||
.remove_if_and_get(
|
||||
&self.cache,
|
||||
table_id,
|
||||
|cached_file| {
|
||||
if let Some(max_tombstone_sequence_number) = max_tombstone_sequence_number {
|
||||
let max_cached = cached_file.max_tombstone_sequence_number();
|
||||
|
||||
if let Some(max_cached) = max_cached {
|
||||
max_cached < max_tombstone_sequence_number
|
||||
} else {
|
||||
// a max sequence was provided but there were no
|
||||
// files in the cache. Means we need to refresh
|
||||
true
|
||||
}
|
||||
})
|
||||
} else {
|
||||
false
|
||||
}
|
||||
if let Some(max_cached) = max_cached {
|
||||
max_cached < max_tombstone_sequence_number
|
||||
} else {
|
||||
// a max sequence was provided but there were no
|
||||
// files in the cache. Means we need to refresh
|
||||
true
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
},
|
||||
((), span),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Mark the entry for table_id as expired / needs a refresh
|
||||
#[cfg(test)]
|
||||
pub fn expire(&self, table_id: TableId) {
|
||||
self.remove_if_handle.remove_if(&table_id, |_| true);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -232,14 +236,14 @@ mod tests {
|
|||
let tombstone1 = table_and_shard.create_tombstone(7, 1, 100, "foo=1").await;
|
||||
|
||||
let cache = make_cache(&catalog);
|
||||
let cached_tombstones = cache.get(table_id, None).await.to_vec();
|
||||
let cached_tombstones = cache.get(table_id, None, None).await.to_vec();
|
||||
|
||||
assert_eq!(cached_tombstones.len(), 1);
|
||||
assert_eq!(cached_tombstones[0].as_ref(), &tombstone1.tombstone);
|
||||
|
||||
// validate a second request doens't result in a catalog request
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
cache.get(table_id, None).await;
|
||||
cache.get(table_id, None, None).await;
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
}
|
||||
|
||||
|
|
@ -262,11 +266,11 @@ mod tests {
|
|||
let table_id2 = table2.table.id;
|
||||
let tombstone2 = table_and_shard2.create_tombstone(8, 1, 100, "foo=1").await;
|
||||
|
||||
let cached_tombstones = cache.get(table_id1, None).await.to_vec();
|
||||
let cached_tombstones = cache.get(table_id1, None, None).await.to_vec();
|
||||
assert_eq!(cached_tombstones.len(), 1);
|
||||
assert_eq!(cached_tombstones[0].as_ref(), &tombstone1.tombstone);
|
||||
|
||||
let cached_tombstones = cache.get(table_id2, None).await.to_vec();
|
||||
let cached_tombstones = cache.get(table_id2, None, None).await.to_vec();
|
||||
assert_eq!(cached_tombstones.len(), 1);
|
||||
assert_eq!(cached_tombstones[0].as_ref(), &tombstone2.tombstone);
|
||||
}
|
||||
|
|
@ -291,7 +295,7 @@ mod tests {
|
|||
// Create tombstone 1
|
||||
table_and_shard.create_tombstone(7, 1, 100, "foo=1").await;
|
||||
|
||||
let cached_tombstones = cache.get(table_id, None).await;
|
||||
let cached_tombstones = cache.get(table_id, None, None).await;
|
||||
assert_eq!(cached_tombstones.to_vec().len(), 1);
|
||||
assert_eq!(cached_tombstones.size(), single_tombstone_size);
|
||||
|
||||
|
|
@ -299,7 +303,7 @@ mod tests {
|
|||
table_and_shard.create_tombstone(8, 1, 100, "foo=1").await;
|
||||
|
||||
cache.expire(table_id);
|
||||
let cached_tombstones = cache.get(table_id, None).await;
|
||||
let cached_tombstones = cache.get(table_id, None, None).await;
|
||||
assert_eq!(cached_tombstones.to_vec().len(), 2);
|
||||
assert_eq!(cached_tombstones.size(), two_tombstone_size);
|
||||
}
|
||||
|
|
@ -310,7 +314,7 @@ mod tests {
|
|||
let cache = make_cache(&catalog);
|
||||
|
||||
let made_up_table = TableId::new(1337);
|
||||
let cached_tombstones = cache.get(made_up_table, None).await.to_vec();
|
||||
let cached_tombstones = cache.get(made_up_table, None, None).await.to_vec();
|
||||
assert!(cached_tombstones.is_empty());
|
||||
}
|
||||
|
||||
|
|
@ -343,19 +347,26 @@ mod tests {
|
|||
.await
|
||||
.tombstone
|
||||
.id;
|
||||
assert_ids(&cache.get(table_id, None).await, &[tombstone1, tombstone2]);
|
||||
assert_ids(
|
||||
&cache.get(table_id, None, None).await,
|
||||
&[tombstone1, tombstone2],
|
||||
);
|
||||
|
||||
// simulate request with no sequence number
|
||||
// should not expire anything
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
cache.expire_on_newly_persisted_files(table_id, None);
|
||||
assert_ids(&cache.get(table_id, None).await, &[tombstone1, tombstone2]);
|
||||
assert_ids(
|
||||
&cache.get(table_id, None, None).await,
|
||||
&[tombstone1, tombstone2],
|
||||
);
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
|
||||
// simulate request with sequence number 2
|
||||
// should not expire anything
|
||||
cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_2));
|
||||
assert_ids(&cache.get(table_id, None).await, &[tombstone1, tombstone2]);
|
||||
assert_ids(
|
||||
&cache.get(table_id, Some(sequence_number_2), None).await,
|
||||
&[tombstone1, tombstone2],
|
||||
);
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
|
||||
// add a new tombstone (at sequence 10)
|
||||
|
|
@ -365,14 +376,16 @@ mod tests {
|
|||
.tombstone
|
||||
.id;
|
||||
|
||||
// cache is stale,
|
||||
assert_ids(&cache.get(table_id, None).await, &[tombstone1, tombstone2]);
|
||||
// cache is stale,
|
||||
assert_ids(
|
||||
&cache.get(table_id, None, None).await,
|
||||
&[tombstone1, tombstone2],
|
||||
);
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
|
||||
// new request includes sequence 10 and causes a cache refresh
|
||||
cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_10));
|
||||
assert_ids(
|
||||
&cache.get(table_id, None).await,
|
||||
&cache.get(table_id, Some(sequence_number_10), None).await,
|
||||
&[tombstone1, tombstone2, tombstone10],
|
||||
);
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
|
||||
|
|
@ -392,16 +405,15 @@ mod tests {
|
|||
let cache = make_cache(&catalog);
|
||||
|
||||
// no tombstones for the table, cached
|
||||
assert!(cache.get(table_id, None).await.tombstones.is_empty());
|
||||
assert!(cache.get(table_id, None, None).await.tombstones.is_empty());
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
|
||||
// second request to should be cached
|
||||
assert!(cache.get(table_id, None).await.tombstones.is_empty());
|
||||
assert!(cache.get(table_id, None, None).await.tombstones.is_empty());
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
|
||||
// calls to expire if there are no new known tombstones should not still be cached
|
||||
cache.expire_on_newly_persisted_files(table_id, None);
|
||||
assert!(cache.get(table_id, None).await.tombstones.is_empty());
|
||||
assert!(cache.get(table_id, None, None).await.tombstones.is_empty());
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
|
||||
// Create a tombstone
|
||||
|
|
@ -412,12 +424,14 @@ mod tests {
|
|||
.id;
|
||||
|
||||
// cache is stale
|
||||
assert!(cache.get(table_id, None).await.tombstones.is_empty());
|
||||
assert!(cache.get(table_id, None, None).await.tombstones.is_empty());
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
|
||||
|
||||
// Now call to expire with knowledge of new tombstone, will cause a cache refresh
|
||||
cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_1));
|
||||
assert_ids(&cache.get(table_id, None).await, &[tombstone1]);
|
||||
assert_ids(
|
||||
&cache.get(table_id, Some(sequence_number_1), None).await,
|
||||
&[tombstone1],
|
||||
);
|
||||
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -210,10 +210,12 @@ impl QuerierTable {
|
|||
self.ingester_partitions(predicate, span_recorder.child_span("ingester partitions")),
|
||||
catalog_cache.parquet_file().get(
|
||||
self.id(),
|
||||
None,
|
||||
span_recorder.child_span("cache GET parquet_file (pre-warm")
|
||||
),
|
||||
catalog_cache.tombstone().get(
|
||||
self.id(),
|
||||
None,
|
||||
span_recorder.child_span("cache GET tombstone (pre-warm)")
|
||||
),
|
||||
);
|
||||
|
|
@ -221,9 +223,15 @@ impl QuerierTable {
|
|||
// handle errors / cache refresh
|
||||
let partitions = partitions?;
|
||||
|
||||
// figure out if the ingester has created new parquet files or
|
||||
// tombstones the querier doens't yet know about
|
||||
self.validate_caches(&partitions);
|
||||
// determine max parquet sequence number for cache invalidation
|
||||
let max_parquet_sequence_number = partitions
|
||||
.iter()
|
||||
.flat_map(|p| p.parquet_max_sequence_number())
|
||||
.max();
|
||||
let max_tombstone_sequence_number = partitions
|
||||
.iter()
|
||||
.flat_map(|p| p.tombstone_max_sequence_number())
|
||||
.max();
|
||||
|
||||
debug!(
|
||||
namespace=%self.namespace_name,
|
||||
|
|
@ -233,14 +241,19 @@ impl QuerierTable {
|
|||
);
|
||||
|
||||
// Now fetch the actual contents of the catalog we need
|
||||
// NB: Pass max parquet/tombstone sequence numbers to `get`
|
||||
// to ensure cache is refreshed if we learned about new files/tombstones.
|
||||
let (parquet_files, tombstones) = join!(
|
||||
catalog_cache.parquet_file().get(
|
||||
self.id(),
|
||||
max_parquet_sequence_number,
|
||||
span_recorder.child_span("cache GET parquet_file")
|
||||
),
|
||||
catalog_cache
|
||||
.tombstone()
|
||||
.get(self.id(), span_recorder.child_span("cache GET tombstone"))
|
||||
catalog_cache.tombstone().get(
|
||||
self.id(),
|
||||
max_tombstone_sequence_number,
|
||||
span_recorder.child_span("cache GET tombstone")
|
||||
)
|
||||
);
|
||||
|
||||
let columns: HashSet<ColumnId> = parquet_files
|
||||
|
|
@ -460,43 +473,6 @@ impl QuerierTable {
|
|||
Ok(partitions)
|
||||
}
|
||||
|
||||
/// Handles invalidating parquet and tombstone caches if the
|
||||
/// responses from the ingesters refer to newer parquet data or
|
||||
/// tombstone data than is in the cache.
|
||||
fn validate_caches(&self, partitions: &[IngesterPartition]) {
|
||||
// figure out if the ingester has created new parquet files or
|
||||
// tombstones the querier doens't yet know about
|
||||
let catalog_cache = self.chunk_adapter.catalog_cache();
|
||||
|
||||
let max_parquet_sequence_number = partitions
|
||||
.iter()
|
||||
.flat_map(|p| p.parquet_max_sequence_number())
|
||||
.max();
|
||||
|
||||
let parquet_cache_outdated = catalog_cache
|
||||
.parquet_file()
|
||||
.expire_on_newly_persisted_files(self.id, max_parquet_sequence_number);
|
||||
|
||||
let max_tombstone_sequence_number = partitions
|
||||
.iter()
|
||||
.flat_map(|p| p.tombstone_max_sequence_number())
|
||||
.max();
|
||||
|
||||
let tombstone_cache_outdated = catalog_cache
|
||||
.tombstone()
|
||||
.expire_on_newly_persisted_files(self.id, max_tombstone_sequence_number);
|
||||
|
||||
debug!(
|
||||
namespace=%self.namespace_name,
|
||||
table_name=%self.table_name(),
|
||||
parquet_cache_outdated,
|
||||
tombstone_cache_outdated,
|
||||
?max_parquet_sequence_number,
|
||||
?max_tombstone_sequence_number,
|
||||
"caches validated"
|
||||
);
|
||||
}
|
||||
|
||||
/// clear the parquet file cache
|
||||
#[cfg(test)]
|
||||
fn clear_parquet_cache(&self) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue