diff --git a/cache_system/src/backend/policy/lru.rs b/cache_system/src/backend/policy/lru.rs index 226205bec5..eab2359485 100644 --- a/cache_system/src/backend/policy/lru.rs +++ b/cache_system/src/backend/policy/lru.rs @@ -190,14 +190,16 @@ //! | | | //! V (mutex) | //! .~~~~~~~~~~~~~~~. .~~~~~~~~~~~. | .~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~. -//! -->: PolicyBackend :--->: LruPolicy : (arc) : PoolMemberImpl : : PoolMember : +//! -->: PolicyBackend :--->: LruPolicy : | : PoolMemberImpl : : PoolMember : //! : : : : | : : : : -//! : : : :------+------: :<--(dyn)---: : +//! : : : : +------: :<--(dyn)---: : //! .~~~~~~~~~~~~~~~. .~~~~~~~~~~~. .~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~. -//! | ^ -//! (arc) | -//! | | -//! V | +//! | | ^ ^ +//! | | | | +//! | +--------------------------------------(arc)-----+ | +//! (arc) | +//! | (weak) +//! V | //! .~~~~~~~~~~~~~~. .~~~~~~~~~~~~~. //! ---------------------->: ResourcePool :-----+-------(arc)--------------------->: SharedState : //! : : | : : @@ -300,7 +302,7 @@ use std::{ collections::{btree_map::Entry, BTreeMap}, fmt::Debug, hash::Hash, - sync::Arc, + sync::{Arc, Weak}, }; use iox_time::Time; @@ -362,12 +364,35 @@ where current: Mutex>, /// Members (= backends) that use this pool. - members: Mutex>>>, + members: Mutex>>>, /// Notification when [`current`](Self::current) as changed. change_notify: Notify, } +impl SharedState +where + S: Resource, +{ + /// Get current members. + /// + /// This also performs a clean-up. + fn members(&self) -> BTreeMap<&'static str, Arc>> { + let mut members = self.members.lock(); + let mut out = BTreeMap::new(); + + members.retain(|id, member| match member.upgrade() { + Some(member) => { + out.insert(*id, member); + true + } + None => false, + }); + + out + } +} + /// Resource pool. /// /// This can be used with [`LruPolicy`]. @@ -455,29 +480,23 @@ where /// /// # Panic /// Panics when a member with the specific ID is already registered. - fn register_member(&self, id: &'static str, member: Box>) { + fn register_member(&self, id: &'static str, member: Weak>) { let mut members = self.shared.members.lock(); match members.entry(id) { Entry::Vacant(v) => { v.insert(member); } - Entry::Occupied(o) => { - panic!("Member '{}' already registered", o.key()); + Entry::Occupied(mut o) => { + if o.get().strong_count() > 0 { + panic!("Member '{}' already registered", o.key()); + } else { + *o.get_mut() = member; + } } } } - /// Unregister pool member. - /// - /// # Panic - /// Panics when the member with the specified ID is unknown (or was already unregistered). - fn unregister_member(&self, id: &str) { - let mut members = self.shared.members.lock(); - - assert!(members.remove(id).is_some(), "Member '{id}' unknown"); - } - /// Add used resource from pool. fn add(&self, s: S) { let mut current = self.shared.current.lock(); @@ -518,21 +537,15 @@ where V: Clone + Debug + Send + 'static, S: Resource, { - /// Pool member ID. - id: &'static str, - /// Link to central resource pool. pool: Arc>, + /// Pool member + member: Arc>, + /// Resource estimator that is used for new (via [`SET`](Subscriber::set)) entries. resource_estimator: Arc>, - /// Tracks when an element was used last. - /// - /// This is shared with [`PoolMemberImpl`], because [`clean_up_loop`] uses it via [`PoolMember::could_remove`] to - /// select victims for LRU evictions. - last_used: Arc>>, - /// Count number of elements within this specific pool member. metric_count: U64Gauge, @@ -584,23 +597,19 @@ where move |mut callback_handle| { callback_handle.execute_requests(vec![ChangeRequest::ensure_empty()]); - let last_used = Arc::new(Mutex::new(AddressableHeap::new())); - - pool.register_member( + let member = Arc::new(PoolMemberImpl { id, - Box::new(PoolMemberImpl { - id, - last_used: Arc::clone(&last_used), - metric_evicted, - callback_handle: Mutex::new(callback_handle), - }), - ); + last_used: Mutex::new(AddressableHeap::new()), + metric_evicted, + callback_handle: Mutex::new(callback_handle), + }); + + pool.register_member(id, Arc::downgrade(&member) as _); Self { - id, pool, + member, resource_estimator, - last_used, metric_count, metric_usage, } @@ -615,7 +624,15 @@ where S: Resource, { fn drop(&mut self) { - self.pool.unregister_member(self.id); + let size_total = { + let mut guard = self.member.last_used.lock(); + let mut accu = S::zero(); + while let Some((_k, s, _t)) = guard.pop() { + accu = accu + s; + } + accu + }; + self.pool.remove(size_total); } } @@ -630,7 +647,7 @@ where fn get(&mut self, k: &Self::K, now: Time) -> Vec> { trace!(?k, now = now.timestamp(), "LRU get",); - let mut last_used = self.last_used.lock(); + let mut last_used = self.member.last_used.lock(); // update "last used" last_used.update_order(k, now); @@ -659,7 +676,7 @@ where } { - let mut last_used = self.last_used.lock(); + let mut last_used = self.member.last_used.lock(); // maybe clean from pool if let Some((consumption, last_used_t_previously)) = last_used.remove(k) { @@ -686,7 +703,7 @@ where fn remove(&mut self, k: &Self::K, now: Time) -> Vec> { trace!(?k, now = now.timestamp(), "LRU remove",); - let mut last_used = self.last_used.lock(); + let mut last_used = self.member.last_used.lock(); if let Some((consumption, _last_used)) = last_used.remove(k) { self.pool.remove(consumption); @@ -702,7 +719,7 @@ where /// /// The only implementation of this is [`PoolMemberImpl`]. This indirection is required to erase `K` and `V` from specific /// backend so we can stick it into the generic pool. -trait PoolMember: Debug + Send + 'static { +trait PoolMember: Debug + Send + Sync + 'static { /// Resource type. type S; @@ -739,7 +756,7 @@ where /// Tracks usage of the last used elements. /// /// See documentation of [`callback_handle`](Self::callback_handle) for a reasoning about locking. - last_used: Arc>>, + last_used: Mutex>, /// Handle to call back into the [`PolicyBackend`] to evict data. /// @@ -839,10 +856,9 @@ async fn clean_up_loop( } } - // hold member lock - // this is OK since this is only modified when new members are added. The members itself do NOT interact with - // this value. - let members = shared.members.lock(); + // receive members + // Do NOT hold the member lock during the deletion later because this can lead to deadlocks during shutdown. + let members = shared.members(); // select victims let mut victims: BTreeMap<&'static str, Vec>> = Default::default(); @@ -866,9 +882,7 @@ async fn clean_up_loop( } for (id, keys) in victims { - let member = members - .get(id) - .expect("did not drop the lock in the meantime"); + let member = members.get(id).expect("did get this ID from this map"); member.remove_keys(keys); } } @@ -956,15 +970,21 @@ mod tests { "id", Arc::clone(&resource_estimator) as _, )); + backend1.set(String::from("a"), 1usize); + assert_eq!(pool.current(), TestSize(1)); // drop the backend so re-registering the same ID ("id") MUST NOT panic drop(backend1); + assert_eq!(pool.current(), TestSize(0)); + let mut backend2 = PolicyBackend::hashmap_backed(time_provider); backend2.add_policy(LruPolicy::new( Arc::clone(&pool), "id", Arc::clone(&resource_estimator) as _, )); + backend2.set(String::from("a"), 2usize); + assert_eq!(pool.current(), TestSize(2)); } #[tokio::test] @@ -1606,9 +1626,18 @@ mod tests { }); } - /// Regression test for . #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_deadlock() { + // Regression test for . + test_deadlock_inner(Duration::from_secs(1)).await; + + // Regression test for + for _ in 0..100 { + test_deadlock_inner(Duration::from_millis(1)).await; + } + } + + async fn test_deadlock_inner(test_duration: Duration) { #[derive(Debug)] struct OneSizeProvider {} @@ -1662,7 +1691,7 @@ mod tests { } }); - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(test_duration).await; worker1.abort(); worker2.abort();