docs: Fix comment wrapping while reading through

pull/24376/head
Carol (Nichols || Goulding) 2022-05-25 10:26:23 -04:00
parent 6cc767efcc
commit 66823522f3
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
2 changed files with 52 additions and 33 deletions

View File

@ -1,30 +1,35 @@
//! Main data structure, see [`Cache`].
use std::{collections::HashMap, hash::Hash, sync::Arc};
use super::{backend::CacheBackend, loader::Loader};
use futures::{
future::{BoxFuture, Shared},
FutureExt, TryFutureExt,
};
use observability_deps::tracing::debug;
use parking_lot::Mutex;
use std::{collections::HashMap, hash::Hash, sync::Arc};
use tokio::{
sync::oneshot::{error::RecvError, Sender},
task::JoinHandle,
};
use super::{backend::CacheBackend, loader::Loader};
/// High-level cache implementation.
///
/// # Concurrency
/// Multiple cache requests for different keys can run at the same time. When data is requested for the same key the
/// underlying loader will only be polled once, even when the requests are made while the loader is still running.
///
/// Multiple cache requests for different keys can run at the same time. When data is requested for
/// the same key the underlying loader will only be polled once, even when the requests are made
/// while the loader is still running.
///
/// # Cancellation
/// Canceling a [`get`](Self::get) request will NOT cancel the underlying loader. The data will still be cached.
///
/// Canceling a [`get`](Self::get) request will NOT cancel the underlying loader. The data will
/// still be cached.
///
/// # Panic
/// If the underlying loader panics, all currently running [`get`](Self::get) requests will panic. The data will NOT be cached.
///
/// If the underlying loader panics, all currently running [`get`](Self::get) requests will panic.
/// The data will NOT be cached.
#[derive(Debug)]
pub struct Cache<K, V>
where
@ -57,7 +62,8 @@ where
/// Get value from cache.
pub async fn get(&self, k: K) -> V {
// place state locking into its own scope so it doesn't leak into the generator (async function)
// place state locking into its own scope so it doesn't leak into the generator (async
// function)
let receiver = {
let mut state = self.state.lock();
@ -83,7 +89,8 @@ where
let tag = state.tag_counter;
state.tag_counter += 1;
// need to wrap the query into a tokio task so that it doesn't get cancelled when this very request is canceled
// need to wrap the query into a tokio task so that it doesn't get cancelled when
// this very request is cancelled
let state_captured = Arc::clone(&self.state);
let loader = Arc::clone(&self.loader);
let k_captured = k.clone();
@ -93,7 +100,8 @@ where
let k_for_loader = k_captured.clone();
// execute the loader
// If we panic here then `tx` will be dropped and the receivers will be notified.
// If we panic here then `tx` will be dropped and the receivers will be
// notified.
let v = loader.load(k_for_loader).await;
// remove "running" state and store result
@ -104,14 +112,16 @@ where
Some(running_query) if running_query.tag == tag => {
state.running_queries.remove(&k_captured);
// this very query is in charge of the key, so store in in the underlying cache
// this very query is in charge of the key, so store in in the
// underlying cache
state.cached_entries.set(k_captured, v.clone());
true
}
_ => {
// This query is actually not really running any longer but got shut down, e.g. due
// to side loading. Do NOT store the generated value in the underlying cache.
// This query is actually not really running any longer but got
// shut down, e.g. due to side loading. Do NOT store the
// generated value in the underlying cache.
false
}
@ -119,8 +129,9 @@ where
};
if !was_running {
// value was side-loaded, so we cannot populate `v`. Instead block this execution branch and
// wait for `rx_set` to deliver the side-loaded result.
// value was side-loaded, so we cannot populate `v`. Instead block this
// execution branch and wait for `rx_set` to deliver the side-loaded
// result.
loop {
tokio::task::yield_now().await;
}
@ -134,8 +145,9 @@ where
maybe_v = rx_set.fuse() => {
match maybe_v {
Ok(v) => {
// data get side-loaded via `Cache::set`. In this case, we do NOT modify the state
// because there would be a lock-gap. The `set` function will do that for us instead.
// data get side-loaded via `Cache::set`. In this case, we do
// NOT modify the state because there would be a lock-gap. The
// `set` function will do that for us instead.
v
}
Err(_) => {
@ -187,9 +199,10 @@ where
// it's OK when the receiver side is gone (likely panicked)
running_query.set.send(v.clone()).ok();
// When we side-load data into the running task, the task does NOT modify the backend, so we have to do
// that. The reason for not letting the task feed the side-loaded data back into `cached_entries` is that we
// would need to drop the state lock here before the task could acquire it, leading to a lock gap.
// When we side-load data into the running task, the task does NOT modify the
// backend, so we have to do that. The reason for not letting the task feed the
// side-loaded data back into `cached_entries` is that we would need to drop the
// state lock here before the task could acquire it, leading to a lock gap.
Some(running_query.join_handle)
} else {
None
@ -215,9 +228,10 @@ where
{
fn drop(&mut self) {
for (_k, running_query) in self.state.lock().running_queries.drain() {
// It's unlikely that anyone is still using the shared receiver at this point, because Cache::get borrow
// the self. If it is still in use, aborting the task will cancel the contained future which in turn will
// drop the sender of the oneshot channel. The receivers will be notified.
// It's unlikely that anyone is still using the shared receiver at this point, because
// `Cache::get` borrows the `self`. If it is still in use, aborting the task will
// cancel the contained future which in turn will drop the sender of the oneshot
// channel. The receivers will be notified.
running_query.join_handle.abort();
}
}
@ -226,11 +240,12 @@ where
/// A [`tokio::sync::oneshot::Receiver`] that can be cloned.
///
/// The types are:
/// - `Arc<Mutex<V>>`: Ensures that we can clone `V` without requiring `V: Sync`. At the same time the reference to `V`
/// (i.e. the `Arc`) must be cloneable for `Shared`
///
/// - `Arc<Mutex<V>>`: Ensures that we can clone `V` without requiring `V: Sync`. At the same time
/// the reference to `V` (i.e. the `Arc`) must be cloneable for `Shared`
/// - `Arc<RecvError>`: Is required because `RecvError` is not `Clone` but `Shared` requires that.
/// - `BoxFuture`: The transformation from `Result<V, RecvError>` to `Result<Arc<Mutex<V>>, Arc<RecvError>>` results in
/// a kinda messy type and we wanna erase that.
/// - `BoxFuture`: The transformation from `Result<V, RecvError>` to `Result<Arc<Mutex<V>>,
/// Arc<RecvError>>` results in a kinda messy type and we wanna erase that.
/// - `Shared`: Allow the receiver to be cloned and be awaited from multiple places.
type SharedReceiver<V> = Shared<BoxFuture<'static, Result<Arc<Mutex<V>>, Arc<RecvError>>>>;
@ -483,14 +498,16 @@ mod tests {
impl TestLoader {
/// Panic when loading value for `k`.
///
/// If this is used together with [`block`](Self::block), the panic will occur AFTER blocking.
/// If this is used together with [`block`](Self::block), the panic will occur AFTER
/// blocking.
fn panic_once(&self, k: u8) {
self.panic.lock().insert(k);
}
/// Block all [`load`](Self::load) requests until [`unblock`](Self::unblock) is called.
///
/// If this is used together with [`panic_once`](Self::panic_once), the panic will occur AFTER blocking.
/// If this is used together with [`panic_once`](Self::panic_once), the panic will occur
/// AFTER blocking.
fn block(&self) {
let mut blocked = self.blocked.lock();
assert!(blocked.is_none());
@ -523,7 +540,8 @@ mod tests {
async fn load(&self, k: u8) -> String {
self.loaded.lock().push(k);
// need to capture the cloned notify handle, otherwise the lock guard leaks into the generator
// need to capture the cloned notify handle, otherwise the lock guard leaks into the
// generator
let maybe_block = self.blocked.lock().clone();
if let Some(block) = maybe_block {
block.notified().await;

View File

@ -94,9 +94,10 @@ pub enum ChunkStorage {
/// Chunk representation for the querier.
///
/// These chunks are usually created on-demand. The querier cache system does not really have a notion of chunks (rather
/// it knows about parquet files, local FS caches, ingester data, cached read buffers) but we need to combine all that
/// knowledge into chunk objects because this is what the query engine (DataFusion and InfluxRPC) expect.
/// These chunks are usually created on-demand. The querier cache system does not really have a
/// notion of chunks (rather it knows about parquet files, local FS caches, ingester data, cached
/// read buffers) but we need to combine all that knowledge into chunk objects because this is what
/// the query engine (DataFusion and InfluxRPC) expect.
#[derive(Debug)]
pub struct QuerierChunk {
/// How the data is currently structured / available for query.