Merge pull request #6121 from influxdata/dom/deferred-namespace-name
perf(ingester): deferred namespace name discoverypull/24376/head
commit
1b322a6a6e
|
@ -25,6 +25,7 @@ use write_summary::ShardProgress;
|
|||
|
||||
use crate::{
|
||||
compact::{compact_persisting_batch, CompactedStream},
|
||||
handler::NAMESPACE_NAME_PRE_FETCH,
|
||||
lifecycle::LifecycleHandle,
|
||||
};
|
||||
|
||||
|
@ -36,7 +37,11 @@ pub(crate) mod table;
|
|||
|
||||
pub(crate) use sequence_range::*;
|
||||
|
||||
use self::{partition::resolver::PartitionProvider, shard::ShardData};
|
||||
use self::{
|
||||
namespace::name_resolver::{NamespaceNameProvider, NamespaceNameResolver},
|
||||
partition::resolver::PartitionProvider,
|
||||
shard::ShardData,
|
||||
};
|
||||
|
||||
#[cfg(test)]
|
||||
mod triggers;
|
||||
|
@ -127,6 +132,13 @@ impl IngesterData {
|
|||
},
|
||||
);
|
||||
|
||||
let namespace_name_provider: Arc<dyn NamespaceNameProvider> =
|
||||
Arc::new(NamespaceNameResolver::new(
|
||||
NAMESPACE_NAME_PRE_FETCH,
|
||||
Arc::clone(&catalog),
|
||||
backoff_config.clone(),
|
||||
));
|
||||
|
||||
let shards = shards
|
||||
.into_iter()
|
||||
.map(|(id, index)| {
|
||||
|
@ -135,6 +147,7 @@ impl IngesterData {
|
|||
ShardData::new(
|
||||
index,
|
||||
id,
|
||||
Arc::clone(&namespace_name_provider),
|
||||
Arc::clone(&partition_provider),
|
||||
Arc::clone(&metrics),
|
||||
),
|
||||
|
@ -256,9 +269,14 @@ impl Persister for IngesterData {
|
|||
let namespace = self
|
||||
.shards
|
||||
.get(&shard_id)
|
||||
.and_then(|s| s.namespace_by_id(namespace_id))
|
||||
.and_then(|s| s.namespace(namespace_id))
|
||||
.unwrap_or_else(|| panic!("namespace {namespace_id} not in shard {shard_id} state"));
|
||||
|
||||
// Begin resolving the load-deferred name concurrently if it is not
|
||||
// already available.
|
||||
let namespace_name = namespace.namespace_name();
|
||||
namespace_name.prefetch_now();
|
||||
|
||||
// Assert the namespace ID matches the index key.
|
||||
assert_eq!(namespace.namespace_id(), namespace_id);
|
||||
|
||||
|
@ -384,7 +402,7 @@ impl Persister for IngesterData {
|
|||
creation_timestamp: SystemProvider::new().now(),
|
||||
shard_id,
|
||||
namespace_id,
|
||||
namespace_name: Arc::clone(&**namespace.namespace_name()),
|
||||
namespace_name: Arc::clone(&*namespace.namespace_name().get().await),
|
||||
table_id,
|
||||
table_name: Arc::clone(&*table_name),
|
||||
partition_id,
|
||||
|
@ -621,6 +639,7 @@ mod tests {
|
|||
use super::*;
|
||||
use crate::{
|
||||
data::{namespace::NamespaceData, partition::resolver::CatalogPartitionResolver},
|
||||
deferred_load::DeferredLoad,
|
||||
lifecycle::{LifecycleConfig, LifecycleManager},
|
||||
};
|
||||
|
||||
|
@ -803,7 +822,7 @@ mod tests {
|
|||
|
||||
let (table_id, partition_id) = {
|
||||
let sd = data.shards.get(&shard1.id).unwrap();
|
||||
let n = sd.namespace(&"foo".into()).unwrap();
|
||||
let n = sd.namespace(namespace.id).unwrap();
|
||||
let mem_table = n.table_data(&"mem".into()).unwrap();
|
||||
assert!(n.table_data(&"mem".into()).is_some());
|
||||
let p = mem_table
|
||||
|
@ -955,7 +974,7 @@ mod tests {
|
|||
assert_progress(&data, shard_index, expected_progress).await;
|
||||
|
||||
let sd = data.shards.get(&shard1.id).unwrap();
|
||||
let n = sd.namespace(&"foo".into()).unwrap();
|
||||
let n = sd.namespace(namespace.id).unwrap();
|
||||
let partition_id;
|
||||
let table_id;
|
||||
{
|
||||
|
@ -1033,7 +1052,7 @@ mod tests {
|
|||
let cached_sort_key = data
|
||||
.shard(shard1.id)
|
||||
.unwrap()
|
||||
.namespace_by_id(namespace.id)
|
||||
.namespace(namespace.id)
|
||||
.unwrap()
|
||||
.table_id(table_id)
|
||||
.unwrap()
|
||||
|
@ -1213,7 +1232,7 @@ mod tests {
|
|||
|
||||
// Get the namespace
|
||||
let sd = data.shards.get(&shard1.id).unwrap();
|
||||
let n = sd.namespace(&"foo".into()).unwrap();
|
||||
let n = sd.namespace(namespace.id).unwrap();
|
||||
|
||||
let expected_progress = ShardProgress::new().with_buffered(SequenceNumber::new(1));
|
||||
assert_progress(&data, shard_index, expected_progress).await;
|
||||
|
@ -1383,7 +1402,7 @@ mod tests {
|
|||
|
||||
let data = NamespaceData::new(
|
||||
namespace.id,
|
||||
"foo".into(),
|
||||
DeferredLoad::new(Duration::from_millis(1), async { "foo".into() }),
|
||||
shard.id,
|
||||
partition_provider,
|
||||
&metrics,
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
//! Namespace level data buffer structures.
|
||||
|
||||
pub(crate) mod name_resolver;
|
||||
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use data_types::{NamespaceId, SequenceNumber, ShardId, TableId};
|
||||
|
@ -15,7 +17,7 @@ use super::{
|
|||
partition::resolver::PartitionProvider,
|
||||
table::{TableData, TableName},
|
||||
};
|
||||
use crate::{data::DmlApplyAction, lifecycle::LifecycleHandle};
|
||||
use crate::{data::DmlApplyAction, deferred_load::DeferredLoad, lifecycle::LifecycleHandle};
|
||||
|
||||
/// A double-referenced map where [`TableData`] can be looked up by name, or ID.
|
||||
#[derive(Debug, Default)]
|
||||
|
@ -78,7 +80,7 @@ impl std::fmt::Display for NamespaceName {
|
|||
#[derive(Debug)]
|
||||
pub(crate) struct NamespaceData {
|
||||
namespace_id: NamespaceId,
|
||||
namespace_name: NamespaceName,
|
||||
namespace_name: DeferredLoad<NamespaceName>,
|
||||
|
||||
/// The catalog ID of the shard this namespace is being populated from.
|
||||
shard_id: ShardId,
|
||||
|
@ -142,7 +144,7 @@ impl NamespaceData {
|
|||
/// Initialize new tables with default partition template of daily
|
||||
pub(super) fn new(
|
||||
namespace_id: NamespaceId,
|
||||
namespace_name: NamespaceName,
|
||||
namespace_name: DeferredLoad<NamespaceName>,
|
||||
shard_id: ShardId,
|
||||
partition_provider: Arc<dyn PartitionProvider>,
|
||||
metrics: &metric::Registry,
|
||||
|
@ -308,7 +310,7 @@ impl NamespaceData {
|
|||
}
|
||||
|
||||
/// Returns the [`NamespaceName`] for this namespace.
|
||||
pub(crate) fn namespace_name(&self) -> &NamespaceName {
|
||||
pub(crate) fn namespace_name(&self) -> &DeferredLoad<NamespaceName> {
|
||||
&self.namespace_name
|
||||
}
|
||||
}
|
||||
|
@ -348,13 +350,14 @@ impl<'a> Drop for ScopedSequenceNumber<'a> {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use data_types::{PartitionId, PartitionKey, ShardIndex};
|
||||
use metric::{Attributes, Metric};
|
||||
|
||||
use crate::{
|
||||
data::partition::{resolver::MockPartitionProvider, PartitionData, SortKeyState},
|
||||
deferred_load,
|
||||
lifecycle::mock_handle::MockLifecycleHandle,
|
||||
test_util::{make_write_op, TEST_TABLE},
|
||||
};
|
||||
|
@ -389,14 +392,18 @@ mod tests {
|
|||
|
||||
let ns = NamespaceData::new(
|
||||
NAMESPACE_ID,
|
||||
NAMESPACE_NAME.into(),
|
||||
DeferredLoad::new(Duration::from_millis(1), async { NAMESPACE_NAME.into() }),
|
||||
SHARD_ID,
|
||||
partition_provider,
|
||||
&metrics,
|
||||
);
|
||||
|
||||
// Assert the namespace name was stored
|
||||
assert_eq!(ns.namespace_name().to_string(), NAMESPACE_NAME);
|
||||
let name = ns.namespace_name().to_string();
|
||||
assert!(
|
||||
(name == NAMESPACE_NAME) || (name == deferred_load::UNRESOLVED_DISPLAY_STRING),
|
||||
"unexpected namespace name: {name}"
|
||||
);
|
||||
|
||||
// Assert the namespace does not contain the test data
|
||||
assert!(ns.table_data(&TABLE_NAME.into()).is_none());
|
||||
|
@ -430,5 +437,10 @@ mod tests {
|
|||
.expect("failed to get observer")
|
||||
.fetch();
|
||||
assert_eq!(tables, 1);
|
||||
|
||||
// Ensure the deferred namespace name is loaded.
|
||||
let name = ns.namespace_name().get().await;
|
||||
assert_eq!(&**name, NAMESPACE_NAME);
|
||||
assert_eq!(ns.namespace_name().to_string(), NAMESPACE_NAME);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,138 @@
|
|||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
use data_types::NamespaceId;
|
||||
use iox_catalog::interface::Catalog;
|
||||
|
||||
use crate::deferred_load::DeferredLoad;
|
||||
|
||||
use super::NamespaceName;
|
||||
|
||||
pub(crate) trait NamespaceNameProvider: Send + Sync + std::fmt::Debug {
|
||||
fn for_namespace(&self, id: NamespaceId) -> DeferredLoad<NamespaceName>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct NamespaceNameResolver {
|
||||
max_smear: Duration,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
backoff_config: BackoffConfig,
|
||||
}
|
||||
|
||||
impl NamespaceNameResolver {
|
||||
pub(crate) fn new(
|
||||
max_smear: Duration,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
backoff_config: BackoffConfig,
|
||||
) -> Self {
|
||||
Self {
|
||||
max_smear,
|
||||
catalog,
|
||||
backoff_config,
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch the [`NamespaceName`] from the [`Catalog`] for specified
|
||||
/// `namespace_id`, retrying endlessly when errors occur.
|
||||
pub(crate) async fn fetch(
|
||||
namespace_id: NamespaceId,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
backoff_config: BackoffConfig,
|
||||
) -> NamespaceName {
|
||||
Backoff::new(&backoff_config)
|
||||
.retry_all_errors("fetch namespace name", || async {
|
||||
let s = catalog
|
||||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.get_by_id(namespace_id)
|
||||
.await?
|
||||
.expect("resolving namespace name for non-existent namespace id")
|
||||
.name
|
||||
.into();
|
||||
|
||||
Result::<_, iox_catalog::interface::Error>::Ok(s)
|
||||
})
|
||||
.await
|
||||
.expect("retry forever")
|
||||
}
|
||||
}
|
||||
|
||||
impl NamespaceNameProvider for NamespaceNameResolver {
|
||||
fn for_namespace(&self, id: NamespaceId) -> DeferredLoad<NamespaceName> {
|
||||
DeferredLoad::new(
|
||||
self.max_smear,
|
||||
Self::fetch(id, Arc::clone(&self.catalog), self.backoff_config.clone()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod mock {
|
||||
use super::*;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct MockNamespaceNameProvider {
|
||||
name: NamespaceName,
|
||||
}
|
||||
|
||||
impl MockNamespaceNameProvider {
|
||||
pub(crate) fn new(name: impl Into<NamespaceName>) -> Self {
|
||||
Self { name: name.into() }
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for MockNamespaceNameProvider {
|
||||
fn default() -> Self {
|
||||
Self::new("bananas")
|
||||
}
|
||||
}
|
||||
|
||||
impl NamespaceNameProvider for MockNamespaceNameProvider {
|
||||
fn for_namespace(&self, _id: NamespaceId) -> DeferredLoad<NamespaceName> {
|
||||
let name = self.name.clone();
|
||||
DeferredLoad::new(Duration::from_secs(1), async { name })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use data_types::ShardIndex;
|
||||
use test_helpers::timeout::FutureTimeout;
|
||||
|
||||
use crate::test_util::populate_catalog;
|
||||
|
||||
use super::*;
|
||||
|
||||
const SHARD_INDEX: ShardIndex = ShardIndex::new(24);
|
||||
const TABLE_NAME: &str = "bananas";
|
||||
const NAMESPACE_NAME: &str = "platanos";
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fetch() {
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
let backoff_config = BackoffConfig::default();
|
||||
let catalog: Arc<dyn Catalog> =
|
||||
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
|
||||
|
||||
// Populate the catalog with the shard / namespace / table
|
||||
let (_shard_id, ns_id, _table_id) =
|
||||
populate_catalog(&*catalog, SHARD_INDEX, NAMESPACE_NAME, TABLE_NAME).await;
|
||||
|
||||
let fetcher = Arc::new(NamespaceNameResolver::new(
|
||||
Duration::from_secs(10),
|
||||
Arc::clone(&catalog),
|
||||
backoff_config.clone(),
|
||||
));
|
||||
|
||||
let got = fetcher
|
||||
.for_namespace(ns_id)
|
||||
.get()
|
||||
.with_timeout_panic(Duration::from_secs(5))
|
||||
.await;
|
||||
assert_eq!(&**got, NAMESPACE_NAME);
|
||||
}
|
||||
}
|
|
@ -7,7 +7,7 @@ use data_types::PartitionId;
|
|||
use iox_catalog::interface::Catalog;
|
||||
use schema::sort::SortKey;
|
||||
|
||||
/// A resolver of [`SortKey`] from the catalog for a given partition.
|
||||
/// A resolver of [`SortKey`] from the catalog for a given [`PartitionId`].
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct SortKeyResolver {
|
||||
partition_id: PartitionId,
|
||||
|
|
|
@ -5,44 +5,15 @@ use std::sync::Arc;
|
|||
use data_types::{NamespaceId, ShardId, ShardIndex};
|
||||
use dml::DmlOperation;
|
||||
use metric::U64Counter;
|
||||
use parking_lot::RwLock;
|
||||
use write_summary::ShardProgress;
|
||||
|
||||
use super::{
|
||||
namespace::{NamespaceData, NamespaceName},
|
||||
namespace::{name_resolver::NamespaceNameProvider, NamespaceData},
|
||||
partition::resolver::PartitionProvider,
|
||||
DmlApplyAction,
|
||||
};
|
||||
use crate::{arcmap::ArcMap, lifecycle::LifecycleHandle};
|
||||
|
||||
/// A double-referenced map where [`NamespaceData`] can be looked up by name, or
|
||||
/// ID.
|
||||
#[derive(Debug, Default)]
|
||||
struct DoubleRef {
|
||||
// TODO(4880): this can be removed when IDs are sent over the wire.
|
||||
by_name: ArcMap<NamespaceName, NamespaceData>,
|
||||
by_id: ArcMap<NamespaceId, NamespaceData>,
|
||||
}
|
||||
|
||||
impl DoubleRef {
|
||||
fn insert(&mut self, name: NamespaceName, ns: NamespaceData) -> Arc<NamespaceData> {
|
||||
let id = ns.namespace_id();
|
||||
|
||||
let ns = Arc::new(ns);
|
||||
self.by_name.insert(name, Arc::clone(&ns));
|
||||
self.by_id.insert(id, Arc::clone(&ns));
|
||||
ns
|
||||
}
|
||||
|
||||
fn by_name(&self, name: &NamespaceName) -> Option<Arc<NamespaceData>> {
|
||||
self.by_name.get(name)
|
||||
}
|
||||
|
||||
fn by_id(&self, id: NamespaceId) -> Option<Arc<NamespaceData>> {
|
||||
self.by_id.get(&id)
|
||||
}
|
||||
}
|
||||
|
||||
/// Data of a Shard
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ShardData {
|
||||
|
@ -57,8 +28,17 @@ pub(crate) struct ShardData {
|
|||
/// [`PartitionData`]: super::partition::PartitionData
|
||||
partition_provider: Arc<dyn PartitionProvider>,
|
||||
|
||||
// New namespaces can come in at any time so we need to be able to add new ones
|
||||
namespaces: RwLock<DoubleRef>,
|
||||
/// A set of namespaces this [`ShardData`] instance has processed
|
||||
/// [`DmlOperation`]'s for.
|
||||
///
|
||||
/// The [`NamespaceNameProvider`] acts as a [`DeferredLoad`] constructor to
|
||||
/// resolve the [`NamespaceName`] for new [`NamespaceData`] out of the hot
|
||||
/// path.
|
||||
///
|
||||
/// [`DeferredLoad`]: crate::deferred_load::DeferredLoad
|
||||
/// [`NamespaceName`]: data_types::NamespaceName
|
||||
namespaces: ArcMap<NamespaceId, NamespaceData>,
|
||||
namespace_name_resolver: Arc<dyn NamespaceNameProvider>,
|
||||
|
||||
metrics: Arc<metric::Registry>,
|
||||
namespace_count: U64Counter,
|
||||
|
@ -69,6 +49,7 @@ impl ShardData {
|
|||
pub(crate) fn new(
|
||||
shard_index: ShardIndex,
|
||||
shard_id: ShardId,
|
||||
namespace_name_resolver: Arc<dyn NamespaceNameProvider>,
|
||||
partition_provider: Arc<dyn PartitionProvider>,
|
||||
metrics: Arc<metric::Registry>,
|
||||
) -> Self {
|
||||
|
@ -83,28 +64,33 @@ impl ShardData {
|
|||
shard_index,
|
||||
shard_id,
|
||||
namespaces: Default::default(),
|
||||
namespace_name_resolver,
|
||||
metrics,
|
||||
partition_provider,
|
||||
namespace_count,
|
||||
}
|
||||
}
|
||||
|
||||
/// Store the write or delete in the shard. Deletes will
|
||||
/// be written into the catalog before getting stored in the buffer.
|
||||
/// Any writes that create new IOx partitions will have those records
|
||||
/// created in the catalog before putting into the buffer.
|
||||
/// Buffer the provided [`DmlOperation`] into the ingester state.
|
||||
pub(super) async fn buffer_operation(
|
||||
&self,
|
||||
dml_operation: DmlOperation,
|
||||
lifecycle_handle: &dyn LifecycleHandle,
|
||||
) -> Result<DmlApplyAction, super::Error> {
|
||||
let namespace_data = match self.namespace(&NamespaceName::from(dml_operation.namespace())) {
|
||||
Some(d) => d,
|
||||
None => {
|
||||
self.insert_namespace(dml_operation.namespace(), dml_operation.namespace_id())
|
||||
.await?
|
||||
}
|
||||
};
|
||||
let namespace_id = dml_operation.namespace_id();
|
||||
let namespace_data = self.namespaces.get_or_insert_with(&namespace_id, || {
|
||||
// Increase the metric that records the number of namespaces
|
||||
// buffered in this ingester instance.
|
||||
self.namespace_count.inc(1);
|
||||
|
||||
Arc::new(NamespaceData::new(
|
||||
namespace_id,
|
||||
self.namespace_name_resolver.for_namespace(namespace_id),
|
||||
self.shard_id,
|
||||
Arc::clone(&self.partition_provider),
|
||||
&self.metrics,
|
||||
))
|
||||
});
|
||||
|
||||
namespace_data
|
||||
.buffer_operation(dml_operation, lifecycle_handle)
|
||||
|
@ -112,55 +98,13 @@ impl ShardData {
|
|||
}
|
||||
|
||||
/// Gets the namespace data out of the map
|
||||
pub(crate) fn namespace(&self, namespace: &NamespaceName) -> Option<Arc<NamespaceData>> {
|
||||
let n = self.namespaces.read();
|
||||
n.by_name(namespace)
|
||||
}
|
||||
|
||||
/// Gets the namespace data out of the map
|
||||
pub(crate) fn namespace_by_id(&self, namespace_id: NamespaceId) -> Option<Arc<NamespaceData>> {
|
||||
// TODO: this should be the default once IDs are pushed over the wire.
|
||||
//
|
||||
// At which point the map should be indexed by IDs, instead of namespace
|
||||
// names.
|
||||
let n = self.namespaces.read();
|
||||
n.by_id(namespace_id)
|
||||
}
|
||||
|
||||
/// Retrieves the namespace from the catalog and initializes an empty buffer, or
|
||||
/// retrieves the buffer if some other caller gets it first
|
||||
async fn insert_namespace(
|
||||
&self,
|
||||
namespace: &str,
|
||||
namespace_id: NamespaceId,
|
||||
) -> Result<Arc<NamespaceData>, super::Error> {
|
||||
let ns_name = NamespaceName::from(namespace);
|
||||
|
||||
let mut n = self.namespaces.write();
|
||||
|
||||
Ok(match n.by_name(&ns_name) {
|
||||
Some(v) => v,
|
||||
None => {
|
||||
self.namespace_count.inc(1);
|
||||
|
||||
// Insert the table and then return a ref to it.
|
||||
n.insert(
|
||||
ns_name.clone(),
|
||||
NamespaceData::new(
|
||||
namespace_id,
|
||||
ns_name,
|
||||
self.shard_id,
|
||||
Arc::clone(&self.partition_provider),
|
||||
&self.metrics,
|
||||
),
|
||||
)
|
||||
}
|
||||
})
|
||||
pub(crate) fn namespace(&self, namespace_id: NamespaceId) -> Option<Arc<NamespaceData>> {
|
||||
self.namespaces.get(&namespace_id)
|
||||
}
|
||||
|
||||
/// Return the progress of this shard
|
||||
pub(super) async fn progress(&self) -> ShardProgress {
|
||||
let namespaces: Vec<_> = self.namespaces.read().by_id.values();
|
||||
let namespaces: Vec<_> = self.namespaces.values();
|
||||
|
||||
let mut progress = ShardProgress::new();
|
||||
|
||||
|
@ -184,7 +128,10 @@ mod tests {
|
|||
use metric::{Attributes, Metric};
|
||||
|
||||
use crate::{
|
||||
data::partition::{resolver::MockPartitionProvider, PartitionData, SortKeyState},
|
||||
data::{
|
||||
namespace::name_resolver::mock::MockNamespaceNameProvider,
|
||||
partition::{resolver::MockPartitionProvider, PartitionData, SortKeyState},
|
||||
},
|
||||
lifecycle::mock_handle::MockLifecycleHandle,
|
||||
test_util::{make_write_op, TEST_TABLE},
|
||||
};
|
||||
|
@ -199,7 +146,7 @@ mod tests {
|
|||
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_shard_double_ref() {
|
||||
async fn test_shard_init_namespace() {
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
|
||||
// Configure the mock partition provider to return a partition for this
|
||||
|
@ -220,13 +167,13 @@ mod tests {
|
|||
let shard = ShardData::new(
|
||||
SHARD_INDEX,
|
||||
SHARD_ID,
|
||||
Arc::new(MockNamespaceNameProvider::new(NAMESPACE_NAME)),
|
||||
partition_provider,
|
||||
Arc::clone(&metrics),
|
||||
);
|
||||
|
||||
// Assert the namespace does not contain the test data
|
||||
assert!(shard.namespace(&NAMESPACE_NAME.into()).is_none());
|
||||
assert!(shard.namespace_by_id(NAMESPACE_ID).is_none());
|
||||
assert!(shard.namespace(NAMESPACE_ID).is_none());
|
||||
|
||||
// Write some test data
|
||||
shard
|
||||
|
@ -245,9 +192,7 @@ mod tests {
|
|||
.await
|
||||
.expect("buffer op should succeed");
|
||||
|
||||
// Both forms of referencing the table should succeed
|
||||
assert!(shard.namespace(&NAMESPACE_NAME.into()).is_some());
|
||||
assert!(shard.namespace_by_id(NAMESPACE_ID).is_some());
|
||||
assert!(shard.namespace(NAMESPACE_ID).is_some());
|
||||
|
||||
// And the table counter metric should increase
|
||||
let tables = metrics
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
//! Generic deferred execution of arbitrary [`Future`]'s.
|
||||
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use std::{fmt::Display, sync::Arc, time::Duration};
|
||||
|
||||
use futures::Future;
|
||||
use observability_deps::tracing::*;
|
||||
|
@ -14,6 +14,11 @@ use tokio::{
|
|||
task::JoinHandle,
|
||||
};
|
||||
|
||||
/// [`UNRESOLVED_DISPLAY_STRING`] defines the string shown when invoking the
|
||||
/// [`Display`] implementation on a [`DeferredLoad`] that has not yet resolved
|
||||
/// the deferred value.
|
||||
pub(crate) const UNRESOLVED_DISPLAY_STRING: &str = "<unresolved>";
|
||||
|
||||
/// The states of a [`DeferredLoad`] instance.
|
||||
#[derive(Debug)]
|
||||
enum State<T> {
|
||||
|
@ -68,6 +73,86 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> Display for DeferredLoad<T>
|
||||
where
|
||||
T: Display,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self.value.lock().as_ref().unwrap() {
|
||||
State::Unresolved(_) | State::Loading(_) => f.write_str(UNRESOLVED_DISPLAY_STRING),
|
||||
State::Resolved(v) => v.fmt(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DeferredLoad<T> {
|
||||
/// Provide a hint to the [`DeferredLoad`] that the value will be used soon.
|
||||
///
|
||||
/// This allows the value to be materialised in the background, in parallel
|
||||
/// while the caller is executing code that will eventually call
|
||||
/// [`Self::get()`].
|
||||
pub(crate) fn prefetch_now(&self) {
|
||||
let mut state = self.value.lock();
|
||||
|
||||
// If the value has already resolved, this call is a NOP.
|
||||
if let Some(State::Resolved(_)) = &*state {
|
||||
return;
|
||||
}
|
||||
|
||||
// Potentially transition the state, discarding the waker.
|
||||
let (_waker, new_state) = self.get_load_waker(state.take().unwrap());
|
||||
*state = Some(new_state);
|
||||
}
|
||||
|
||||
/// Potentially transition `state`, returning the [`Notify`] that will be
|
||||
/// signalled when loading the value completes, and the (potentially
|
||||
/// changed) state.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This method panics if `state` is [`State::Resolved`].
|
||||
fn get_load_waker(&self, state: State<T>) -> (Arc<Notify>, State<T>) {
|
||||
let waker = match state {
|
||||
// This caller is the first to demand the value - wake the
|
||||
// background task, initialise the notification mechanism and
|
||||
// wait for the task to complete.
|
||||
State::Unresolved(task_waker) => {
|
||||
// Wake the running background task, ignoring any send error
|
||||
// as the background task may have concurrently woken up due
|
||||
// to the sleep timer and stopped listening on the waker
|
||||
// channel.
|
||||
let _ = task_waker.send(());
|
||||
|
||||
// Replace the state with a notification for this thread
|
||||
// (and others that call get()) to wait on for the
|
||||
// concurrent fetch to complete.
|
||||
Arc::new(Notify::default())
|
||||
}
|
||||
|
||||
// If the value is already being fetched, wait for the fetch to
|
||||
// complete.
|
||||
State::Loading(waker) => waker,
|
||||
|
||||
// This was checked above before take()-ing the state.
|
||||
State::Resolved(_) => unreachable!(),
|
||||
};
|
||||
|
||||
// Ensure any subsequent callers can subscribe to the completion
|
||||
// event by transitioning to the loading state.
|
||||
let state = State::Loading(Arc::clone(&waker));
|
||||
|
||||
// Whenever there is a waker for the caller, the background task
|
||||
// MUST be running.
|
||||
//
|
||||
// This check happens before the state lock is released, ensuring
|
||||
// the background task doesn't concurrently finish (it would be
|
||||
// blocked waiting to update the state).
|
||||
assert!(!self.handle.is_finished());
|
||||
|
||||
(waker, state)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DeferredLoad<T>
|
||||
where
|
||||
T: Send + Sync + 'static,
|
||||
|
@ -179,42 +264,8 @@ where
|
|||
// If execution reaches here, this call will have to wait for the
|
||||
// value to be resolved, and potentially must wake the background
|
||||
// task to do so.
|
||||
let waker = match state.take().unwrap() {
|
||||
// This caller is the first to demand the value - wake the
|
||||
// background task, initialise the notification mechanism and
|
||||
// wait for the task to complete.
|
||||
State::Unresolved(task_waker) => {
|
||||
// Wake the running background task, ignoring any send error
|
||||
// as the background task may have concurrently woken up due
|
||||
// to the sleep timer and stopped listening on the waker
|
||||
// channel.
|
||||
let _ = task_waker.send(());
|
||||
|
||||
// Replace the state with a notification for this thread
|
||||
// (and others that call get()) to wait on for the
|
||||
// concurrent fetch to complete.
|
||||
Arc::new(Notify::default())
|
||||
}
|
||||
|
||||
// If the value is already being fetched, wait for the fetch to
|
||||
// complete.
|
||||
State::Loading(waker) => waker,
|
||||
|
||||
// This was checked above before take()-ing the state.
|
||||
State::Resolved(_) => unreachable!(),
|
||||
};
|
||||
|
||||
// Ensure any subsequent callers can subscribe to the completion
|
||||
// event.
|
||||
*state = Some(State::Loading(Arc::clone(&waker)));
|
||||
|
||||
// Whenever there is a waker for the caller, the background task
|
||||
// MUST be running.
|
||||
//
|
||||
// This check happens before the state lock is released, ensuring
|
||||
// the background task doesn't concurrently finish (it would be
|
||||
// blocked waiting to update the state).
|
||||
assert!(!self.handle.is_finished());
|
||||
let (waker, new_state) = self.get_load_waker(state.take().unwrap());
|
||||
*state = Some(new_state);
|
||||
|
||||
waker
|
||||
};
|
||||
|
@ -371,4 +422,91 @@ mod tests {
|
|||
fut.as_mut().with_timeout_panic(TIMEOUT).await;
|
||||
assert_eq!(fut.as_mut().take_output(), Some(42));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_display() {
|
||||
// This channel is used to block the background task from completing
|
||||
// after the above channel has signalled it has begun.
|
||||
let (allow_complete, can_complete) = oneshot::channel();
|
||||
|
||||
// Configure the background load to fire (practically) immediately but
|
||||
// block waiting for rx to be unblocked.
|
||||
let d = Arc::new(DeferredLoad::new(Duration::from_millis(1), async {
|
||||
// Wait for the test thread to issue the demand call and unblock
|
||||
// this fn.
|
||||
can_complete.await.expect("sender died");
|
||||
42
|
||||
}));
|
||||
|
||||
assert_eq!("<unresolved>", d.to_string());
|
||||
|
||||
// Issue a demand call
|
||||
let fut = future::maybe_done(d.get());
|
||||
pin_mut!(fut);
|
||||
assert_eq!(fut.as_mut().take_output(), None);
|
||||
|
||||
assert_eq!("<unresolved>", d.to_string());
|
||||
|
||||
// Unblock the background task.
|
||||
allow_complete.send(()).expect("background task died");
|
||||
|
||||
// And await the demand call
|
||||
fut.as_mut().await;
|
||||
assert_eq!(fut.as_mut().take_output(), Some(42));
|
||||
|
||||
// And assert Display is delegated to the resolved value
|
||||
assert_eq!("42", d.to_string());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prefetch_concurrent_demand() {
|
||||
// This channel is used to signal the background load has begun.
|
||||
let (signal_start, started) = oneshot::channel();
|
||||
|
||||
// This channel is used to block the background task from completing
|
||||
// after the above channel has signalled it has begun.
|
||||
let (allow_complete, can_complete) = oneshot::channel();
|
||||
|
||||
// Configure the background load to fire (practically) immediately but
|
||||
// block waiting for rx to be unblocked.
|
||||
//
|
||||
// This allows the current thread time to issue a demand and wait on the
|
||||
// result before the background load completes.
|
||||
let d = Arc::new(DeferredLoad::new(LONG_LONG_TIME, async {
|
||||
// Signal the background task has begun.
|
||||
signal_start.send(()).expect("test task died");
|
||||
// Wait for the test thread to issue the demand call and unblock
|
||||
// this fn.
|
||||
can_complete.await.expect("sender died");
|
||||
42
|
||||
}));
|
||||
|
||||
d.prefetch_now();
|
||||
|
||||
// Wait for the background task to begin.
|
||||
started
|
||||
.with_timeout_panic(Duration::from_secs(5))
|
||||
.await
|
||||
.expect("background task died");
|
||||
|
||||
// Issue a demand call
|
||||
let fut = future::maybe_done(d.get());
|
||||
pin_mut!(fut);
|
||||
assert_eq!(fut.as_mut().take_output(), None);
|
||||
|
||||
// Unblock the background task.
|
||||
allow_complete.send(()).expect("background task died");
|
||||
|
||||
// And await the demand call
|
||||
fut.as_mut().await;
|
||||
assert_eq!(fut.as_mut().take_output(), Some(42));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_prefetch_already_loaded() {
|
||||
let d = Arc::new(DeferredLoad::new(LONG_LONG_TIME, async { 42 }));
|
||||
|
||||
let _ = d.get().with_timeout_panic(TIMEOUT).await;
|
||||
d.prefetch_now();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,7 +46,16 @@ use crate::{
|
|||
///
|
||||
/// [`PartitionData`]: crate::data::partition::PartitionData
|
||||
/// [`SortKey`]: schema::sort::SortKey
|
||||
const SORT_KEY_PRE_FETCH: Duration = Duration::from_secs(30);
|
||||
pub(crate) const SORT_KEY_PRE_FETCH: Duration = Duration::from_secs(60);
|
||||
|
||||
/// The maximum duration of time between observing an initialising the
|
||||
/// [`NamespaceData`] in response to observing an operation for a namespace, and
|
||||
/// fetching the string identifier for it in the background via a
|
||||
/// [`DeferredLoad`].
|
||||
///
|
||||
/// [`NamespaceData`]: crate::data::namespace::NamespaceData
|
||||
/// [`DeferredLoad`]: crate::deferred_load::DeferredLoad
|
||||
pub(crate) const NAMESPACE_NAME_PRE_FETCH: Duration = Duration::from_secs(60);
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[allow(missing_copy_implementations, missing_docs)]
|
||||
|
|
|
@ -263,7 +263,7 @@ pub async fn prepare_data_to_querier(
|
|||
let mut found_namespace = false;
|
||||
|
||||
for (shard_id, shard_data) in ingest_data.shards() {
|
||||
let namespace_data = match shard_data.namespace_by_id(request.namespace_id) {
|
||||
let namespace_data = match shard_data.namespace(request.namespace_id) {
|
||||
Some(namespace_data) => {
|
||||
trace!(
|
||||
shard_id=%shard_id.get(),
|
||||
|
|
Loading…
Reference in New Issue