diff --git a/cache_system/src/driver.rs b/cache_system/src/driver.rs index 2ee34880d5..740af806a4 100644 --- a/cache_system/src/driver.rs +++ b/cache_system/src/driver.rs @@ -1,30 +1,35 @@ //! Main data structure, see [`Cache`]. -use std::{collections::HashMap, hash::Hash, sync::Arc}; +use super::{backend::CacheBackend, loader::Loader}; use futures::{ future::{BoxFuture, Shared}, FutureExt, TryFutureExt, }; use observability_deps::tracing::debug; use parking_lot::Mutex; +use std::{collections::HashMap, hash::Hash, sync::Arc}; use tokio::{ sync::oneshot::{error::RecvError, Sender}, task::JoinHandle, }; -use super::{backend::CacheBackend, loader::Loader}; - /// High-level cache implementation. /// /// # Concurrency -/// Multiple cache requests for different keys can run at the same time. When data is requested for the same key the -/// underlying loader will only be polled once, even when the requests are made while the loader is still running. +/// +/// Multiple cache requests for different keys can run at the same time. When data is requested for +/// the same key the underlying loader will only be polled once, even when the requests are made +/// while the loader is still running. /// /// # Cancellation -/// Canceling a [`get`](Self::get) request will NOT cancel the underlying loader. The data will still be cached. +/// +/// Canceling a [`get`](Self::get) request will NOT cancel the underlying loader. The data will +/// still be cached. /// /// # Panic -/// If the underlying loader panics, all currently running [`get`](Self::get) requests will panic. The data will NOT be cached. +/// +/// If the underlying loader panics, all currently running [`get`](Self::get) requests will panic. +/// The data will NOT be cached. #[derive(Debug)] pub struct Cache where @@ -57,7 +62,8 @@ where /// Get value from cache. pub async fn get(&self, k: K) -> V { - // place state locking into its own scope so it doesn't leak into the generator (async function) + // place state locking into its own scope so it doesn't leak into the generator (async + // function) let receiver = { let mut state = self.state.lock(); @@ -83,7 +89,8 @@ where let tag = state.tag_counter; state.tag_counter += 1; - // need to wrap the query into a tokio task so that it doesn't get cancelled when this very request is canceled + // need to wrap the query into a tokio task so that it doesn't get cancelled when + // this very request is cancelled let state_captured = Arc::clone(&self.state); let loader = Arc::clone(&self.loader); let k_captured = k.clone(); @@ -93,7 +100,8 @@ where let k_for_loader = k_captured.clone(); // execute the loader - // If we panic here then `tx` will be dropped and the receivers will be notified. + // If we panic here then `tx` will be dropped and the receivers will be + // notified. let v = loader.load(k_for_loader).await; // remove "running" state and store result @@ -104,14 +112,16 @@ where Some(running_query) if running_query.tag == tag => { state.running_queries.remove(&k_captured); - // this very query is in charge of the key, so store in in the underlying cache + // this very query is in charge of the key, so store in in the + // underlying cache state.cached_entries.set(k_captured, v.clone()); true } _ => { - // This query is actually not really running any longer but got shut down, e.g. due - // to side loading. Do NOT store the generated value in the underlying cache. + // This query is actually not really running any longer but got + // shut down, e.g. due to side loading. Do NOT store the + // generated value in the underlying cache. false } @@ -119,8 +129,9 @@ where }; if !was_running { - // value was side-loaded, so we cannot populate `v`. Instead block this execution branch and - // wait for `rx_set` to deliver the side-loaded result. + // value was side-loaded, so we cannot populate `v`. Instead block this + // execution branch and wait for `rx_set` to deliver the side-loaded + // result. loop { tokio::task::yield_now().await; } @@ -134,8 +145,9 @@ where maybe_v = rx_set.fuse() => { match maybe_v { Ok(v) => { - // data get side-loaded via `Cache::set`. In this case, we do NOT modify the state - // because there would be a lock-gap. The `set` function will do that for us instead. + // data get side-loaded via `Cache::set`. In this case, we do + // NOT modify the state because there would be a lock-gap. The + // `set` function will do that for us instead. v } Err(_) => { @@ -187,9 +199,10 @@ where // it's OK when the receiver side is gone (likely panicked) running_query.set.send(v.clone()).ok(); - // When we side-load data into the running task, the task does NOT modify the backend, so we have to do - // that. The reason for not letting the task feed the side-loaded data back into `cached_entries` is that we - // would need to drop the state lock here before the task could acquire it, leading to a lock gap. + // When we side-load data into the running task, the task does NOT modify the + // backend, so we have to do that. The reason for not letting the task feed the + // side-loaded data back into `cached_entries` is that we would need to drop the + // state lock here before the task could acquire it, leading to a lock gap. Some(running_query.join_handle) } else { None @@ -215,9 +228,10 @@ where { fn drop(&mut self) { for (_k, running_query) in self.state.lock().running_queries.drain() { - // It's unlikely that anyone is still using the shared receiver at this point, because Cache::get borrow - // the self. If it is still in use, aborting the task will cancel the contained future which in turn will - // drop the sender of the oneshot channel. The receivers will be notified. + // It's unlikely that anyone is still using the shared receiver at this point, because + // `Cache::get` borrows the `self`. If it is still in use, aborting the task will + // cancel the contained future which in turn will drop the sender of the oneshot + // channel. The receivers will be notified. running_query.join_handle.abort(); } } @@ -226,11 +240,12 @@ where /// A [`tokio::sync::oneshot::Receiver`] that can be cloned. /// /// The types are: -/// - `Arc>`: Ensures that we can clone `V` without requiring `V: Sync`. At the same time the reference to `V` -/// (i.e. the `Arc`) must be cloneable for `Shared` +/// +/// - `Arc>`: Ensures that we can clone `V` without requiring `V: Sync`. At the same time +/// the reference to `V` (i.e. the `Arc`) must be cloneable for `Shared` /// - `Arc`: Is required because `RecvError` is not `Clone` but `Shared` requires that. -/// - `BoxFuture`: The transformation from `Result` to `Result>, Arc>` results in -/// a kinda messy type and we wanna erase that. +/// - `BoxFuture`: The transformation from `Result` to `Result>, +/// Arc>` results in a kinda messy type and we wanna erase that. /// - `Shared`: Allow the receiver to be cloned and be awaited from multiple places. type SharedReceiver = Shared>, Arc>>>; @@ -483,14 +498,16 @@ mod tests { impl TestLoader { /// Panic when loading value for `k`. /// - /// If this is used together with [`block`](Self::block), the panic will occur AFTER blocking. + /// If this is used together with [`block`](Self::block), the panic will occur AFTER + /// blocking. fn panic_once(&self, k: u8) { self.panic.lock().insert(k); } /// Block all [`load`](Self::load) requests until [`unblock`](Self::unblock) is called. /// - /// If this is used together with [`panic_once`](Self::panic_once), the panic will occur AFTER blocking. + /// If this is used together with [`panic_once`](Self::panic_once), the panic will occur + /// AFTER blocking. fn block(&self) { let mut blocked = self.blocked.lock(); assert!(blocked.is_none()); @@ -523,7 +540,8 @@ mod tests { async fn load(&self, k: u8) -> String { self.loaded.lock().push(k); - // need to capture the cloned notify handle, otherwise the lock guard leaks into the generator + // need to capture the cloned notify handle, otherwise the lock guard leaks into the + // generator let maybe_block = self.blocked.lock().clone(); if let Some(block) = maybe_block { block.notified().await; diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index b6f4ded819..3965bf36b2 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -94,9 +94,10 @@ pub enum ChunkStorage { /// Chunk representation for the querier. /// -/// These chunks are usually created on-demand. The querier cache system does not really have a notion of chunks (rather -/// it knows about parquet files, local FS caches, ingester data, cached read buffers) but we need to combine all that -/// knowledge into chunk objects because this is what the query engine (DataFusion and InfluxRPC) expect. +/// These chunks are usually created on-demand. The querier cache system does not really have a +/// notion of chunks (rather it knows about parquet files, local FS caches, ingester data, cached +/// read buffers) but we need to combine all that knowledge into chunk objects because this is what +/// the query engine (DataFusion and InfluxRPC) expect. #[derive(Debug)] pub struct QuerierChunk { /// How the data is currently structured / available for query.