diff --git a/cache_system/src/backend/policy/remove_if.rs b/cache_system/src/backend/policy/remove_if.rs
index 9eb7625961..5aea47ae08 100644
--- a/cache_system/src/backend/policy/remove_if.rs
+++ b/cache_system/src/backend/policy/remove_if.rs
@@ -3,7 +3,10 @@ use metric::U64Counter;
use parking_lot::Mutex;
use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc};
-use super::{CacheBackend, CallbackHandle, ChangeRequest, Subscriber};
+use crate::{
+ backend::policy::{CacheBackend, CallbackHandle, ChangeRequest, Subscriber},
+ cache::{Cache, CacheGetStatus},
+};
/// Allows explicitly removing entries from the cache.
#[derive(Debug, Clone)]
@@ -96,7 +99,7 @@ where
/// held (and thus the inner backend can't be concurrently accessed
pub fn remove_if
(&self, k: &K, predicate: P) -> bool
where
- P: Fn(V) -> bool + Send,
+ P: FnOnce(V) -> bool,
{
let mut guard = self.callback_handle.lock();
let handle = match guard.as_mut() {
@@ -120,6 +123,65 @@ where
removed
}
+
+ /// Performs [`remove_if`](Self::remove_if) and [`GET`](Cache::get) in one go.
+ ///
+ /// Ensures that these two actions interact correctly.
+ ///
+ /// # Forward process
+ /// This function only works if cache values evolve in one direction. This is that the predicate can only flip from
+ /// `true` to `false` over time (i.e. it detects an outdated value and then an up-to-date value), NOT the other way
+ /// around (i.e. data cannot get outdated under the same predicate).
+ pub async fn remove_if_and_get
(
+ &self,
+ cache: &C,
+ k: K,
+ predicate: P,
+ extra: GetExtra,
+ ) -> V
+ where
+ P: Fn(V) -> bool + Send,
+ C: Cache,
+ GetExtra: Clone + Send,
+ {
+ let mut removed = self.remove_if(&k, &predicate);
+
+ loop {
+ // avoid some `Sync` bounds
+ let k_for_get = k.clone();
+ let extra_for_get = extra.clone();
+ let (v, status) = cache.get_with_status(k_for_get, extra_for_get).await;
+
+ match status {
+ CacheGetStatus::Hit => {
+ // key existed and no other process loaded it => safe to use
+ return v;
+ }
+ CacheGetStatus::Miss => {
+ // key didn't exist and we loaded it => safe to use
+ return v;
+ }
+ CacheGetStatus::MissAlreadyLoading => {
+ if removed {
+ // key was outdated but there was some loading in process, this may have overlapped with our check
+ // so our check might have been incomplete => need to re-check
+ removed = self.remove_if(&k, &predicate);
+ if removed {
+ // removed again, so cannot use our result
+ continue;
+ } else {
+ // didn't remove => safe to use
+ return v;
+ }
+ } else {
+ // there was a load action in process but the key was already up-to-date, so it's OK to use the new
+ // data as well (forward process)
+ return v;
+ }
+ }
+ }
+ }
+ }
}
#[cfg(test)]
diff --git a/querier/src/cache/namespace.rs b/querier/src/cache/namespace.rs
index 3e948badcc..d22ef43621 100644
--- a/querier/src/cache/namespace.rs
+++ b/querier/src/cache/namespace.rs
@@ -167,25 +167,30 @@ impl NamespaceCache {
should_cover: &[(&str, &HashSet)],
span: Option,
) -> Option> {
- self.remove_if_handle.remove_if(&name, |cached_namespace| {
- if let Some(namespace) = cached_namespace.as_ref() {
- should_cover.iter().any(|(table_name, columns)| {
- if let Some(table) = namespace.tables.get(*table_name) {
- columns
- .iter()
- .any(|col| !table.column_id_map.contains_key(col))
+ self.remove_if_handle
+ .remove_if_and_get(
+ &self.cache,
+ name,
+ |cached_namespace| {
+ if let Some(namespace) = cached_namespace.as_ref() {
+ should_cover.iter().any(|(table_name, columns)| {
+ if let Some(table) = namespace.tables.get(*table_name) {
+ columns
+ .iter()
+ .any(|col| !table.column_id_map.contains_key(col))
+ } else {
+ // table unknown => need to update
+ true
+ }
+ })
} else {
- // table unknown => need to update
- true
+ // namespace unknown => need to update if should cover anything
+ !should_cover.is_empty()
}
- })
- } else {
- // namespace unknown => need to update if should cover anything
- !should_cover.is_empty()
- }
- });
-
- self.cache.get(name, ((), span)).await
+ },
+ ((), span),
+ )
+ .await
}
}
diff --git a/querier/src/cache/parquet_file.rs b/querier/src/cache/parquet_file.rs
index 79d13422f8..e73acae298 100644
--- a/querier/src/cache/parquet_file.rs
+++ b/querier/src/cache/parquet_file.rs
@@ -177,16 +177,8 @@ impl ParquetFileCache {
}
/// Get list of cached parquet files, by table id
- pub async fn get(&self, table_id: TableId, span: Option) -> Arc {
- self.cache.get(table_id, ((), span)).await
- }
-
- /// Mark the entry for table_id as expired (and needs a refresh)
- #[cfg(test)]
- pub fn expire(&self, table_id: TableId) {
- self.remove_if_handle.remove_if(&table_id, |_| true);
- }
-
+ ///
+ /// # Expiration
/// Clear the parquet file cache if the cache does not contain any
/// files that have the specified `max_parquet_sequence_number`.
///
@@ -203,38 +195,50 @@ impl ParquetFileCache {
/// If a `max_parquet_sequence_number` is supplied that is not in
/// our cache, it means the ingester has written new data to the
/// catalog and the cache is out of date.
- pub fn expire_on_newly_persisted_files(
+ pub async fn get(
&self,
table_id: TableId,
max_parquet_sequence_number: Option,
- ) -> bool {
- if let Some(max_parquet_sequence_number) = max_parquet_sequence_number {
- // check backend cache to see if the maximum sequence
- // number desired is less than what we know about
- self.remove_if_handle.remove_if(&table_id, |cached_file| {
- let max_cached = cached_file.max_parquet_sequence_number();
+ span: Option,
+ ) -> Arc {
+ self.remove_if_handle
+ .remove_if_and_get(
+ &self.cache,
+ table_id,
+ |cached_file| {
+ if let Some(max_parquet_sequence_number) = max_parquet_sequence_number {
+ let max_cached = cached_file.max_parquet_sequence_number();
- let expire = if let Some(max_cached) = max_cached {
- max_cached < max_parquet_sequence_number
- } else {
- // a max sequence was provided but there were no
- // files in the cache. Means we need to refresh
- true
- };
+ let expire = if let Some(max_cached) = max_cached {
+ max_cached < max_parquet_sequence_number
+ } else {
+ // a max sequence was provided but there were no
+ // files in the cache. Means we need to refresh
+ true
+ };
- debug!(
- expire,
- ?max_cached,
- max_parquet_sequence_number = max_parquet_sequence_number.get(),
- table_id = table_id.get(),
- "expire parquet file cache",
- );
+ debug!(
+ expire,
+ ?max_cached,
+ max_parquet_sequence_number = max_parquet_sequence_number.get(),
+ table_id = table_id.get(),
+ "expire parquet file cache",
+ );
- expire
- })
- } else {
- false
- }
+ expire
+ } else {
+ false
+ }
+ },
+ ((), span),
+ )
+ .await
+ }
+
+ /// Mark the entry for table_id as expired (and needs a refresh)
+ #[cfg(test)]
+ pub fn expire(&self, table_id: TableId) {
+ self.remove_if_handle.remove_if(&table_id, |_| true);
}
}
@@ -262,7 +266,7 @@ mod tests {
let tfile = partition.create_parquet_file(builder).await;
let cache = make_cache(&catalog);
- let cached_files = cache.get(table.table.id, None).await.vec();
+ let cached_files = cache.get(table.table.id, None, None).await.vec();
assert_eq!(cached_files.len(), 1);
let expected_parquet_file = &tfile.parquet_file;
@@ -270,7 +274,7 @@ mod tests {
// validate a second request doens't result in a catalog request
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
- cache.get(table.table.id, None).await;
+ cache.get(table.table.id, None, None).await;
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
}
@@ -289,12 +293,12 @@ mod tests {
let cache = make_cache(&catalog);
- let cached_files = cache.get(table1.table.id, None).await.vec();
+ let cached_files = cache.get(table1.table.id, None, None).await.vec();
assert_eq!(cached_files.len(), 1);
let expected_parquet_file = &tfile1.parquet_file;
assert_eq!(cached_files[0].as_ref(), expected_parquet_file);
- let cached_files = cache.get(table2.table.id, None).await.vec();
+ let cached_files = cache.get(table2.table.id, None, None).await.vec();
assert_eq!(cached_files.len(), 1);
let expected_parquet_file = &tfile2.parquet_file;
assert_eq!(cached_files[0].as_ref(), expected_parquet_file);
@@ -310,7 +314,7 @@ mod tests {
let different_catalog = TestCatalog::new();
let cache = make_cache(&different_catalog);
- let cached_files = cache.get(table.table.id, None).await.vec();
+ let cached_files = cache.get(table.table.id, None, None).await.vec();
assert!(cached_files.is_empty());
}
@@ -326,14 +330,14 @@ mod tests {
assert!(single_file_size < two_file_size);
let cache = make_cache(&catalog);
- let cached_files = cache.get(table_id, None).await;
+ let cached_files = cache.get(table_id, None, None).await;
assert_eq!(cached_files.size(), single_file_size);
// add a second file, and force the cache to find it
let builder = TestParquetFileBuilder::default().with_line_protocol(TABLE1_LINE_PROTOCOL);
partition.create_parquet_file(builder).await;
cache.expire(table_id);
- let cached_files = cache.get(table_id, None).await;
+ let cached_files = cache.get(table_id, None, None).await;
assert_eq!(cached_files.size(), two_file_size);
}
@@ -362,25 +366,26 @@ mod tests {
let cache = make_cache(&catalog);
let table_id = table.table.id;
assert_eq!(
- cache.get(table_id, None).await.ids(),
+ cache.get(table_id, None, None).await.ids(),
ids(&[&tfile1_2, &tfile1_3])
);
// simulate request with sequence number 2
// should not expire anything
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
- cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_2));
assert_eq!(
- cache.get(table_id, None).await.ids(),
+ cache
+ .get(table_id, Some(sequence_number_2), None)
+ .await
+ .ids(),
ids(&[&tfile1_2, &tfile1_3])
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// simulate request with no sequence number
// should not expire anything
- cache.expire_on_newly_persisted_files(table_id, None);
assert_eq!(
- cache.get(table_id, None).await.ids(),
+ cache.get(table_id, None, None).await.ids(),
ids(&[&tfile1_2, &tfile1_3])
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
@@ -394,15 +399,17 @@ mod tests {
let tfile1_10 = partition.create_parquet_file(builder).await;
// cache doesn't have tfile1_10
assert_eq!(
- cache.get(table_id, None).await.ids(),
+ cache.get(table_id, None, None).await.ids(),
ids(&[&tfile1_2, &tfile1_3])
);
// new request includes sequence 10 and causes a cache refresh
- cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_10));
// now cache has tfile!_10 (yay!)
assert_eq!(
- cache.get(table_id, None).await.ids(),
+ cache
+ .get(table_id, Some(sequence_number_10), None)
+ .await
+ .ids(),
ids(&[&tfile1_2, &tfile1_3, &tfile1_10])
);
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
@@ -415,16 +422,15 @@ mod tests {
let table_id = table.table.id;
// no parquet files, sould be none
- assert!(cache.get(table_id, None).await.files.is_empty());
+ assert!(cache.get(table_id, None, None).await.files.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// second request should be cached
- assert!(cache.get(table_id, None).await.files.is_empty());
+ assert!(cache.get(table_id, None, None).await.files.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// Calls to expire if there is no known persisted file, should still be cached
- cache.expire_on_newly_persisted_files(table_id, None);
- assert!(cache.get(table_id, None).await.files.is_empty());
+ assert!(cache.get(table_id, None, None).await.files.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// make a new parquet file
@@ -437,12 +443,17 @@ mod tests {
let tfile = partition.create_parquet_file(builder).await;
// cache is stale
- assert!(cache.get(table_id, None).await.files.is_empty());
+ assert!(cache.get(table_id, None, None).await.files.is_empty());
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 1);
// Now call to expire with knowledge of new file, will cause a cache refresh
- cache.expire_on_newly_persisted_files(table_id, Some(sequence_number_1));
- assert_eq!(cache.get(table_id, None).await.ids(), ids(&[&tfile]));
+ assert_eq!(
+ cache
+ .get(table_id, Some(sequence_number_1), None)
+ .await
+ .ids(),
+ ids(&[&tfile])
+ );
assert_histogram_metric_count(&catalog.metric_registry, METRIC_NAME, 2);
}
diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs
index 744ef054c2..de6f0e049b 100644
--- a/querier/src/cache/partition.rs
+++ b/querier/src/cache/partition.rs
@@ -121,16 +121,21 @@ impl PartitionCache {
span: Option,
) -> Arc