fix: avoid recursive locking during LRU shutdown (#8395)

* test: regression test for #8378

* fix: avoid recursive locking during LRU shutdown

Fixes the following construct during shutdown:

1. `clean_up_loop` holds `members` lock
2. calls `member.remove_keys`
3. `CallbackHandle::execute_requests` requests upgrades weak ref and gets lock
4. other thread drops last external reference to pool member, the
   upgraded weak ref from (3) is now the last strong ref
5. `CallbackHandle::execute_requests` finishes, drops pool member
6. dropping that pool member calls `ResourcePool::unregister_member`
   which is the same lock as we got in (1) => deadlock

We now just avoid modifying `members` during shutdown and just hold a
weak ref there. As a side effect, the `last_used` addressable heap moves
around a bit an is no longer `Arc`ed (see updated diagram).

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-08-02 16:42:33 +02:00 committed by GitHub
parent 3a2d41df47
commit 3969b40925
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 86 additions and 57 deletions

View File

@ -190,14 +190,16 @@
//! | | |
//! V (mutex) |
//! .~~~~~~~~~~~~~~~. .~~~~~~~~~~~. | .~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~.
//! -->: PolicyBackend :--->: LruPolicy : (arc) : PoolMemberImpl : : PoolMember :
//! -->: PolicyBackend :--->: LruPolicy : | : PoolMemberImpl : : PoolMember :
//! : <K, V> : : <K, V, S> : | : <K, V, S> : : <S> :
//! : : : :------+------: :<--(dyn)---: :
//! : : : : +------: :<--(dyn)---: :
//! .~~~~~~~~~~~~~~~. .~~~~~~~~~~~. .~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~.
//! | ^
//! (arc) |
//! | |
//! V |
//! | | ^ ^
//! | | | |
//! | +--------------------------------------(arc)-----+ |
//! (arc) |
//! | (weak)
//! V |
//! .~~~~~~~~~~~~~~. .~~~~~~~~~~~~~.
//! ---------------------->: ResourcePool :-----+-------(arc)--------------------->: SharedState :
//! : <S> : | : <S> :
@ -300,7 +302,7 @@ use std::{
collections::{btree_map::Entry, BTreeMap},
fmt::Debug,
hash::Hash,
sync::Arc,
sync::{Arc, Weak},
};
use iox_time::Time;
@ -362,12 +364,35 @@ where
current: Mutex<MeasuredT<S>>,
/// Members (= backends) that use this pool.
members: Mutex<BTreeMap<&'static str, Box<dyn PoolMember<S = S>>>>,
members: Mutex<BTreeMap<&'static str, Weak<dyn PoolMember<S = S>>>>,
/// Notification when [`current`](Self::current) as changed.
change_notify: Notify,
}
impl<S> SharedState<S>
where
S: Resource,
{
/// Get current members.
///
/// This also performs a clean-up.
fn members(&self) -> BTreeMap<&'static str, Arc<dyn PoolMember<S = S>>> {
let mut members = self.members.lock();
let mut out = BTreeMap::new();
members.retain(|id, member| match member.upgrade() {
Some(member) => {
out.insert(*id, member);
true
}
None => false,
});
out
}
}
/// Resource pool.
///
/// This can be used with [`LruPolicy`].
@ -455,29 +480,23 @@ where
///
/// # Panic
/// Panics when a member with the specific ID is already registered.
fn register_member(&self, id: &'static str, member: Box<dyn PoolMember<S = S>>) {
fn register_member(&self, id: &'static str, member: Weak<dyn PoolMember<S = S>>) {
let mut members = self.shared.members.lock();
match members.entry(id) {
Entry::Vacant(v) => {
v.insert(member);
}
Entry::Occupied(o) => {
panic!("Member '{}' already registered", o.key());
Entry::Occupied(mut o) => {
if o.get().strong_count() > 0 {
panic!("Member '{}' already registered", o.key());
} else {
*o.get_mut() = member;
}
}
}
}
/// Unregister pool member.
///
/// # Panic
/// Panics when the member with the specified ID is unknown (or was already unregistered).
fn unregister_member(&self, id: &str) {
let mut members = self.shared.members.lock();
assert!(members.remove(id).is_some(), "Member '{id}' unknown");
}
/// Add used resource from pool.
fn add(&self, s: S) {
let mut current = self.shared.current.lock();
@ -518,21 +537,15 @@ where
V: Clone + Debug + Send + 'static,
S: Resource,
{
/// Pool member ID.
id: &'static str,
/// Link to central resource pool.
pool: Arc<ResourcePool<S>>,
/// Pool member
member: Arc<PoolMemberImpl<K, V, S>>,
/// Resource estimator that is used for new (via [`SET`](Subscriber::set)) entries.
resource_estimator: Arc<dyn ResourceEstimator<K = K, V = V, S = S>>,
/// Tracks when an element was used last.
///
/// This is shared with [`PoolMemberImpl`], because [`clean_up_loop`] uses it via [`PoolMember::could_remove`] to
/// select victims for LRU evictions.
last_used: Arc<Mutex<AddressableHeap<K, S, Time>>>,
/// Count number of elements within this specific pool member.
metric_count: U64Gauge,
@ -584,23 +597,19 @@ where
move |mut callback_handle| {
callback_handle.execute_requests(vec![ChangeRequest::ensure_empty()]);
let last_used = Arc::new(Mutex::new(AddressableHeap::new()));
pool.register_member(
let member = Arc::new(PoolMemberImpl {
id,
Box::new(PoolMemberImpl {
id,
last_used: Arc::clone(&last_used),
metric_evicted,
callback_handle: Mutex::new(callback_handle),
}),
);
last_used: Mutex::new(AddressableHeap::new()),
metric_evicted,
callback_handle: Mutex::new(callback_handle),
});
pool.register_member(id, Arc::downgrade(&member) as _);
Self {
id,
pool,
member,
resource_estimator,
last_used,
metric_count,
metric_usage,
}
@ -615,7 +624,15 @@ where
S: Resource,
{
fn drop(&mut self) {
self.pool.unregister_member(self.id);
let size_total = {
let mut guard = self.member.last_used.lock();
let mut accu = S::zero();
while let Some((_k, s, _t)) = guard.pop() {
accu = accu + s;
}
accu
};
self.pool.remove(size_total);
}
}
@ -630,7 +647,7 @@ where
fn get(&mut self, k: &Self::K, now: Time) -> Vec<ChangeRequest<'static, Self::K, Self::V>> {
trace!(?k, now = now.timestamp(), "LRU get",);
let mut last_used = self.last_used.lock();
let mut last_used = self.member.last_used.lock();
// update "last used"
last_used.update_order(k, now);
@ -659,7 +676,7 @@ where
}
{
let mut last_used = self.last_used.lock();
let mut last_used = self.member.last_used.lock();
// maybe clean from pool
if let Some((consumption, last_used_t_previously)) = last_used.remove(k) {
@ -686,7 +703,7 @@ where
fn remove(&mut self, k: &Self::K, now: Time) -> Vec<ChangeRequest<'static, Self::K, Self::V>> {
trace!(?k, now = now.timestamp(), "LRU remove",);
let mut last_used = self.last_used.lock();
let mut last_used = self.member.last_used.lock();
if let Some((consumption, _last_used)) = last_used.remove(k) {
self.pool.remove(consumption);
@ -702,7 +719,7 @@ where
///
/// The only implementation of this is [`PoolMemberImpl`]. This indirection is required to erase `K` and `V` from specific
/// backend so we can stick it into the generic pool.
trait PoolMember: Debug + Send + 'static {
trait PoolMember: Debug + Send + Sync + 'static {
/// Resource type.
type S;
@ -739,7 +756,7 @@ where
/// Tracks usage of the last used elements.
///
/// See documentation of [`callback_handle`](Self::callback_handle) for a reasoning about locking.
last_used: Arc<Mutex<AddressableHeap<K, S, Time>>>,
last_used: Mutex<AddressableHeap<K, S, Time>>,
/// Handle to call back into the [`PolicyBackend`] to evict data.
///
@ -839,10 +856,9 @@ async fn clean_up_loop<S>(
}
}
// hold member lock
// this is OK since this is only modified when new members are added. The members itself do NOT interact with
// this value.
let members = shared.members.lock();
// receive members
// Do NOT hold the member lock during the deletion later because this can lead to deadlocks during shutdown.
let members = shared.members();
// select victims
let mut victims: BTreeMap<&'static str, Vec<Box<dyn Any>>> = Default::default();
@ -866,9 +882,7 @@ async fn clean_up_loop<S>(
}
for (id, keys) in victims {
let member = members
.get(id)
.expect("did not drop the lock in the meantime");
let member = members.get(id).expect("did get this ID from this map");
member.remove_keys(keys);
}
}
@ -956,15 +970,21 @@ mod tests {
"id",
Arc::clone(&resource_estimator) as _,
));
backend1.set(String::from("a"), 1usize);
assert_eq!(pool.current(), TestSize(1));
// drop the backend so re-registering the same ID ("id") MUST NOT panic
drop(backend1);
assert_eq!(pool.current(), TestSize(0));
let mut backend2 = PolicyBackend::hashmap_backed(time_provider);
backend2.add_policy(LruPolicy::new(
Arc::clone(&pool),
"id",
Arc::clone(&resource_estimator) as _,
));
backend2.set(String::from("a"), 2usize);
assert_eq!(pool.current(), TestSize(2));
}
#[tokio::test]
@ -1606,9 +1626,18 @@ mod tests {
});
}
/// Regression test for <https://github.com/influxdata/influxdb_iox/issues/8334>.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_deadlock() {
// Regression test for <https://github.com/influxdata/influxdb_iox/issues/8334>.
test_deadlock_inner(Duration::from_secs(1)).await;
// Regression test for <https://github.com/influxdata/influxdb_iox/issues/8378>
for _ in 0..100 {
test_deadlock_inner(Duration::from_millis(1)).await;
}
}
async fn test_deadlock_inner(test_duration: Duration) {
#[derive(Debug)]
struct OneSizeProvider {}
@ -1662,7 +1691,7 @@ mod tests {
}
});
tokio::time::sleep(Duration::from_secs(1)).await;
tokio::time::sleep(test_duration).await;
worker1.abort();
worker2.abort();