diff --git a/.circleci/config.yml b/.circleci/config.yml index f4b52aabfa..a0463521cd 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -450,7 +450,7 @@ jobs: RUST_VERSION="$(sed -E -ne 's/channel = "(.*)"/\1/p' rust-toolchain.toml)" docker buildx build \ - --build-arg FEATURES="aws,gcp,azure,jemalloc_replacing_malloc,tokio_console" \ + --build-arg FEATURES="aws,gcp,azure,jemalloc_replacing_malloc,tokio_console,pprof" \ --build-arg RUST_VERSION="$RUST_VERSION" \ --build-arg RUSTFLAGS="$RUSTFLAGS" \ --progress plain \ diff --git a/Cargo.lock b/Cargo.lock index a764deaeba..fdfc49e8f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -587,6 +587,22 @@ dependencies = [ "tokio", ] +[[package]] +name = "cache_system" +version = "0.1.0" +dependencies = [ + "async-trait", + "criterion", + "futures", + "iox_time", + "observability_deps", + "parking_lot 0.12.0", + "proptest", + "rand", + "tokio", + "workspace-hack", +] + [[package]] name = "cast" version = "0.2.7" @@ -4097,8 +4113,8 @@ dependencies = [ "assert_matches", "async-trait", "backoff 0.1.0", + "cache_system", "client_util", - "criterion", "data_types", "datafusion 0.1.0", "datafusion_util", @@ -4116,7 +4132,6 @@ dependencies = [ "parquet_file", "pin-project", "predicate", - "proptest", "query", "rand", "schema", @@ -6729,9 +6744,9 @@ dependencies = [ [[package]] name = "zstd-safe" -version = "5.0.1+zstd.1.5.2" +version = "5.0.2+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c12659121420dd6365c5c3de4901f97145b79651fb1d25814020ed2ed0585ae" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" dependencies = [ "libc", "zstd-sys", diff --git a/Cargo.toml b/Cargo.toml index a01213ba02..9fd0f04505 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "arrow_util", "backoff", + "cache_system", "clap_blocks", "client_util", "compactor", diff --git a/cache_system/Cargo.toml b/cache_system/Cargo.toml new file mode 100644 index 0000000000..1264c139bf --- /dev/null +++ b/cache_system/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "cache_system" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-trait = "0.1.53" +futures = "0.3" +iox_time = { path = "../iox_time" } +observability_deps = { path = "../observability_deps" } +parking_lot = "0.12" +tokio = { version = "1.18", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } +workspace-hack = { path = "../workspace-hack"} + +[dev-dependencies] +criterion = "0.3" +proptest = { version = "1", default_features = false, features = ["std"] } +rand = "0.8.3" + +[[bench]] +name = "addressable_heap" +harness = false diff --git a/querier/benches/addressable_heap.rs b/cache_system/benches/addressable_heap.rs similarity index 99% rename from querier/benches/addressable_heap.rs rename to cache_system/benches/addressable_heap.rs index 5656eb1aa3..0100647675 100644 --- a/querier/benches/addressable_heap.rs +++ b/cache_system/benches/addressable_heap.rs @@ -1,10 +1,10 @@ use std::mem::size_of; +use cache_system::backend::addressable_heap::AddressableHeap; use criterion::{ criterion_group, criterion_main, measurement::WallTime, AxisScale, BatchSize, BenchmarkGroup, BenchmarkId, Criterion, PlotConfiguration, SamplingMode, }; -use querier::AddressableHeap; use rand::{prelude::SliceRandom, thread_rng, Rng}; /// Payload (`V`) for testing. diff --git a/querier/src/cache_system/backend/addressable_heap.rs b/cache_system/src/backend/addressable_heap.rs similarity index 95% rename from querier/src/cache_system/backend/addressable_heap.rs rename to cache_system/src/backend/addressable_heap.rs index bb12a31efc..36acc4eb56 100644 --- a/querier/src/cache_system/backend/addressable_heap.rs +++ b/cache_system/src/backend/addressable_heap.rs @@ -1,3 +1,4 @@ +//! Implementation of an [`AddressableHeap`]. use std::{ collections::{HashMap, VecDeque}, hash::Hash, @@ -44,6 +45,14 @@ where } } + /// Check if the heap is empty. + pub fn is_empty(&self) -> bool { + let res1 = self.key_to_order_and_value.is_empty(); + let res2 = self.queue.is_empty(); + assert_eq!(res1, res2, "data structures out of sync"); + res1 + } + /// Insert element. /// /// If the element (compared by `K`) already exists, it will be returned. @@ -396,6 +405,10 @@ mod tests { Self { inner: Vec::new() } } + fn is_empty(&self) -> bool { + self.inner.is_empty() + } + fn insert(&mut self, k: u8, v: String, o: i8) -> Option<(String, i8)> { let res = self.remove(&k); self.inner.push((k, v, o)); @@ -441,6 +454,7 @@ mod tests { #[derive(Debug, Clone)] enum Action { + IsEmpty, Insert { k: u8, v: String, o: i8 }, Peek, Pop, @@ -451,6 +465,7 @@ mod tests { // Use a hand-rolled strategy instead of `proptest-derive`, because the latter one is quite a heavy dependency. fn action() -> impl Strategy { prop_oneof![ + Just(Action::IsEmpty), (any::(), ".*", any::()).prop_map(|(k, v, o)| Action::Insert { k, v, o }), Just(Action::Peek), Just(Action::Pop), @@ -467,6 +482,11 @@ mod tests { for action in actions { match action { + Action::IsEmpty => { + let res1 = heap.is_empty(); + let res2 = sim.is_empty(); + assert_eq!(res1, res2); + } Action::Insert{k, v, o} => { let res1 = heap.insert(k, v.clone(), o); let res2 = sim.insert(k, v, o); diff --git a/querier/src/cache_system/backend/dual.rs b/cache_system/src/backend/dual.rs similarity index 98% rename from querier/src/cache_system/backend/dual.rs rename to cache_system/src/backend/dual.rs index 02ef66db8b..d112903166 100644 --- a/querier/src/cache_system/backend/dual.rs +++ b/cache_system/src/backend/dual.rs @@ -1,3 +1,4 @@ +//! Cross-populated two caches. use std::{any::Any, fmt::Debug, hash::Hash, sync::Arc}; use parking_lot::Mutex; @@ -310,7 +311,7 @@ mod tests { #[test] fn test_generic1() { - use crate::cache_system::backend::test_util::test_generic; + use crate::backend::test_util::test_generic; test_generic(|| { let backend1 = Box::new(HashMap::::new()); @@ -328,7 +329,7 @@ mod tests { #[test] fn test_generic2() { - use crate::cache_system::backend::test_util::test_generic; + use crate::backend::test_util::test_generic; test_generic(|| { let backend1 = Box::new(HashMap::::new()); diff --git a/querier/src/cache_system/backend/hash_map.rs b/cache_system/src/backend/hash_map.rs similarity index 89% rename from querier/src/cache_system/backend/hash_map.rs rename to cache_system/src/backend/hash_map.rs index 0ccf0e6c69..cb3c3028da 100644 --- a/querier/src/cache_system/backend/hash_map.rs +++ b/cache_system/src/backend/hash_map.rs @@ -1,3 +1,4 @@ +//! Implements [`CacheBackend`] for [`HashMap`]. use std::{ any::Any, collections::HashMap, @@ -43,7 +44,7 @@ mod tests { #[test] fn test_generic() { - use crate::cache_system::backend::test_util::test_generic; + use crate::backend::test_util::test_generic; test_generic(HashMap::new); } diff --git a/cache_system/src/backend/lru.rs b/cache_system/src/backend/lru.rs new file mode 100644 index 0000000000..b89c31cfd4 --- /dev/null +++ b/cache_system/src/backend/lru.rs @@ -0,0 +1,1199 @@ +//! LRU (Least Recently Used) cache system. +//! +//! # Usage +//! +//! ``` +//! use std::{ +//! collections::HashMap, +//! ops::{Add, Sub}, +//! sync::Arc, +//! }; +//! use iox_time::SystemProvider; +//! use cache_system::backend::{ +//! CacheBackend, +//! lru::{LruBackend, ResourcePool}, +//! resource_consumption::{Resource, ResourceEstimator}, +//! }; +//! +//! // first we implement a strongly-typed RAM size measurement +//! #[derive(Debug, Clone, Copy, PartialEq, PartialOrd)] +//! struct RamSize(usize); +//! +//! impl Resource for RamSize { +//! fn zero() -> Self { +//! Self(0) +//! } +//! } +//! +//! impl Add for RamSize { +//! type Output = Self; +//! +//! fn add(self, rhs: Self) -> Self::Output { +//! Self(self.0.checked_add(rhs.0).expect("overflow")) +//! } +//! } +//! +//! impl Sub for RamSize { +//! type Output = Self; +//! +//! fn sub(self, rhs: Self) -> Self::Output { +//! Self(self.0.checked_sub(rhs.0).expect("underflow")) +//! } +//! } +//! +//! // a time provider is required to determine the age of entries +//! let time_provider = Arc::new(SystemProvider::new()); +//! +//! // set up a memory pool +//! let limit = RamSize(50); +//! let pool = Arc::new(ResourcePool::new(limit, time_provider)); +//! +//! // set up first pool user: a u64->String map +//! #[derive(Debug)] +//! struct Provider1 {} +//! +//! impl ResourceEstimator for Provider1 { +//! type K = u64; +//! type V = String; +//! type S = RamSize; +//! +//! fn consumption(&self, _k: &Self::K, v: &Self::V) -> Self::S { +//! RamSize(8) + RamSize(v.capacity()) +//! } +//! } +//! let mut backend1 = LruBackend::new( +//! Box::new(HashMap::new()), +//! Arc::clone(&pool), +//! String::from("id1"), +//! Arc::new(Provider1{}), +//! ); +//! +//! // add some data +//! backend1.set(1, String::from("some_entry")); +//! backend1.set(2, String::from("another_entry")); +//! assert_eq!(pool.current(), RamSize(39)); +//! +//! // only test first one +//! assert!(backend1.get(&1).is_some()); +//! +//! // fill up pool +//! backend1.set(3, String::from("this_will_evict_data")); +//! assert!(backend1.get(&1).is_some()); +//! assert!(backend1.get(&2).is_none()); +//! assert!(backend1.get(&3).is_some()); +//! assert_eq!(pool.current(), RamSize(46)); +//! +//! // set up second pool user with totally different types: a u8->Vec map +//! #[derive(Debug)] +//! struct Provider2 {} +//! +//! impl ResourceEstimator for Provider2 { +//! type K = u8; +//! type V = Vec; +//! type S = RamSize; +//! +//! fn consumption(&self, _k: &Self::K, v: &Self::V) -> Self::S { +//! RamSize(1) + RamSize(v.capacity()) +//! } +//! } +//! let mut backend2 = LruBackend::new( +//! Box::new(HashMap::new()), +//! Arc::clone(&pool), +//! String::from("id2"), +//! Arc::new(Provider2{}), +//! ); +//! +//! // eviction works for all pool members +//! backend2.set(1, vec![1, 2, 3, 4]); +//! assert!(backend1.get(&1).is_none()); +//! assert!(backend1.get(&2).is_none()); +//! assert!(backend1.get(&3).is_some()); +//! assert!(backend2.get(&1).is_some()); +//! assert_eq!(pool.current(), RamSize(33)); +//! ``` +//! +//! # Internals +//! Here we describe the internals of the LRU cache system. +//! +//! ## Requirements +//! To understand the construction, we first must understand what the LRU system tries to achieve: +//! +//! - **Single Pool:** Have a single resource pool for multiple LRU backends. +//! - **Eviction Cascade:** Adding data to any of the backends (or modifying an existing entry) should check if there is +//! enough space left in the LRU backend. If not, we must remove the least recently used entries over all backends +//! (including the one that just got a new entry) until there is enough space. +//! +//! This has the following consequences: +//! +//! - **Cyclic Structure:** The LRU backends communicate with the pool, but the pool also neeeds to communicate with +//! all the backends. This creates some form of cyclic data structure. +//! - **Type Erasure:** The pool is only specific to the resource type, not the key and value types of the +//! participating backends. So at some place we need to perform type erasure. +//! +//! ## Data Structures +//! +//! ```text +//! .~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~. +//! ------------->: ResourcePool :--(mutex)-->: ResourcePoolInner :-----------------------------------+ +//! : : : : | +//! .~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~. | +//! ^ | +//! | | +//! (arc) | +//! | | +//! | | +//! | .~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~. | +//! | : LruBackendInner :<--: PoolMemberGuardImpl :<-(dyn)-: PoolMemberGuard : | +//! | : : : : : : | +//! | .~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~. | +//! | ^ ^ ^ | +//! | | | | | +//! | | +-------------+-------------+ | +//! | | (call lock) | +//! | | +-------------+-------------+ | +//! | (mutex) | | | +//! .~~~~~~~~~~~~~. | | .~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~. | +//! ->: LruBackend :-+ (arc) : PoolMemberImpl : : PoolMember :<---+ +//! : : | | : : : : | +//! : :----------+-------------------: :<--(dyn)---: : | +//! .~~~~~~~~~~~~~. | .~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~. | +//! | | +//! | | +//! | | +//! | | +//! | .~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~. | +//! | : LruBackendInner :<--: PoolMemberGuardImpl :<-(dyn)-: PoolMemberGuard : | +//! | : : : : : : | +//! | .~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~. | +//! | ^ ^ ^ | +//! | | | | | +//! | | +-------------+-------------+ | +//! | | (call lock) | +//! | | +-------------+-------------+ | +//! | (mutex) | | | +//! .~~~~~~~~~~~~~. | | .~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~. | +//! ->: LruBackend :-+ (arc) : PoolMemberImpl : : PoolMember :<---+ +//! : : | : : : : +//! : :----------+-------------------: :<--(dyn)---: : +//! .~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~. +//! ``` +//! +//! ## State +//! State is held in the following structures: +//! +//! - `LruBackendInner`: Holds the actual user-provided backend ([`CacheBackend`]) as well as an [`AddressableHeap`] to +//! memorize when entries were used for the last time. +//! - `ResourcePoolInner`: Holds a reference to all pool members as well as the current consumption. +//! +//! All other structures and traits "only" act as glue. +//! +//! ## Locking +//! What and how we lock depends on the operation. +//! +//! Note that all locks are bare mutexes, there are no read-write-locks. "Only read" is not really an important use +//! case since even `get` requires updating the "last used" timestamp of the corresponding entry. +//! +//! ### Get +//! For [`get`](CacheBackend::get) we only need to update the "last used" timestamp for the affected entry. No +//! pool-wide operations are required. We just [`LruBackendInner`] and perform the read operation of the inner backend +//! and the modification of the "last used" timestamp. +//! +//! ### Remove +//! For [`remove`](CacheBackend::remove) the pool usage can only decrease, so other backends are never affected. We +//! first lock [`ResourcePoolInner`], then [`LruBackendInner`] and then perform the modification on both. +//! +//! ### Set +//! [`set`](CacheBackend::set) is the most complex operation and requires a bit of a lock dance: +//! +//! 1. Lock [`ResourcePoolInner`] +//! 2. Lock [`LruBackendInner`] +//! 3. Check if the entry already exists and remove it. +//! 4. Drop lock of [`LruBackendInner`] so that the pool can use it to free up space. +//! 5. Request to add more data to the pool: +//! 1. Check if we need to free up space, otherwise we can already procede to step 6. +//! 2. Lock all pool members ([`PoolMember::lock`] which ultimately locks [`LruBackendInner`]) +//! 3. Loop: +//! 1. Ask pool members if they have anything to free. +//! 2. Pick least recently used result and free it +//! 4. Drop locks of [`LruBackendInner`] +//! 6. Lock [`LruBackendInner`] +//! 7. Drop lock of [`LruBackendInner`] and [`ResourcePoolInner`] +//! +//! The global locks in step 5.2 are required so that the reads in step 5.3.1 and the resulting actions in step 5.3.2 +//! are consistent. Otherwise an interleaved `get` request might invalidate the results. +use std::{ + any::Any, + collections::{btree_map::Entry, BTreeMap}, + fmt::Debug, + hash::Hash, + ops::Deref, + sync::Arc, +}; + +use iox_time::{Time, TimeProvider}; +use parking_lot::{Mutex, MutexGuard}; + +use super::{ + addressable_heap::AddressableHeap, + resource_consumption::{Resource, ResourceEstimator}, + CacheBackend, +}; + +/// Inner state of [`ResourcePool`] which is always behind a mutex. +#[derive(Debug)] +struct ResourcePoolInner +where + S: Resource, +{ + /// Resource limit. + limit: S, + + /// Current resource usage. + current: S, + + /// Members (= backends) that use this pool. + members: BTreeMap>>, +} + +impl ResourcePoolInner +where + S: Resource, +{ + /// Register new pool member. + /// + /// # Panic + /// Panics when a member with the specific ID is already registered. + fn register_member(&mut self, id: String, member: Box>) { + match self.members.entry(id) { + Entry::Vacant(v) => { + v.insert(member); + } + Entry::Occupied(o) => { + panic!("Member '{}' already registered", o.key()); + } + } + } + + /// Unregister pool member. + /// + /// # Panic + /// Panics when the member with the specified ID is unknown (or was already unregistered). + fn unregister_member(&mut self, id: &str) { + assert!(self.members.remove(id).is_some(), "Member '{}' unknown", id); + } + + /// Add used resource too pool. + fn add(&mut self, s: S) { + self.current = self.current + s; + + if self.current > self.limit { + // lock all members + let mut members: Vec<_> = self.members.values().map(|member| member.lock()).collect(); + + // evict data until we are below the limit + while self.current > self.limit { + let mut options: Vec<_> = members + .iter_mut() + .filter_map(|member| member.could_remove().map(|t| (t, member))) + .collect(); + options.sort_by_key(|(t, _member)| *t); + + let (_t, member) = options.first_mut().expect("accounting out of sync"); + let s = member.remove_oldest(); + self.current = self.current - s; + } + } + } + + /// Remove used resource from pool. + fn remove(&mut self, s: S) { + self.current = self.current - s; + } +} + +/// Resource pool. +/// +/// This can be used with [`LruBackend`]. +#[derive(Debug)] +pub struct ResourcePool +where + S: Resource, +{ + inner: Mutex>, + time_provider: Arc, +} + +impl ResourcePool +where + S: Resource, +{ + /// Creates new empty resource pool with given limit. + pub fn new(limit: S, time_provider: Arc) -> Self { + Self { + inner: Mutex::new(ResourcePoolInner { + limit, + current: S::zero(), + members: BTreeMap::new(), + }), + time_provider, + } + } + + /// Get current pool usage. + pub fn current(&self) -> S { + self.inner.lock().current + } +} + +/// Inner state of [`LruBackend`]. +/// +/// This is used by [`LruBackend`] directly but also by [`PoolMemberImpl`] to add it to a [`ResourcePool`]/[`ResourcePoolInner`]. +#[derive(Debug)] +struct LruBackendInner +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, + S: Resource, +{ + inner_backend: Box>, + last_used: AddressableHeap, +} + +/// [Cache backend](CacheBackend) that wraps another backend and limits its resource usage. +#[derive(Debug)] +pub struct LruBackend +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, + S: Resource, +{ + id: String, + inner: Arc>>, + pool: Arc>, + resource_estimator: Arc>, +} + +impl LruBackend +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, + S: Resource, +{ + /// Create new backend w/o any known keys. + /// + /// The inner backend MUST NOT contain any data at this point, otherwise we will not track any resource consumption + /// for these entries. + /// + /// # Panic + /// - Panics if the given ID is already used within the given pool. + /// - If the inner backend is not empty. + pub fn new( + inner_backend: Box>, + pool: Arc>, + id: String, + resource_estimator: Arc>, + ) -> Self { + assert!(inner_backend.is_empty(), "inner backend is not empty"); + + let inner = Arc::new(Mutex::new(LruBackendInner { + inner_backend, + last_used: AddressableHeap::new(), + })); + + pool.inner.lock().register_member( + id.clone(), + Box::new(PoolMemberImpl { + inner: Arc::clone(&inner), + }), + ); + + Self { + id, + inner, + pool, + resource_estimator, + } + } + + /// Get underlying / inner backend. + pub fn inner_backend(&self) -> LruBackendInnerBackendHandle<'_, K, V, S> { + LruBackendInnerBackendHandle { + inner: self.inner.lock(), + } + } +} + +impl Drop for LruBackend +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, + S: Resource, +{ + fn drop(&mut self) { + self.pool.inner.lock().unregister_member(&self.id); + } +} + +impl CacheBackend for LruBackend +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, + S: Resource, +{ + type K = K; + type V = V; + + fn get(&mut self, k: &Self::K) -> Option { + let mut inner = self.inner.lock(); + + match inner.inner_backend.get(k) { + Some(v) => { + // update "last used" + let now = self.pool.time_provider.now(); + let (consumption, _last_used) = inner + .last_used + .remove(k) + .expect("backend and last-used table out of sync"); + inner.last_used.insert(k.clone(), consumption, now); + + Some(v) + } + None => None, + } + } + + fn set(&mut self, k: Self::K, v: Self::V) { + // determine all attributes before getting any locks + let consumption = self.resource_estimator.consumption(&k, &v); + let now = self.pool.time_provider.now(); + + // get locks + let mut pool = self.pool.inner.lock(); + + // check for oversized entries + if consumption > pool.limit { + return; + } + + // maybe clean from pool + { + let mut inner = self.inner.lock(); + if let Some((consumption, _last_used)) = inner.last_used.remove(&k) { + pool.remove(consumption); + } + } + + // pool-wide operation + // Since this may call back to this very backend to remove entries, we MUST NOT hold an inner lock at this point. + pool.add(consumption); + + // add new entry to inner backend AFTER adding it to the pool, so we are never overcommitting resources. + let mut inner = self.inner.lock(); + inner.inner_backend.set(k.clone(), v); + inner.last_used.insert(k, consumption, now); + } + + fn remove(&mut self, k: &Self::K) { + let mut pool = self.pool.inner.lock(); + let mut inner = self.inner.lock(); + + inner.inner_backend.remove(k); + if let Some((consumption, _last_used)) = inner.last_used.remove(k) { + pool.remove(consumption); + } + } + + fn is_empty(&self) -> bool { + self.inner.lock().last_used.is_empty() + } + + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } +} + +/// A member of a [`ResourcePool`]/[`ResourcePoolInner`]. +/// +/// Must be [locked](Self::lock) to gain access. +/// +/// The only implementation of this is [`PoolMemberImpl`]. This indirection is required to erase `K` and `V` from specific +/// backend so we can stick it into the generic pool. +trait PoolMember: Debug + Send + 'static { + /// Resource type. + type S; + + /// Lock pool member. + fn lock(&self) -> Box + '_>; +} + +/// The only implementation of [`PoolMember`]. +/// +/// In constast to the trait, this still contains `K` and `V`. +#[derive(Debug)] +pub struct PoolMemberImpl +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, + S: Resource, +{ + inner: Arc>>, +} + +impl PoolMember for PoolMemberImpl +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, + S: Resource, +{ + type S = S; + + fn lock(&self) -> Box + '_> { + Box::new(PoolMemberGuardImpl { + inner: self.inner.lock(), + }) + } +} + +/// Locked [`ResourcePool`]/[`ResourcePoolInner`] member. +/// +/// The only implementation of this is [`PoolMemberGuardImpl`]. This indirection is required to erase `K` and `V` from +/// specific backend so we can stick it into the generic pool. +trait PoolMemberGuard: Debug { + /// Resource type. + type S; + + /// Check if this member has anything that could be removed. If so, return the "last used" timestamp of the oldest + /// entry. + fn could_remove(&self) -> Option