From c022ab6786b95439c4ac499d6ffac941d3ed080e Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 3 Oct 2022 15:19:14 +0200 Subject: [PATCH] feat: deferred partition sort key fetcher Adds a new DeferredSortKey type that fetches a partition's sort key from the catalog in the background, or on-demand if not yet pre-fetched. From the caller's perspective, little has changed compared to reading it from the catalog directly - the sort key is always returned when calling get(), regardless of the mechanism, and retries are handled transparently. Internally the sort key MAY have been pre-fetched in the background between the DeferredSortKey being initialised, and the call to get(). The background task waits a (uniformly) random duration of time before issuing the catalog query to pre-fetch the sort key. This allows large numbers of DeferredSortKey to (randomly) smear the lookup queries over a large duration of time. This allows a large number of DeferredSortKey to be initialised in a short period of time, without creating an equally large spike in queries against the catalog in the same time period. --- Cargo.lock | 1 + ingester/Cargo.toml | 1 + ingester/src/data/partition/resolver/mod.rs | 3 + .../src/data/partition/resolver/sort_key.rs | 317 ++++++++++++++++++ 4 files changed, 322 insertions(+) create mode 100644 ingester/src/data/partition/resolver/sort_key.rs diff --git a/Cargo.lock b/Cargo.lock index 488aae235c..cdb9a2bd37 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2206,6 +2206,7 @@ dependencies = [ "pin-project", "predicate", "prost 0.11.0", + "rand", "schema", "snafu", "test_helpers", diff --git a/ingester/Cargo.toml b/ingester/Cargo.toml index beb94c37e9..51e01c3def 100644 --- a/ingester/Cargo.toml +++ b/ingester/Cargo.toml @@ -45,6 +45,7 @@ write_buffer = { path = "../write_buffer" } write_summary = { path = "../write_summary" } tokio-util = { version = "0.7.4" } trace = { path = "../trace" } +rand = "0.8.5" [dev-dependencies] assert_matches = "1.5.0" diff --git a/ingester/src/data/partition/resolver/mod.rs b/ingester/src/data/partition/resolver/mod.rs index fcb5e5fb6a..904eb781f5 100644 --- a/ingester/src/data/partition/resolver/mod.rs +++ b/ingester/src/data/partition/resolver/mod.rs @@ -11,6 +11,9 @@ pub use r#trait::*; mod catalog; pub use catalog::*; +mod sort_key; +pub(crate) use sort_key::*; + #[cfg(test)] mod mock; #[cfg(test)] diff --git a/ingester/src/data/partition/resolver/sort_key.rs b/ingester/src/data/partition/resolver/sort_key.rs new file mode 100644 index 0000000000..c0c5555963 --- /dev/null +++ b/ingester/src/data/partition/resolver/sort_key.rs @@ -0,0 +1,317 @@ +//! A optimised resolver of a partition [`SortKey`]. + +use std::{sync::Arc, time::Duration}; + +use backoff::{Backoff, BackoffConfig}; +use data_types::PartitionId; +use iox_catalog::interface::Catalog; +use parking_lot::Mutex; +use rand::Rng; +use schema::sort::SortKey; +use tokio::task::JoinHandle; + +/// The states of a [`DeferredSortKey`] instance. +#[derive(Debug)] +enum State { + /// The value has not yet been fetched by the background task. + Unresolved, + /// The value was fetched by the background task and is read to be consumed. + Resolved(Option), +} + +/// A resolver of [`SortKey`] from the catalog for a given partition. +/// +/// This implementation combines lazy / deferred loading of the [`SortKey`] from +/// the [`Catalog`], and a background timer that pre-fetches the [`SortKey`] +/// after some random duration of time. Combined, these behaviours smear the +/// [`SortKey`] queries across the allowable time range, avoiding a large number +/// of queries from executing when multiple [`SortKey`] are needed in the system +/// at one point in time. +/// +/// If the [`DeferredSortKey`] is dropped and the background task is still +/// incomplete (sleeping / actively fetching the [`SortKey`]) it is aborted +/// immediately. The background task exists once it has successfully fetched the +/// [`SortKey`]. +/// +/// # Stale Cached Values +/// +/// This is effectively a cache that is pre-warmed in the background - this +/// necessitates that the caller can tolerate, or determine, stale values. +#[derive(Debug)] +pub(crate) struct DeferredSortKey { + value: Arc>, + partition_id: PartitionId, + + handle: JoinHandle<()>, + + backoff_config: BackoffConfig, + catalog: Arc, +} + +impl DeferredSortKey { + /// Construct a [`DeferredSortKey`] instance that fetches the [`SortKey`] + /// for the specified `partition_id`. + /// + /// The background task will wait a uniformly random duration of time + /// between `[0, max_smear)` before attempting to pre-fetch the [`SortKey`] + /// from `catalog`. + pub(crate) fn new( + partition_id: PartitionId, + max_smear: Duration, + catalog: Arc, + backoff_config: BackoffConfig, + ) -> Self { + // Init the value container the background thread populates. + let value = Arc::new(Mutex::new(State::Unresolved)); + + // Select random duration from a uniform distribution, up to the + // configured maximum. + let wait_for = rand::thread_rng().gen_range(Duration::ZERO..max_smear); + + // Spawn the background task, sleeping for the random duration of time + // before fetching the sort key. + let handle = tokio::spawn({ + let value = Arc::clone(&value); + let catalog = Arc::clone(&catalog); + let backoff_config = backoff_config.clone(); + async move { + // Sleep for the random duration + tokio::time::sleep(wait_for).await; + // Fetch the sort key from the catalog + let v = fetch(partition_id, &*catalog, &backoff_config).await; + // And attempt to + let mut state = value.lock(); + *state = match *state { + State::Unresolved => State::Resolved(v), + State::Resolved(_) => return, + }; + } + }); + + Self { + value, + partition_id, + handle, + backoff_config, + catalog, + } + } + + /// Read the [`SortKey`] for the partition. + /// + /// If the [`SortKey`] was pre-fetched in the background, it is returned + /// immediately. If the [`SortKey`] has not yet been resolved, this call + /// blocks while it is read from the [`Catalog`]. + pub(crate) async fn get(&self) -> Option { + { + let state = self.value.lock(); + + // If there is a resolved value, return it. + if let State::Resolved(v) = &*state { + return v.clone(); + } + } + + // Otherwise resolve the value immediately, aborting the background + // task. + let sort_key = fetch(self.partition_id, &*self.catalog, &self.backoff_config).await; + + { + let mut state = self.value.lock(); + self.handle.abort(); + *state = State::Resolved(sort_key.clone()); + } + + sort_key + } +} + +impl Drop for DeferredSortKey { + fn drop(&mut self) { + // Attempt to abort the background task, regardless of it having + // completed or not. + self.handle.abort() + } +} + +/// Fetch the [`SortKey`] from the [`Catalog`] for `partition_id`, retrying +/// endlessly when errors occur. +async fn fetch( + partition_id: PartitionId, + catalog: &dyn Catalog, + backoff_config: &BackoffConfig, +) -> Option { + Backoff::new(backoff_config) + .retry_all_errors("fetch partition sort key", || async { + let s = catalog + .repositories() + .await + .partitions() + .get_by_id(partition_id) + .await? + .expect("resolving sort key for non-existent partition") + .sort_key(); + + Result::<_, iox_catalog::interface::Error>::Ok(s) + }) + .await + .expect("retry forever") +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use data_types::ShardIndex; + use test_helpers::timeout::FutureTimeout; + + use crate::test_util::populate_catalog; + + use super::*; + + const SHARD_INDEX: ShardIndex = ShardIndex::new(24); + const TABLE_NAME: &str = "bananas"; + const NAMESPACE_NAME: &str = "platanos"; + const PARTITION_KEY: &str = "platanos"; + + // A test that (most likely) exercises the "read on demand" code path. + // + // The background task is configured to run some time between now, and + // 10,000 hours in the future - it most likely doesn't get to complete + // before the get() call is issued. + // + // If this test flakes, it is POSSIBLE but UNLIKELY that the background task + // has completed and the get() call reads a pre-fetched value. + #[tokio::test] + async fn test_read_demand() { + let metrics = Arc::new(metric::Registry::default()); + let backoff_config = BackoffConfig::default(); + let catalog: Arc = + Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics))); + + // Populate the catalog with the shard / namespace / table + let (shard_id, _ns_id, table_id) = + populate_catalog(&*catalog, SHARD_INDEX, NAMESPACE_NAME, TABLE_NAME).await; + + let partition_id = catalog + .repositories() + .await + .partitions() + .create_or_get(PARTITION_KEY.into(), shard_id, table_id) + .await + .expect("should create") + .id; + + // Read the just-created sort key (None) + let fetched = DeferredSortKey::new( + partition_id, + Duration::from_secs(36_000_000), + Arc::clone(&catalog), + backoff_config.clone(), + ) + .get() + .await; + assert!(fetched.is_none()); + + // Set the sort key + let catalog_state = catalog + .repositories() + .await + .partitions() + .update_sort_key(partition_id, &["uno", "dos", "bananas"]) + .await + .expect("should update existing partition key"); + + // Read the updated sort key + let fetched = DeferredSortKey::new( + partition_id, + Duration::from_secs(10_000), + Arc::clone(&catalog), + backoff_config, + ) + .get() + .await; + + assert!(fetched.is_some()); + assert_eq!(fetched, catalog_state.sort_key()); + } + + // A test that deterministically exercises the "background pre-fetch" code path. + #[tokio::test] + async fn test_read_pre_fetched() { + let metrics = Arc::new(metric::Registry::default()); + let backoff_config = BackoffConfig::default(); + let catalog: Arc = + Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics))); + + // Populate the catalog with the shard / namespace / table + let (shard_id, _ns_id, table_id) = + populate_catalog(&*catalog, SHARD_INDEX, NAMESPACE_NAME, TABLE_NAME).await; + + let partition_id = catalog + .repositories() + .await + .partitions() + .create_or_get(PARTITION_KEY.into(), shard_id, table_id) + .await + .expect("should create") + .id; + + // Read the just-created sort key (None) + let fetcher = DeferredSortKey::new( + partition_id, + Duration::from_nanos(1), + Arc::clone(&catalog), + backoff_config.clone(), + ); + + // Spin, waiting for the background task to show as complete. + async { + loop { + if fetcher.handle.is_finished() { + return; + } + + tokio::task::yield_now().await + } + } + .with_timeout_panic(Duration::from_secs(5)) + .await; + + assert!(fetcher.get().await.is_none()); + + // Set the sort key + let catalog_state = catalog + .repositories() + .await + .partitions() + .update_sort_key(partition_id, &["uno", "dos", "bananas"]) + .await + .expect("should update existing partition key"); + + // Read the updated sort key + let fetcher = DeferredSortKey::new( + partition_id, + Duration::from_nanos(1), + Arc::clone(&catalog), + backoff_config.clone(), + ); + + // Spin, waiting for the background task to show as complete. + async { + loop { + if fetcher.handle.is_finished() { + return; + } + + tokio::task::yield_now().await + } + } + .with_timeout_panic(Duration::from_secs(5)) + .await; + + let fetched = fetcher.get().await; + assert!(fetched.is_some()); + assert_eq!(fetched, catalog_state.sort_key()); + } +}