Merge pull request #6132 from influxdata/dom/deferred-table-name

perf(ingester): deferred table name discovery
pull/24376/head
kodiakhq[bot] 2022-11-14 18:35:35 +00:00 committed by GitHub
commit d0b3ac1cef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 413 additions and 163 deletions

View File

@ -45,6 +45,7 @@ use self::{
namespace::name_resolver::{NamespaceNameProvider, NamespaceNameResolver},
partition::resolver::{CatalogPartitionResolver, PartitionCache, PartitionProvider},
shard::ShardData,
table::name_resolver::{TableNameProvider, TableNameResolver},
};
#[cfg(test)]
@ -66,6 +67,15 @@ const SORT_KEY_PRE_FETCH: Duration = Duration::from_secs(30);
/// [`DeferredLoad`]: crate::deferred_load::DeferredLoad
pub(crate) const NAMESPACE_NAME_PRE_FETCH: Duration = Duration::from_secs(60);
/// The maximum duration of time between observing and initialising the
/// [`TableData`] in response to observing an operation for a table, and
/// fetching the string identifier for it in the background via a
/// [`DeferredLoad`].
///
/// [`TableData`]: crate::data::table::TableData
/// [`DeferredLoad`]: crate::deferred_load::DeferredLoad
pub const TABLE_NAME_PRE_FETCH: Duration = Duration::from_secs(60);
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {
@ -195,6 +205,13 @@ impl IngesterData {
backoff_config.clone(),
));
// Initialise the deferred table name resolver.
let table_name_provider: Arc<dyn TableNameProvider> = Arc::new(TableNameResolver::new(
TABLE_NAME_PRE_FETCH,
Arc::clone(&catalog),
backoff_config.clone(),
));
let shards = shards
.into_iter()
.map(|(id, index)| {
@ -204,6 +221,7 @@ impl IngesterData {
index,
id,
Arc::clone(&namespace_name_provider),
Arc::clone(&table_name_provider),
Arc::clone(&partition_provider),
Arc::clone(&metrics),
),
@ -336,7 +354,7 @@ impl Persister for IngesterData {
// Assert the namespace ID matches the index key.
assert_eq!(namespace.namespace_id(), namespace_id);
let table_data = namespace.table_id(table_id).unwrap_or_else(|| {
let table_data = namespace.table(table_id).unwrap_or_else(|| {
panic!("table {table_id} in namespace {namespace_id} not in shard {shard_id} state")
});
// Assert various properties of the table to ensure the index is
@ -344,7 +362,11 @@ impl Persister for IngesterData {
assert_eq!(table_data.shard_id(), shard_id);
assert_eq!(table_data.namespace_id(), namespace_id);
assert_eq!(table_data.table_id(), table_id);
let table_name = table_data.table_name().clone();
// Begin resolving the load-deferred name concurrently if it is not
// already available.
let table_name = Arc::clone(table_data.table_name());
table_name.prefetch_now();
let partition = table_data.get_partition(partition_id).unwrap_or_else(|| {
panic!(
@ -368,7 +390,7 @@ impl Persister for IngesterData {
assert_eq!(guard.shard_id(), shard_id);
assert_eq!(guard.namespace_id(), namespace_id);
assert_eq!(guard.table_id(), table_id);
assert_eq!(*guard.table_name(), table_name);
assert!(Arc::ptr_eq(guard.table_name(), &table_name));
partition_key = guard.partition_key().clone();
sort_key = guard.sort_key().clone();
@ -439,6 +461,10 @@ impl Persister for IngesterData {
}
};
// At this point, the table name is necessary, so demand it be resolved
// if it is not yet available.
let table_name = table_name.get().await;
// Prepare the plan for CPU intensive work of compaction, de-duplication and sorting
let CompactedStream {
stream: record_stream,
@ -694,7 +720,10 @@ mod tests {
use super::*;
use crate::{
data::{namespace::NamespaceData, partition::resolver::CatalogPartitionResolver},
data::{
namespace::NamespaceData, partition::resolver::CatalogPartitionResolver,
table::name_resolver::mock::MockTableNameProvider,
},
deferred_load::DeferredLoad,
lifecycle::{LifecycleConfig, LifecycleManager},
};
@ -888,17 +917,26 @@ mod tests {
// limits)
assert_matches!(action, DmlApplyAction::Applied(false));
let (table_id, partition_id) = {
let table_id = catalog
.repositories()
.await
.tables()
.get_by_namespace_and_name(namespace.id, "mem")
.await
.unwrap()
.unwrap()
.id;
let partition_id = {
let sd = data.shards.get(&shard1.id).unwrap();
let n = sd.namespace(namespace.id).unwrap();
let mem_table = n.table_data(&"mem".into()).unwrap();
assert!(n.table_data(&"mem".into()).is_some());
let mem_table = n.table(table_id).unwrap();
let p = mem_table
.get_partition_by_key(&"1970-01-01".into())
.unwrap()
.lock()
.partition_id();
(mem_table.table_id(), p)
p
};
data.persist(shard1.id, namespace.id, table_id, partition_id)
@ -1047,15 +1085,21 @@ mod tests {
.with_buffered(SequenceNumber::new(2));
assert_progress(&data, shard_index, expected_progress).await;
let table_id = catalog
.repositories()
.await
.tables()
.get_by_namespace_and_name(namespace.id, "mem")
.await
.unwrap()
.unwrap()
.id;
let sd = data.shards.get(&shard1.id).unwrap();
let n = sd.namespace(namespace.id).unwrap();
let partition_id;
let table_id;
{
let mem_table = n.table_data(&"mem".into()).unwrap();
assert!(n.table_data(&"cpu".into()).is_some());
table_id = mem_table.table_id();
let mem_table = n.table(table_id).unwrap();
let p = mem_table
.get_partition_by_key(&"1970-01-01".into())
@ -1128,7 +1172,7 @@ mod tests {
.unwrap()
.namespace(namespace.id)
.unwrap()
.table_id(table_id)
.table(table_id)
.unwrap()
.get_partition(partition_id)
.unwrap()
@ -1181,7 +1225,7 @@ mod tests {
// Only the < 500 KB bucket has a count
assert_eq!(buckets_with_counts, &[500 * 1024]);
let mem_table = n.table_data(&"mem".into()).unwrap();
let mem_table = n.table(table_id).unwrap();
// verify that the parquet_max_sequence_number got updated
assert_eq!(
@ -1483,6 +1527,7 @@ mod tests {
let data = NamespaceData::new(
namespace.id,
DeferredLoad::new(Duration::from_millis(1), async { "foo".into() }),
Arc::new(MockTableNameProvider::new(table.name)),
shard.id,
partition_provider,
&metrics,
@ -1496,7 +1541,7 @@ mod tests {
.await
.unwrap();
{
let table = data.table_data(&"mem".into()).unwrap();
let table = data.table(table.id).unwrap();
assert!(table
.partitions()
.into_iter()
@ -1517,7 +1562,7 @@ mod tests {
.await
.unwrap();
let table = data.table_data(&"mem".into()).unwrap();
let table = data.table(table.id).unwrap();
let partition = table.get_partition_by_key(&"1970-01-01".into()).unwrap();
assert_eq!(
partition.lock().sequence_number_range().inclusive_min(),

View File

@ -2,7 +2,7 @@
pub(crate) mod name_resolver;
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;
use data_types::{NamespaceId, SequenceNumber, ShardId, TableId};
use dml::DmlOperation;
@ -15,37 +15,11 @@ use write_summary::ShardProgress;
use super::triggers::TestTriggers;
use super::{
partition::resolver::PartitionProvider,
table::{TableData, TableName},
table::{name_resolver::TableNameProvider, TableData},
};
use crate::{
arcmap::ArcMap, data::DmlApplyAction, deferred_load::DeferredLoad, lifecycle::LifecycleHandle,
};
use crate::{data::DmlApplyAction, deferred_load::DeferredLoad, lifecycle::LifecycleHandle};
/// A double-referenced map where [`TableData`] can be looked up by name, or ID.
#[derive(Debug, Default)]
struct DoubleRef {
// TODO(4880): this can be removed when IDs are sent over the wire.
by_name: HashMap<TableName, Arc<TableData>>,
by_id: HashMap<TableId, Arc<TableData>>,
}
impl DoubleRef {
fn insert(&mut self, t: TableData) -> Arc<TableData> {
let name = t.table_name().clone();
let id = t.table_id();
let t = Arc::new(t);
self.by_name.insert(name, Arc::clone(&t));
self.by_id.insert(id, Arc::clone(&t));
t
}
fn by_name(&self, name: &TableName) -> Option<Arc<TableData>> {
self.by_name.get(name).map(Arc::clone)
}
fn by_id(&self, id: TableId) -> Option<Arc<TableData>> {
self.by_id.get(&id).map(Arc::clone)
}
}
/// The string name / identifier of a Namespace.
///
@ -85,7 +59,17 @@ pub(crate) struct NamespaceData {
/// The catalog ID of the shard this namespace is being populated from.
shard_id: ShardId,
tables: RwLock<DoubleRef>,
/// A set of tables this [`NamespaceData`] instance has processed
/// [`DmlOperation`]'s for.
///
/// The [`TableNameProvider`] acts as a [`DeferredLoad`] constructor to
/// resolve the [`TableName`] for new [`TableData`] out of the hot path.
///
/// [`TableName`]: crate::data::table::TableName
tables: ArcMap<TableId, TableData>,
table_name_resolver: Arc<dyn TableNameProvider>,
/// The count of tables initialised in this Ingester so far, across all
/// shards / namespaces.
table_count: U64Counter,
/// The resolver of `(shard_id, table_id, partition_key)` to
@ -145,6 +129,7 @@ impl NamespaceData {
pub(super) fn new(
namespace_id: NamespaceId,
namespace_name: DeferredLoad<NamespaceName>,
table_name_resolver: Arc<dyn TableNameProvider>,
shard_id: ShardId,
partition_provider: Arc<dyn PartitionProvider>,
metrics: &metric::Registry,
@ -161,6 +146,7 @@ impl NamespaceData {
namespace_name,
shard_id,
tables: Default::default(),
table_name_resolver,
table_count,
buffering_sequence_number: RwLock::new(None),
partition_provider,
@ -198,12 +184,19 @@ impl NamespaceData {
// Extract the partition key derived by the router.
let partition_key = write.partition_key().clone();
for (table_name, table_id, b) in write.into_tables() {
let table_name = TableName::from(table_name);
let table_data = match self.table_data(&table_name) {
Some(t) => t,
None => self.insert_table(table_name, table_id).await?,
};
for (_table_name, table_id, b) in write.into_tables() {
// Grab a reference to the table data, or insert a new
// TableData for it.
let table_data = self.tables.get_or_insert_with(&table_id, || {
self.table_count.inc(1);
Arc::new(TableData::new(
table_id,
self.table_name_resolver.for_table(table_id),
self.shard_id,
self.namespace_id,
Arc::clone(&self.partition_provider),
))
});
let action = table_data
.buffer_table_write(
@ -246,45 +239,14 @@ impl NamespaceData {
}
}
/// Return the specified [`TableData`] if it exists.
pub(crate) fn table_data(&self, table_name: &TableName) -> Option<Arc<TableData>> {
let t = self.tables.read();
t.by_name(table_name)
}
/// Return the table data by ID.
pub(crate) fn table_id(&self, table_id: TableId) -> Option<Arc<TableData>> {
let t = self.tables.read();
t.by_id(table_id)
}
/// Inserts the table or returns it if it happens to be inserted by some other thread
async fn insert_table(
&self,
table_name: TableName,
table_id: TableId,
) -> Result<Arc<TableData>, super::Error> {
let mut t = self.tables.write();
Ok(match t.by_name(&table_name) {
Some(v) => v,
None => {
self.table_count.inc(1);
// Insert the table and then return a ref to it.
t.insert(TableData::new(
table_id,
table_name,
self.shard_id,
self.namespace_id,
Arc::clone(&self.partition_provider),
))
}
})
pub(crate) fn table(&self, table_id: TableId) -> Option<Arc<TableData>> {
self.tables.get(&table_id)
}
/// Return progress from this Namespace
pub(super) async fn progress(&self) -> ShardProgress {
let tables: Vec<_> = self.tables.read().by_id.values().map(Arc::clone).collect();
let tables: Vec<_> = self.tables.values();
// Consolidate progress across partitions.
let mut progress = ShardProgress::new()
@ -356,7 +318,10 @@ mod tests {
use metric::{Attributes, Metric};
use crate::{
data::partition::{resolver::MockPartitionProvider, PartitionData, SortKeyState},
data::{
partition::{resolver::MockPartitionProvider, PartitionData, SortKeyState},
table::{name_resolver::mock::MockTableNameProvider, TableName},
},
deferred_load,
lifecycle::mock_handle::MockLifecycleHandle,
test_util::{make_write_op, TEST_TABLE},
@ -372,7 +337,7 @@ mod tests {
const NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
#[tokio::test]
async fn test_namespace_double_ref() {
async fn test_namespace_init_table() {
let metrics = Arc::new(metric::Registry::default());
// Configure the mock partition provider to return a partition for this
@ -384,7 +349,9 @@ mod tests {
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
@ -393,6 +360,7 @@ mod tests {
let ns = NamespaceData::new(
NAMESPACE_ID,
DeferredLoad::new(Duration::from_millis(1), async { NAMESPACE_NAME.into() }),
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
SHARD_ID,
partition_provider,
&metrics,
@ -406,8 +374,7 @@ mod tests {
);
// Assert the namespace does not contain the test data
assert!(ns.table_data(&TABLE_NAME.into()).is_none());
assert!(ns.table_id(TABLE_ID).is_none());
assert!(ns.table(TABLE_ID).is_none());
// Write some test data
ns.buffer_operation(
@ -425,9 +392,8 @@ mod tests {
.await
.expect("buffer op should succeed");
// Both forms of referencing the table should succeed
assert!(ns.table_data(&TABLE_NAME.into()).is_some());
assert!(ns.table_id(TABLE_ID).is_some());
// Referencing the table should succeed
assert!(ns.table(TABLE_ID).is_some());
// And the table counter metric should increase
let tables = metrics

View File

@ -8,6 +8,8 @@ use crate::deferred_load::DeferredLoad;
use super::NamespaceName;
/// An abstract provider of a [`DeferredLoad`] configured to fetch the
/// [`NamespaceName`] of the specified [`NamespaceId`].
pub(crate) trait NamespaceNameProvider: Send + Sync + std::fmt::Debug {
fn for_namespace(&self, id: NamespaceId) -> DeferredLoad<NamespaceName>;
}

View File

@ -73,8 +73,9 @@ pub struct PartitionData {
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
/// The name of the table this partition is part of.
table_name: TableName,
/// The name of the table this partition is part of, potentially unresolved
/// / deferred.
table_name: Arc<DeferredLoad<TableName>>,
/// A buffer for incoming writes.
buffer: DataBuffer,
@ -96,7 +97,7 @@ impl PartitionData {
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: TableName,
table_name: Arc<DeferredLoad<TableName>>,
sort_key: SortKeyState,
max_persisted_sequence_number: Option<SequenceNumber>,
) -> Self {
@ -385,7 +386,7 @@ impl PartitionData {
/// Return the name of the table this [`PartitionData`] is buffering writes
/// for.
pub(crate) fn table_name(&self) -> &TableName {
pub(crate) fn table_name(&self) -> &Arc<DeferredLoad<TableName>> {
&self.table_name
}
@ -466,7 +467,9 @@ mod tests {
ShardId::new(2),
NamespaceId::new(3),
TableId::new(4),
TABLE_NAME.clone(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
@ -585,7 +588,9 @@ mod tests {
ShardId::new(2),
NamespaceId::new(3),
TableId::new(4),
TABLE_NAME.clone(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
@ -780,7 +785,9 @@ mod tests {
ShardId::new(2),
NamespaceId::new(3),
TableId::new(4),
TABLE_NAME.clone(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
@ -893,7 +900,9 @@ mod tests {
ShardId::new(1),
NamespaceId::new(42),
TableId::new(1),
"platanos".into(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from("platanos")
})),
starting_state,
None,
);
@ -949,7 +958,9 @@ mod tests {
ShardId::new(1),
NamespaceId::new(42),
TableId::new(1),
"platanos".into(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from("platanos")
})),
starting_state,
None,
);
@ -971,7 +982,9 @@ mod tests {
ShardId::new(2),
NamespaceId::new(3),
TableId::new(4),
TABLE_NAME.clone(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
@ -992,7 +1005,9 @@ mod tests {
ShardId::new(2),
NamespaceId::new(3),
TableId::new(4),
TABLE_NAME.clone(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
@ -1008,7 +1023,9 @@ mod tests {
ShardId::new(2),
NamespaceId::new(3),
TableId::new(4),
TABLE_NAME.clone(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
@ -1025,7 +1042,9 @@ mod tests {
ShardId::new(2),
NamespaceId::new(3),
TableId::new(4),
TABLE_NAME.clone(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
@ -1050,7 +1069,9 @@ mod tests {
ShardId::new(2),
NamespaceId::new(3),
TableId::new(4),
TABLE_NAME.clone(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
@ -1078,7 +1099,9 @@ mod tests {
ShardId::new(2),
NamespaceId::new(3),
TableId::new(4),
TABLE_NAME.clone(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
@ -1112,7 +1135,9 @@ mod tests {
ShardId::new(2),
NamespaceId::new(3),
TableId::new(4),
TABLE_NAME.clone(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
None,
);
@ -1152,7 +1177,9 @@ mod tests {
ShardId::new(2),
NamespaceId::new(3),
TableId::new(4),
TABLE_NAME.clone(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
Some(SequenceNumber::new(42)),
);
@ -1180,7 +1207,9 @@ mod tests {
ShardId::new(2),
NamespaceId::new(3),
TableId::new(4),
TABLE_NAME.clone(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
Some(SequenceNumber::new(42)),
);
@ -1201,15 +1230,17 @@ mod tests {
// Ensure an empty PartitionData does not panic due to constructing an empty
// QueryAdaptor.
#[test]
fn test_empty_partition_no_queryadaptor_panic() {
#[tokio::test]
async fn test_empty_partition_no_queryadaptor_panic() {
let mut p = PartitionData::new(
PARTITION_ID,
PARTITION_KEY.clone(),
ShardId::new(2),
NamespaceId::new(3),
TableId::new(4),
TABLE_NAME.clone(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TABLE_NAME.clone()
})),
SortKeyState::Provided(None),
Some(SequenceNumber::new(42)),
);

View File

@ -195,7 +195,7 @@ where
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: TableName,
table_name: Arc<DeferredLoad<TableName>>,
) -> PartitionData {
// Use the cached PartitionKey instead of the caller's partition_key,
// instead preferring to reuse the already-shared Arc<str> in the cache.
@ -277,7 +277,9 @@ mod tests {
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
);
@ -290,14 +292,16 @@ mod tests {
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
)
.await;
assert_eq!(got.partition_id(), PARTITION_ID);
assert_eq!(got.shard_id(), SHARD_ID);
assert_eq!(got.table_id(), TABLE_ID);
assert_eq!(got.table_name(), TABLE_NAME);
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
assert!(cache.inner.is_empty());
}
@ -324,14 +328,16 @@ mod tests {
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
)
.await;
assert_eq!(got.partition_id(), PARTITION_ID);
assert_eq!(got.shard_id(), SHARD_ID);
assert_eq!(got.table_id(), TABLE_ID);
assert_eq!(got.table_name(), TABLE_NAME);
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
assert_eq!(*got.partition_key(), PartitionKey::from(PARTITION_KEY));
// The cache should have been cleaned up as it was consumed.
@ -356,7 +362,9 @@ mod tests {
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
));
@ -377,14 +385,16 @@ mod tests {
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
)
.await;
assert_eq!(got.partition_id(), other_key_id);
assert_eq!(got.shard_id(), SHARD_ID);
assert_eq!(got.table_id(), TABLE_ID);
assert_eq!(got.table_name(), TABLE_NAME);
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
}
#[tokio::test]
@ -396,7 +406,9 @@ mod tests {
SHARD_ID,
NAMESPACE_ID,
other_table,
TABLE_NAME.into(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
));
@ -417,14 +429,16 @@ mod tests {
SHARD_ID,
NAMESPACE_ID,
other_table,
TABLE_NAME.into(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
)
.await;
assert_eq!(got.partition_id(), PARTITION_ID);
assert_eq!(got.shard_id(), SHARD_ID);
assert_eq!(got.table_id(), other_table);
assert_eq!(got.table_name(), TABLE_NAME);
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
}
#[tokio::test]
@ -436,7 +450,9 @@ mod tests {
other_shard,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
));
@ -457,13 +473,15 @@ mod tests {
other_shard,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
)
.await;
assert_eq!(got.partition_id(), PARTITION_ID);
assert_eq!(got.shard_id(), other_shard);
assert_eq!(got.table_id(), TABLE_ID);
assert_eq!(got.table_name(), TABLE_NAME);
assert_eq!(&**got.table_name().get().await, TABLE_NAME);
}
}

View File

@ -9,9 +9,12 @@ use data_types::{NamespaceId, Partition, PartitionKey, ShardId, TableId};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::debug;
use crate::data::{
partition::{PartitionData, SortKeyState},
table::TableName,
use crate::{
data::{
partition::{PartitionData, SortKeyState},
table::TableName,
},
deferred_load::DeferredLoad,
};
use super::r#trait::PartitionProvider;
@ -58,7 +61,7 @@ impl PartitionProvider for CatalogPartitionResolver {
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: TableName,
table_name: Arc<DeferredLoad<TableName>>,
) -> PartitionData {
debug!(
%partition_key,
@ -89,7 +92,7 @@ impl PartitionProvider for CatalogPartitionResolver {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use data_types::ShardIndex;
@ -139,9 +142,15 @@ mod tests {
shard_id,
namespace_id,
table_id,
table_name.clone(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
)
.await;
// Ensure the table name is available.
let _ = got.table_name().get().await;
assert_eq!(got.namespace_id(), namespace_id);
assert_eq!(got.table_name().to_string(), table_name.to_string());
assert_matches!(got.sort_key(), SortKeyState::Provided(None));

View File

@ -1,12 +1,15 @@
//! A mock [`PartitionProvider`] to inject [`PartitionData`] for tests.
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, PartitionKey, ShardId, TableId};
use parking_lot::Mutex;
use crate::data::{partition::PartitionData, table::TableName};
use crate::{
data::{partition::PartitionData, table::TableName},
deferred_load::DeferredLoad,
};
use super::r#trait::PartitionProvider;
@ -58,7 +61,7 @@ impl PartitionProvider for MockPartitionProvider {
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: TableName,
table_name: Arc<DeferredLoad<TableName>>,
) -> PartitionData {
let p = self
.partitions

View File

@ -3,7 +3,10 @@ use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, PartitionKey, ShardId, TableId};
use crate::data::{partition::PartitionData, table::TableName};
use crate::{
data::{partition::PartitionData, table::TableName},
deferred_load::DeferredLoad,
};
/// An infallible resolver of [`PartitionData`] for the specified shard, table,
/// and partition key, returning an initialised [`PartitionData`] buffer for it.
@ -20,7 +23,7 @@ pub(crate) trait PartitionProvider: Send + Sync + Debug {
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: TableName,
table_name: Arc<DeferredLoad<TableName>>,
) -> PartitionData;
}
@ -35,7 +38,7 @@ where
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: TableName,
table_name: Arc<DeferredLoad<TableName>>,
) -> PartitionData {
(**self)
.get_partition(partition_key, shard_id, namespace_id, table_id, table_name)
@ -45,7 +48,7 @@ where
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use data_types::PartitionId;
@ -59,7 +62,9 @@ mod tests {
let shard_id = ShardId::new(42);
let namespace_id = NamespaceId::new(1234);
let table_id = TableId::new(24);
let table_name = TableName::from("platanos");
let table_name = Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from("platanos")
}));
let partition = PartitionId::new(4242);
let data = PartitionData::new(
partition,
@ -67,7 +72,7 @@ mod tests {
shard_id,
namespace_id,
table_id,
table_name.clone(),
Arc::clone(&table_name),
SortKeyState::Provided(None),
None,
);
@ -75,7 +80,13 @@ mod tests {
let mock = Arc::new(MockPartitionProvider::default().with_partition(data));
let got = mock
.get_partition(key, shard_id, namespace_id, table_id, table_name.clone())
.get_partition(
key,
shard_id,
namespace_id,
table_id,
Arc::clone(&table_name),
)
.await;
assert_eq!(got.partition_id(), partition);
assert_eq!(got.namespace_id(), namespace_id);

View File

@ -10,6 +10,7 @@ use write_summary::ShardProgress;
use super::{
namespace::{name_resolver::NamespaceNameProvider, NamespaceData},
partition::resolver::PartitionProvider,
table::name_resolver::TableNameProvider,
DmlApplyAction,
};
use crate::{arcmap::ArcMap, lifecycle::LifecycleHandle};
@ -39,6 +40,12 @@ pub(crate) struct ShardData {
/// [`NamespaceName`]: data_types::NamespaceName
namespaces: ArcMap<NamespaceId, NamespaceData>,
namespace_name_resolver: Arc<dyn NamespaceNameProvider>,
/// The [`TableName`] provider used by [`NamespaceData`] to initialise a
/// [`TableData`].
///
/// [`TableName`]: crate::data::table::TableName
/// [`TableData`]: crate::data::table::TableData
table_name_resolver: Arc<dyn TableNameProvider>,
metrics: Arc<metric::Registry>,
namespace_count: U64Counter,
@ -50,6 +57,7 @@ impl ShardData {
shard_index: ShardIndex,
shard_id: ShardId,
namespace_name_resolver: Arc<dyn NamespaceNameProvider>,
table_name_resolver: Arc<dyn TableNameProvider>,
partition_provider: Arc<dyn PartitionProvider>,
metrics: Arc<metric::Registry>,
) -> Self {
@ -65,6 +73,7 @@ impl ShardData {
shard_id,
namespaces: Default::default(),
namespace_name_resolver,
table_name_resolver,
metrics,
partition_provider,
namespace_count,
@ -86,6 +95,7 @@ impl ShardData {
Arc::new(NamespaceData::new(
namespace_id,
self.namespace_name_resolver.for_namespace(namespace_id),
Arc::clone(&self.table_name_resolver),
self.shard_id,
Arc::clone(&self.partition_provider),
&self.metrics,
@ -122,7 +132,7 @@ impl ShardData {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use data_types::{PartitionId, PartitionKey, ShardIndex, TableId};
use metric::{Attributes, Metric};
@ -131,7 +141,9 @@ mod tests {
data::{
namespace::name_resolver::mock::MockNamespaceNameProvider,
partition::{resolver::MockPartitionProvider, PartitionData, SortKeyState},
table::{name_resolver::mock::MockTableNameProvider, TableName},
},
deferred_load::DeferredLoad,
lifecycle::mock_handle::MockLifecycleHandle,
test_util::{make_write_op, TEST_TABLE},
};
@ -158,7 +170,9 @@ mod tests {
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
@ -168,6 +182,7 @@ mod tests {
SHARD_INDEX,
SHARD_ID,
Arc::new(MockNamespaceNameProvider::new(NAMESPACE_NAME)),
Arc::new(MockTableNameProvider::new(TABLE_NAME)),
partition_provider,
Arc::clone(&metrics),
);

View File

@ -1,5 +1,7 @@
//! Table level data buffer structures.
pub(crate) mod name_resolver;
use std::sync::Arc;
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId};
@ -11,7 +13,7 @@ use super::{
partition::{resolver::PartitionProvider, BufferError, PartitionData},
DmlApplyAction,
};
use crate::{arcmap::ArcMap, lifecycle::LifecycleHandle};
use crate::{arcmap::ArcMap, deferred_load::DeferredLoad, lifecycle::LifecycleHandle};
/// A double-referenced map where [`PartitionData`] can be looked up by
/// [`PartitionKey`], or ID.
@ -91,7 +93,7 @@ impl PartialEq<str> for TableName {
#[derive(Debug)]
pub(crate) struct TableData {
table_id: TableId,
table_name: TableName,
table_name: Arc<DeferredLoad<TableName>>,
/// The catalog ID of the shard & namespace this table is being populated
/// from.
@ -119,14 +121,14 @@ impl TableData {
/// for the first time.
pub(super) fn new(
table_id: TableId,
table_name: TableName,
table_name: DeferredLoad<TableName>,
shard_id: ShardId,
namespace_id: NamespaceId,
partition_provider: Arc<dyn PartitionProvider>,
) -> Self {
Self {
table_id,
table_name,
table_name: Arc::new(table_name),
shard_id,
namespace_id,
partition_data: Default::default(),
@ -154,7 +156,7 @@ impl TableData {
self.shard_id,
self.namespace_id,
self.table_id,
self.table_name.clone(),
Arc::clone(&self.table_name),
)
.await;
// Add the double-referenced partition to the map.
@ -242,7 +244,7 @@ impl TableData {
}
/// Returns the name of this table.
pub(crate) fn table_name(&self) -> &TableName {
pub(crate) fn table_name(&self) -> &Arc<DeferredLoad<TableName>> {
&self.table_name
}
@ -259,7 +261,7 @@ impl TableData {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use data_types::PartitionId;
@ -295,7 +297,9 @@ mod tests {
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
@ -303,7 +307,9 @@ mod tests {
let table = TableData::new(
TABLE_ID,
TABLE_NAME.into(),
DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
}),
SHARD_ID,
NAMESPACE_ID,
partition_provider,
@ -354,7 +360,9 @@ mod tests {
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
})),
SortKeyState::Provided(None),
None,
),
@ -362,7 +370,9 @@ mod tests {
let table = TableData::new(
TABLE_ID,
TABLE_NAME.into(),
DeferredLoad::new(Duration::from_secs(1), async {
TableName::from(TABLE_NAME)
}),
SHARD_ID,
NAMESPACE_ID,
partition_provider,

View File

@ -0,0 +1,140 @@
use std::{sync::Arc, time::Duration};
use backoff::{Backoff, BackoffConfig};
use data_types::TableId;
use iox_catalog::interface::Catalog;
use crate::deferred_load::DeferredLoad;
use super::TableName;
/// An abstract provider of a [`DeferredLoad`] configured to fetch the
/// [`TableName`] of the specified [`TableId`].
pub(crate) trait TableNameProvider: Send + Sync + std::fmt::Debug {
fn for_table(&self, id: TableId) -> DeferredLoad<TableName>;
}
#[derive(Debug)]
pub(crate) struct TableNameResolver {
max_smear: Duration,
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
}
impl TableNameResolver {
pub(crate) fn new(
max_smear: Duration,
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
) -> Self {
Self {
max_smear,
catalog,
backoff_config,
}
}
/// Fetch the [`TableName`] from the [`Catalog`] for specified
/// `table_id`, retrying endlessly when errors occur.
pub(crate) async fn fetch(
table_id: TableId,
catalog: Arc<dyn Catalog>,
backoff_config: BackoffConfig,
) -> TableName {
Backoff::new(&backoff_config)
.retry_all_errors("fetch table name", || async {
let s = catalog
.repositories()
.await
.tables()
.get_by_id(table_id)
.await?
.expect("resolving table name for non-existent table id")
.name
.into();
Result::<_, iox_catalog::interface::Error>::Ok(s)
})
.await
.expect("retry forever")
}
}
impl TableNameProvider for TableNameResolver {
fn for_table(&self, id: TableId) -> DeferredLoad<TableName> {
DeferredLoad::new(
self.max_smear,
Self::fetch(id, Arc::clone(&self.catalog), self.backoff_config.clone()),
)
}
}
#[cfg(test)]
pub(crate) mod mock {
use super::*;
#[derive(Debug)]
pub(crate) struct MockTableNameProvider {
name: TableName,
}
impl MockTableNameProvider {
pub(crate) fn new(name: impl Into<TableName>) -> Self {
Self { name: name.into() }
}
}
impl Default for MockTableNameProvider {
fn default() -> Self {
Self::new("bananas")
}
}
impl TableNameProvider for MockTableNameProvider {
fn for_table(&self, _id: TableId) -> DeferredLoad<TableName> {
let name = self.name.clone();
DeferredLoad::new(Duration::from_secs(1), async { name })
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use data_types::ShardIndex;
use test_helpers::timeout::FutureTimeout;
use crate::test_util::populate_catalog;
use super::*;
const SHARD_INDEX: ShardIndex = ShardIndex::new(24);
const TABLE_NAME: &str = "bananas";
const NAMESPACE_NAME: &str = "platanos";
#[tokio::test]
async fn test_fetch() {
let metrics = Arc::new(metric::Registry::default());
let backoff_config = BackoffConfig::default();
let catalog: Arc<dyn Catalog> =
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
// Populate the catalog with the shard / namespace / table
let (_shard_id, _ns_id, table_id) =
populate_catalog(&*catalog, SHARD_INDEX, NAMESPACE_NAME, TABLE_NAME).await;
let fetcher = Arc::new(TableNameResolver::new(
Duration::from_secs(10),
Arc::clone(&catalog),
backoff_config.clone(),
));
let got = fetcher
.for_table(table_id)
.get()
.with_timeout_panic(Duration::from_secs(5))
.await;
assert_eq!(&**got, TABLE_NAME);
}
}

View File

@ -278,7 +278,7 @@ pub async fn prepare_data_to_querier(
}
};
if let Some(table_data) = namespace_data.table_id(request.table_id) {
if let Some(table_data) = namespace_data.table(request.table_id) {
trace!(
shard_id=%shard_id.get(),
namespace_id=%request.namespace_id,