refactor: improve "remove if" cache checks (#5673)
* refactor: use concrete backend type in `ChangeRequest` * refactor: "remove if"-checks shall NOT count as "used" * test: improve docs Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>pull/24376/head
parent
84e8a4ac41
commit
e0ad5e4c20
|
|
@ -17,6 +17,7 @@ use super::{
|
|||
test_util::{TestLoader, TestRefreshDurationProvider},
|
||||
RefreshPolicy,
|
||||
},
|
||||
remove_if::{RemoveIfHandle, RemoveIfPolicy},
|
||||
ttl::{test_util::TestTtlProvider, TtlPolicy},
|
||||
PolicyBackend,
|
||||
};
|
||||
|
|
@ -208,6 +209,37 @@ async fn test_lru_learns_about_ttl_evictions() {
|
|||
assert_eq!(backend.get(&3), Some(String::from("c")));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_remove_if_check_does_not_extend_lifetime() {
|
||||
let TestStateLruAndRemoveIf {
|
||||
mut backend,
|
||||
size_estimator,
|
||||
time_provider,
|
||||
remove_if_handle,
|
||||
..
|
||||
} = TestStateLruAndRemoveIf::new().await;
|
||||
|
||||
size_estimator.mock_size(1, String::from("a"), TestSize(4));
|
||||
size_estimator.mock_size(2, String::from("b"), TestSize(4));
|
||||
size_estimator.mock_size(3, String::from("c"), TestSize(4));
|
||||
|
||||
backend.set(1, String::from("a"));
|
||||
time_provider.inc(Duration::from_secs(1));
|
||||
|
||||
backend.set(2, String::from("b"));
|
||||
time_provider.inc(Duration::from_secs(1));
|
||||
|
||||
// Checking remove_if should not count as a "use" of 1
|
||||
// for the "least recently used" calculation
|
||||
remove_if_handle.remove_if(&1, |_| false);
|
||||
backend.set(3, String::from("c"));
|
||||
|
||||
// adding "c" totals 12 size, but backend has room for only 10
|
||||
// so "least recently used" (in this case 1, not 2) should be removed
|
||||
assert_eq!(backend.get(&1), None);
|
||||
assert!(backend.get(&2).is_some());
|
||||
}
|
||||
|
||||
/// Test setup that integrates the TTL policy with a refresh.
|
||||
struct TestStateTtlAndRefresh {
|
||||
backend: PolicyBackend<u8, String>,
|
||||
|
|
@ -357,6 +389,49 @@ impl TestStateTtlAndLRU {
|
|||
}
|
||||
}
|
||||
|
||||
/// Test setup that integrates the LRU policy with RemoveIf and max size of 10
|
||||
struct TestStateLruAndRemoveIf {
|
||||
backend: PolicyBackend<u8, String>,
|
||||
time_provider: Arc<MockProvider>,
|
||||
size_estimator: Arc<TestSizeEstimator>,
|
||||
remove_if_handle: RemoveIfHandle<u8, String>,
|
||||
}
|
||||
|
||||
impl TestStateLruAndRemoveIf {
|
||||
async fn new() -> Self {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = Arc::new(metric::Registry::new());
|
||||
let size_estimator = Arc::new(TestSizeEstimator::default());
|
||||
|
||||
let mut backend = PolicyBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&time_provider) as _,
|
||||
);
|
||||
|
||||
let pool = Arc::new(ResourcePool::new(
|
||||
"my_pool",
|
||||
TestSize(10),
|
||||
Arc::clone(&metric_registry),
|
||||
));
|
||||
backend.add_policy(LruPolicy::new(
|
||||
Arc::clone(&pool),
|
||||
"my_cache",
|
||||
Arc::clone(&size_estimator) as _,
|
||||
));
|
||||
|
||||
let (constructor, remove_if_handle) =
|
||||
RemoveIfPolicy::create_constructor_and_handle("my_cache", &metric_registry);
|
||||
backend.add_policy(constructor);
|
||||
|
||||
Self {
|
||||
backend,
|
||||
time_provider,
|
||||
size_estimator,
|
||||
remove_if_handle,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct TestSizeEstimator {
|
||||
sizes: Mutex<HashMap<(u8, String), TestSize>>,
|
||||
|
|
|
|||
|
|
@ -517,7 +517,7 @@ where
|
|||
/// locked during a single request, so "get + modify" patterns work out of the box without the need to fair interleaving modifications.
|
||||
pub fn from_fn<F>(f: F) -> Self
|
||||
where
|
||||
F: for<'b> FnOnce(&'b mut dyn CacheBackend<K = K, V = V>) + 'a,
|
||||
F: for<'b> FnOnce(&'b mut Recorder<K, V>) + 'a,
|
||||
{
|
||||
Self { fun: Box::new(f) }
|
||||
}
|
||||
|
|
@ -553,14 +553,13 @@ where
|
|||
}
|
||||
|
||||
/// Execute this change request.
|
||||
pub fn eval(self, backend: &mut dyn CacheBackend<K = K, V = V>) {
|
||||
pub fn eval(self, backend: &mut Recorder<K, V>) {
|
||||
(self.fun)(backend)
|
||||
}
|
||||
}
|
||||
|
||||
/// Function captured within [`ChangeRequest`].
|
||||
type ChangeRequestFn<'a, K, V> =
|
||||
Box<dyn for<'b> FnOnce(&'b mut dyn CacheBackend<K = K, V = V>) + 'a>;
|
||||
type ChangeRequestFn<'a, K, V> = Box<dyn for<'b> FnOnce(&'b mut Recorder<K, V>) + 'a>;
|
||||
|
||||
/// Records of interactions with the callback [`CacheBackend`].
|
||||
#[derive(Debug, PartialEq)]
|
||||
|
|
@ -590,7 +589,7 @@ enum Record<K, V> {
|
|||
/// Specialized [`CacheBackend`] that forwards changes and requests to the underlying backend of [`PolicyBackend`] but
|
||||
/// also records all changes into [`Record`]s.
|
||||
#[derive(Debug)]
|
||||
struct Recorder<K, V>
|
||||
pub struct Recorder<K, V>
|
||||
where
|
||||
K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
|
||||
V: Clone + Debug + Send + 'static,
|
||||
|
|
@ -599,6 +598,22 @@ where
|
|||
records: Vec<Record<K, V>>,
|
||||
}
|
||||
|
||||
impl<K, V> Recorder<K, V>
|
||||
where
|
||||
K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
|
||||
V: Clone + Debug + Send + 'static,
|
||||
{
|
||||
/// Perform a [`GET`](CacheBackend::get) request that is NOT seen by other policies.
|
||||
///
|
||||
/// This is helpful if you just want to check the underlying data of a key without treating it as "used".
|
||||
///
|
||||
/// Note that this functionality only exists for [`GET`](CacheBackend::get) requests, not for modifying requests
|
||||
/// like [`SET`](CacheBackend::set) or [`REMOVE`](CacheBackend::remove) since they always require policies to be in-sync.
|
||||
pub fn get_untracked(&mut self, k: &K) -> Option<V> {
|
||||
self.inner.lock().get(k)
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, V> CacheBackend for Recorder<K, V>
|
||||
where
|
||||
K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
|
||||
|
|
@ -895,6 +910,28 @@ mod tests {
|
|||
backend.remove(&String::from("a"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_untracked() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()), time_provider);
|
||||
backend.add_policy(create_test_policy(vec![
|
||||
TestStep {
|
||||
condition: TestBackendInteraction::Set {
|
||||
k: String::from("a"),
|
||||
v: 1,
|
||||
},
|
||||
action: TestAction::ChangeRequests(vec![SendableChangeRequest::from_fn(
|
||||
|backend| {
|
||||
assert_eq!(backend.get_untracked(&String::from("a")), Some(1));
|
||||
},
|
||||
)]),
|
||||
},
|
||||
// NO `GET` interaction triggered here!
|
||||
]));
|
||||
|
||||
backend.set(String::from("a"), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_basic_get_set() {
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
|
|
@ -1854,7 +1891,7 @@ mod tests {
|
|||
|
||||
/// Same as [`ChangeRequestFn`] but implements `Send`.
|
||||
type SendableChangeRequestFn =
|
||||
Box<dyn for<'a> FnOnce(&'a mut dyn CacheBackend<K = String, V = usize>) + Send + 'static>;
|
||||
Box<dyn for<'a> FnOnce(&'a mut Recorder<String, usize>) + Send + 'static>;
|
||||
|
||||
/// Same as [`ChangeRequest`] but implements `Send`.
|
||||
struct SendableChangeRequest {
|
||||
|
|
@ -1871,7 +1908,7 @@ mod tests {
|
|||
impl SendableChangeRequest {
|
||||
fn from_fn<F>(f: F) -> Self
|
||||
where
|
||||
F: for<'b> FnOnce(&'b mut dyn CacheBackend<K = String, V = usize>) + Send + 'static,
|
||||
F: for<'b> FnOnce(&'b mut Recorder<String, usize>) + Send + 'static,
|
||||
{
|
||||
Self { fun: Box::new(f) }
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ use tokio_util::sync::CancellationToken;
|
|||
|
||||
use crate::loader::Loader;
|
||||
|
||||
use super::{CallbackHandle, ChangeRequest, Subscriber};
|
||||
use super::{CacheBackend, CallbackHandle, ChangeRequest, Subscriber};
|
||||
|
||||
/// Interface to provide refresh duration for a key-value pair.
|
||||
pub trait RefreshDurationProvider: std::fmt::Debug + Send + Sync + 'static {
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ use metric::U64Counter;
|
|||
use parking_lot::Mutex;
|
||||
use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc};
|
||||
|
||||
use super::{CallbackHandle, ChangeRequest, Subscriber};
|
||||
use super::{CacheBackend, CallbackHandle, ChangeRequest, Subscriber};
|
||||
|
||||
/// Allows explicitly removing entries from the cache.
|
||||
#[derive(Debug, Clone)]
|
||||
|
|
@ -109,7 +109,7 @@ where
|
|||
let removed_captured = &mut removed;
|
||||
let k = k.clone();
|
||||
handle.execute_requests(vec![ChangeRequest::from_fn(move |backend| {
|
||||
if let Some(v) = backend.get(&k) {
|
||||
if let Some(v) = backend.get_untracked(&k) {
|
||||
if predicate(v) {
|
||||
metric_removed_by_predicate.inc(1);
|
||||
backend.remove(&k);
|
||||
|
|
|
|||
Loading…
Reference in New Issue