From 0ccefa0d0c479e0f55f345e36f2328f9b3de4d52 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 15 Aug 2022 16:48:16 +0000 Subject: [PATCH] refactor: port TTL backend to policy framework (#5396) * refactor: port TTL backend to policy framework Note that this is "just" a port, it does NOT change how TTL works. This will be done in #5318. Helps with #5320. * fix: ensure inner backend is empty * test: add some smoke test --- Cargo.lock | 2 + cache_system/Cargo.toml | 2 +- cache_system/src/backend/mod.rs | 1 - cache_system/src/backend/policy/mod.rs | 69 +++- cache_system/src/backend/{ => policy}/ttl.rs | 384 +++++++++---------- querier/src/cache/namespace.rs | 11 +- querier/src/cache/processed_tombstones.rs | 12 +- querier/src/cache/table.rs | 11 +- workspace-hack/Cargo.toml | 4 + 9 files changed, 270 insertions(+), 226 deletions(-) rename cache_system/src/backend/{ => policy}/ttl.rs (65%) diff --git a/Cargo.lock b/Cargo.lock index 3f139a6042..e6b389aaa7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5940,6 +5940,7 @@ dependencies = [ "hex", "indexmap", "libc", + "lock_api", "log", "md-5", "memchr", @@ -5948,6 +5949,7 @@ dependencies = [ "num-traits", "object_store", "once_cell", + "parking_lot 0.12.1", "parquet", "predicates", "prost 0.11.0", diff --git a/cache_system/Cargo.toml b/cache_system/Cargo.toml index 41c19c6c82..3b026beaef 100644 --- a/cache_system/Cargo.toml +++ b/cache_system/Cargo.toml @@ -9,7 +9,7 @@ futures = "0.3" iox_time = { path = "../iox_time" } metric = { path = "../metric" } observability_deps = { path = "../observability_deps" } -parking_lot = "0.12" +parking_lot = { version = "0.12", features = ["arc_lock"] } pdatastructs = { version = "0.7", default-features = false, features = ["fixedbitset"] } tokio = { version = "1.20", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } trace = { path = "../trace"} diff --git a/cache_system/src/backend/mod.rs b/cache_system/src/backend/mod.rs index 5723618a5b..676073c9d6 100644 --- a/cache_system/src/backend/mod.rs +++ b/cache_system/src/backend/mod.rs @@ -7,7 +7,6 @@ pub mod lru; pub mod policy; pub mod resource_consumption; pub mod shared; -pub mod ttl; #[cfg(test)] mod test_util; diff --git a/cache_system/src/backend/policy/mod.rs b/cache_system/src/backend/policy/mod.rs index ed2490d8b3..40d60d2bad 100644 --- a/cache_system/src/backend/policy/mod.rs +++ b/cache_system/src/backend/policy/mod.rs @@ -5,13 +5,17 @@ use std::{ collections::VecDeque, fmt::Debug, hash::Hash, + marker::PhantomData, + ops::Deref, sync::{Arc, Weak}, }; -use parking_lot::{Mutex, ReentrantMutex}; +use parking_lot::{lock_api::ArcMutexGuard, Mutex, RawMutex, ReentrantMutex}; use super::CacheBackend; +pub mod ttl; + /// Convenience macro to easily follow the borrow/lock chain of [`StrongSharedInner`]. /// /// This cannot just be a method because we cannot return references to local variables. @@ -266,6 +270,19 @@ where lock_inner!(mut guard = self.inner); guard.subscribers.push(Box::new(subscriber)); } + + /// Provide temporary read-only access to the underlying backend. + /// + /// This is mostly useful for debugging and testing. + pub fn inner_ref(&mut self) -> InnerBackendRef<'_, K, V> { + // NOTE: We deliberately use a mutable reference here to prevent users from using `` while + // we hold a lock to the underlying backend. + lock_inner!(guard = self.inner); + InnerBackendRef { + inner: guard.inner.lock_arc(), + _phantom: PhantomData::default(), + } + } } impl CacheBackend for PolicyBackend @@ -476,6 +493,15 @@ where }) } + /// Ensure that backends is empty and panic otherwise. + /// + /// This is mostly useful during initialization. + pub fn ensure_empty() -> Self { + Self::from_fn(|backend| { + assert!(backend.is_empty(), "inner backend is not empty"); + }) + } + /// Execute this change request. fn eval(self, backend: &mut dyn CacheBackend) { (self.fun)(backend) @@ -558,6 +584,47 @@ where } } +/// Read-only ref to the inner backend of [`PolicyBackend`] for debugging. +pub struct InnerBackendRef<'a, K, V> +where + K: Clone + Eq + Hash + Ord + Debug + Send + 'static, + V: Clone + Debug + Send + 'static, +{ + inner: ArcMutexGuard>>, + _phantom: PhantomData<&'a mut ()>, +} + +// Workaround for . +impl<'a, K, V> Drop for InnerBackendRef<'a, K, V> +where + K: Clone + Eq + Hash + Ord + Debug + Send + 'static, + V: Clone + Debug + Send + 'static, +{ + fn drop(&mut self) {} +} + +impl<'a, K, V> Debug for InnerBackendRef<'a, K, V> +where + K: Clone + Eq + Hash + Ord + Debug + Send + 'static, + V: Clone + Debug + Send + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InnerBackendRef").finish_non_exhaustive() + } +} + +impl<'a, K, V> Deref for InnerBackendRef<'a, K, V> +where + K: Clone + Eq + Hash + Ord + Debug + Send + 'static, + V: Clone + Debug + Send + 'static, +{ + type Target = dyn CacheBackend; + + fn deref(&self) -> &Self::Target { + self.inner.as_ref() + } +} + #[cfg(test)] mod tests { use std::{collections::HashMap, sync::Barrier, thread::JoinHandle}; diff --git a/cache_system/src/backend/ttl.rs b/cache_system/src/backend/policy/ttl.rs similarity index 65% rename from cache_system/src/backend/ttl.rs rename to cache_system/src/backend/policy/ttl.rs index 01668d8cc4..86c6275412 100644 --- a/cache_system/src/backend/ttl.rs +++ b/cache_system/src/backend/policy/ttl.rs @@ -1,10 +1,12 @@ //! Time-to-live handling. -use std::{any::Any, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc, time::Duration}; +use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc, time::Duration}; use iox_time::{Time, TimeProvider}; use metric::U64Counter; -use super::{addressable_heap::AddressableHeap, CacheBackend}; +use crate::backend::addressable_heap::AddressableHeap; + +use super::{CallbackHandle, ChangeRequest, Subscriber}; /// Interface to provide TTL (time to live) data for a key-value pair. pub trait TtlProvider: std::fmt::Debug + Send + Sync + 'static { @@ -104,45 +106,36 @@ impl TtlProvider for OptionalValueTtlProvider { } } -/// Cache backend that implements Time To Life. +/// Cache policy that implements Time To Life. /// /// # Cache Eviction -/// Every method ([`get`](CacheBackend::get), [`set`](CacheBackend::set), [`remove`](CacheBackend::remove)) causes the +/// Every method ([`get`](Subscriber::get), [`set`](Subscriber::set), [`remove`](Subscriber::remove)) causes the /// cache to check for expired keys. This may lead to certain delays, esp. when dropping the contained values takes a /// long time. #[derive(Debug)] -pub struct TtlBackend +pub struct TtlPolicy where K: Clone + Eq + Debug + Hash + Ord + Send + 'static, V: Clone + Debug + Send + 'static, { - inner_backend: Box>, ttl_provider: Arc>, time_provider: Arc, expiration: AddressableHeap, metric_expired: U64Counter, } -impl TtlBackend +impl TtlPolicy where K: Clone + Eq + Debug + Hash + Ord + Send + 'static, V: Clone + Debug + Send + 'static, { - /// Create new backend w/o any known keys. - /// - /// The inner backend MUST NOT contain any data at this point, otherwise we will not track any TTLs for these entries. - /// - /// # Panic - /// If the inner backend is not empty. + /// Create new TTL policy. pub fn new( - inner_backend: Box>, ttl_provider: Arc>, time_provider: Arc, name: &'static str, metric_registry: &metric::Registry, - ) -> Self { - assert!(inner_backend.is_empty(), "inner backend is not empty"); - + ) -> impl FnOnce(CallbackHandle) -> Self { let metric_expired = metric_registry .register_metric::( "cache_ttl_expired", @@ -150,16 +143,21 @@ where ) .recorder(&[("name", name)]); - Self { - inner_backend, - ttl_provider, - time_provider, - expiration: Default::default(), - metric_expired, + |mut callback_handle| { + callback_handle.init_requests(vec![ChangeRequest::ensure_empty()]); + + Self { + ttl_provider, + time_provider, + expiration: Default::default(), + metric_expired, + } } } - fn evict_expired(&mut self, now: Time) { + fn evict_expired(&mut self, now: Time) -> Vec> { + let mut requests = vec![]; + while self .expiration .peek() @@ -168,24 +166,14 @@ where { let (k, _, _t) = self.expiration.pop().unwrap(); self.metric_expired.inc(1); - self.inner_backend.remove(&k); + requests.push(ChangeRequest::remove(k)); } - } - /// Reference to inner backend. - #[allow(dead_code)] - pub fn inner_backend(&self) -> &dyn CacheBackend { - self.inner_backend.as_ref() - } - - /// Reference to TTL provider. - #[allow(dead_code)] - pub fn ttl_provider(&self) -> &Arc> { - &self.ttl_provider + requests } } -impl CacheBackend for TtlBackend +impl Subscriber for TtlPolicy where K: Clone + Eq + Debug + Hash + Ord + Send + 'static, V: Clone + Debug + Send + 'static, @@ -193,18 +181,18 @@ where type K = K; type V = V; - fn get(&mut self, k: &Self::K) -> Option { - self.evict_expired(self.time_provider.now()); - - self.inner_backend.get(k) + fn get(&mut self, _k: &Self::K) -> Vec> { + self.evict_expired(self.time_provider.now()) } - fn set(&mut self, k: Self::K, v: Self::V) { + fn set(&mut self, k: Self::K, v: Self::V) -> Vec> { let now = self.time_provider.now(); - self.evict_expired(now); + let mut requests = self.evict_expired(now); - let should_store = if let Some(ttl) = self.ttl_provider.expires_in(&k, &v) { - let should_store = !ttl.is_zero(); + if let Some(ttl) = self.ttl_provider.expires_in(&k, &v) { + if ttl.is_zero() { + requests.push(ChangeRequest::remove(k.clone())); + } match now.checked_add(ttl) { Some(t) => { @@ -215,33 +203,17 @@ where self.expiration.remove(&k); } } - - should_store } else { // Still need to ensure that any current expiration is disabled self.expiration.remove(&k); - - true }; - if should_store { - self.inner_backend.set(k, v); - } + requests } - fn remove(&mut self, k: &Self::K) { - self.evict_expired(self.time_provider.now()); - - self.inner_backend.remove(k); + fn remove(&mut self, k: &Self::K) -> Vec> { self.expiration.remove(k); - } - - fn is_empty(&self) -> bool { - self.inner_backend.is_empty() - } - - fn as_any(&self) -> &dyn Any { - self as &dyn Any + self.evict_expired(self.time_provider.now()) } } @@ -253,6 +225,8 @@ mod tests { use metric::{Observation, RawReporter}; use parking_lot::Mutex; + use crate::backend::{policy::PolicyBackend, CacheBackend}; + use super::*; #[test] @@ -271,17 +245,33 @@ mod tests { } #[test] - fn test_expires_single() { + #[should_panic(expected = "inner backend is not empty")] + fn test_panic_inner_not_empty() { let ttl_provider = Arc::new(TestTtlProvider::new()); - let time_provider = Arc::new(MockProvider::new(Time::MIN)); let metric_registry = metric::Registry::new(); - let mut backend = TtlBackend::new( - Box::new(HashMap::::new()), + + let time_provider = Arc::new(MockProvider::new(Time::MIN)); + let mut backend = PolicyBackend::new(Box::new(HashMap::::new())); + let constructor = TtlPolicy::new( Arc::clone(&ttl_provider) as _, Arc::clone(&time_provider) as _, "my_cache", &metric_registry, ); + backend.add_policy(|mut handle| { + handle.init_requests(vec![ChangeRequest::set(1, String::from("foo"))]); + constructor(handle) + }); + } + + #[test] + fn test_expires_single() { + let TestState { + mut backend, + metric_registry, + ttl_provider, + time_provider, + } = TestState::new(); assert_eq!(get_expired_metric(&metric_registry), 0); @@ -304,13 +294,13 @@ mod tests { // init time provider at MAX! let time_provider = Arc::new(MockProvider::new(Time::MAX)); - let mut backend = TtlBackend::new( - Box::new(HashMap::::new()), + let mut backend = PolicyBackend::new(Box::new(HashMap::::new())); + backend.add_policy(TtlPolicy::new( Arc::clone(&ttl_provider) as _, Arc::clone(&time_provider) as _, "my_cache", &metric_registry, - ); + )); ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::MAX)); backend.set(1, String::from("a")); @@ -319,16 +309,12 @@ mod tests { #[test] fn test_never_expire() { - let ttl_provider = Arc::new(TestTtlProvider::new()); - let time_provider = Arc::new(MockProvider::new(Time::MIN)); - let metric_registry = metric::Registry::new(); - let mut backend = TtlBackend::new( - Box::new(HashMap::::new()), - Arc::clone(&ttl_provider) as _, - Arc::clone(&time_provider) as _, - "my_cache", - &metric_registry, - ); + let TestState { + mut backend, + ttl_provider, + time_provider, + .. + } = TestState::new(); ttl_provider.set_expires_in(1, String::from("a"), None); backend.set(1, String::from("a")); @@ -340,16 +326,12 @@ mod tests { #[test] fn test_expiration_uses_key_and_value() { - let ttl_provider = Arc::new(TestTtlProvider::new()); - let time_provider = Arc::new(MockProvider::new(Time::MIN)); - let metric_registry = metric::Registry::new(); - let mut backend = TtlBackend::new( - Box::new(HashMap::::new()), - Arc::clone(&ttl_provider) as _, - Arc::clone(&time_provider) as _, - "my_cache", - &metric_registry, - ); + let TestState { + mut backend, + ttl_provider, + time_provider, + .. + } = TestState::new(); ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1))); ttl_provider.set_expires_in(1, String::from("b"), Some(Duration::from_secs(4))); @@ -362,16 +344,12 @@ mod tests { #[test] fn test_override_with_different_expiration() { - let ttl_provider = Arc::new(TestTtlProvider::new()); - let time_provider = Arc::new(MockProvider::new(Time::MIN)); - let metric_registry = metric::Registry::new(); - let mut backend = TtlBackend::new( - Box::new(HashMap::::new()), - Arc::clone(&ttl_provider) as _, - Arc::clone(&time_provider) as _, - "my_cache", - &metric_registry, - ); + let TestState { + mut backend, + ttl_provider, + time_provider, + .. + } = TestState::new(); ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1))); backend.set(1, String::from("a")); @@ -386,16 +364,12 @@ mod tests { #[test] fn test_override_with_no_expiration() { - let ttl_provider = Arc::new(TestTtlProvider::new()); - let time_provider = Arc::new(MockProvider::new(Time::MIN)); - let metric_registry = metric::Registry::new(); - let mut backend = TtlBackend::new( - Box::new(HashMap::::new()), - Arc::clone(&ttl_provider) as _, - Arc::clone(&time_provider) as _, - "my_cache", - &metric_registry, - ); + let TestState { + mut backend, + ttl_provider, + time_provider, + .. + } = TestState::new(); ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1))); backend.set(1, String::from("a")); @@ -415,13 +389,13 @@ mod tests { // init time provider at nearly MAX! let time_provider = Arc::new(MockProvider::new(Time::MAX - Duration::from_secs(2))); - let mut backend = TtlBackend::new( - Box::new(HashMap::::new()), + let mut backend = PolicyBackend::new(Box::new(HashMap::::new())); + backend.add_policy(TtlPolicy::new( Arc::clone(&ttl_provider) as _, Arc::clone(&time_provider) as _, "my_cache", &metric_registry, - ); + )); ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1))); backend.set(1, String::from("a")); @@ -436,16 +410,12 @@ mod tests { #[test] fn test_readd_with_different_expiration() { - let ttl_provider = Arc::new(TestTtlProvider::new()); - let time_provider = Arc::new(MockProvider::new(Time::MIN)); - let metric_registry = metric::Registry::new(); - let mut backend = TtlBackend::new( - Box::new(HashMap::::new()), - Arc::clone(&ttl_provider) as _, - Arc::clone(&time_provider) as _, - "my_cache", - &metric_registry, - ); + let TestState { + mut backend, + ttl_provider, + time_provider, + .. + } = TestState::new(); ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1))); backend.set(1, String::from("a")); @@ -461,16 +431,12 @@ mod tests { #[test] fn test_readd_with_no_expiration() { - let ttl_provider = Arc::new(TestTtlProvider::new()); - let time_provider = Arc::new(MockProvider::new(Time::MIN)); - let metric_registry = metric::Registry::new(); - let mut backend = TtlBackend::new( - Box::new(HashMap::::new()), - Arc::clone(&ttl_provider) as _, - Arc::clone(&time_provider) as _, - "my_cache", - &metric_registry, - ); + let TestState { + mut backend, + ttl_provider, + time_provider, + .. + } = TestState::new(); ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1))); backend.set(1, String::from("a")); @@ -486,16 +452,12 @@ mod tests { #[test] fn test_update_cleans_multiple_keys() { - let ttl_provider = Arc::new(TestTtlProvider::new()); - let time_provider = Arc::new(MockProvider::new(Time::MIN)); - let metric_registry = metric::Registry::new(); - let mut backend = TtlBackend::new( - Box::new(HashMap::::new()), - Arc::clone(&ttl_provider) as _, - Arc::clone(&time_provider) as _, - "my_cache", - &metric_registry, - ); + let TestState { + mut backend, + ttl_provider, + time_provider, + .. + } = TestState::new(); ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1))); ttl_provider.set_expires_in(2, String::from("b"), Some(Duration::from_secs(2))); @@ -512,15 +474,19 @@ mod tests { time_provider.inc(Duration::from_secs(2)); assert_eq!(backend.get(&1), None); - let inner_backend = backend - .inner_backend() - .as_any() - .downcast_ref::>() - .unwrap(); - assert!(!inner_backend.contains_key(&1)); - assert!(!inner_backend.contains_key(&2)); - assert!(!inner_backend.contains_key(&3)); - assert!(inner_backend.contains_key(&4)); + + { + let inner_ref = backend.inner_ref(); + let inner_backend = inner_ref + .as_any() + .downcast_ref::>() + .unwrap(); + assert!(!inner_backend.contains_key(&1)); + assert!(!inner_backend.contains_key(&2)); + assert!(!inner_backend.contains_key(&3)); + assert!(inner_backend.contains_key(&4)); + } + assert_eq!(backend.get(&2), None); assert_eq!(backend.get(&3), None); assert_eq!(backend.get(&4), Some(String::from("d"))); @@ -528,16 +494,12 @@ mod tests { #[test] fn test_remove_expired_key() { - let ttl_provider = Arc::new(TestTtlProvider::new()); - let time_provider = Arc::new(MockProvider::new(Time::MIN)); - let metric_registry = metric::Registry::new(); - let mut backend = TtlBackend::new( - Box::new(HashMap::::new()), - Arc::clone(&ttl_provider) as _, - Arc::clone(&time_provider) as _, - "my_cache", - &metric_registry, - ); + let TestState { + mut backend, + ttl_provider, + time_provider, + .. + } = TestState::new(); ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1))); backend.set(1, String::from("a")); @@ -550,16 +512,12 @@ mod tests { #[test] fn test_expire_removed_key() { - let ttl_provider = Arc::new(TestTtlProvider::new()); - let time_provider = Arc::new(MockProvider::new(Time::MIN)); - let metric_registry = metric::Registry::new(); - let mut backend = TtlBackend::new( - Box::new(HashMap::::new()), - Arc::clone(&ttl_provider) as _, - Arc::clone(&time_provider) as _, - "my_cache", - &metric_registry, - ); + let TestState { + mut backend, + ttl_provider, + time_provider, + .. + } = TestState::new(); ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1))); ttl_provider.set_expires_in(2, String::from("b"), Some(Duration::from_secs(2))); @@ -574,60 +532,36 @@ mod tests { #[test] fn test_expire_immediately() { - let ttl_provider = Arc::new(TestTtlProvider::new()); - let time_provider = Arc::new(MockProvider::new(Time::MIN)); - let metric_registry = metric::Registry::new(); - let mut backend = TtlBackend::new( - Box::new(HashMap::::new()), - Arc::clone(&ttl_provider) as _, - Arc::clone(&time_provider) as _, - "my_cache", - &metric_registry, - ); + let TestState { + mut backend, + ttl_provider, + .. + } = TestState::new(); ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(0))); backend.set(1, String::from("a")); - let inner_backend = backend - .inner_backend() - .as_any() - .downcast_ref::>() - .unwrap(); - assert!(inner_backend.is_empty()); + assert!(backend.is_empty()); assert_eq!(backend.get(&1), None); } #[test] - #[should_panic(expected = "inner backend is not empty")] - fn test_panic_inner_not_empty() { - let ttl_provider = Arc::new(TestTtlProvider::new()); - let time_provider = Arc::new(MockProvider::new(Time::MIN)); - let metric_registry = metric::Registry::new(); - TtlBackend::new( - Box::new(HashMap::::from([(1, String::from("a"))])), - Arc::clone(&ttl_provider) as _, - Arc::clone(&time_provider) as _, - "my_cache", - &metric_registry, - ); - } - - #[test] - fn test_generic() { + fn test_generic_backend() { use crate::backend::test_util::test_generic; test_generic(|| { let ttl_provider = Arc::new(NeverTtlProvider::default()); let time_provider = Arc::new(MockProvider::new(Time::MIN)); let metric_registry = metric::Registry::new(); - TtlBackend::new( - Box::new(HashMap::::new()), - ttl_provider, - time_provider, + let mut backend = PolicyBackend::new(Box::new(HashMap::::new())); + backend.add_policy(TtlPolicy::new( + Arc::clone(&ttl_provider) as _, + Arc::clone(&time_provider) as _, "my_cache", &metric_registry, - ) + )); + backend }); } @@ -661,6 +595,36 @@ mod tests { } } + struct TestState { + backend: PolicyBackend, + metric_registry: metric::Registry, + ttl_provider: Arc, + time_provider: Arc, + } + + impl TestState { + fn new() -> Self { + let ttl_provider = Arc::new(TestTtlProvider::new()); + let time_provider = Arc::new(MockProvider::new(Time::MIN)); + let metric_registry = metric::Registry::new(); + + let mut backend = PolicyBackend::new(Box::new(HashMap::::new())); + backend.add_policy(TtlPolicy::new( + Arc::clone(&ttl_provider) as _, + Arc::clone(&time_provider) as _, + "my_cache", + &metric_registry, + )); + + Self { + backend, + metric_registry, + ttl_provider, + time_provider, + } + } + } + fn get_expired_metric(metric_registry: &metric::Registry) -> u64 { let mut reporter = RawReporter::default(); metric_registry.report(&mut reporter); diff --git a/querier/src/cache/namespace.rs b/querier/src/cache/namespace.rs index d5309d4544..d4dae8270f 100644 --- a/querier/src/cache/namespace.rs +++ b/querier/src/cache/namespace.rs @@ -4,9 +4,12 @@ use backoff::{Backoff, BackoffConfig}; use cache_system::{ backend::{ lru::{LruBackend, ResourcePool}, + policy::{ + ttl::{OptionalValueTtlProvider, TtlPolicy}, + PolicyBackend, + }, resource_consumption::FunctionEstimator, shared::SharedBackend, - ttl::{OptionalValueTtlProvider, TtlBackend}, }, cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache}, loader::{metrics::MetricsLoader, FunctionLoader}, @@ -100,8 +103,8 @@ impl NamespaceCache { testing, )); - let backend = Box::new(TtlBackend::new( - Box::new(HashMap::new()), + let mut backend = PolicyBackend::new(Box::new(HashMap::new())); + backend.add_policy(TtlPolicy::new( Arc::new(OptionalValueTtlProvider::new( Some(TTL_NON_EXISTING), Some(TTL_EXISTING), @@ -113,7 +116,7 @@ impl NamespaceCache { // add to memory pool let backend = Box::new(LruBackend::new( - backend as _, + Box::new(backend), Arc::clone(&ram_pool), CACHE_ID, Arc::new(FunctionEstimator::new( diff --git a/querier/src/cache/processed_tombstones.rs b/querier/src/cache/processed_tombstones.rs index 5bb3a0e501..8a9c7dfcff 100644 --- a/querier/src/cache/processed_tombstones.rs +++ b/querier/src/cache/processed_tombstones.rs @@ -4,8 +4,11 @@ use backoff::{Backoff, BackoffConfig}; use cache_system::{ backend::{ lru::{LruBackend, ResourcePool}, + policy::{ + ttl::{TtlPolicy, TtlProvider}, + PolicyBackend, + }, resource_consumption::FunctionEstimator, - ttl::{TtlBackend, TtlProvider}, }, cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache}, loader::{metrics::MetricsLoader, FunctionLoader}, @@ -79,16 +82,15 @@ impl ProcessedTombstonesCache { testing, )); - let backend = Box::new(HashMap::new()); - let backend = Box::new(TtlBackend::new( - backend, + let mut backend = PolicyBackend::new(Box::new(HashMap::new())); + backend.add_policy(TtlPolicy::new( Arc::new(KeepExistsForever {}), Arc::clone(&time_provider), CACHE_ID, metric_registry, )); let backend = Box::new(LruBackend::new( - backend, + Box::new(backend), ram_pool, CACHE_ID, Arc::new(FunctionEstimator::new(|k, v| { diff --git a/querier/src/cache/table.rs b/querier/src/cache/table.rs index 1fbb00af67..cc2d3b8347 100644 --- a/querier/src/cache/table.rs +++ b/querier/src/cache/table.rs @@ -4,8 +4,11 @@ use backoff::{Backoff, BackoffConfig}; use cache_system::{ backend::{ lru::{LruBackend, ResourcePool}, + policy::{ + ttl::{OptionalValueTtlProvider, TtlPolicy}, + PolicyBackend, + }, resource_consumption::FunctionEstimator, - ttl::{OptionalValueTtlProvider, TtlBackend}, }, cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache}, loader::{metrics::MetricsLoader, FunctionLoader}, @@ -76,8 +79,8 @@ impl TableCache { testing, )); - let backend = Box::new(TtlBackend::new( - Box::new(HashMap::new()), + let mut backend = PolicyBackend::new(Box::new(HashMap::new())); + backend.add_policy(TtlPolicy::new( Arc::new(OptionalValueTtlProvider::new(Some(TTL_NON_EXISTING), None)), Arc::clone(&time_provider), CACHE_ID, @@ -86,7 +89,7 @@ impl TableCache { // add to memory pool let backend = Box::new(LruBackend::new( - backend, + Box::new(backend), Arc::clone(&ram_pool), CACHE_ID, Arc::new(FunctionEstimator::new(|k, v: &Option>| { diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index ba02faa831..48403e17b3 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -36,6 +36,7 @@ hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] } hex = { version = "0.4", features = ["alloc", "std"] } indexmap = { version = "1", default-features = false, features = ["std"] } libc = { version = "0.2", features = ["extra_traits", "std"] } +lock_api = { version = "0.4", default-features = false, features = ["arc_lock"] } log = { version = "0.4", default-features = false, features = ["std"] } md-5 = { version = "0.10", features = ["std"] } memchr = { version = "2", features = ["std"] } @@ -44,6 +45,7 @@ num-integer = { version = "0.1", default-features = false, features = ["i128", " num-traits = { version = "0.2", features = ["i128", "libm", "std"] } object_store = { git = "https://github.com/tustvold/arrow-rs.git", rev = "0f9d902aae0c0ce07e3f81d06d621b5189ad9240", default-features = false, features = ["aws", "azure", "azure_core", "azure_storage", "azure_storage_blobs", "base64", "gcp", "quick-xml", "rand", "reqwest", "ring", "rustls-pemfile", "serde", "serde_json"] } once_cell = { version = "1", features = ["alloc", "parking_lot", "parking_lot_core", "race", "std"] } +parking_lot = { version = "0.12", features = ["arc_lock"] } parquet = { version = "20", features = ["arrow", "async", "base64", "brotli", "experimental", "flate2", "futures", "lz4", "snap", "tokio", "zstd"] } predicates = { version = "2", features = ["diff", "difflib", "float-cmp", "normalize-line-endings", "regex"] } prost = { version = "0.11", features = ["prost-derive", "std"] } @@ -97,12 +99,14 @@ heck = { version = "0.4", features = ["unicode", "unicode-segmentation"] } hex = { version = "0.4", features = ["alloc", "std"] } indexmap = { version = "1", default-features = false, features = ["std"] } libc = { version = "0.2", features = ["extra_traits", "std"] } +lock_api = { version = "0.4", default-features = false, features = ["arc_lock"] } log = { version = "0.4", default-features = false, features = ["std"] } md-5 = { version = "0.10", features = ["std"] } memchr = { version = "2", features = ["std"] } nom = { version = "7", features = ["alloc", "std"] } num-traits = { version = "0.2", features = ["i128", "libm", "std"] } once_cell = { version = "1", features = ["alloc", "parking_lot", "parking_lot_core", "race", "std"] } +parking_lot = { version = "0.12", features = ["arc_lock"] } prost = { version = "0.11", features = ["prost-derive", "std"] } prost-types = { version = "0.11", features = ["std"] } rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }