diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 0cb9c2d66b..6471a15141 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -25,6 +25,7 @@ use write_summary::ShardProgress; use crate::{ compact::{compact_persisting_batch, CompactedStream}, + handler::NAMESPACE_NAME_PRE_FETCH, lifecycle::LifecycleHandle, }; @@ -36,7 +37,11 @@ pub(crate) mod table; pub(crate) use sequence_range::*; -use self::{partition::resolver::PartitionProvider, shard::ShardData}; +use self::{ + namespace::name_resolver::{NamespaceNameProvider, NamespaceNameResolver}, + partition::resolver::PartitionProvider, + shard::ShardData, +}; #[cfg(test)] mod triggers; @@ -127,6 +132,13 @@ impl IngesterData { }, ); + let namespace_name_provider: Arc = + Arc::new(NamespaceNameResolver::new( + NAMESPACE_NAME_PRE_FETCH, + Arc::clone(&catalog), + backoff_config.clone(), + )); + let shards = shards .into_iter() .map(|(id, index)| { @@ -135,6 +147,7 @@ impl IngesterData { ShardData::new( index, id, + Arc::clone(&namespace_name_provider), Arc::clone(&partition_provider), Arc::clone(&metrics), ), @@ -256,9 +269,14 @@ impl Persister for IngesterData { let namespace = self .shards .get(&shard_id) - .and_then(|s| s.namespace_by_id(namespace_id)) + .and_then(|s| s.namespace(namespace_id)) .unwrap_or_else(|| panic!("namespace {namespace_id} not in shard {shard_id} state")); + + // Begin resolving the load-deferred name concurrently if it is not + // already available. let namespace_name = namespace.namespace_name(); + namespace_name.prefetch_now(); + // Assert the namespace ID matches the index key. assert_eq!(namespace.namespace_id(), namespace_id); @@ -384,7 +402,7 @@ impl Persister for IngesterData { creation_timestamp: SystemProvider::new().now(), shard_id, namespace_id, - namespace_name: Arc::clone(&**namespace.namespace_name()), + namespace_name: Arc::clone(&*namespace.namespace_name().get().await), table_id, table_name: Arc::clone(&*table_name), partition_id, @@ -621,6 +639,7 @@ mod tests { use super::*; use crate::{ data::{namespace::NamespaceData, partition::resolver::CatalogPartitionResolver}, + deferred_load::DeferredLoad, lifecycle::{LifecycleConfig, LifecycleManager}, }; @@ -803,7 +822,7 @@ mod tests { let (table_id, partition_id) = { let sd = data.shards.get(&shard1.id).unwrap(); - let n = sd.namespace(&"foo".into()).unwrap(); + let n = sd.namespace(namespace.id).unwrap(); let mem_table = n.table_data(&"mem".into()).unwrap(); assert!(n.table_data(&"mem".into()).is_some()); let p = mem_table @@ -955,7 +974,7 @@ mod tests { assert_progress(&data, shard_index, expected_progress).await; let sd = data.shards.get(&shard1.id).unwrap(); - let n = sd.namespace(&"foo".into()).unwrap(); + let n = sd.namespace(namespace.id).unwrap(); let partition_id; let table_id; { @@ -1033,7 +1052,7 @@ mod tests { let cached_sort_key = data .shard(shard1.id) .unwrap() - .namespace_by_id(namespace.id) + .namespace(namespace.id) .unwrap() .table_id(table_id) .unwrap() @@ -1213,7 +1232,7 @@ mod tests { // Get the namespace let sd = data.shards.get(&shard1.id).unwrap(); - let n = sd.namespace(&"foo".into()).unwrap(); + let n = sd.namespace(namespace.id).unwrap(); let expected_progress = ShardProgress::new().with_buffered(SequenceNumber::new(1)); assert_progress(&data, shard_index, expected_progress).await; @@ -1383,7 +1402,7 @@ mod tests { let data = NamespaceData::new( namespace.id, - "foo".into(), + DeferredLoad::new(Duration::from_millis(1), async { "foo".into() }), shard.id, partition_provider, &metrics, diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index e4d593a82e..d6e54e9771 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -1,5 +1,7 @@ //! Namespace level data buffer structures. +pub(crate) mod name_resolver; + use std::{collections::HashMap, sync::Arc}; use data_types::{NamespaceId, SequenceNumber, ShardId, TableId}; @@ -15,7 +17,7 @@ use super::{ partition::resolver::PartitionProvider, table::{TableData, TableName}, }; -use crate::{data::DmlApplyAction, lifecycle::LifecycleHandle}; +use crate::{data::DmlApplyAction, deferred_load::DeferredLoad, lifecycle::LifecycleHandle}; /// A double-referenced map where [`TableData`] can be looked up by name, or ID. #[derive(Debug, Default)] @@ -78,7 +80,7 @@ impl std::fmt::Display for NamespaceName { #[derive(Debug)] pub(crate) struct NamespaceData { namespace_id: NamespaceId, - namespace_name: NamespaceName, + namespace_name: DeferredLoad, /// The catalog ID of the shard this namespace is being populated from. shard_id: ShardId, @@ -142,7 +144,7 @@ impl NamespaceData { /// Initialize new tables with default partition template of daily pub(super) fn new( namespace_id: NamespaceId, - namespace_name: NamespaceName, + namespace_name: DeferredLoad, shard_id: ShardId, partition_provider: Arc, metrics: &metric::Registry, @@ -308,7 +310,7 @@ impl NamespaceData { } /// Returns the [`NamespaceName`] for this namespace. - pub(crate) fn namespace_name(&self) -> &NamespaceName { + pub(crate) fn namespace_name(&self) -> &DeferredLoad { &self.namespace_name } } @@ -348,13 +350,14 @@ impl<'a> Drop for ScopedSequenceNumber<'a> { #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{sync::Arc, time::Duration}; use data_types::{PartitionId, PartitionKey, ShardIndex}; use metric::{Attributes, Metric}; use crate::{ data::partition::{resolver::MockPartitionProvider, PartitionData, SortKeyState}, + deferred_load, lifecycle::mock_handle::MockLifecycleHandle, test_util::{make_write_op, TEST_TABLE}, }; @@ -389,14 +392,18 @@ mod tests { let ns = NamespaceData::new( NAMESPACE_ID, - NAMESPACE_NAME.into(), + DeferredLoad::new(Duration::from_millis(1), async { NAMESPACE_NAME.into() }), SHARD_ID, partition_provider, &metrics, ); // Assert the namespace name was stored - assert_eq!(ns.namespace_name().to_string(), NAMESPACE_NAME); + let name = ns.namespace_name().to_string(); + assert!( + (name == NAMESPACE_NAME) || (name == deferred_load::UNRESOLVED_DISPLAY_STRING), + "unexpected namespace name: {name}" + ); // Assert the namespace does not contain the test data assert!(ns.table_data(&TABLE_NAME.into()).is_none()); @@ -430,5 +437,10 @@ mod tests { .expect("failed to get observer") .fetch(); assert_eq!(tables, 1); + + // Ensure the deferred namespace name is loaded. + let name = ns.namespace_name().get().await; + assert_eq!(&**name, NAMESPACE_NAME); + assert_eq!(ns.namespace_name().to_string(), NAMESPACE_NAME); } } diff --git a/ingester/src/data/namespace/name_resolver.rs b/ingester/src/data/namespace/name_resolver.rs new file mode 100644 index 0000000000..282bc50714 --- /dev/null +++ b/ingester/src/data/namespace/name_resolver.rs @@ -0,0 +1,138 @@ +use std::{sync::Arc, time::Duration}; + +use backoff::{Backoff, BackoffConfig}; +use data_types::NamespaceId; +use iox_catalog::interface::Catalog; + +use crate::deferred_load::DeferredLoad; + +use super::NamespaceName; + +pub(crate) trait NamespaceNameProvider: Send + Sync + std::fmt::Debug { + fn for_namespace(&self, id: NamespaceId) -> DeferredLoad; +} + +#[derive(Debug)] +pub(crate) struct NamespaceNameResolver { + max_smear: Duration, + catalog: Arc, + backoff_config: BackoffConfig, +} + +impl NamespaceNameResolver { + pub(crate) fn new( + max_smear: Duration, + catalog: Arc, + backoff_config: BackoffConfig, + ) -> Self { + Self { + max_smear, + catalog, + backoff_config, + } + } + + /// Fetch the [`NamespaceName`] from the [`Catalog`] for specified + /// `namespace_id`, retrying endlessly when errors occur. + pub(crate) async fn fetch( + namespace_id: NamespaceId, + catalog: Arc, + backoff_config: BackoffConfig, + ) -> NamespaceName { + Backoff::new(&backoff_config) + .retry_all_errors("fetch namespace name", || async { + let s = catalog + .repositories() + .await + .namespaces() + .get_by_id(namespace_id) + .await? + .expect("resolving namespace name for non-existent namespace id") + .name + .into(); + + Result::<_, iox_catalog::interface::Error>::Ok(s) + }) + .await + .expect("retry forever") + } +} + +impl NamespaceNameProvider for NamespaceNameResolver { + fn for_namespace(&self, id: NamespaceId) -> DeferredLoad { + DeferredLoad::new( + self.max_smear, + Self::fetch(id, Arc::clone(&self.catalog), self.backoff_config.clone()), + ) + } +} + +#[cfg(test)] +pub(crate) mod mock { + use super::*; + + #[derive(Debug)] + pub(crate) struct MockNamespaceNameProvider { + name: NamespaceName, + } + + impl MockNamespaceNameProvider { + pub(crate) fn new(name: impl Into) -> Self { + Self { name: name.into() } + } + } + + impl Default for MockNamespaceNameProvider { + fn default() -> Self { + Self::new("bananas") + } + } + + impl NamespaceNameProvider for MockNamespaceNameProvider { + fn for_namespace(&self, _id: NamespaceId) -> DeferredLoad { + let name = self.name.clone(); + DeferredLoad::new(Duration::from_secs(1), async { name }) + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use data_types::ShardIndex; + use test_helpers::timeout::FutureTimeout; + + use crate::test_util::populate_catalog; + + use super::*; + + const SHARD_INDEX: ShardIndex = ShardIndex::new(24); + const TABLE_NAME: &str = "bananas"; + const NAMESPACE_NAME: &str = "platanos"; + + #[tokio::test] + async fn test_fetch() { + let metrics = Arc::new(metric::Registry::default()); + let backoff_config = BackoffConfig::default(); + let catalog: Arc = + Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics))); + + // Populate the catalog with the shard / namespace / table + let (_shard_id, ns_id, _table_id) = + populate_catalog(&*catalog, SHARD_INDEX, NAMESPACE_NAME, TABLE_NAME).await; + + let fetcher = Arc::new(NamespaceNameResolver::new( + Duration::from_secs(10), + Arc::clone(&catalog), + backoff_config.clone(), + )); + + let got = fetcher + .for_namespace(ns_id) + .get() + .with_timeout_panic(Duration::from_secs(5)) + .await; + assert_eq!(&**got, NAMESPACE_NAME); + } +} diff --git a/ingester/src/data/partition/resolver/sort_key.rs b/ingester/src/data/partition/resolver/sort_key.rs index e792a20064..bd516ca9c2 100644 --- a/ingester/src/data/partition/resolver/sort_key.rs +++ b/ingester/src/data/partition/resolver/sort_key.rs @@ -7,7 +7,7 @@ use data_types::PartitionId; use iox_catalog::interface::Catalog; use schema::sort::SortKey; -/// A resolver of [`SortKey`] from the catalog for a given partition. +/// A resolver of [`SortKey`] from the catalog for a given [`PartitionId`]. #[derive(Debug)] pub(crate) struct SortKeyResolver { partition_id: PartitionId, diff --git a/ingester/src/data/shard.rs b/ingester/src/data/shard.rs index 132d9ef34f..184af03da4 100644 --- a/ingester/src/data/shard.rs +++ b/ingester/src/data/shard.rs @@ -5,44 +5,15 @@ use std::sync::Arc; use data_types::{NamespaceId, ShardId, ShardIndex}; use dml::DmlOperation; use metric::U64Counter; -use parking_lot::RwLock; use write_summary::ShardProgress; use super::{ - namespace::{NamespaceData, NamespaceName}, + namespace::{name_resolver::NamespaceNameProvider, NamespaceData}, partition::resolver::PartitionProvider, DmlApplyAction, }; use crate::{arcmap::ArcMap, lifecycle::LifecycleHandle}; -/// A double-referenced map where [`NamespaceData`] can be looked up by name, or -/// ID. -#[derive(Debug, Default)] -struct DoubleRef { - // TODO(4880): this can be removed when IDs are sent over the wire. - by_name: ArcMap, - by_id: ArcMap, -} - -impl DoubleRef { - fn insert(&mut self, name: NamespaceName, ns: NamespaceData) -> Arc { - let id = ns.namespace_id(); - - let ns = Arc::new(ns); - self.by_name.insert(name, Arc::clone(&ns)); - self.by_id.insert(id, Arc::clone(&ns)); - ns - } - - fn by_name(&self, name: &NamespaceName) -> Option> { - self.by_name.get(name) - } - - fn by_id(&self, id: NamespaceId) -> Option> { - self.by_id.get(&id) - } -} - /// Data of a Shard #[derive(Debug)] pub(crate) struct ShardData { @@ -57,8 +28,17 @@ pub(crate) struct ShardData { /// [`PartitionData`]: super::partition::PartitionData partition_provider: Arc, - // New namespaces can come in at any time so we need to be able to add new ones - namespaces: RwLock, + /// A set of namespaces this [`ShardData`] instance has processed + /// [`DmlOperation`]'s for. + /// + /// The [`NamespaceNameProvider`] acts as a [`DeferredLoad`] constructor to + /// resolve the [`NamespaceName`] for new [`NamespaceData`] out of the hot + /// path. + /// + /// [`DeferredLoad`]: crate::deferred_load::DeferredLoad + /// [`NamespaceName`]: data_types::NamespaceName + namespaces: ArcMap, + namespace_name_resolver: Arc, metrics: Arc, namespace_count: U64Counter, @@ -69,6 +49,7 @@ impl ShardData { pub(crate) fn new( shard_index: ShardIndex, shard_id: ShardId, + namespace_name_resolver: Arc, partition_provider: Arc, metrics: Arc, ) -> Self { @@ -83,28 +64,33 @@ impl ShardData { shard_index, shard_id, namespaces: Default::default(), + namespace_name_resolver, metrics, partition_provider, namespace_count, } } - /// Store the write or delete in the shard. Deletes will - /// be written into the catalog before getting stored in the buffer. - /// Any writes that create new IOx partitions will have those records - /// created in the catalog before putting into the buffer. + /// Buffer the provided [`DmlOperation`] into the ingester state. pub(super) async fn buffer_operation( &self, dml_operation: DmlOperation, lifecycle_handle: &dyn LifecycleHandle, ) -> Result { - let namespace_data = match self.namespace(&NamespaceName::from(dml_operation.namespace())) { - Some(d) => d, - None => { - self.insert_namespace(dml_operation.namespace(), dml_operation.namespace_id()) - .await? - } - }; + let namespace_id = dml_operation.namespace_id(); + let namespace_data = self.namespaces.get_or_insert_with(&namespace_id, || { + // Increase the metric that records the number of namespaces + // buffered in this ingester instance. + self.namespace_count.inc(1); + + Arc::new(NamespaceData::new( + namespace_id, + self.namespace_name_resolver.for_namespace(namespace_id), + self.shard_id, + Arc::clone(&self.partition_provider), + &self.metrics, + )) + }); namespace_data .buffer_operation(dml_operation, lifecycle_handle) @@ -112,55 +98,13 @@ impl ShardData { } /// Gets the namespace data out of the map - pub(crate) fn namespace(&self, namespace: &NamespaceName) -> Option> { - let n = self.namespaces.read(); - n.by_name(namespace) - } - - /// Gets the namespace data out of the map - pub(crate) fn namespace_by_id(&self, namespace_id: NamespaceId) -> Option> { - // TODO: this should be the default once IDs are pushed over the wire. - // - // At which point the map should be indexed by IDs, instead of namespace - // names. - let n = self.namespaces.read(); - n.by_id(namespace_id) - } - - /// Retrieves the namespace from the catalog and initializes an empty buffer, or - /// retrieves the buffer if some other caller gets it first - async fn insert_namespace( - &self, - namespace: &str, - namespace_id: NamespaceId, - ) -> Result, super::Error> { - let ns_name = NamespaceName::from(namespace); - - let mut n = self.namespaces.write(); - - Ok(match n.by_name(&ns_name) { - Some(v) => v, - None => { - self.namespace_count.inc(1); - - // Insert the table and then return a ref to it. - n.insert( - ns_name.clone(), - NamespaceData::new( - namespace_id, - ns_name, - self.shard_id, - Arc::clone(&self.partition_provider), - &self.metrics, - ), - ) - } - }) + pub(crate) fn namespace(&self, namespace_id: NamespaceId) -> Option> { + self.namespaces.get(&namespace_id) } /// Return the progress of this shard pub(super) async fn progress(&self) -> ShardProgress { - let namespaces: Vec<_> = self.namespaces.read().by_id.values(); + let namespaces: Vec<_> = self.namespaces.values(); let mut progress = ShardProgress::new(); @@ -184,7 +128,10 @@ mod tests { use metric::{Attributes, Metric}; use crate::{ - data::partition::{resolver::MockPartitionProvider, PartitionData, SortKeyState}, + data::{ + namespace::name_resolver::mock::MockNamespaceNameProvider, + partition::{resolver::MockPartitionProvider, PartitionData, SortKeyState}, + }, lifecycle::mock_handle::MockLifecycleHandle, test_util::{make_write_op, TEST_TABLE}, }; @@ -199,7 +146,7 @@ mod tests { const NAMESPACE_ID: NamespaceId = NamespaceId::new(42); #[tokio::test] - async fn test_shard_double_ref() { + async fn test_shard_init_namespace() { let metrics = Arc::new(metric::Registry::default()); // Configure the mock partition provider to return a partition for this @@ -220,13 +167,13 @@ mod tests { let shard = ShardData::new( SHARD_INDEX, SHARD_ID, + Arc::new(MockNamespaceNameProvider::new(NAMESPACE_NAME)), partition_provider, Arc::clone(&metrics), ); // Assert the namespace does not contain the test data - assert!(shard.namespace(&NAMESPACE_NAME.into()).is_none()); - assert!(shard.namespace_by_id(NAMESPACE_ID).is_none()); + assert!(shard.namespace(NAMESPACE_ID).is_none()); // Write some test data shard @@ -245,9 +192,7 @@ mod tests { .await .expect("buffer op should succeed"); - // Both forms of referencing the table should succeed - assert!(shard.namespace(&NAMESPACE_NAME.into()).is_some()); - assert!(shard.namespace_by_id(NAMESPACE_ID).is_some()); + assert!(shard.namespace(NAMESPACE_ID).is_some()); // And the table counter metric should increase let tables = metrics diff --git a/ingester/src/deferred_load.rs b/ingester/src/deferred_load.rs index 539eaefa0a..45526a36ae 100644 --- a/ingester/src/deferred_load.rs +++ b/ingester/src/deferred_load.rs @@ -1,6 +1,6 @@ //! Generic deferred execution of arbitrary [`Future`]'s. -use std::{sync::Arc, time::Duration}; +use std::{fmt::Display, sync::Arc, time::Duration}; use futures::Future; use observability_deps::tracing::*; @@ -14,6 +14,11 @@ use tokio::{ task::JoinHandle, }; +/// [`UNRESOLVED_DISPLAY_STRING`] defines the string shown when invoking the +/// [`Display`] implementation on a [`DeferredLoad`] that has not yet resolved +/// the deferred value. +pub(crate) const UNRESOLVED_DISPLAY_STRING: &str = ""; + /// The states of a [`DeferredLoad`] instance. #[derive(Debug)] enum State { @@ -68,6 +73,86 @@ where } } +impl Display for DeferredLoad +where + T: Display, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self.value.lock().as_ref().unwrap() { + State::Unresolved(_) | State::Loading(_) => f.write_str(UNRESOLVED_DISPLAY_STRING), + State::Resolved(v) => v.fmt(f), + } + } +} + +impl DeferredLoad { + /// Provide a hint to the [`DeferredLoad`] that the value will be used soon. + /// + /// This allows the value to be materialised in the background, in parallel + /// while the caller is executing code that will eventually call + /// [`Self::get()`]. + pub(crate) fn prefetch_now(&self) { + let mut state = self.value.lock(); + + // If the value has already resolved, this call is a NOP. + if let Some(State::Resolved(_)) = &*state { + return; + } + + // Potentially transition the state, discarding the waker. + let (_waker, new_state) = self.get_load_waker(state.take().unwrap()); + *state = Some(new_state); + } + + /// Potentially transition `state`, returning the [`Notify`] that will be + /// signalled when loading the value completes, and the (potentially + /// changed) state. + /// + /// # Panics + /// + /// This method panics if `state` is [`State::Resolved`]. + fn get_load_waker(&self, state: State) -> (Arc, State) { + let waker = match state { + // This caller is the first to demand the value - wake the + // background task, initialise the notification mechanism and + // wait for the task to complete. + State::Unresolved(task_waker) => { + // Wake the running background task, ignoring any send error + // as the background task may have concurrently woken up due + // to the sleep timer and stopped listening on the waker + // channel. + let _ = task_waker.send(()); + + // Replace the state with a notification for this thread + // (and others that call get()) to wait on for the + // concurrent fetch to complete. + Arc::new(Notify::default()) + } + + // If the value is already being fetched, wait for the fetch to + // complete. + State::Loading(waker) => waker, + + // This was checked above before take()-ing the state. + State::Resolved(_) => unreachable!(), + }; + + // Ensure any subsequent callers can subscribe to the completion + // event by transitioning to the loading state. + let state = State::Loading(Arc::clone(&waker)); + + // Whenever there is a waker for the caller, the background task + // MUST be running. + // + // This check happens before the state lock is released, ensuring + // the background task doesn't concurrently finish (it would be + // blocked waiting to update the state). + assert!(!self.handle.is_finished()); + + (waker, state) + } +} + impl DeferredLoad where T: Send + Sync + 'static, @@ -179,42 +264,8 @@ where // If execution reaches here, this call will have to wait for the // value to be resolved, and potentially must wake the background // task to do so. - let waker = match state.take().unwrap() { - // This caller is the first to demand the value - wake the - // background task, initialise the notification mechanism and - // wait for the task to complete. - State::Unresolved(task_waker) => { - // Wake the running background task, ignoring any send error - // as the background task may have concurrently woken up due - // to the sleep timer and stopped listening on the waker - // channel. - let _ = task_waker.send(()); - - // Replace the state with a notification for this thread - // (and others that call get()) to wait on for the - // concurrent fetch to complete. - Arc::new(Notify::default()) - } - - // If the value is already being fetched, wait for the fetch to - // complete. - State::Loading(waker) => waker, - - // This was checked above before take()-ing the state. - State::Resolved(_) => unreachable!(), - }; - - // Ensure any subsequent callers can subscribe to the completion - // event. - *state = Some(State::Loading(Arc::clone(&waker))); - - // Whenever there is a waker for the caller, the background task - // MUST be running. - // - // This check happens before the state lock is released, ensuring - // the background task doesn't concurrently finish (it would be - // blocked waiting to update the state). - assert!(!self.handle.is_finished()); + let (waker, new_state) = self.get_load_waker(state.take().unwrap()); + *state = Some(new_state); waker }; @@ -371,4 +422,91 @@ mod tests { fut.as_mut().with_timeout_panic(TIMEOUT).await; assert_eq!(fut.as_mut().take_output(), Some(42)); } + + #[tokio::test] + async fn test_display() { + // This channel is used to block the background task from completing + // after the above channel has signalled it has begun. + let (allow_complete, can_complete) = oneshot::channel(); + + // Configure the background load to fire (practically) immediately but + // block waiting for rx to be unblocked. + let d = Arc::new(DeferredLoad::new(Duration::from_millis(1), async { + // Wait for the test thread to issue the demand call and unblock + // this fn. + can_complete.await.expect("sender died"); + 42 + })); + + assert_eq!("", d.to_string()); + + // Issue a demand call + let fut = future::maybe_done(d.get()); + pin_mut!(fut); + assert_eq!(fut.as_mut().take_output(), None); + + assert_eq!("", d.to_string()); + + // Unblock the background task. + allow_complete.send(()).expect("background task died"); + + // And await the demand call + fut.as_mut().await; + assert_eq!(fut.as_mut().take_output(), Some(42)); + + // And assert Display is delegated to the resolved value + assert_eq!("42", d.to_string()); + } + + #[tokio::test] + async fn test_prefetch_concurrent_demand() { + // This channel is used to signal the background load has begun. + let (signal_start, started) = oneshot::channel(); + + // This channel is used to block the background task from completing + // after the above channel has signalled it has begun. + let (allow_complete, can_complete) = oneshot::channel(); + + // Configure the background load to fire (practically) immediately but + // block waiting for rx to be unblocked. + // + // This allows the current thread time to issue a demand and wait on the + // result before the background load completes. + let d = Arc::new(DeferredLoad::new(LONG_LONG_TIME, async { + // Signal the background task has begun. + signal_start.send(()).expect("test task died"); + // Wait for the test thread to issue the demand call and unblock + // this fn. + can_complete.await.expect("sender died"); + 42 + })); + + d.prefetch_now(); + + // Wait for the background task to begin. + started + .with_timeout_panic(Duration::from_secs(5)) + .await + .expect("background task died"); + + // Issue a demand call + let fut = future::maybe_done(d.get()); + pin_mut!(fut); + assert_eq!(fut.as_mut().take_output(), None); + + // Unblock the background task. + allow_complete.send(()).expect("background task died"); + + // And await the demand call + fut.as_mut().await; + assert_eq!(fut.as_mut().take_output(), Some(42)); + } + + #[tokio::test] + async fn test_prefetch_already_loaded() { + let d = Arc::new(DeferredLoad::new(LONG_LONG_TIME, async { 42 })); + + let _ = d.get().with_timeout_panic(TIMEOUT).await; + d.prefetch_now(); + } } diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index c7f3c10535..2337517b38 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -46,7 +46,16 @@ use crate::{ /// /// [`PartitionData`]: crate::data::partition::PartitionData /// [`SortKey`]: schema::sort::SortKey -const SORT_KEY_PRE_FETCH: Duration = Duration::from_secs(30); +pub(crate) const SORT_KEY_PRE_FETCH: Duration = Duration::from_secs(60); + +/// The maximum duration of time between observing an initialising the +/// [`NamespaceData`] in response to observing an operation for a namespace, and +/// fetching the string identifier for it in the background via a +/// [`DeferredLoad`]. +/// +/// [`NamespaceData`]: crate::data::namespace::NamespaceData +/// [`DeferredLoad`]: crate::deferred_load::DeferredLoad +pub(crate) const NAMESPACE_NAME_PRE_FETCH: Duration = Duration::from_secs(60); #[derive(Debug, Snafu)] #[allow(missing_copy_implementations, missing_docs)] diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index 8b75c502bb..abb607925e 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -263,7 +263,7 @@ pub async fn prepare_data_to_querier( let mut found_namespace = false; for (shard_id, shard_data) in ingest_data.shards() { - let namespace_data = match shard_data.namespace_by_id(request.namespace_id) { + let namespace_data = match shard_data.namespace(request.namespace_id) { Some(namespace_data) => { trace!( shard_id=%shard_id.get(),