Merge pull request #5826 from influxdata/dom/table-name-type

refactor: use TableName, not Arc<str>
pull/24376/head
Dom 2022-10-11 09:54:29 +01:00 committed by GitHub
commit 2b8958fc03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 122 additions and 73 deletions

View File

@ -34,6 +34,7 @@ pub mod table;
use self::{
partition::{resolver::PartitionProvider, PartitionStatus},
shard::ShardData,
table::TableName,
};
#[cfg(test)]
@ -449,16 +450,17 @@ impl Persister for IngesterData {
.record(file_size as u64);
// and remove the persisted data from memory
let table_name = TableName::from(&partition_info.table_name);
namespace
.mark_persisted(
&partition_info.table_name,
&table_name,
&partition_info.partition.partition_key,
iox_metadata.max_sequence_number,
)
.await;
debug!(
?partition_id,
table_name=%partition_info.table_name,
%table_name,
partition_key=%partition_info.partition.partition_key,
max_sequence_number=%iox_metadata.max_sequence_number.get(),
"marked partition as persisted"
@ -816,8 +818,8 @@ mod tests {
let (table_id, partition_id) = {
let sd = data.shards.get(&shard1.id).unwrap();
let n = sd.namespace(&"foo".into()).unwrap();
let mem_table = n.table_data("mem").unwrap();
assert!(n.table_data("mem").is_some());
let mem_table = n.table_data(&"mem".into()).unwrap();
assert!(n.table_data(&"mem".into()).is_some());
let mem_table = mem_table.write().await;
let p = mem_table
.get_partition_by_key(&"1970-01-01".into())
@ -961,8 +963,8 @@ mod tests {
let partition_id;
let table_id;
{
let mem_table = n.table_data("mem").unwrap();
assert!(n.table_data("cpu").is_some());
let mem_table = n.table_data(&"mem".into()).unwrap();
assert!(n.table_data(&"cpu".into()).is_some());
let mem_table = mem_table.write().await;
table_id = mem_table.table_id();
@ -1077,7 +1079,7 @@ mod tests {
.unwrap();
assert_eq!(partition_info.partition.sort_key, vec!["time"]);
let mem_table = n.table_data("mem").unwrap();
let mem_table = n.table_data(&"mem".into()).unwrap();
let mem_table = mem_table.read().await;
// verify that the parquet_max_sequence_number got updated
@ -1372,7 +1374,7 @@ mod tests {
.await
.unwrap();
{
let table_data = data.table_data("mem").unwrap();
let table_data = data.table_data(&"mem".into()).unwrap();
let table = table_data.read().await;
let p = table.get_partition_by_key(&"1970-01-01".into()).unwrap();
assert_eq!(
@ -1388,7 +1390,7 @@ mod tests {
.await
.unwrap();
let table_data = data.table_data("mem").unwrap();
let table_data = data.table_data(&"mem".into()).unwrap();
let table = table_data.read().await;
let partition = table.get_partition_by_key(&"1970-01-01".into()).unwrap();
assert_eq!(
@ -1481,7 +1483,7 @@ mod tests {
.unwrap()
.namespace(&namespace.name.clone().into())
.unwrap()
.table_data("mem")
.table_data(&"mem".into())
.unwrap()
.read()
.await
@ -1513,7 +1515,7 @@ mod tests {
.unwrap()
.namespace(&namespace.name.into())
.unwrap()
.table_data("mem")
.table_data(&"mem".into())
.unwrap()
.read()
.await

View File

@ -13,20 +13,23 @@ use write_summary::ShardProgress;
#[cfg(test)]
use super::triggers::TestTriggers;
use super::{partition::resolver::PartitionProvider, table::TableData};
use super::{
partition::resolver::PartitionProvider,
table::{TableData, TableName},
};
use crate::lifecycle::LifecycleHandle;
/// A double-referenced map where [`TableData`] can be looked up by name, or ID.
#[derive(Debug, Default)]
struct DoubleRef {
// TODO(4880): this can be removed when IDs are sent over the wire.
by_name: HashMap<Arc<str>, Arc<tokio::sync::RwLock<TableData>>>,
by_name: HashMap<TableName, Arc<tokio::sync::RwLock<TableData>>>,
by_id: HashMap<TableId, Arc<tokio::sync::RwLock<TableData>>>,
}
impl DoubleRef {
fn insert(&mut self, t: TableData) -> Arc<tokio::sync::RwLock<TableData>> {
let name = Arc::clone(t.table_name());
let name = t.table_name().clone();
let id = t.table_id();
let t = Arc::new(tokio::sync::RwLock::new(t));
@ -35,7 +38,7 @@ impl DoubleRef {
t
}
fn by_name(&self, name: &str) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
fn by_name(&self, name: &TableName) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
self.by_name.get(name).map(Arc::clone)
}
@ -196,6 +199,7 @@ impl NamespaceData {
.clone();
for (t, b) in write.into_tables() {
let t = TableName::from(t);
let table_data = match self.table_data(&t) {
Some(t) => t,
None => self.insert_table(&t, catalog).await?,
@ -221,10 +225,13 @@ impl NamespaceData {
Ok(pause_writes)
}
DmlOperation::Delete(delete) => {
let table_name = delete.table_name().context(super::TableNotPresentSnafu)?;
let table_data = match self.table_data(table_name) {
let table_name = delete
.table_name()
.context(super::TableNotPresentSnafu)?
.into();
let table_data = match self.table_data(&table_name) {
Some(t) => t,
None => self.insert_table(table_name, catalog).await?,
None => self.insert_table(&table_name, catalog).await?,
};
let mut table_data = table_data.write().await;
@ -244,7 +251,7 @@ impl NamespaceData {
#[cfg(test)] // Only used in tests
pub(crate) async fn snapshot(
&self,
table_name: &str,
table_name: &TableName,
partition_key: &PartitionKey,
) -> Option<(
Vec<Arc<super::partition::SnapshotBatch>>,
@ -270,7 +277,7 @@ impl NamespaceData {
#[cfg(test)] // Only used in tests
pub(crate) async fn snapshot_to_persisting(
&self,
table_name: &str,
table_name: &TableName,
partition_key: &PartitionKey,
) -> Option<Arc<super::partition::PersistingBatch>> {
if let Some(table_data) = self.table_data(table_name) {
@ -287,7 +294,7 @@ impl NamespaceData {
/// Gets the buffered table data
pub(crate) fn table_data(
&self,
table_name: &str,
table_name: &TableName,
) -> Option<Arc<tokio::sync::RwLock<TableData>>> {
let t = self.tables.read();
t.by_name(table_name)
@ -305,7 +312,7 @@ impl NamespaceData {
/// Inserts the table or returns it if it happens to be inserted by some other thread
async fn insert_table(
&self,
table_name: &str,
table_name: &TableName,
catalog: &Arc<dyn Catalog>,
) -> Result<Arc<tokio::sync::RwLock<TableData>>, super::Error> {
let mut repos = catalog.repositories().await;
@ -314,7 +321,9 @@ impl NamespaceData {
.get_table_persist_info(self.shard_id, self.namespace_id, table_name)
.await
.context(super::CatalogSnafu)?
.context(super::TableNotFoundSnafu { table_name })?;
.ok_or_else(|| super::Error::TableNotFound {
table_name: table_name.to_string(),
})?;
let mut t = self.tables.write();
@ -326,7 +335,7 @@ impl NamespaceData {
// Insert the table and then return a ref to it.
t.insert(TableData::new(
info.table_id,
table_name,
table_name.clone(),
self.shard_id,
self.namespace_id,
info.tombstone_max_sequence_number,
@ -341,7 +350,7 @@ impl NamespaceData {
/// data buffer.
pub(super) async fn mark_persisted(
&self,
table_name: &str,
table_name: &TableName,
partition_key: &PartitionKey,
sequence_number: SequenceNumber,
) {
@ -479,7 +488,7 @@ mod tests {
assert_eq!(&**ns.namespace_name(), NAMESPACE_NAME);
// Assert the namespace does not contain the test data
assert!(ns.table_data(TABLE_NAME).is_none());
assert!(ns.table_data(&TABLE_NAME.into()).is_none());
assert!(ns.table_id(table_id).is_none());
// Write some test data
@ -499,7 +508,7 @@ mod tests {
.expect("buffer op should succeed");
// Both forms of referencing the table should succeed
assert!(ns.table_data(TABLE_NAME).is_some());
assert!(ns.table_data(&TABLE_NAME.into()).is_some());
assert!(ns.table_id(table_id).is_some());
// And the table counter metric should increase

View File

@ -19,6 +19,8 @@ use self::{
};
use crate::{data::query_dedup::query, query::QueryableBatch};
use super::table::TableName;
mod buffer;
pub mod resolver;
@ -180,7 +182,7 @@ pub struct PartitionData {
namespace_id: NamespaceId,
table_id: TableId,
/// The name of the table this partition is part of.
table_name: Arc<str>,
table_name: TableName,
pub(super) data: DataBuffer,
@ -198,7 +200,7 @@ impl PartitionData {
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<str>,
table_name: TableName,
sort_key: SortKeyState,
max_persisted_sequence_number: Option<SequenceNumber>,
) -> Self {

View File

@ -9,6 +9,8 @@ use snafu::ResultExt;
use uuid::Uuid;
use write_summary::ShardProgress;
use crate::data::table::TableName;
use super::{PersistingBatch, QueryableBatch, SnapshotBatch};
/// Data of an IOx partition split into batches
@ -109,7 +111,7 @@ impl DataBuffer {
/// Both buffer and snapshots will be empty after this
pub(super) fn snapshot_to_queryable_batch(
&mut self,
table_name: &Arc<str>,
table_name: &TableName,
partition_id: PartitionId,
tombstone: Option<Tombstone>,
) -> Option<QueryableBatch> {
@ -129,7 +131,7 @@ impl DataBuffer {
None
} else {
Some(QueryableBatch::new(
Arc::clone(table_name),
table_name.clone(),
partition_id,
data,
tombstones,
@ -164,7 +166,7 @@ impl DataBuffer {
shard_id: ShardId,
table_id: TableId,
partition_id: PartitionId,
table_name: &Arc<str>,
table_name: &TableName,
) -> Option<Arc<PersistingBatch>> {
if self.persisting.is_some() {
panic!("Unable to snapshot while persisting. This is an unexpected state.")

View File

@ -9,7 +9,10 @@ use iox_catalog::interface::Catalog;
use observability_deps::tracing::debug;
use parking_lot::Mutex;
use crate::data::partition::{resolver::DeferredSortKey, PartitionData, SortKeyState};
use crate::data::{
partition::{resolver::DeferredSortKey, PartitionData, SortKeyState},
table::TableName,
};
use super::r#trait::PartitionProvider;
@ -189,7 +192,7 @@ where
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<str>,
table_name: TableName,
) -> PartitionData {
// Use the cached PartitionKey instead of the caller's partition_key,
// instead preferring to reuse the already-shared Arc<str> in the cache.

View File

@ -9,7 +9,10 @@ use data_types::{NamespaceId, Partition, PartitionKey, ShardId, TableId};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::debug;
use crate::data::partition::{PartitionData, SortKeyState};
use crate::data::{
partition::{PartitionData, SortKeyState},
table::TableName,
};
use super::r#trait::PartitionProvider;
@ -55,7 +58,7 @@ impl PartitionProvider for CatalogPartitionResolver {
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<str>,
table_name: TableName,
) -> PartitionData {
debug!(
%partition_key,
@ -132,7 +135,7 @@ mod tests {
};
let callers_partition_key = PartitionKey::from(PARTITION_KEY);
let table_name = TABLE_NAME.into();
let table_name = TableName::from(TABLE_NAME);
let resolver = CatalogPartitionResolver::new(Arc::clone(&catalog));
let got = resolver
.get_partition(
@ -140,7 +143,7 @@ mod tests {
shard_id,
namespace_id,
table_id,
Arc::clone(&table_name),
table_name.clone(),
)
.await;
assert_eq!(got.namespace_id(), namespace_id);

View File

@ -1,12 +1,12 @@
//! A mock [`PartitionProvider`] to inject [`PartitionData`] for tests.
use std::{collections::HashMap, sync::Arc};
use std::collections::HashMap;
use async_trait::async_trait;
use data_types::{NamespaceId, PartitionKey, ShardId, TableId};
use parking_lot::Mutex;
use crate::data::partition::PartitionData;
use crate::data::{partition::PartitionData, table::TableName};
use super::r#trait::PartitionProvider;
@ -58,7 +58,7 @@ impl PartitionProvider for MockPartitionProvider {
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<str>,
table_name: TableName,
) -> PartitionData {
let p = self
.partitions

View File

@ -3,7 +3,7 @@ use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, PartitionKey, ShardId, TableId};
use crate::data::partition::PartitionData;
use crate::data::{partition::PartitionData, table::TableName};
/// An infallible resolver of [`PartitionData`] for the specified shard, table,
/// and partition key, returning an initialised [`PartitionData`] buffer for it.
@ -20,7 +20,7 @@ pub trait PartitionProvider: Send + Sync + Debug {
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<str>,
table_name: TableName,
) -> PartitionData;
}
@ -35,7 +35,7 @@ where
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<str>,
table_name: TableName,
) -> PartitionData {
(**self)
.get_partition(partition_key, shard_id, namespace_id, table_id, table_name)
@ -59,7 +59,7 @@ mod tests {
let shard_id = ShardId::new(42);
let namespace_id = NamespaceId::new(1234);
let table_id = TableId::new(24);
let table_name = "platanos".into();
let table_name = TableName::from("platanos");
let partition = PartitionId::new(4242);
let data = PartitionData::new(
partition,
@ -67,7 +67,7 @@ mod tests {
shard_id,
namespace_id,
table_id,
Arc::clone(&table_name),
table_name.clone(),
SortKeyState::Provided(None),
None,
);
@ -75,13 +75,7 @@ mod tests {
let mock = Arc::new(MockPartitionProvider::default().with_partition(data));
let got = mock
.get_partition(
key,
shard_id,
namespace_id,
table_id,
Arc::clone(&table_name),
)
.get_partition(key, shard_id, namespace_id, table_id, table_name.clone())
.await;
assert_eq!(got.partition_id(), partition);
assert_eq!(got.namespace_id(), namespace_id);

View File

@ -50,11 +50,40 @@ impl DoubleRef {
}
}
/// The string name / identifier of a Table.
///
/// A reference-counted, cheap clone-able string.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct TableName(Arc<str>);
impl<T> From<T> for TableName
where
T: AsRef<str>,
{
fn from(v: T) -> Self {
Self(Arc::from(v.as_ref()))
}
}
impl std::fmt::Display for TableName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::ops::Deref for TableName {
type Target = str;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// Data of a Table in a given Namesapce that belongs to a given Shard
#[derive(Debug)]
pub(crate) struct TableData {
table_id: TableId,
table_name: Arc<str>,
table_name: TableName,
/// The catalog ID of the shard & namespace this table is being populated
/// from.
@ -85,7 +114,7 @@ impl TableData {
/// for the first time.
pub(super) fn new(
table_id: TableId,
table_name: &str,
table_name: TableName,
shard_id: ShardId,
namespace_id: NamespaceId,
tombstone_max_sequence_number: Option<SequenceNumber>,
@ -93,7 +122,7 @@ impl TableData {
) -> Self {
Self {
table_id,
table_name: table_name.into(),
table_name,
shard_id,
namespace_id,
tombstone_max_sequence_number,
@ -137,7 +166,7 @@ impl TableData {
self.shard_id,
self.namespace_id,
self.table_id,
Arc::clone(&self.table_name),
self.table_name.clone(),
)
.await;
// Add the double-referenced partition to the map.
@ -276,7 +305,7 @@ impl TableData {
}
/// Returns the name of this table.
pub(crate) fn table_name(&self) -> &Arc<str> {
pub(crate) fn table_name(&self) -> &TableName {
&self.table_name
}
}
@ -335,7 +364,7 @@ mod tests {
let mut table = TableData::new(
table_id,
TABLE_NAME,
TABLE_NAME.into(),
shard_id,
ns_id,
None,
@ -395,7 +424,7 @@ mod tests {
let mut table = TableData::new(
table_id,
TABLE_NAME,
TABLE_NAME.into(),
shard_id,
ns_id,
None,

View File

@ -445,7 +445,7 @@ mod tests {
use write_buffer::mock::{MockBufferForReading, MockBufferSharedState};
use super::*;
use crate::data::partition::SnapshotBatch;
use crate::data::{partition::SnapshotBatch, table::TableName};
#[tokio::test]
async fn read_from_write_buffer_write_to_mutable_buffer() {
@ -513,13 +513,15 @@ mod tests {
// data in there from both writes.
tokio::time::timeout(Duration::from_secs(2), async {
let ns_name = ingester.namespace.name.into();
let table_name = TableName::from("a");
loop {
let mut has_measurement = false;
if let Some(data) = ingester.ingester.data.shard(ingester.shard.id) {
if let Some(data) = data.namespace(&ns_name) {
// verify there's data in the buffer
if let Some((b, _)) = data.snapshot("a", &"1970-01-01".into()).await {
if let Some((b, _)) = data.snapshot(&table_name, &"1970-01-01".into()).await
{
if let Some(b) = b.first() {
if b.data.num_rows() > 0 {
has_measurement = true;
@ -755,13 +757,15 @@ mod tests {
// data in there
tokio::time::timeout(Duration::from_secs(1), async move {
let ns_name = namespace.name.into();
let table_name = TableName::from("cpu");
loop {
let mut has_measurement = false;
if let Some(data) = ingester.data.shard(shard.id) {
if let Some(data) = data.namespace(&ns_name) {
// verify there's data in the buffer
if let Some((b, _)) = data.snapshot("cpu", &"1970-01-01".into()).await {
if let Some((b, _)) = data.snapshot(&table_name, &"1970-01-01".into()).await
{
if let Some(b) = b.first() {
custom_batch_verification(b);

View File

@ -12,8 +12,8 @@ use snafu::{ensure, Snafu};
use crate::{
data::{
namespace::NamespaceName, partition::UnpersistedPartitionData, IngesterData,
IngesterQueryPartition, IngesterQueryResponse,
namespace::NamespaceName, partition::UnpersistedPartitionData, table::TableName,
IngesterData, IngesterQueryPartition, IngesterQueryResponse,
},
query::QueryableBatch,
};
@ -69,7 +69,8 @@ pub async fn prepare_data_to_querier(
}
};
let table_data = match namespace_data.table_data(&request.table) {
let table_name = TableName::from(&request.table);
let table_data = match namespace_data.table_data(&table_name) {
Some(table_data) => {
debug!(table_name=%request.table, "found table");
table_data

View File

@ -28,7 +28,7 @@ use predicate::{
use schema::{merge::merge_record_batch_schemas, selection::Selection, sort::SortKey, Schema};
use snafu::{ResultExt, Snafu};
use crate::data::partition::SnapshotBatch;
use crate::data::{partition::SnapshotBatch, table::TableName};
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)]
@ -60,7 +60,7 @@ pub(crate) struct QueryableBatch {
pub(crate) delete_predicates: Vec<Arc<DeletePredicate>>,
/// This is needed to return a reference for a trait function
pub(crate) table_name: Arc<str>,
pub(crate) table_name: TableName,
/// Partition ID
pub(crate) partition_id: PartitionId,
@ -69,7 +69,7 @@ pub(crate) struct QueryableBatch {
impl QueryableBatch {
/// Initilaize a QueryableBatch
pub(crate) fn new(
table_name: Arc<str>,
table_name: TableName,
partition_id: PartitionId,
data: Vec<Arc<SnapshotBatch>>,
deletes: Vec<Tombstone>,

View File

@ -657,7 +657,7 @@ pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation)
.unwrap()
.namespace(&TEST_NAMESPACE.into())
.unwrap()
.snapshot_to_persisting(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1))
.snapshot_to_persisting(&TEST_TABLE.into(), &PartitionKey::from(TEST_PARTITION_1))
.await;
} else if loc.contains(DataLocation::SNAPSHOT) {
// move partition 1 data to snapshot
@ -666,7 +666,7 @@ pub(crate) async fn make_ingester_data(two_partitions: bool, loc: DataLocation)
.unwrap()
.namespace(&TEST_NAMESPACE.into())
.unwrap()
.snapshot(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1))
.snapshot(&TEST_TABLE.into(), &PartitionKey::from(TEST_PARTITION_1))
.await;
}
@ -826,7 +826,7 @@ async fn make_one_partition_with_tombstones(
.unwrap()
.namespace(&TEST_NAMESPACE.into())
.unwrap()
.snapshot_to_persisting(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1))
.snapshot_to_persisting(&TEST_TABLE.into(), &PartitionKey::from(TEST_PARTITION_1))
.await;
} else if loc.contains(DataLocation::SNAPSHOT) {
// move partition 1 data to snapshot
@ -835,7 +835,7 @@ async fn make_one_partition_with_tombstones(
.unwrap()
.namespace(&TEST_NAMESPACE.into())
.unwrap()
.snapshot(TEST_TABLE, &PartitionKey::from(TEST_PARTITION_1))
.snapshot(&TEST_TABLE.into(), &PartitionKey::from(TEST_PARTITION_1))
.await;
}