From d807941804b845ab1f9c2814f1fcbb919ab0ae00 Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Thu, 12 May 2022 23:25:22 +0200 Subject: [PATCH 1/8] chore: Enable pprof feature back (#4587) --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index f4b52aabfa..a0463521cd 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -450,7 +450,7 @@ jobs: RUST_VERSION="$(sed -E -ne 's/channel = "(.*)"/\1/p' rust-toolchain.toml)" docker buildx build \ - --build-arg FEATURES="aws,gcp,azure,jemalloc_replacing_malloc,tokio_console" \ + --build-arg FEATURES="aws,gcp,azure,jemalloc_replacing_malloc,tokio_console,pprof" \ --build-arg RUST_VERSION="$RUST_VERSION" \ --build-arg RUSTFLAGS="$RUSTFLAGS" \ --progress plain \ From 24020403ba36deeff20a8b9b6e1acf0fd4dcb5a0 Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Thu, 12 May 2022 23:45:13 +0200 Subject: [PATCH 2/8] fix: Remove pprof feature; it fails to build (#4588) --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index a0463521cd..f4b52aabfa 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -450,7 +450,7 @@ jobs: RUST_VERSION="$(sed -E -ne 's/channel = "(.*)"/\1/p' rust-toolchain.toml)" docker buildx build \ - --build-arg FEATURES="aws,gcp,azure,jemalloc_replacing_malloc,tokio_console,pprof" \ + --build-arg FEATURES="aws,gcp,azure,jemalloc_replacing_malloc,tokio_console" \ --build-arg RUST_VERSION="$RUST_VERSION" \ --build-arg RUSTFLAGS="$RUSTFLAGS" \ --progress plain \ From 4434ec6836119377312aacdac12957fc999d5ef8 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Thu, 12 May 2022 18:40:29 -0400 Subject: [PATCH 3/8] chore: convert some debug to trace to reduce noises (#4589) --- query/src/provider/deduplicate/algo.rs | 6 +++--- query_tests/cases/in/several_chunks.expected | 5 +++-- query_tests/cases/in/several_chunks.sql | 5 +++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/query/src/provider/deduplicate/algo.rs b/query/src/provider/deduplicate/algo.rs index 12160921e8..8d9aa2d6f2 100644 --- a/query/src/provider/deduplicate/algo.rs +++ b/query/src/provider/deduplicate/algo.rs @@ -72,7 +72,7 @@ impl RecordBatchDeduplicator { }; let mut dupe_ranges = self.compute_ranges(&batch)?; - debug!("Finish computing range"); + trace!("Finish computing range"); // The last partition may span batches so we can't emit it // until we have seen the next batch (or we are at end of @@ -80,7 +80,7 @@ impl RecordBatchDeduplicator { let last_range = dupe_ranges.ranges.pop(); let output_record_batch = self.output_from_ranges(&batch, &dupe_ranges)?; - debug!( + trace!( num_rows = output_record_batch.num_rows(), "Rows of ouput_record_batch" ); @@ -91,7 +91,7 @@ impl RecordBatchDeduplicator { let last_batch = Self::slice_record_batch(&batch, last_range.start, len)?; self.last_batch = Some(last_batch); } - debug!("done pushing record batch into the indexer"); + trace!("done pushing record batch into the indexer"); Ok(output_record_batch) } diff --git a/query_tests/cases/in/several_chunks.expected b/query_tests/cases/in/several_chunks.expected index 0cff25fc39..5c421a68a6 100644 --- a/query_tests/cases/in/several_chunks.expected +++ b/query_tests/cases/in/several_chunks.expected @@ -1,5 +1,5 @@ -- Test Setup: ManyFieldsSeveralChunks --- SQL: SELECT * from h2o order by temp, other_temp, time; +-- SQL: SELECT * from h2o; -- Results After Sorting +---------+------------+-------+------+--------------------------------+ | city | other_temp | state | temp | time | @@ -29,7 +29,8 @@ | | IOxReadFilterNode: table_name=h2o, chunks=2 predicate=Predicate | | | | +---------------+---------------------------------------------------------------------------------------------------------------------+ --- SQL: select temp, other_temp, time from h2o order by 1, 2; +-- SQL: select temp, other_temp, time from h2o; +-- Results After Sorting +------+------------+--------------------------------+ | temp | other_temp | time | +------+------------+--------------------------------+ diff --git a/query_tests/cases/in/several_chunks.sql b/query_tests/cases/in/several_chunks.sql index 89b4770bd0..8eac702232 100644 --- a/query_tests/cases/in/several_chunks.sql +++ b/query_tests/cases/in/several_chunks.sql @@ -2,7 +2,7 @@ -- validate we have access to information schema for listing system tables -- IOX_COMPARE: sorted -SELECT * from h2o order by temp, other_temp, time; +SELECT * from h2o; -- Plan will look like: -- . Two chunks (one parquet and one from ingester) neither overlap nor contain duplicate -- --> scan in one scan node @@ -10,5 +10,6 @@ SELECT * from h2o order by temp, other_temp, time; EXPLAIN SELECT * from h2o; -- Only selct fields and time -select temp, other_temp, time from h2o order by 1, 2; +-- IOX_COMPARE: sorted +select temp, other_temp, time from h2o; EXPLAIN select temp, other_temp, time from h2o; From 335374fed549bb01f01401bea7fbd9c737d59c89 Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Fri, 13 May 2022 01:33:28 +0200 Subject: [PATCH 4/8] chore: Enable pprof feature back (#4587) (#4590) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .circleci/config.yml | 2 +- ioxd_common/Cargo.toml | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index f4b52aabfa..a0463521cd 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -450,7 +450,7 @@ jobs: RUST_VERSION="$(sed -E -ne 's/channel = "(.*)"/\1/p' rust-toolchain.toml)" docker buildx build \ - --build-arg FEATURES="aws,gcp,azure,jemalloc_replacing_malloc,tokio_console" \ + --build-arg FEATURES="aws,gcp,azure,jemalloc_replacing_malloc,tokio_console,pprof" \ --build-arg RUST_VERSION="$RUST_VERSION" \ --build-arg RUSTFLAGS="$RUSTFLAGS" \ --progress plain \ diff --git a/ioxd_common/Cargo.toml b/ioxd_common/Cargo.toml index 50c297dc22..55059c5813 100644 --- a/ioxd_common/Cargo.toml +++ b/ioxd_common/Cargo.toml @@ -15,7 +15,10 @@ metric = { path = "../metric" } metric_exporters = { path = "../metric_exporters" } mutable_batch_lp = { path = "../mutable_batch_lp" } observability_deps = { path = "../observability_deps" } -pprof = { version = "0.9", default-features = false, features = ["flamegraph", "prost-codec"], optional = true } +# NOTE: we may not notice that we need the "backtrace-rs" feature if we also build with the heappy feature, which depends on backtrace-rs. +# (honestly I thought that cargo dependencies were isolated on a per crate basis so I'm a bit surprised that pprof accidentally builds +# successfully just because another crate happens to depend on backtrace-rs) +pprof = { version = "0.9", default-features = false, features = ["flamegraph", "prost-codec", "backtrace-rs"], optional = true } predicate = { path = "../predicate" } service_grpc_testing = { path = "../service_grpc_testing" } trace = { path = "../trace" } From d15abc5e2dfcf1e80894a8402bd4bf0489255728 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 13 May 2022 07:27:24 +0000 Subject: [PATCH 5/8] chore(deps): Bump zstd-safe from 5.0.1+zstd.1.5.2 to 5.0.2+zstd.1.5.2 (#4591) Bumps [zstd-safe](https://github.com/gyscos/zstd-rs) from 5.0.1+zstd.1.5.2 to 5.0.2+zstd.1.5.2. - [Release notes](https://github.com/gyscos/zstd-rs/releases) - [Commits](https://github.com/gyscos/zstd-rs/commits) --- updated-dependencies: - dependency-name: zstd-safe dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6efd8f2938..91a0c83b34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6729,9 +6729,9 @@ dependencies = [ [[package]] name = "zstd-safe" -version = "5.0.1+zstd.1.5.2" +version = "5.0.2+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c12659121420dd6365c5c3de4901f97145b79651fb1d25814020ed2ed0585ae" +checksum = "1d2a5585e04f9eea4b2a3d1eca508c4dee9592a89ef6f450c11719da0726f4db" dependencies = [ "libc", "zstd-sys", From 31f3d988aeb357fe3ca5f08bc4edd53adef4aba5 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 13 May 2022 09:34:59 +0200 Subject: [PATCH 6/8] feat: LRU cache infrastructure (#4189) * feat: `AddressableHeap::is_empty` * feat: add type-safe size trait * feat: LRU cache infrastructure * fix: typos Co-authored-by: Andrew Lamb * fix: update after rebase * docs: explain test code * test: ensure that values are dropped from LRU cache * test: ensure that backends are dropped from LRU cache * docs: explain where LRU state is stored * docs: explain high-level LRU usage * refactor: "memory (size)" => "resource (consumption)" This should make the reasoning more generic and easier to understand. Co-authored-by: Andrew Lamb Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .../cache_system/backend/addressable_heap.rs | 19 + querier/src/cache_system/backend/lru.rs | 1199 +++++++++++++++++ querier/src/cache_system/backend/mod.rs | 2 + .../backend/resource_consumption.rs | 30 + querier/src/lib.rs | 5 + 5 files changed, 1255 insertions(+) create mode 100644 querier/src/cache_system/backend/lru.rs create mode 100644 querier/src/cache_system/backend/resource_consumption.rs diff --git a/querier/src/cache_system/backend/addressable_heap.rs b/querier/src/cache_system/backend/addressable_heap.rs index bb12a31efc..e8afd72dc8 100644 --- a/querier/src/cache_system/backend/addressable_heap.rs +++ b/querier/src/cache_system/backend/addressable_heap.rs @@ -44,6 +44,14 @@ where } } + /// Check if the heap is empty. + pub fn is_empty(&self) -> bool { + let res1 = self.key_to_order_and_value.is_empty(); + let res2 = self.queue.is_empty(); + assert_eq!(res1, res2, "data structures out of sync"); + res1 + } + /// Insert element. /// /// If the element (compared by `K`) already exists, it will be returned. @@ -396,6 +404,10 @@ mod tests { Self { inner: Vec::new() } } + fn is_empty(&self) -> bool { + self.inner.is_empty() + } + fn insert(&mut self, k: u8, v: String, o: i8) -> Option<(String, i8)> { let res = self.remove(&k); self.inner.push((k, v, o)); @@ -441,6 +453,7 @@ mod tests { #[derive(Debug, Clone)] enum Action { + IsEmpty, Insert { k: u8, v: String, o: i8 }, Peek, Pop, @@ -451,6 +464,7 @@ mod tests { // Use a hand-rolled strategy instead of `proptest-derive`, because the latter one is quite a heavy dependency. fn action() -> impl Strategy { prop_oneof![ + Just(Action::IsEmpty), (any::(), ".*", any::()).prop_map(|(k, v, o)| Action::Insert { k, v, o }), Just(Action::Peek), Just(Action::Pop), @@ -467,6 +481,11 @@ mod tests { for action in actions { match action { + Action::IsEmpty => { + let res1 = heap.is_empty(); + let res2 = sim.is_empty(); + assert_eq!(res1, res2); + } Action::Insert{k, v, o} => { let res1 = heap.insert(k, v.clone(), o); let res2 = sim.insert(k, v, o); diff --git a/querier/src/cache_system/backend/lru.rs b/querier/src/cache_system/backend/lru.rs new file mode 100644 index 0000000000..7db5032d4b --- /dev/null +++ b/querier/src/cache_system/backend/lru.rs @@ -0,0 +1,1199 @@ +//! LRU (Least Recently Used) cache system. +//! +//! # Usage +//! +//! ``` +//! use std::{ +//! collections::HashMap, +//! ops::{Add, Sub}, +//! sync::Arc, +//! }; +//! use iox_time::SystemProvider; +//! use querier::{ +//! CacheBackend, +//! lru::{LruBackend, ResourcePool}, +//! resource_consumption::{Resource, ResourceEstimator}, +//! }; +//! +//! // first we implement a strongly-typed RAM size measurement +//! #[derive(Debug, Clone, Copy, PartialEq, PartialOrd)] +//! struct RamSize(usize); +//! +//! impl Resource for RamSize { +//! fn zero() -> Self { +//! Self(0) +//! } +//! } +//! +//! impl Add for RamSize { +//! type Output = Self; +//! +//! fn add(self, rhs: Self) -> Self::Output { +//! Self(self.0.checked_add(rhs.0).expect("overflow")) +//! } +//! } +//! +//! impl Sub for RamSize { +//! type Output = Self; +//! +//! fn sub(self, rhs: Self) -> Self::Output { +//! Self(self.0.checked_sub(rhs.0).expect("underflow")) +//! } +//! } +//! +//! // a time provider is required to determine the age of entries +//! let time_provider = Arc::new(SystemProvider::new()); +//! +//! // set up a memory pool +//! let limit = RamSize(50); +//! let pool = Arc::new(ResourcePool::new(limit, time_provider)); +//! +//! // set up first pool user: a u64->String map +//! #[derive(Debug)] +//! struct Provider1 {} +//! +//! impl ResourceEstimator for Provider1 { +//! type K = u64; +//! type V = String; +//! type S = RamSize; +//! +//! fn consumption(&self, _k: &Self::K, v: &Self::V) -> Self::S { +//! RamSize(8) + RamSize(v.capacity()) +//! } +//! } +//! let mut backend1 = LruBackend::new( +//! Box::new(HashMap::new()), +//! Arc::clone(&pool), +//! String::from("id1"), +//! Arc::new(Provider1{}), +//! ); +//! +//! // add some data +//! backend1.set(1, String::from("some_entry")); +//! backend1.set(2, String::from("another_entry")); +//! assert_eq!(pool.current(), RamSize(39)); +//! +//! // only test first one +//! assert!(backend1.get(&1).is_some()); +//! +//! // fill up pool +//! backend1.set(3, String::from("this_will_evict_data")); +//! assert!(backend1.get(&1).is_some()); +//! assert!(backend1.get(&2).is_none()); +//! assert!(backend1.get(&3).is_some()); +//! assert_eq!(pool.current(), RamSize(46)); +//! +//! // set up second pool user with totally different types: a u8->Vec map +//! #[derive(Debug)] +//! struct Provider2 {} +//! +//! impl ResourceEstimator for Provider2 { +//! type K = u8; +//! type V = Vec; +//! type S = RamSize; +//! +//! fn consumption(&self, _k: &Self::K, v: &Self::V) -> Self::S { +//! RamSize(1) + RamSize(v.capacity()) +//! } +//! } +//! let mut backend2 = LruBackend::new( +//! Box::new(HashMap::new()), +//! Arc::clone(&pool), +//! String::from("id2"), +//! Arc::new(Provider2{}), +//! ); +//! +//! // eviction works for all pool members +//! backend2.set(1, vec![1, 2, 3, 4]); +//! assert!(backend1.get(&1).is_none()); +//! assert!(backend1.get(&2).is_none()); +//! assert!(backend1.get(&3).is_some()); +//! assert!(backend2.get(&1).is_some()); +//! assert_eq!(pool.current(), RamSize(33)); +//! ``` +//! +//! # Internals +//! Here we describe the internals of the LRU cache system. +//! +//! ## Requirements +//! To understand the construction, we first must understand what the LRU system tries to achieve: +//! +//! - **Single Pool:** Have a single resource pool for multiple LRU backends. +//! - **Eviction Cascade:** Adding data to any of the backends (or modifying an existing entry) should check if there is +//! enough space left in the LRU backend. If not, we must remove the least recently used entries over all backends +//! (including the one that just got a new entry) until there is enough space. +//! +//! This has the following consequences: +//! +//! - **Cyclic Structure:** The LRU backends communicate with the pool, but the pool also neeeds to communicate with +//! all the backends. This creates some form of cyclic data structure. +//! - **Type Erasure:** The pool is only specific to the resource type, not the key and value types of the +//! participating backends. So at some place we need to perform type erasure. +//! +//! ## Data Structures +//! +//! ```text +//! .~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~. +//! ------------->: ResourcePool :--(mutex)-->: ResourcePoolInner :-----------------------------------+ +//! : : : : | +//! .~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~. | +//! ^ | +//! | | +//! (arc) | +//! | | +//! | | +//! | .~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~. | +//! | : LruBackendInner :<--: PoolMemberGuardImpl :<-(dyn)-: PoolMemberGuard : | +//! | : : : : : : | +//! | .~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~. | +//! | ^ ^ ^ | +//! | | | | | +//! | | +-------------+-------------+ | +//! | | (call lock) | +//! | | +-------------+-------------+ | +//! | (mutex) | | | +//! .~~~~~~~~~~~~~. | | .~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~. | +//! ->: LruBackend :-+ (arc) : PoolMemberImpl : : PoolMember :<---+ +//! : : | | : : : : | +//! : :----------+-------------------: :<--(dyn)---: : | +//! .~~~~~~~~~~~~~. | .~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~. | +//! | | +//! | | +//! | | +//! | | +//! | .~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~. | +//! | : LruBackendInner :<--: PoolMemberGuardImpl :<-(dyn)-: PoolMemberGuard : | +//! | : : : : : : | +//! | .~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~~. | +//! | ^ ^ ^ | +//! | | | | | +//! | | +-------------+-------------+ | +//! | | (call lock) | +//! | | +-------------+-------------+ | +//! | (mutex) | | | +//! .~~~~~~~~~~~~~. | | .~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~. | +//! ->: LruBackend :-+ (arc) : PoolMemberImpl : : PoolMember :<---+ +//! : : | : : : : +//! : :----------+-------------------: :<--(dyn)---: : +//! .~~~~~~~~~~~~~. .~~~~~~~~~~~~~~~~. .~~~~~~~~~~~~. +//! ``` +//! +//! ## State +//! State is held in the following structures: +//! +//! - `LruBackendInner`: Holds the actual user-provided backend ([`CacheBackend`]) as well as an [`AddressableHeap`] to +//! memorize when entries were used for the last time. +//! - `ResourcePoolInner`: Holds a reference to all pool members as well as the current consumption. +//! +//! All other structures and traits "only" act as glue. +//! +//! ## Locking +//! What and how we lock depends on the operation. +//! +//! Note that all locks are bare mutexes, there are no read-write-locks. "Only read" is not really an important use +//! case since even `get` requires updating the "last used" timestamp of the corresponding entry. +//! +//! ### Get +//! For [`get`](CacheBackend::get) we only need to update the "last used" timestamp for the affected entry. No +//! pool-wide operations are required. We just [`LruBackendInner`] and perform the read operation of the inner backend +//! and the modification of the "last used" timestamp. +//! +//! ### Remove +//! For [`remove`](CacheBackend::remove) the pool usage can only decrease, so other backends are never affected. We +//! first lock [`ResourcePoolInner`], then [`LruBackendInner`] and then perform the modification on both. +//! +//! ### Set +//! [`set`](CacheBackend::set) is the most complex operation and requires a bit of a lock dance: +//! +//! 1. Lock [`ResourcePoolInner`] +//! 2. Lock [`LruBackendInner`] +//! 3. Check if the entry already exists and remove it. +//! 4. Drop lock of [`LruBackendInner`] so that the pool can use it to free up space. +//! 5. Request to add more data to the pool: +//! 1. Check if we need to free up space, otherwise we can already procede to step 6. +//! 2. Lock all pool members ([`PoolMember::lock`] which ultimately locks [`LruBackendInner`]) +//! 3. Loop: +//! 1. Ask pool members if they have anything to free. +//! 2. Pick least recently used result and free it +//! 4. Drop locks of [`LruBackendInner`] +//! 6. Lock [`LruBackendInner`] +//! 7. Drop lock of [`LruBackendInner`] and [`ResourcePoolInner`] +//! +//! The global locks in step 5.2 are required so that the reads in step 5.3.1 and the resulting actions in step 5.3.2 +//! are consistent. Otherwise an interleaved `get` request might invalidate the results. +use std::{ + any::Any, + collections::{btree_map::Entry, BTreeMap}, + fmt::Debug, + hash::Hash, + ops::Deref, + sync::Arc, +}; + +use iox_time::{Time, TimeProvider}; +use parking_lot::{Mutex, MutexGuard}; + +use super::{ + addressable_heap::AddressableHeap, + resource_consumption::{Resource, ResourceEstimator}, + CacheBackend, +}; + +/// Inner state of [`ResourcePool`] which is always behind a mutex. +#[derive(Debug)] +struct ResourcePoolInner +where + S: Resource, +{ + /// Resource limit. + limit: S, + + /// Current resource usage. + current: S, + + /// Members (= backends) that use this pool. + members: BTreeMap>>, +} + +impl ResourcePoolInner +where + S: Resource, +{ + /// Register new pool member. + /// + /// # Panic + /// Panics when a member with the specific ID is already registered. + fn register_member(&mut self, id: String, member: Box>) { + match self.members.entry(id) { + Entry::Vacant(v) => { + v.insert(member); + } + Entry::Occupied(o) => { + panic!("Member '{}' already registered", o.key()); + } + } + } + + /// Unregister pool member. + /// + /// # Panic + /// Panics when the member with the specified ID is unknown (or was already unregistered). + fn unregister_member(&mut self, id: &str) { + assert!(self.members.remove(id).is_some(), "Member '{}' unknown", id); + } + + /// Add used resource too pool. + fn add(&mut self, s: S) { + self.current = self.current + s; + + if self.current > self.limit { + // lock all members + let mut members: Vec<_> = self.members.values().map(|member| member.lock()).collect(); + + // evict data until we are below the limit + while self.current > self.limit { + let mut options: Vec<_> = members + .iter_mut() + .filter_map(|member| member.could_remove().map(|t| (t, member))) + .collect(); + options.sort_by_key(|(t, _member)| *t); + + let (_t, member) = options.first_mut().expect("accounting out of sync"); + let s = member.remove_oldest(); + self.current = self.current - s; + } + } + } + + /// Remove used resource from pool. + fn remove(&mut self, s: S) { + self.current = self.current - s; + } +} + +/// Resource pool. +/// +/// This can be used with [`LruBackend`]. +#[derive(Debug)] +pub struct ResourcePool +where + S: Resource, +{ + inner: Mutex>, + time_provider: Arc, +} + +impl ResourcePool +where + S: Resource, +{ + /// Creates new empty resource pool with given limit. + pub fn new(limit: S, time_provider: Arc) -> Self { + Self { + inner: Mutex::new(ResourcePoolInner { + limit, + current: S::zero(), + members: BTreeMap::new(), + }), + time_provider, + } + } + + /// Get current pool usage. + pub fn current(&self) -> S { + self.inner.lock().current + } +} + +/// Inner state of [`LruBackend`]. +/// +/// This is used by [`LruBackend`] directly but also by [`PoolMemberImpl`] to add it to a [`ResourcePool`]/[`ResourcePoolInner`]. +#[derive(Debug)] +struct LruBackendInner +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, + S: Resource, +{ + inner_backend: Box>, + last_used: AddressableHeap, +} + +/// [Cache backend](CacheBackend) that wraps another backend and limits its resource usage. +#[derive(Debug)] +pub struct LruBackend +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, + S: Resource, +{ + id: String, + inner: Arc>>, + pool: Arc>, + resource_estimator: Arc>, +} + +impl LruBackend +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, + S: Resource, +{ + /// Create new backend w/o any known keys. + /// + /// The inner backend MUST NOT contain any data at this point, otherwise we will not track any resource consumption + /// for these entries. + /// + /// # Panic + /// - Panics if the given ID is already used within the given pool. + /// - If the inner backend is not empty. + pub fn new( + inner_backend: Box>, + pool: Arc>, + id: String, + resource_estimator: Arc>, + ) -> Self { + assert!(inner_backend.is_empty(), "inner backend is not empty"); + + let inner = Arc::new(Mutex::new(LruBackendInner { + inner_backend, + last_used: AddressableHeap::new(), + })); + + pool.inner.lock().register_member( + id.clone(), + Box::new(PoolMemberImpl { + inner: Arc::clone(&inner), + }), + ); + + Self { + id, + inner, + pool, + resource_estimator, + } + } + + /// Get underlying / inner backend. + pub fn inner_backend(&self) -> LruBackendInnerBackendHandle<'_, K, V, S> { + LruBackendInnerBackendHandle { + inner: self.inner.lock(), + } + } +} + +impl Drop for LruBackend +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, + S: Resource, +{ + fn drop(&mut self) { + self.pool.inner.lock().unregister_member(&self.id); + } +} + +impl CacheBackend for LruBackend +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, + S: Resource, +{ + type K = K; + type V = V; + + fn get(&mut self, k: &Self::K) -> Option { + let mut inner = self.inner.lock(); + + match inner.inner_backend.get(k) { + Some(v) => { + // update "last used" + let now = self.pool.time_provider.now(); + let (consumption, _last_used) = inner + .last_used + .remove(k) + .expect("backend and last-used table out of sync"); + inner.last_used.insert(k.clone(), consumption, now); + + Some(v) + } + None => None, + } + } + + fn set(&mut self, k: Self::K, v: Self::V) { + // determine all attributes before getting any locks + let consumption = self.resource_estimator.consumption(&k, &v); + let now = self.pool.time_provider.now(); + + // get locks + let mut pool = self.pool.inner.lock(); + + // check for oversized entries + if consumption > pool.limit { + return; + } + + // maybe clean from pool + { + let mut inner = self.inner.lock(); + if let Some((consumption, _last_used)) = inner.last_used.remove(&k) { + pool.remove(consumption); + } + } + + // pool-wide operation + // Since this may call back to this very backend to remove entries, we MUST NOT hold an inner lock at this point. + pool.add(consumption); + + // add new entry to inner backend AFTER adding it to the pool, so we are never overcommitting resources. + let mut inner = self.inner.lock(); + inner.inner_backend.set(k.clone(), v); + inner.last_used.insert(k, consumption, now); + } + + fn remove(&mut self, k: &Self::K) { + let mut pool = self.pool.inner.lock(); + let mut inner = self.inner.lock(); + + inner.inner_backend.remove(k); + if let Some((consumption, _last_used)) = inner.last_used.remove(k) { + pool.remove(consumption); + } + } + + fn is_empty(&self) -> bool { + self.inner.lock().last_used.is_empty() + } + + fn as_any(&self) -> &dyn Any { + self as &dyn Any + } +} + +/// A member of a [`ResourcePool`]/[`ResourcePoolInner`]. +/// +/// Must be [locked](Self::lock) to gain access. +/// +/// The only implementation of this is [`PoolMemberImpl`]. This indirection is required to erase `K` and `V` from specific +/// backend so we can stick it into the generic pool. +trait PoolMember: Debug + Send + 'static { + /// Resource type. + type S; + + /// Lock pool member. + fn lock(&self) -> Box + '_>; +} + +/// The only implementation of [`PoolMember`]. +/// +/// In constast to the trait, this still contains `K` and `V`. +#[derive(Debug)] +pub struct PoolMemberImpl +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, + S: Resource, +{ + inner: Arc>>, +} + +impl PoolMember for PoolMemberImpl +where + K: Clone + Eq + Debug + Hash + Ord + Send + 'static, + V: Clone + Debug + Send + 'static, + S: Resource, +{ + type S = S; + + fn lock(&self) -> Box + '_> { + Box::new(PoolMemberGuardImpl { + inner: self.inner.lock(), + }) + } +} + +/// Locked [`ResourcePool`]/[`ResourcePoolInner`] member. +/// +/// The only implementation of this is [`PoolMemberGuardImpl`]. This indirection is required to erase `K` and `V` from +/// specific backend so we can stick it into the generic pool. +trait PoolMemberGuard: Debug { + /// Resource type. + type S; + + /// Check if this member has anything that could be removed. If so, return the "last used" timestamp of the oldest + /// entry. + fn could_remove(&self) -> Option