refactor: PartitionData carries parent IDs

This commit changes the PartitionData buffer structure to carry the IDs
of all its parents - the table, namespace, and shard. Previously only
the table & shard were carried.
pull/24376/head
Dom Dwyer 2022-09-29 15:07:03 +02:00
parent 0287a58118
commit 726b1d1d3b
8 changed files with 78 additions and 17 deletions

View File

@ -80,7 +80,7 @@ pub struct NamespaceId(i64);
#[allow(missing_docs)]
impl NamespaceId {
pub fn new(v: i64) -> Self {
pub const fn new(v: i64) -> Self {
Self(v)
}
pub fn get(&self) -> i64 {

View File

@ -265,6 +265,7 @@ impl NamespaceData {
info.table_id,
table_name,
self.shard_id,
self.namespace_id,
info.tombstone_max_sequence_number,
Arc::clone(&self.partition_provider),
))));

View File

@ -3,7 +3,9 @@
use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use data_types::{PartitionId, PartitionKey, SequenceNumber, ShardId, TableId, Tombstone};
use data_types::{
NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId, Tombstone,
};
use iox_query::exec::Executor;
use mutable_batch::MutableBatch;
use schema::selection::Selection;
@ -138,8 +140,9 @@ pub struct PartitionData {
/// The string partition key for this partition.
partition_key: PartitionKey,
/// The shard and table IDs for this partition.
/// The shard, namespace & table IDs for this partition.
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
/// The name of the table this partition is part of.
table_name: Arc<str>,
@ -157,6 +160,7 @@ impl PartitionData {
id: PartitionId,
partition_key: PartitionKey,
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<str>,
max_persisted_sequence_number: Option<SequenceNumber>,
@ -165,6 +169,7 @@ impl PartitionData {
id,
partition_key,
shard_id,
namespace_id,
table_id,
table_name,
data: Default::default(),
@ -337,6 +342,11 @@ impl PartitionData {
pub fn partition_key(&self) -> &PartitionKey {
&self.partition_key
}
/// Return the [`NamespaceId`] this partition is a part of.
pub fn namespace_id(&self) -> NamespaceId {
self.namespace_id
}
}
#[cfg(test)]
@ -353,6 +363,7 @@ mod tests {
PartitionId::new(1),
"bananas".into(),
ShardId::new(1),
NamespaceId::new(42),
TableId::new(1),
"foo".into(),
None,
@ -399,6 +410,7 @@ mod tests {
PartitionId::new(p_id),
"bananas".into(),
ShardId::new(s_id),
NamespaceId::new(42),
TableId::new(t_id),
"restaurant".into(),
None,

View File

@ -1,7 +1,9 @@
use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use data_types::{Partition, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId};
use data_types::{
NamespaceId, Partition, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId,
};
use observability_deps::tracing::debug;
use parking_lot::Mutex;
@ -150,6 +152,7 @@ where
&self,
partition_key: PartitionKey,
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<str>,
) -> PartitionData {
@ -165,6 +168,7 @@ where
cached.partition_id,
key,
shard_id,
namespace_id,
table_id,
table_name,
cached.max_sequence_number,
@ -175,7 +179,7 @@ where
// Otherwise delegate to the catalog / inner impl.
self.inner
.get_partition(partition_key, shard_id, table_id, table_name)
.get_partition(partition_key, shard_id, namespace_id, table_id, table_name)
.await
}
}
@ -189,7 +193,8 @@ mod tests {
const PARTITION_KEY: &str = "bananas";
const PARTITION_ID: PartitionId = PartitionId::new(42);
const SHARD_ID: ShardId = ShardId::new(1);
const TABLE_ID: TableId = TableId::new(2);
const NAMESPACE_ID: NamespaceId = NamespaceId::new(2);
const TABLE_ID: TableId = TableId::new(3);
const TABLE_NAME: &str = "platanos";
#[tokio::test]
@ -198,6 +203,7 @@ mod tests {
PARTITION_ID,
PARTITION_KEY.into(),
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
None,
@ -206,7 +212,13 @@ mod tests {
let cache = PartitionCache::new(inner, []);
let got = cache
.get_partition(PARTITION_KEY.into(), SHARD_ID, TABLE_ID, TABLE_NAME.into())
.get_partition(
PARTITION_KEY.into(),
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
)
.await;
assert_eq!(got.id(), PARTITION_ID);
@ -237,6 +249,7 @@ mod tests {
.get_partition(
callers_partition_key.clone(),
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
)
@ -270,6 +283,7 @@ mod tests {
other_key_id,
PARTITION_KEY.into(),
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
None,
@ -287,7 +301,13 @@ mod tests {
let cache = PartitionCache::new(inner, [partition]);
let got = cache
.get_partition(other_key.clone(), SHARD_ID, TABLE_ID, TABLE_NAME.into())
.get_partition(
other_key.clone(),
SHARD_ID,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
)
.await;
assert_eq!(got.id(), other_key_id);
@ -305,6 +325,7 @@ mod tests {
PARTITION_ID,
PARTITION_KEY.into(),
SHARD_ID,
NAMESPACE_ID,
other_table,
TABLE_NAME.into(),
None,
@ -325,6 +346,7 @@ mod tests {
.get_partition(
PARTITION_KEY.into(),
SHARD_ID,
NAMESPACE_ID,
other_table,
TABLE_NAME.into(),
)
@ -345,6 +367,7 @@ mod tests {
PARTITION_ID,
PARTITION_KEY.into(),
other_shard,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
None,
@ -365,6 +388,7 @@ mod tests {
.get_partition(
PARTITION_KEY.into(),
other_shard,
NAMESPACE_ID,
TABLE_ID,
TABLE_NAME.into(),
)

View File

@ -5,7 +5,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use backoff::{Backoff, BackoffConfig};
use data_types::{Partition, PartitionKey, ShardId, TableId};
use data_types::{NamespaceId, Partition, PartitionKey, ShardId, TableId};
use iox_catalog::interface::Catalog;
use observability_deps::tracing::debug;
@ -53,6 +53,7 @@ impl PartitionProvider for CatalogPartitionResolver {
&self,
partition_key: PartitionKey,
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<str>,
) -> PartitionData {
@ -74,6 +75,7 @@ impl PartitionProvider for CatalogPartitionResolver {
// definitely has no other refs.
partition_key,
shard_id,
namespace_id,
table_id,
table_name,
p.persisted_sequence_number,
@ -98,7 +100,7 @@ mod tests {
let catalog: Arc<dyn Catalog> =
Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics)));
let (shard_id, table_id) = {
let (shard_id, namespace_id, table_id) = {
let mut repos = catalog.repositories().await;
let t = repos.topics().create_or_get("platanos").await.unwrap();
let q = repos.query_pools().create_or_get("platanos").await.unwrap();
@ -125,7 +127,7 @@ mod tests {
.await
.unwrap();
(shard.id, table.id)
(shard.id, ns.id, table.id)
};
let callers_partition_key = PartitionKey::from(PARTITION_KEY);
@ -135,10 +137,12 @@ mod tests {
.get_partition(
callers_partition_key.clone(),
shard_id,
namespace_id,
table_id,
Arc::clone(&table_name),
)
.await;
assert_eq!(got.namespace_id(), namespace_id);
assert_eq!(*got.table_name(), *table_name);
assert_eq!(got.max_persisted_sequence_number(), None);
assert!(got.partition_key.ptr_eq(&callers_partition_key));

View File

@ -3,7 +3,7 @@
use std::{collections::HashMap, sync::Arc};
use async_trait::async_trait;
use data_types::{PartitionKey, ShardId, TableId};
use data_types::{NamespaceId, PartitionKey, ShardId, TableId};
use parking_lot::Mutex;
use crate::data::partition::PartitionData;
@ -53,6 +53,7 @@ impl PartitionProvider for MockPartitionProvider {
&self,
partition_key: PartitionKey,
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<str>,
) -> PartitionData {
@ -64,6 +65,7 @@ impl PartitionProvider for MockPartitionProvider {
panic!("no partition data for mock ({partition_key:?}, {shard_id:?}, {table_id:?})")
});
assert_eq!(p.namespace_id(), namespace_id);
assert_eq!(*p.table_name(), *table_name);
p
}

View File

@ -1,7 +1,7 @@
use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{PartitionKey, ShardId, TableId};
use data_types::{NamespaceId, PartitionKey, ShardId, TableId};
use crate::data::partition::PartitionData;
@ -18,6 +18,7 @@ pub trait PartitionProvider: Send + Sync + Debug {
&self,
partition_key: PartitionKey,
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<str>,
) -> PartitionData;
@ -32,11 +33,12 @@ where
&self,
partition_key: PartitionKey,
shard_id: ShardId,
namespace_id: NamespaceId,
table_id: TableId,
table_name: Arc<str>,
) -> PartitionData {
(**self)
.get_partition(partition_key, shard_id, table_id, table_name)
.get_partition(partition_key, shard_id, namespace_id, table_id, table_name)
.await
}
}
@ -55,6 +57,7 @@ mod tests {
async fn test_arc_impl() {
let key = PartitionKey::from("bananas");
let shard_id = ShardId::new(42);
let namespace_id = NamespaceId::new(1234);
let table_id = TableId::new(24);
let table_name = "platanos".into();
let partition = PartitionId::new(4242);
@ -62,6 +65,7 @@ mod tests {
partition,
"bananas".into(),
shard_id,
namespace_id,
table_id,
Arc::clone(&table_name),
None,
@ -70,9 +74,16 @@ mod tests {
let mock = Arc::new(MockPartitionProvider::default().with_partition(key.clone(), data));
let got = mock
.get_partition(key, shard_id, table_id, Arc::clone(&table_name))
.get_partition(
key,
shard_id,
namespace_id,
table_id,
Arc::clone(&table_name),
)
.await;
assert_eq!(got.id(), partition);
assert_eq!(got.namespace_id(), namespace_id);
assert_eq!(*got.table_name(), *table_name);
}
}

View File

@ -2,7 +2,9 @@
use std::{collections::BTreeMap, sync::Arc};
use data_types::{DeletePredicate, PartitionKey, SequenceNumber, ShardId, TableId, Timestamp};
use data_types::{
DeletePredicate, NamespaceId, PartitionKey, SequenceNumber, ShardId, TableId, Timestamp,
};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use mutable_batch::MutableBatch;
@ -20,8 +22,10 @@ pub(crate) struct TableData {
table_id: TableId,
table_name: Arc<str>,
/// The catalog ID of the shard this table is being populated from.
/// The catalog ID of the shard & namespace this table is being populated
/// from.
shard_id: ShardId,
namespace_id: NamespaceId,
// the max sequence number for a tombstone associated with this table
tombstone_max_sequence_number: Option<SequenceNumber>,
@ -49,6 +53,7 @@ impl TableData {
table_id: TableId,
table_name: &str,
shard_id: ShardId,
namespace_id: NamespaceId,
tombstone_max_sequence_number: Option<SequenceNumber>,
partition_provider: Arc<dyn PartitionProvider>,
) -> Self {
@ -56,6 +61,7 @@ impl TableData {
table_id,
table_name: table_name.into(),
shard_id,
namespace_id,
tombstone_max_sequence_number,
partition_data: Default::default(),
partition_provider,
@ -94,6 +100,7 @@ impl TableData {
.get_partition(
partition_key.clone(),
self.shard_id,
self.namespace_id,
self.table_id,
Arc::clone(&self.table_name),
)