From e0ad5e4c204b6828684bd762cb4bea2edcd005b4 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 20 Sep 2022 13:26:07 +0200 Subject: [PATCH] refactor: improve "remove if" cache checks (#5673) * refactor: use concrete backend type in `ChangeRequest` * refactor: "remove if"-checks shall NOT count as "used" * test: improve docs Co-authored-by: Andrew Lamb Co-authored-by: Andrew Lamb --- .../src/backend/policy/integration_tests.rs | 75 +++++++++++++++++++ cache_system/src/backend/policy/mod.rs | 51 +++++++++++-- cache_system/src/backend/policy/refresh.rs | 2 +- cache_system/src/backend/policy/remove_if.rs | 4 +- 4 files changed, 122 insertions(+), 10 deletions(-) diff --git a/cache_system/src/backend/policy/integration_tests.rs b/cache_system/src/backend/policy/integration_tests.rs index 8f35f92d72..6aab5512cd 100644 --- a/cache_system/src/backend/policy/integration_tests.rs +++ b/cache_system/src/backend/policy/integration_tests.rs @@ -17,6 +17,7 @@ use super::{ test_util::{TestLoader, TestRefreshDurationProvider}, RefreshPolicy, }, + remove_if::{RemoveIfHandle, RemoveIfPolicy}, ttl::{test_util::TestTtlProvider, TtlPolicy}, PolicyBackend, }; @@ -208,6 +209,37 @@ async fn test_lru_learns_about_ttl_evictions() { assert_eq!(backend.get(&3), Some(String::from("c"))); } +#[tokio::test] +async fn test_remove_if_check_does_not_extend_lifetime() { + let TestStateLruAndRemoveIf { + mut backend, + size_estimator, + time_provider, + remove_if_handle, + .. + } = TestStateLruAndRemoveIf::new().await; + + size_estimator.mock_size(1, String::from("a"), TestSize(4)); + size_estimator.mock_size(2, String::from("b"), TestSize(4)); + size_estimator.mock_size(3, String::from("c"), TestSize(4)); + + backend.set(1, String::from("a")); + time_provider.inc(Duration::from_secs(1)); + + backend.set(2, String::from("b")); + time_provider.inc(Duration::from_secs(1)); + + // Checking remove_if should not count as a "use" of 1 + // for the "least recently used" calculation + remove_if_handle.remove_if(&1, |_| false); + backend.set(3, String::from("c")); + + // adding "c" totals 12 size, but backend has room for only 10 + // so "least recently used" (in this case 1, not 2) should be removed + assert_eq!(backend.get(&1), None); + assert!(backend.get(&2).is_some()); +} + /// Test setup that integrates the TTL policy with a refresh. struct TestStateTtlAndRefresh { backend: PolicyBackend, @@ -357,6 +389,49 @@ impl TestStateTtlAndLRU { } } +/// Test setup that integrates the LRU policy with RemoveIf and max size of 10 +struct TestStateLruAndRemoveIf { + backend: PolicyBackend, + time_provider: Arc, + size_estimator: Arc, + remove_if_handle: RemoveIfHandle, +} + +impl TestStateLruAndRemoveIf { + async fn new() -> Self { + let time_provider = Arc::new(MockProvider::new(Time::MIN)); + let metric_registry = Arc::new(metric::Registry::new()); + let size_estimator = Arc::new(TestSizeEstimator::default()); + + let mut backend = PolicyBackend::new( + Box::new(HashMap::::new()), + Arc::clone(&time_provider) as _, + ); + + let pool = Arc::new(ResourcePool::new( + "my_pool", + TestSize(10), + Arc::clone(&metric_registry), + )); + backend.add_policy(LruPolicy::new( + Arc::clone(&pool), + "my_cache", + Arc::clone(&size_estimator) as _, + )); + + let (constructor, remove_if_handle) = + RemoveIfPolicy::create_constructor_and_handle("my_cache", &metric_registry); + backend.add_policy(constructor); + + Self { + backend, + time_provider, + size_estimator, + remove_if_handle, + } + } +} + #[derive(Debug, Default)] struct TestSizeEstimator { sizes: Mutex>, diff --git a/cache_system/src/backend/policy/mod.rs b/cache_system/src/backend/policy/mod.rs index dca074eba0..b686d2be69 100644 --- a/cache_system/src/backend/policy/mod.rs +++ b/cache_system/src/backend/policy/mod.rs @@ -517,7 +517,7 @@ where /// locked during a single request, so "get + modify" patterns work out of the box without the need to fair interleaving modifications. pub fn from_fn(f: F) -> Self where - F: for<'b> FnOnce(&'b mut dyn CacheBackend) + 'a, + F: for<'b> FnOnce(&'b mut Recorder) + 'a, { Self { fun: Box::new(f) } } @@ -553,14 +553,13 @@ where } /// Execute this change request. - pub fn eval(self, backend: &mut dyn CacheBackend) { + pub fn eval(self, backend: &mut Recorder) { (self.fun)(backend) } } /// Function captured within [`ChangeRequest`]. -type ChangeRequestFn<'a, K, V> = - Box FnOnce(&'b mut dyn CacheBackend) + 'a>; +type ChangeRequestFn<'a, K, V> = Box FnOnce(&'b mut Recorder) + 'a>; /// Records of interactions with the callback [`CacheBackend`]. #[derive(Debug, PartialEq)] @@ -590,7 +589,7 @@ enum Record { /// Specialized [`CacheBackend`] that forwards changes and requests to the underlying backend of [`PolicyBackend`] but /// also records all changes into [`Record`]s. #[derive(Debug)] -struct Recorder +pub struct Recorder where K: Clone + Eq + Hash + Ord + Debug + Send + 'static, V: Clone + Debug + Send + 'static, @@ -599,6 +598,22 @@ where records: Vec>, } +impl Recorder +where + K: Clone + Eq + Hash + Ord + Debug + Send + 'static, + V: Clone + Debug + Send + 'static, +{ + /// Perform a [`GET`](CacheBackend::get) request that is NOT seen by other policies. + /// + /// This is helpful if you just want to check the underlying data of a key without treating it as "used". + /// + /// Note that this functionality only exists for [`GET`](CacheBackend::get) requests, not for modifying requests + /// like [`SET`](CacheBackend::set) or [`REMOVE`](CacheBackend::remove) since they always require policies to be in-sync. + pub fn get_untracked(&mut self, k: &K) -> Option { + self.inner.lock().get(k) + } +} + impl CacheBackend for Recorder where K: Clone + Eq + Hash + Ord + Debug + Send + 'static, @@ -895,6 +910,28 @@ mod tests { backend.remove(&String::from("a")); } + #[test] + fn test_get_untracked() { + let time_provider = Arc::new(MockProvider::new(Time::MIN)); + let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider); + backend.add_policy(create_test_policy(vec![ + TestStep { + condition: TestBackendInteraction::Set { + k: String::from("a"), + v: 1, + }, + action: TestAction::ChangeRequests(vec![SendableChangeRequest::from_fn( + |backend| { + assert_eq!(backend.get_untracked(&String::from("a")), Some(1)); + }, + )]), + }, + // NO `GET` interaction triggered here! + ])); + + backend.set(String::from("a"), 1); + } + #[test] fn test_basic_get_set() { let time_provider = Arc::new(MockProvider::new(Time::MIN)); @@ -1854,7 +1891,7 @@ mod tests { /// Same as [`ChangeRequestFn`] but implements `Send`. type SendableChangeRequestFn = - Box FnOnce(&'a mut dyn CacheBackend) + Send + 'static>; + Box FnOnce(&'a mut Recorder) + Send + 'static>; /// Same as [`ChangeRequest`] but implements `Send`. struct SendableChangeRequest { @@ -1871,7 +1908,7 @@ mod tests { impl SendableChangeRequest { fn from_fn(f: F) -> Self where - F: for<'b> FnOnce(&'b mut dyn CacheBackend) + Send + 'static, + F: for<'b> FnOnce(&'b mut Recorder) + Send + 'static, { Self { fun: Box::new(f) } } diff --git a/cache_system/src/backend/policy/refresh.rs b/cache_system/src/backend/policy/refresh.rs index 26bfcf9339..5504f28340 100644 --- a/cache_system/src/backend/policy/refresh.rs +++ b/cache_system/src/backend/policy/refresh.rs @@ -16,7 +16,7 @@ use tokio_util::sync::CancellationToken; use crate::loader::Loader; -use super::{CallbackHandle, ChangeRequest, Subscriber}; +use super::{CacheBackend, CallbackHandle, ChangeRequest, Subscriber}; /// Interface to provide refresh duration for a key-value pair. pub trait RefreshDurationProvider: std::fmt::Debug + Send + Sync + 'static { diff --git a/cache_system/src/backend/policy/remove_if.rs b/cache_system/src/backend/policy/remove_if.rs index 6c54e28dc0..9eb7625961 100644 --- a/cache_system/src/backend/policy/remove_if.rs +++ b/cache_system/src/backend/policy/remove_if.rs @@ -3,7 +3,7 @@ use metric::U64Counter; use parking_lot::Mutex; use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc}; -use super::{CallbackHandle, ChangeRequest, Subscriber}; +use super::{CacheBackend, CallbackHandle, ChangeRequest, Subscriber}; /// Allows explicitly removing entries from the cache. #[derive(Debug, Clone)] @@ -109,7 +109,7 @@ where let removed_captured = &mut removed; let k = k.clone(); handle.execute_requests(vec![ChangeRequest::from_fn(move |backend| { - if let Some(v) = backend.get(&k) { + if let Some(v) = backend.get_untracked(&k) { if predicate(v) { metric_removed_by_predicate.inc(1); backend.remove(&k);