refactor: port TTL backend to policy framework (#5396)
* refactor: port TTL backend to policy framework Note that this is "just" a port, it does NOT change how TTL works. This will be done in #5318. Helps with #5320. * fix: ensure inner backend is empty * test: add some smoke testpull/24376/head
parent
b2caf54b3a
commit
0ccefa0d0c
|
@ -5940,6 +5940,7 @@ dependencies = [
|
|||
"hex",
|
||||
"indexmap",
|
||||
"libc",
|
||||
"lock_api",
|
||||
"log",
|
||||
"md-5",
|
||||
"memchr",
|
||||
|
@ -5948,6 +5949,7 @@ dependencies = [
|
|||
"num-traits",
|
||||
"object_store",
|
||||
"once_cell",
|
||||
"parking_lot 0.12.1",
|
||||
"parquet",
|
||||
"predicates",
|
||||
"prost 0.11.0",
|
||||
|
|
|
@ -9,7 +9,7 @@ futures = "0.3"
|
|||
iox_time = { path = "../iox_time" }
|
||||
metric = { path = "../metric" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parking_lot = "0.12"
|
||||
parking_lot = { version = "0.12", features = ["arc_lock"] }
|
||||
pdatastructs = { version = "0.7", default-features = false, features = ["fixedbitset"] }
|
||||
tokio = { version = "1.20", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
|
||||
trace = { path = "../trace"}
|
||||
|
|
|
@ -7,7 +7,6 @@ pub mod lru;
|
|||
pub mod policy;
|
||||
pub mod resource_consumption;
|
||||
pub mod shared;
|
||||
pub mod ttl;
|
||||
|
||||
#[cfg(test)]
|
||||
mod test_util;
|
||||
|
|
|
@ -5,13 +5,17 @@ use std::{
|
|||
collections::VecDeque,
|
||||
fmt::Debug,
|
||||
hash::Hash,
|
||||
marker::PhantomData,
|
||||
ops::Deref,
|
||||
sync::{Arc, Weak},
|
||||
};
|
||||
|
||||
use parking_lot::{Mutex, ReentrantMutex};
|
||||
use parking_lot::{lock_api::ArcMutexGuard, Mutex, RawMutex, ReentrantMutex};
|
||||
|
||||
use super::CacheBackend;
|
||||
|
||||
pub mod ttl;
|
||||
|
||||
/// Convenience macro to easily follow the borrow/lock chain of [`StrongSharedInner`].
|
||||
///
|
||||
/// This cannot just be a method because we cannot return references to local variables.
|
||||
|
@ -266,6 +270,19 @@ where
|
|||
lock_inner!(mut guard = self.inner);
|
||||
guard.subscribers.push(Box::new(subscriber));
|
||||
}
|
||||
|
||||
/// Provide temporary read-only access to the underlying backend.
|
||||
///
|
||||
/// This is mostly useful for debugging and testing.
|
||||
pub fn inner_ref(&mut self) -> InnerBackendRef<'_, K, V> {
|
||||
// NOTE: We deliberately use a mutable reference here to prevent users from using `<Self as CacheBackend>` while
|
||||
// we hold a lock to the underlying backend.
|
||||
lock_inner!(guard = self.inner);
|
||||
InnerBackendRef {
|
||||
inner: guard.inner.lock_arc(),
|
||||
_phantom: PhantomData::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, V> CacheBackend for PolicyBackend<K, V>
|
||||
|
@ -476,6 +493,15 @@ where
|
|||
})
|
||||
}
|
||||
|
||||
/// Ensure that backends is empty and panic otherwise.
|
||||
///
|
||||
/// This is mostly useful during initialization.
|
||||
pub fn ensure_empty() -> Self {
|
||||
Self::from_fn(|backend| {
|
||||
assert!(backend.is_empty(), "inner backend is not empty");
|
||||
})
|
||||
}
|
||||
|
||||
/// Execute this change request.
|
||||
fn eval(self, backend: &mut dyn CacheBackend<K = K, V = V>) {
|
||||
(self.fun)(backend)
|
||||
|
@ -558,6 +584,47 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Read-only ref to the inner backend of [`PolicyBackend`] for debugging.
|
||||
pub struct InnerBackendRef<'a, K, V>
|
||||
where
|
||||
K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
|
||||
V: Clone + Debug + Send + 'static,
|
||||
{
|
||||
inner: ArcMutexGuard<RawMutex, Box<dyn CacheBackend<K = K, V = V>>>,
|
||||
_phantom: PhantomData<&'a mut ()>,
|
||||
}
|
||||
|
||||
// Workaround for <https://github.com/rust-lang/rust/issues/100573>.
|
||||
impl<'a, K, V> Drop for InnerBackendRef<'a, K, V>
|
||||
where
|
||||
K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
|
||||
V: Clone + Debug + Send + 'static,
|
||||
{
|
||||
fn drop(&mut self) {}
|
||||
}
|
||||
|
||||
impl<'a, K, V> Debug for InnerBackendRef<'a, K, V>
|
||||
where
|
||||
K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
|
||||
V: Clone + Debug + Send + 'static,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("InnerBackendRef").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, K, V> Deref for InnerBackendRef<'a, K, V>
|
||||
where
|
||||
K: Clone + Eq + Hash + Ord + Debug + Send + 'static,
|
||||
V: Clone + Debug + Send + 'static,
|
||||
{
|
||||
type Target = dyn CacheBackend<K = K, V = V>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.inner.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{collections::HashMap, sync::Barrier, thread::JoinHandle};
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
//! Time-to-live handling.
|
||||
use std::{any::Any, fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc, time::Duration};
|
||||
use std::{fmt::Debug, hash::Hash, marker::PhantomData, sync::Arc, time::Duration};
|
||||
|
||||
use iox_time::{Time, TimeProvider};
|
||||
use metric::U64Counter;
|
||||
|
||||
use super::{addressable_heap::AddressableHeap, CacheBackend};
|
||||
use crate::backend::addressable_heap::AddressableHeap;
|
||||
|
||||
use super::{CallbackHandle, ChangeRequest, Subscriber};
|
||||
|
||||
/// Interface to provide TTL (time to live) data for a key-value pair.
|
||||
pub trait TtlProvider: std::fmt::Debug + Send + Sync + 'static {
|
||||
|
@ -104,45 +106,36 @@ impl<K, V> TtlProvider for OptionalValueTtlProvider<K, V> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Cache backend that implements Time To Life.
|
||||
/// Cache policy that implements Time To Life.
|
||||
///
|
||||
/// # Cache Eviction
|
||||
/// Every method ([`get`](CacheBackend::get), [`set`](CacheBackend::set), [`remove`](CacheBackend::remove)) causes the
|
||||
/// Every method ([`get`](Subscriber::get), [`set`](Subscriber::set), [`remove`](Subscriber::remove)) causes the
|
||||
/// cache to check for expired keys. This may lead to certain delays, esp. when dropping the contained values takes a
|
||||
/// long time.
|
||||
#[derive(Debug)]
|
||||
pub struct TtlBackend<K, V>
|
||||
pub struct TtlPolicy<K, V>
|
||||
where
|
||||
K: Clone + Eq + Debug + Hash + Ord + Send + 'static,
|
||||
V: Clone + Debug + Send + 'static,
|
||||
{
|
||||
inner_backend: Box<dyn CacheBackend<K = K, V = V>>,
|
||||
ttl_provider: Arc<dyn TtlProvider<K = K, V = V>>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
expiration: AddressableHeap<K, (), Time>,
|
||||
metric_expired: U64Counter,
|
||||
}
|
||||
|
||||
impl<K, V> TtlBackend<K, V>
|
||||
impl<K, V> TtlPolicy<K, V>
|
||||
where
|
||||
K: Clone + Eq + Debug + Hash + Ord + Send + 'static,
|
||||
V: Clone + Debug + Send + 'static,
|
||||
{
|
||||
/// Create new backend w/o any known keys.
|
||||
///
|
||||
/// The inner backend MUST NOT contain any data at this point, otherwise we will not track any TTLs for these entries.
|
||||
///
|
||||
/// # Panic
|
||||
/// If the inner backend is not empty.
|
||||
/// Create new TTL policy.
|
||||
pub fn new(
|
||||
inner_backend: Box<dyn CacheBackend<K = K, V = V>>,
|
||||
ttl_provider: Arc<dyn TtlProvider<K = K, V = V>>,
|
||||
time_provider: Arc<dyn TimeProvider>,
|
||||
name: &'static str,
|
||||
metric_registry: &metric::Registry,
|
||||
) -> Self {
|
||||
assert!(inner_backend.is_empty(), "inner backend is not empty");
|
||||
|
||||
) -> impl FnOnce(CallbackHandle<K, V>) -> Self {
|
||||
let metric_expired = metric_registry
|
||||
.register_metric::<U64Counter>(
|
||||
"cache_ttl_expired",
|
||||
|
@ -150,16 +143,21 @@ where
|
|||
)
|
||||
.recorder(&[("name", name)]);
|
||||
|
||||
Self {
|
||||
inner_backend,
|
||||
ttl_provider,
|
||||
time_provider,
|
||||
expiration: Default::default(),
|
||||
metric_expired,
|
||||
|mut callback_handle| {
|
||||
callback_handle.init_requests(vec![ChangeRequest::ensure_empty()]);
|
||||
|
||||
Self {
|
||||
ttl_provider,
|
||||
time_provider,
|
||||
expiration: Default::default(),
|
||||
metric_expired,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn evict_expired(&mut self, now: Time) {
|
||||
fn evict_expired(&mut self, now: Time) -> Vec<ChangeRequest<K, V>> {
|
||||
let mut requests = vec![];
|
||||
|
||||
while self
|
||||
.expiration
|
||||
.peek()
|
||||
|
@ -168,24 +166,14 @@ where
|
|||
{
|
||||
let (k, _, _t) = self.expiration.pop().unwrap();
|
||||
self.metric_expired.inc(1);
|
||||
self.inner_backend.remove(&k);
|
||||
requests.push(ChangeRequest::remove(k));
|
||||
}
|
||||
}
|
||||
|
||||
/// Reference to inner backend.
|
||||
#[allow(dead_code)]
|
||||
pub fn inner_backend(&self) -> &dyn CacheBackend<K = K, V = V> {
|
||||
self.inner_backend.as_ref()
|
||||
}
|
||||
|
||||
/// Reference to TTL provider.
|
||||
#[allow(dead_code)]
|
||||
pub fn ttl_provider(&self) -> &Arc<dyn TtlProvider<K = K, V = V>> {
|
||||
&self.ttl_provider
|
||||
requests
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, V> CacheBackend for TtlBackend<K, V>
|
||||
impl<K, V> Subscriber for TtlPolicy<K, V>
|
||||
where
|
||||
K: Clone + Eq + Debug + Hash + Ord + Send + 'static,
|
||||
V: Clone + Debug + Send + 'static,
|
||||
|
@ -193,18 +181,18 @@ where
|
|||
type K = K;
|
||||
type V = V;
|
||||
|
||||
fn get(&mut self, k: &Self::K) -> Option<Self::V> {
|
||||
self.evict_expired(self.time_provider.now());
|
||||
|
||||
self.inner_backend.get(k)
|
||||
fn get(&mut self, _k: &Self::K) -> Vec<ChangeRequest<Self::K, Self::V>> {
|
||||
self.evict_expired(self.time_provider.now())
|
||||
}
|
||||
|
||||
fn set(&mut self, k: Self::K, v: Self::V) {
|
||||
fn set(&mut self, k: Self::K, v: Self::V) -> Vec<ChangeRequest<Self::K, Self::V>> {
|
||||
let now = self.time_provider.now();
|
||||
self.evict_expired(now);
|
||||
let mut requests = self.evict_expired(now);
|
||||
|
||||
let should_store = if let Some(ttl) = self.ttl_provider.expires_in(&k, &v) {
|
||||
let should_store = !ttl.is_zero();
|
||||
if let Some(ttl) = self.ttl_provider.expires_in(&k, &v) {
|
||||
if ttl.is_zero() {
|
||||
requests.push(ChangeRequest::remove(k.clone()));
|
||||
}
|
||||
|
||||
match now.checked_add(ttl) {
|
||||
Some(t) => {
|
||||
|
@ -215,33 +203,17 @@ where
|
|||
self.expiration.remove(&k);
|
||||
}
|
||||
}
|
||||
|
||||
should_store
|
||||
} else {
|
||||
// Still need to ensure that any current expiration is disabled
|
||||
self.expiration.remove(&k);
|
||||
|
||||
true
|
||||
};
|
||||
|
||||
if should_store {
|
||||
self.inner_backend.set(k, v);
|
||||
}
|
||||
requests
|
||||
}
|
||||
|
||||
fn remove(&mut self, k: &Self::K) {
|
||||
self.evict_expired(self.time_provider.now());
|
||||
|
||||
self.inner_backend.remove(k);
|
||||
fn remove(&mut self, k: &Self::K) -> Vec<ChangeRequest<Self::K, Self::V>> {
|
||||
self.expiration.remove(k);
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
self.inner_backend.is_empty()
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self as &dyn Any
|
||||
self.evict_expired(self.time_provider.now())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -253,6 +225,8 @@ mod tests {
|
|||
use metric::{Observation, RawReporter};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use crate::backend::{policy::PolicyBackend, CacheBackend};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
|
@ -271,17 +245,33 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn test_expires_single() {
|
||||
#[should_panic(expected = "inner backend is not empty")]
|
||||
fn test_panic_inner_not_empty() {
|
||||
let ttl_provider = Arc::new(TestTtlProvider::new());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
let mut backend = TtlBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::<u8, String>::new()));
|
||||
let constructor = TtlPolicy::new(
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
);
|
||||
backend.add_policy(|mut handle| {
|
||||
handle.init_requests(vec![ChangeRequest::set(1, String::from("foo"))]);
|
||||
constructor(handle)
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_expires_single() {
|
||||
let TestState {
|
||||
mut backend,
|
||||
metric_registry,
|
||||
ttl_provider,
|
||||
time_provider,
|
||||
} = TestState::new();
|
||||
|
||||
assert_eq!(get_expired_metric(&metric_registry), 0);
|
||||
|
||||
|
@ -304,13 +294,13 @@ mod tests {
|
|||
|
||||
// init time provider at MAX!
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MAX));
|
||||
let mut backend = TtlBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::<u8, String>::new()));
|
||||
backend.add_policy(TtlPolicy::new(
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
);
|
||||
));
|
||||
|
||||
ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::MAX));
|
||||
backend.set(1, String::from("a"));
|
||||
|
@ -319,16 +309,12 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_never_expire() {
|
||||
let ttl_provider = Arc::new(TestTtlProvider::new());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
let mut backend = TtlBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
);
|
||||
let TestState {
|
||||
mut backend,
|
||||
ttl_provider,
|
||||
time_provider,
|
||||
..
|
||||
} = TestState::new();
|
||||
|
||||
ttl_provider.set_expires_in(1, String::from("a"), None);
|
||||
backend.set(1, String::from("a"));
|
||||
|
@ -340,16 +326,12 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_expiration_uses_key_and_value() {
|
||||
let ttl_provider = Arc::new(TestTtlProvider::new());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
let mut backend = TtlBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
);
|
||||
let TestState {
|
||||
mut backend,
|
||||
ttl_provider,
|
||||
time_provider,
|
||||
..
|
||||
} = TestState::new();
|
||||
|
||||
ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1)));
|
||||
ttl_provider.set_expires_in(1, String::from("b"), Some(Duration::from_secs(4)));
|
||||
|
@ -362,16 +344,12 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_override_with_different_expiration() {
|
||||
let ttl_provider = Arc::new(TestTtlProvider::new());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
let mut backend = TtlBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
);
|
||||
let TestState {
|
||||
mut backend,
|
||||
ttl_provider,
|
||||
time_provider,
|
||||
..
|
||||
} = TestState::new();
|
||||
|
||||
ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1)));
|
||||
backend.set(1, String::from("a"));
|
||||
|
@ -386,16 +364,12 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_override_with_no_expiration() {
|
||||
let ttl_provider = Arc::new(TestTtlProvider::new());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
let mut backend = TtlBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
);
|
||||
let TestState {
|
||||
mut backend,
|
||||
ttl_provider,
|
||||
time_provider,
|
||||
..
|
||||
} = TestState::new();
|
||||
|
||||
ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1)));
|
||||
backend.set(1, String::from("a"));
|
||||
|
@ -415,13 +389,13 @@ mod tests {
|
|||
|
||||
// init time provider at nearly MAX!
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MAX - Duration::from_secs(2)));
|
||||
let mut backend = TtlBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::<u8, String>::new()));
|
||||
backend.add_policy(TtlPolicy::new(
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
);
|
||||
));
|
||||
|
||||
ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1)));
|
||||
backend.set(1, String::from("a"));
|
||||
|
@ -436,16 +410,12 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_readd_with_different_expiration() {
|
||||
let ttl_provider = Arc::new(TestTtlProvider::new());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
let mut backend = TtlBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
);
|
||||
let TestState {
|
||||
mut backend,
|
||||
ttl_provider,
|
||||
time_provider,
|
||||
..
|
||||
} = TestState::new();
|
||||
|
||||
ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1)));
|
||||
backend.set(1, String::from("a"));
|
||||
|
@ -461,16 +431,12 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_readd_with_no_expiration() {
|
||||
let ttl_provider = Arc::new(TestTtlProvider::new());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
let mut backend = TtlBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
);
|
||||
let TestState {
|
||||
mut backend,
|
||||
ttl_provider,
|
||||
time_provider,
|
||||
..
|
||||
} = TestState::new();
|
||||
|
||||
ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1)));
|
||||
backend.set(1, String::from("a"));
|
||||
|
@ -486,16 +452,12 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_update_cleans_multiple_keys() {
|
||||
let ttl_provider = Arc::new(TestTtlProvider::new());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
let mut backend = TtlBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
);
|
||||
let TestState {
|
||||
mut backend,
|
||||
ttl_provider,
|
||||
time_provider,
|
||||
..
|
||||
} = TestState::new();
|
||||
|
||||
ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1)));
|
||||
ttl_provider.set_expires_in(2, String::from("b"), Some(Duration::from_secs(2)));
|
||||
|
@ -512,15 +474,19 @@ mod tests {
|
|||
|
||||
time_provider.inc(Duration::from_secs(2));
|
||||
assert_eq!(backend.get(&1), None);
|
||||
let inner_backend = backend
|
||||
.inner_backend()
|
||||
.as_any()
|
||||
.downcast_ref::<HashMap<u8, String>>()
|
||||
.unwrap();
|
||||
assert!(!inner_backend.contains_key(&1));
|
||||
assert!(!inner_backend.contains_key(&2));
|
||||
assert!(!inner_backend.contains_key(&3));
|
||||
assert!(inner_backend.contains_key(&4));
|
||||
|
||||
{
|
||||
let inner_ref = backend.inner_ref();
|
||||
let inner_backend = inner_ref
|
||||
.as_any()
|
||||
.downcast_ref::<HashMap<u8, String>>()
|
||||
.unwrap();
|
||||
assert!(!inner_backend.contains_key(&1));
|
||||
assert!(!inner_backend.contains_key(&2));
|
||||
assert!(!inner_backend.contains_key(&3));
|
||||
assert!(inner_backend.contains_key(&4));
|
||||
}
|
||||
|
||||
assert_eq!(backend.get(&2), None);
|
||||
assert_eq!(backend.get(&3), None);
|
||||
assert_eq!(backend.get(&4), Some(String::from("d")));
|
||||
|
@ -528,16 +494,12 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_remove_expired_key() {
|
||||
let ttl_provider = Arc::new(TestTtlProvider::new());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
let mut backend = TtlBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
);
|
||||
let TestState {
|
||||
mut backend,
|
||||
ttl_provider,
|
||||
time_provider,
|
||||
..
|
||||
} = TestState::new();
|
||||
|
||||
ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1)));
|
||||
backend.set(1, String::from("a"));
|
||||
|
@ -550,16 +512,12 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_expire_removed_key() {
|
||||
let ttl_provider = Arc::new(TestTtlProvider::new());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
let mut backend = TtlBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
);
|
||||
let TestState {
|
||||
mut backend,
|
||||
ttl_provider,
|
||||
time_provider,
|
||||
..
|
||||
} = TestState::new();
|
||||
|
||||
ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(1)));
|
||||
ttl_provider.set_expires_in(2, String::from("b"), Some(Duration::from_secs(2)));
|
||||
|
@ -574,60 +532,36 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_expire_immediately() {
|
||||
let ttl_provider = Arc::new(TestTtlProvider::new());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
let mut backend = TtlBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
);
|
||||
let TestState {
|
||||
mut backend,
|
||||
ttl_provider,
|
||||
..
|
||||
} = TestState::new();
|
||||
|
||||
ttl_provider.set_expires_in(1, String::from("a"), Some(Duration::from_secs(0)));
|
||||
backend.set(1, String::from("a"));
|
||||
|
||||
let inner_backend = backend
|
||||
.inner_backend()
|
||||
.as_any()
|
||||
.downcast_ref::<HashMap<u8, String>>()
|
||||
.unwrap();
|
||||
assert!(inner_backend.is_empty());
|
||||
assert!(backend.is_empty());
|
||||
|
||||
assert_eq!(backend.get(&1), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "inner backend is not empty")]
|
||||
fn test_panic_inner_not_empty() {
|
||||
let ttl_provider = Arc::new(TestTtlProvider::new());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
TtlBackend::new(
|
||||
Box::new(HashMap::<u8, String>::from([(1, String::from("a"))])),
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_generic() {
|
||||
fn test_generic_backend() {
|
||||
use crate::backend::test_util::test_generic;
|
||||
|
||||
test_generic(|| {
|
||||
let ttl_provider = Arc::new(NeverTtlProvider::default());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
TtlBackend::new(
|
||||
Box::new(HashMap::<u8, String>::new()),
|
||||
ttl_provider,
|
||||
time_provider,
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::<u8, String>::new()));
|
||||
backend.add_policy(TtlPolicy::new(
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
)
|
||||
));
|
||||
backend
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -661,6 +595,36 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
struct TestState {
|
||||
backend: PolicyBackend<u8, String>,
|
||||
metric_registry: metric::Registry,
|
||||
ttl_provider: Arc<TestTtlProvider>,
|
||||
time_provider: Arc<MockProvider>,
|
||||
}
|
||||
|
||||
impl TestState {
|
||||
fn new() -> Self {
|
||||
let ttl_provider = Arc::new(TestTtlProvider::new());
|
||||
let time_provider = Arc::new(MockProvider::new(Time::MIN));
|
||||
let metric_registry = metric::Registry::new();
|
||||
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::<u8, String>::new()));
|
||||
backend.add_policy(TtlPolicy::new(
|
||||
Arc::clone(&ttl_provider) as _,
|
||||
Arc::clone(&time_provider) as _,
|
||||
"my_cache",
|
||||
&metric_registry,
|
||||
));
|
||||
|
||||
Self {
|
||||
backend,
|
||||
metric_registry,
|
||||
ttl_provider,
|
||||
time_provider,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_expired_metric(metric_registry: &metric::Registry) -> u64 {
|
||||
let mut reporter = RawReporter::default();
|
||||
metric_registry.report(&mut reporter);
|
|
@ -4,9 +4,12 @@ use backoff::{Backoff, BackoffConfig};
|
|||
use cache_system::{
|
||||
backend::{
|
||||
lru::{LruBackend, ResourcePool},
|
||||
policy::{
|
||||
ttl::{OptionalValueTtlProvider, TtlPolicy},
|
||||
PolicyBackend,
|
||||
},
|
||||
resource_consumption::FunctionEstimator,
|
||||
shared::SharedBackend,
|
||||
ttl::{OptionalValueTtlProvider, TtlBackend},
|
||||
},
|
||||
cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache},
|
||||
loader::{metrics::MetricsLoader, FunctionLoader},
|
||||
|
@ -100,8 +103,8 @@ impl NamespaceCache {
|
|||
testing,
|
||||
));
|
||||
|
||||
let backend = Box::new(TtlBackend::new(
|
||||
Box::new(HashMap::new()),
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()));
|
||||
backend.add_policy(TtlPolicy::new(
|
||||
Arc::new(OptionalValueTtlProvider::new(
|
||||
Some(TTL_NON_EXISTING),
|
||||
Some(TTL_EXISTING),
|
||||
|
@ -113,7 +116,7 @@ impl NamespaceCache {
|
|||
|
||||
// add to memory pool
|
||||
let backend = Box::new(LruBackend::new(
|
||||
backend as _,
|
||||
Box::new(backend),
|
||||
Arc::clone(&ram_pool),
|
||||
CACHE_ID,
|
||||
Arc::new(FunctionEstimator::new(
|
||||
|
|
|
@ -4,8 +4,11 @@ use backoff::{Backoff, BackoffConfig};
|
|||
use cache_system::{
|
||||
backend::{
|
||||
lru::{LruBackend, ResourcePool},
|
||||
policy::{
|
||||
ttl::{TtlPolicy, TtlProvider},
|
||||
PolicyBackend,
|
||||
},
|
||||
resource_consumption::FunctionEstimator,
|
||||
ttl::{TtlBackend, TtlProvider},
|
||||
},
|
||||
cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache},
|
||||
loader::{metrics::MetricsLoader, FunctionLoader},
|
||||
|
@ -79,16 +82,15 @@ impl ProcessedTombstonesCache {
|
|||
testing,
|
||||
));
|
||||
|
||||
let backend = Box::new(HashMap::new());
|
||||
let backend = Box::new(TtlBackend::new(
|
||||
backend,
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()));
|
||||
backend.add_policy(TtlPolicy::new(
|
||||
Arc::new(KeepExistsForever {}),
|
||||
Arc::clone(&time_provider),
|
||||
CACHE_ID,
|
||||
metric_registry,
|
||||
));
|
||||
let backend = Box::new(LruBackend::new(
|
||||
backend,
|
||||
Box::new(backend),
|
||||
ram_pool,
|
||||
CACHE_ID,
|
||||
Arc::new(FunctionEstimator::new(|k, v| {
|
||||
|
|
|
@ -4,8 +4,11 @@ use backoff::{Backoff, BackoffConfig};
|
|||
use cache_system::{
|
||||
backend::{
|
||||
lru::{LruBackend, ResourcePool},
|
||||
policy::{
|
||||
ttl::{OptionalValueTtlProvider, TtlPolicy},
|
||||
PolicyBackend,
|
||||
},
|
||||
resource_consumption::FunctionEstimator,
|
||||
ttl::{OptionalValueTtlProvider, TtlBackend},
|
||||
},
|
||||
cache::{driver::CacheDriver, metrics::CacheWithMetrics, Cache},
|
||||
loader::{metrics::MetricsLoader, FunctionLoader},
|
||||
|
@ -76,8 +79,8 @@ impl TableCache {
|
|||
testing,
|
||||
));
|
||||
|
||||
let backend = Box::new(TtlBackend::new(
|
||||
Box::new(HashMap::new()),
|
||||
let mut backend = PolicyBackend::new(Box::new(HashMap::new()));
|
||||
backend.add_policy(TtlPolicy::new(
|
||||
Arc::new(OptionalValueTtlProvider::new(Some(TTL_NON_EXISTING), None)),
|
||||
Arc::clone(&time_provider),
|
||||
CACHE_ID,
|
||||
|
@ -86,7 +89,7 @@ impl TableCache {
|
|||
|
||||
// add to memory pool
|
||||
let backend = Box::new(LruBackend::new(
|
||||
backend,
|
||||
Box::new(backend),
|
||||
Arc::clone(&ram_pool),
|
||||
CACHE_ID,
|
||||
Arc::new(FunctionEstimator::new(|k, v: &Option<Arc<CachedTable>>| {
|
||||
|
|
|
@ -36,6 +36,7 @@ hashbrown = { version = "0.12", features = ["ahash", "inline-more", "raw"] }
|
|||
hex = { version = "0.4", features = ["alloc", "std"] }
|
||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
libc = { version = "0.2", features = ["extra_traits", "std"] }
|
||||
lock_api = { version = "0.4", default-features = false, features = ["arc_lock"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
md-5 = { version = "0.10", features = ["std"] }
|
||||
memchr = { version = "2", features = ["std"] }
|
||||
|
@ -44,6 +45,7 @@ num-integer = { version = "0.1", default-features = false, features = ["i128", "
|
|||
num-traits = { version = "0.2", features = ["i128", "libm", "std"] }
|
||||
object_store = { git = "https://github.com/tustvold/arrow-rs.git", rev = "0f9d902aae0c0ce07e3f81d06d621b5189ad9240", default-features = false, features = ["aws", "azure", "azure_core", "azure_storage", "azure_storage_blobs", "base64", "gcp", "quick-xml", "rand", "reqwest", "ring", "rustls-pemfile", "serde", "serde_json"] }
|
||||
once_cell = { version = "1", features = ["alloc", "parking_lot", "parking_lot_core", "race", "std"] }
|
||||
parking_lot = { version = "0.12", features = ["arc_lock"] }
|
||||
parquet = { version = "20", features = ["arrow", "async", "base64", "brotli", "experimental", "flate2", "futures", "lz4", "snap", "tokio", "zstd"] }
|
||||
predicates = { version = "2", features = ["diff", "difflib", "float-cmp", "normalize-line-endings", "regex"] }
|
||||
prost = { version = "0.11", features = ["prost-derive", "std"] }
|
||||
|
@ -97,12 +99,14 @@ heck = { version = "0.4", features = ["unicode", "unicode-segmentation"] }
|
|||
hex = { version = "0.4", features = ["alloc", "std"] }
|
||||
indexmap = { version = "1", default-features = false, features = ["std"] }
|
||||
libc = { version = "0.2", features = ["extra_traits", "std"] }
|
||||
lock_api = { version = "0.4", default-features = false, features = ["arc_lock"] }
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
md-5 = { version = "0.10", features = ["std"] }
|
||||
memchr = { version = "2", features = ["std"] }
|
||||
nom = { version = "7", features = ["alloc", "std"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm", "std"] }
|
||||
once_cell = { version = "1", features = ["alloc", "parking_lot", "parking_lot_core", "race", "std"] }
|
||||
parking_lot = { version = "0.12", features = ["arc_lock"] }
|
||||
prost = { version = "0.11", features = ["prost-derive", "std"] }
|
||||
prost-types = { version = "0.11", features = ["std"] }
|
||||
rand = { version = "0.8", features = ["alloc", "getrandom", "libc", "rand_chacha", "rand_hc", "small_rng", "std", "std_rng"] }
|
||||
|
|
Loading…
Reference in New Issue