refactor: ref TableData by name & ID

Changes the NamespaceData to hold a map of table name -> TableData, and
table ID -> TableData simultaneously.

This allows for cheap lookups when the caller holds an ID, and is part
of preparatory work to transition away from using string names in the
ingester for tables.

This commit also switches from a BTreeMap to a HashMap as the backing
collection, as maintaining key ordering doesn't appear to be necessary.
pull/24376/head
Dom Dwyer 2022-09-30 14:10:02 +02:00
parent 75178e4591
commit 9c0e4e98c4
2 changed files with 143 additions and 25 deletions

View File

@ -1,11 +1,8 @@
//! Namespace level data buffer structures.
use std::{
collections::{btree_map::Entry, BTreeMap},
sync::Arc,
};
use std::{collections::HashMap, sync::Arc};
use data_types::{NamespaceId, PartitionKey, SequenceNumber, ShardId};
use data_types::{NamespaceId, PartitionKey, SequenceNumber, ShardId, TableId};
use dml::DmlOperation;
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
@ -16,12 +13,38 @@ use write_summary::ShardProgress;
#[cfg(test)]
use super::triggers::TestTriggers;
use super::{
partition::{resolver::PartitionProvider, PersistingBatch},
table::TableData,
};
use super::{partition::resolver::PartitionProvider, table::TableData};
use crate::lifecycle::LifecycleHandle;
/// A double-referenced map where [`TableData`] can be looked up by name, or ID.
#[derive(Debug, Default)]
struct DoubleRef {
// TODO(4880): this can be removed when IDs are sent over the wire.
by_name: HashMap<Arc<str>, Arc<tokio::sync::RwLock<TableData>>>,
by_id: HashMap<TableId, Arc<tokio::sync::RwLock<TableData>>>,
}
impl DoubleRef {
fn insert(&mut self, t: TableData) -> Arc<tokio::sync::RwLock<TableData>> {
let name = Arc::clone(t.table_name());
let id = t.table_id();
let t = Arc::new(tokio::sync::RwLock::new(t));
self.by_name.insert(name, Arc::clone(&t));
self.by_id.insert(id, Arc::clone(&t));
t
}
fn by_name(&self, name: &str) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
self.by_name.get(name).map(Arc::clone)
}
#[cfg(test)]
fn by_id(&self, id: TableId) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
self.by_id.get(&id).map(Arc::clone)
}
}
/// Data of a Namespace that belongs to a given Shard
#[derive(Debug)]
pub(crate) struct NamespaceData {
@ -30,7 +53,7 @@ pub(crate) struct NamespaceData {
/// The catalog ID of the shard this namespace is being populated from.
shard_id: ShardId,
tables: RwLock<BTreeMap<String, Arc<tokio::sync::RwLock<TableData>>>>,
tables: RwLock<DoubleRef>,
table_count: U64Counter,
/// The resolver of `(shard_id, table_id, partition_key)` to
@ -198,7 +221,7 @@ impl NamespaceData {
partition_key: &PartitionKey,
) -> Option<(
Vec<Arc<super::partition::SnapshotBatch>>,
Option<Arc<PersistingBatch>>,
Option<Arc<super::partition::PersistingBatch>>,
)> {
if let Some(t) = self.table_data(table_name) {
let mut t = t.write().await;
@ -221,7 +244,7 @@ impl NamespaceData {
&self,
table_name: &str,
partition_key: &PartitionKey,
) -> Option<Arc<PersistingBatch>> {
) -> Option<Arc<super::partition::PersistingBatch>> {
if let Some(table_data) = self.table_data(table_name) {
let mut table_data = table_data.write().await;
@ -240,7 +263,17 @@ impl NamespaceData {
table_name: &str,
) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
let t = self.tables.read();
t.get(table_name).cloned()
t.by_name(table_name)
}
/// Return the table data by ID.
#[cfg(test)]
pub(crate) fn table_id(
&self,
table_id: TableId,
) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
let t = self.tables.read();
t.by_id(table_id)
}
/// Inserts the table or returns it if it happens to be inserted by some other thread
@ -259,23 +292,22 @@ impl NamespaceData {
let mut t = self.tables.write();
let data = match t.entry(table_name.to_string()) {
Entry::Vacant(v) => {
let v = v.insert(Arc::new(tokio::sync::RwLock::new(TableData::new(
Ok(match t.by_name(table_name) {
Some(v) => v,
None => {
self.table_count.inc(1);
// Insert the table and then return a ref to it.
t.insert(TableData::new(
info.table_id,
table_name,
self.shard_id,
self.namespace_id,
info.tombstone_max_sequence_number,
Arc::clone(&self.partition_provider),
))));
self.table_count.inc(1);
Arc::clone(v)
))
}
Entry::Occupied(v) => Arc::clone(v.get()),
};
Ok(data)
})
}
/// Walks down the table and partition and clears the persisting batch. The sequence number is
@ -299,7 +331,7 @@ impl NamespaceData {
/// Return progress from this Namespace
pub(super) async fn progress(&self) -> ShardProgress {
let tables: Vec<_> = self.tables.read().values().map(Arc::clone).collect();
let tables: Vec<_> = self.tables.read().by_id.values().map(Arc::clone).collect();
// Consolidate progtress across partitions.
let mut progress = ShardProgress::new()
@ -357,3 +389,84 @@ impl<'a> Drop for ScopedSequenceNumber<'a> {
*buffering_sequence_number = None;
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use data_types::{PartitionId, ShardIndex};
use metric::{Attributes, Metric};
use crate::{
data::partition::{resolver::MockPartitionProvider, PartitionData},
lifecycle::mock_handle::MockLifecycleHandle,
test_util::{make_write_op, populate_catalog},
};
use super::*;
const SHARD_INDEX: ShardIndex = ShardIndex::new(24);
const TABLE_NAME: &str = "bananas";
const NAMESPACE_NAME: &str = "platanos";
#[tokio::test]
async fn test_namespace_double_ref() {
let metrics = Arc::new(metric::Registry::default());
let catalog: Arc<dyn Catalog> =
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
let exec = Executor::new(1);
// Populate the catalog with the shard / namespace / table
let (shard_id, ns_id, table_id) =
populate_catalog(&*catalog, SHARD_INDEX, NAMESPACE_NAME, TABLE_NAME).await;
// Configure the mock partition provider to return a partition for this
// table ID.
let partition_provider = Arc::new(MockPartitionProvider::default().with_partition(
PartitionData::new(
PartitionId::new(0),
PartitionKey::from("banana-split"),
shard_id,
ns_id,
table_id,
TABLE_NAME.into(),
None,
),
));
let ns = NamespaceData::new(ns_id, shard_id, partition_provider, &*metrics);
// Assert the namespace does not contain the test data
assert!(ns.table_data(TABLE_NAME).is_none());
assert!(ns.table_id(table_id).is_none());
// Write some test data
ns.buffer_operation(
DmlOperation::Write(make_write_op(
&PartitionKey::from("banana-split"),
SHARD_INDEX,
NAMESPACE_NAME,
0,
r#"bananas,city=Medford day="sun",temp=55 22"#,
)),
&catalog,
&MockLifecycleHandle::default(),
&exec,
)
.await
.expect("buffer op should succeed");
// Both forms of referencing the table should succeed
assert!(ns.table_data(TABLE_NAME).is_some());
assert!(ns.table_id(table_id).is_some());
// And the table counter metric should increase
let tables = metrics
.get_instrument::<Metric<U64Counter>>("ingester_tables_total")
.expect("failed to read metric")
.get_observer(&Attributes::from([]))
.expect("failed to get observer")
.fetch();
assert_eq!(tables, 1);
}
}

View File

@ -210,10 +210,15 @@ impl TableData {
})
}
#[cfg(test)]
/// Returns the table ID for this partition.
pub(super) fn table_id(&self) -> TableId {
self.table_id
}
/// Returns the name of this table.
pub(crate) fn table_name(&self) -> &Arc<str> {
&self.table_name
}
}
#[cfg(test)]