diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index f030190d5c..60684d44a5 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -80,7 +80,7 @@ pub struct NamespaceId(i64); #[allow(missing_docs)] impl NamespaceId { - pub fn new(v: i64) -> Self { + pub const fn new(v: i64) -> Self { Self(v) } pub fn get(&self) -> i64 { diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index 51753b2dc7..6a5ddb9581 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -265,6 +265,7 @@ impl NamespaceData { info.table_id, table_name, self.shard_id, + self.namespace_id, info.tombstone_max_sequence_number, Arc::clone(&self.partition_provider), )))); diff --git a/ingester/src/data/partition.rs b/ingester/src/data/partition.rs index 7f98615d92..1ec531fdbc 100644 --- a/ingester/src/data/partition.rs +++ b/ingester/src/data/partition.rs @@ -3,7 +3,9 @@ use std::sync::Arc; use arrow::record_batch::RecordBatch; -use data_types::{PartitionId, PartitionKey, SequenceNumber, ShardId, TableId, Tombstone}; +use data_types::{ + NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId, Tombstone, +}; use iox_query::exec::Executor; use mutable_batch::MutableBatch; use schema::selection::Selection; @@ -138,8 +140,9 @@ pub struct PartitionData { /// The string partition key for this partition. partition_key: PartitionKey, - /// The shard and table IDs for this partition. + /// The shard, namespace & table IDs for this partition. shard_id: ShardId, + namespace_id: NamespaceId, table_id: TableId, /// The name of the table this partition is part of. table_name: Arc, @@ -157,6 +160,7 @@ impl PartitionData { id: PartitionId, partition_key: PartitionKey, shard_id: ShardId, + namespace_id: NamespaceId, table_id: TableId, table_name: Arc, max_persisted_sequence_number: Option, @@ -165,6 +169,7 @@ impl PartitionData { id, partition_key, shard_id, + namespace_id, table_id, table_name, data: Default::default(), @@ -337,6 +342,11 @@ impl PartitionData { pub fn partition_key(&self) -> &PartitionKey { &self.partition_key } + + /// Return the [`NamespaceId`] this partition is a part of. + pub fn namespace_id(&self) -> NamespaceId { + self.namespace_id + } } #[cfg(test)] @@ -353,6 +363,7 @@ mod tests { PartitionId::new(1), "bananas".into(), ShardId::new(1), + NamespaceId::new(42), TableId::new(1), "foo".into(), None, @@ -399,6 +410,7 @@ mod tests { PartitionId::new(p_id), "bananas".into(), ShardId::new(s_id), + NamespaceId::new(42), TableId::new(t_id), "restaurant".into(), None, diff --git a/ingester/src/data/partition/resolver/cache.rs b/ingester/src/data/partition/resolver/cache.rs index d784b0b371..83fddd26de 100644 --- a/ingester/src/data/partition/resolver/cache.rs +++ b/ingester/src/data/partition/resolver/cache.rs @@ -1,7 +1,9 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; -use data_types::{Partition, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId}; +use data_types::{ + NamespaceId, Partition, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId, +}; use observability_deps::tracing::debug; use parking_lot::Mutex; @@ -150,6 +152,7 @@ where &self, partition_key: PartitionKey, shard_id: ShardId, + namespace_id: NamespaceId, table_id: TableId, table_name: Arc, ) -> PartitionData { @@ -165,6 +168,7 @@ where cached.partition_id, key, shard_id, + namespace_id, table_id, table_name, cached.max_sequence_number, @@ -175,7 +179,7 @@ where // Otherwise delegate to the catalog / inner impl. self.inner - .get_partition(partition_key, shard_id, table_id, table_name) + .get_partition(partition_key, shard_id, namespace_id, table_id, table_name) .await } } @@ -189,7 +193,8 @@ mod tests { const PARTITION_KEY: &str = "bananas"; const PARTITION_ID: PartitionId = PartitionId::new(42); const SHARD_ID: ShardId = ShardId::new(1); - const TABLE_ID: TableId = TableId::new(2); + const NAMESPACE_ID: NamespaceId = NamespaceId::new(2); + const TABLE_ID: TableId = TableId::new(3); const TABLE_NAME: &str = "platanos"; #[tokio::test] @@ -198,6 +203,7 @@ mod tests { PARTITION_ID, PARTITION_KEY.into(), SHARD_ID, + NAMESPACE_ID, TABLE_ID, TABLE_NAME.into(), None, @@ -206,7 +212,13 @@ mod tests { let cache = PartitionCache::new(inner, []); let got = cache - .get_partition(PARTITION_KEY.into(), SHARD_ID, TABLE_ID, TABLE_NAME.into()) + .get_partition( + PARTITION_KEY.into(), + SHARD_ID, + NAMESPACE_ID, + TABLE_ID, + TABLE_NAME.into(), + ) .await; assert_eq!(got.id(), PARTITION_ID); @@ -237,6 +249,7 @@ mod tests { .get_partition( callers_partition_key.clone(), SHARD_ID, + NAMESPACE_ID, TABLE_ID, TABLE_NAME.into(), ) @@ -270,6 +283,7 @@ mod tests { other_key_id, PARTITION_KEY.into(), SHARD_ID, + NAMESPACE_ID, TABLE_ID, TABLE_NAME.into(), None, @@ -287,7 +301,13 @@ mod tests { let cache = PartitionCache::new(inner, [partition]); let got = cache - .get_partition(other_key.clone(), SHARD_ID, TABLE_ID, TABLE_NAME.into()) + .get_partition( + other_key.clone(), + SHARD_ID, + NAMESPACE_ID, + TABLE_ID, + TABLE_NAME.into(), + ) .await; assert_eq!(got.id(), other_key_id); @@ -305,6 +325,7 @@ mod tests { PARTITION_ID, PARTITION_KEY.into(), SHARD_ID, + NAMESPACE_ID, other_table, TABLE_NAME.into(), None, @@ -325,6 +346,7 @@ mod tests { .get_partition( PARTITION_KEY.into(), SHARD_ID, + NAMESPACE_ID, other_table, TABLE_NAME.into(), ) @@ -345,6 +367,7 @@ mod tests { PARTITION_ID, PARTITION_KEY.into(), other_shard, + NAMESPACE_ID, TABLE_ID, TABLE_NAME.into(), None, @@ -365,6 +388,7 @@ mod tests { .get_partition( PARTITION_KEY.into(), other_shard, + NAMESPACE_ID, TABLE_ID, TABLE_NAME.into(), ) diff --git a/ingester/src/data/partition/resolver/catalog.rs b/ingester/src/data/partition/resolver/catalog.rs index 73e81563e9..8035546be6 100644 --- a/ingester/src/data/partition/resolver/catalog.rs +++ b/ingester/src/data/partition/resolver/catalog.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; -use data_types::{Partition, PartitionKey, ShardId, TableId}; +use data_types::{NamespaceId, Partition, PartitionKey, ShardId, TableId}; use iox_catalog::interface::Catalog; use observability_deps::tracing::debug; @@ -53,6 +53,7 @@ impl PartitionProvider for CatalogPartitionResolver { &self, partition_key: PartitionKey, shard_id: ShardId, + namespace_id: NamespaceId, table_id: TableId, table_name: Arc, ) -> PartitionData { @@ -74,6 +75,7 @@ impl PartitionProvider for CatalogPartitionResolver { // definitely has no other refs. partition_key, shard_id, + namespace_id, table_id, table_name, p.persisted_sequence_number, @@ -98,7 +100,7 @@ mod tests { let catalog: Arc = Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics))); - let (shard_id, table_id) = { + let (shard_id, namespace_id, table_id) = { let mut repos = catalog.repositories().await; let t = repos.topics().create_or_get("platanos").await.unwrap(); let q = repos.query_pools().create_or_get("platanos").await.unwrap(); @@ -125,7 +127,7 @@ mod tests { .await .unwrap(); - (shard.id, table.id) + (shard.id, ns.id, table.id) }; let callers_partition_key = PartitionKey::from(PARTITION_KEY); @@ -135,10 +137,12 @@ mod tests { .get_partition( callers_partition_key.clone(), shard_id, + namespace_id, table_id, Arc::clone(&table_name), ) .await; + assert_eq!(got.namespace_id(), namespace_id); assert_eq!(*got.table_name(), *table_name); assert_eq!(got.max_persisted_sequence_number(), None); assert!(got.partition_key.ptr_eq(&callers_partition_key)); diff --git a/ingester/src/data/partition/resolver/mock.rs b/ingester/src/data/partition/resolver/mock.rs index d9e7c55618..f07496e2f5 100644 --- a/ingester/src/data/partition/resolver/mock.rs +++ b/ingester/src/data/partition/resolver/mock.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; -use data_types::{PartitionKey, ShardId, TableId}; +use data_types::{NamespaceId, PartitionKey, ShardId, TableId}; use parking_lot::Mutex; use crate::data::partition::PartitionData; @@ -53,6 +53,7 @@ impl PartitionProvider for MockPartitionProvider { &self, partition_key: PartitionKey, shard_id: ShardId, + namespace_id: NamespaceId, table_id: TableId, table_name: Arc, ) -> PartitionData { @@ -64,6 +65,7 @@ impl PartitionProvider for MockPartitionProvider { panic!("no partition data for mock ({partition_key:?}, {shard_id:?}, {table_id:?})") }); + assert_eq!(p.namespace_id(), namespace_id); assert_eq!(*p.table_name(), *table_name); p } diff --git a/ingester/src/data/partition/resolver/trait.rs b/ingester/src/data/partition/resolver/trait.rs index 90dac275d7..25dac83869 100644 --- a/ingester/src/data/partition/resolver/trait.rs +++ b/ingester/src/data/partition/resolver/trait.rs @@ -1,7 +1,7 @@ use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; -use data_types::{PartitionKey, ShardId, TableId}; +use data_types::{NamespaceId, PartitionKey, ShardId, TableId}; use crate::data::partition::PartitionData; @@ -18,6 +18,7 @@ pub trait PartitionProvider: Send + Sync + Debug { &self, partition_key: PartitionKey, shard_id: ShardId, + namespace_id: NamespaceId, table_id: TableId, table_name: Arc, ) -> PartitionData; @@ -32,11 +33,12 @@ where &self, partition_key: PartitionKey, shard_id: ShardId, + namespace_id: NamespaceId, table_id: TableId, table_name: Arc, ) -> PartitionData { (**self) - .get_partition(partition_key, shard_id, table_id, table_name) + .get_partition(partition_key, shard_id, namespace_id, table_id, table_name) .await } } @@ -55,6 +57,7 @@ mod tests { async fn test_arc_impl() { let key = PartitionKey::from("bananas"); let shard_id = ShardId::new(42); + let namespace_id = NamespaceId::new(1234); let table_id = TableId::new(24); let table_name = "platanos".into(); let partition = PartitionId::new(4242); @@ -62,6 +65,7 @@ mod tests { partition, "bananas".into(), shard_id, + namespace_id, table_id, Arc::clone(&table_name), None, @@ -70,9 +74,16 @@ mod tests { let mock = Arc::new(MockPartitionProvider::default().with_partition(key.clone(), data)); let got = mock - .get_partition(key, shard_id, table_id, Arc::clone(&table_name)) + .get_partition( + key, + shard_id, + namespace_id, + table_id, + Arc::clone(&table_name), + ) .await; assert_eq!(got.id(), partition); + assert_eq!(got.namespace_id(), namespace_id); assert_eq!(*got.table_name(), *table_name); } } diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index 196ba563a7..b1e5a3dff0 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -2,7 +2,9 @@ use std::{collections::BTreeMap, sync::Arc}; -use data_types::{DeletePredicate, PartitionKey, SequenceNumber, ShardId, TableId, Timestamp}; +use data_types::{ + DeletePredicate, NamespaceId, PartitionKey, SequenceNumber, ShardId, TableId, Timestamp, +}; use iox_catalog::interface::Catalog; use iox_query::exec::Executor; use mutable_batch::MutableBatch; @@ -20,8 +22,10 @@ pub(crate) struct TableData { table_id: TableId, table_name: Arc, - /// The catalog ID of the shard this table is being populated from. + /// The catalog ID of the shard & namespace this table is being populated + /// from. shard_id: ShardId, + namespace_id: NamespaceId, // the max sequence number for a tombstone associated with this table tombstone_max_sequence_number: Option, @@ -49,6 +53,7 @@ impl TableData { table_id: TableId, table_name: &str, shard_id: ShardId, + namespace_id: NamespaceId, tombstone_max_sequence_number: Option, partition_provider: Arc, ) -> Self { @@ -56,6 +61,7 @@ impl TableData { table_id, table_name: table_name.into(), shard_id, + namespace_id, tombstone_max_sequence_number, partition_data: Default::default(), partition_provider, @@ -94,6 +100,7 @@ impl TableData { .get_partition( partition_key.clone(), self.shard_id, + self.namespace_id, self.table_id, Arc::clone(&self.table_name), )