diff --git a/ingester/src/data.rs b/ingester/src/data.rs index fc25b906d4..9c26a09a20 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -810,7 +810,9 @@ mod tests { let mem_table = n.table_data("mem").unwrap(); assert!(n.table_data("mem").is_some()); let mem_table = mem_table.write().await; - let p = mem_table.partition_data.get(&"1970-01-01".into()).unwrap(); + let p = mem_table + .get_partition_by_key(&"1970-01-01".into()) + .unwrap(); p.partition_id() }; @@ -952,7 +954,9 @@ mod tests { let mem_table = n.table_data("mem").unwrap(); assert!(n.table_data("cpu").is_some()); let mem_table = mem_table.write().await; - let p = mem_table.partition_data.get(&"1970-01-01".into()).unwrap(); + let p = mem_table + .get_partition_by_key(&"1970-01-01".into()) + .unwrap(); table_id = mem_table.table_id(); partition_id = p.partition_id(); @@ -1352,7 +1356,7 @@ mod tests { { let table_data = data.table_data("mem").unwrap(); let table = table_data.read().await; - let p = table.partition_data.get(&"1970-01-01".into()).unwrap(); + let p = table.get_partition_by_key(&"1970-01-01".into()).unwrap(); assert_eq!( p.max_persisted_sequence_number(), Some(SequenceNumber::new(1)) @@ -1368,7 +1372,7 @@ mod tests { let table_data = data.table_data("mem").unwrap(); let table = table_data.read().await; - let partition = table.partition_data.get(&"1970-01-01".into()).unwrap(); + let partition = table.get_partition_by_key(&"1970-01-01".into()).unwrap(); assert_eq!( partition.data.buffer.as_ref().unwrap().min_sequence_number, SequenceNumber::new(2) diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index 515956f5f4..6c0be9bc6b 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -226,7 +226,7 @@ impl NamespaceData { if let Some(t) = self.table_data(table_name) { let mut t = t.write().await; - return t.partition_data.get_mut(partition_key).map(|p| { + return t.get_partition_by_key_mut(partition_key).map(|p| { p.data .generate_snapshot() .expect("snapshot on mutable batch should never fail"); @@ -249,8 +249,7 @@ impl NamespaceData { let mut table_data = table_data.write().await; return table_data - .partition_data - .get_mut(partition_key) + .get_partition_by_key_mut(partition_key) .and_then(|partition_data| partition_data.snapshot_to_persisting_batch()); } @@ -321,7 +320,7 @@ impl NamespaceData { ) { if let Some(t) = self.table_data(table_name) { let mut t = t.write().await; - let partition = t.partition_data.get_mut(partition_key); + let partition = t.get_partition_by_key_mut(partition_key); if let Some(p) = partition { p.mark_persisted(sequence_number); diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index 9eb05bae27..000d4d1973 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -1,9 +1,10 @@ //! Table level data buffer structures. -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc}; use data_types::{ - DeletePredicate, NamespaceId, PartitionKey, SequenceNumber, ShardId, TableId, Timestamp, + DeletePredicate, NamespaceId, PartitionId, PartitionKey, SequenceNumber, ShardId, TableId, + Timestamp, }; use iox_catalog::interface::Catalog; use iox_query::exec::Executor; @@ -16,6 +17,39 @@ use super::partition::{ }; use crate::lifecycle::LifecycleHandle; +/// A double-referenced map where [`PartitionData`] can be looked up by +/// [`PartitionKey`], or ID. +#[derive(Debug, Default)] +struct DoubleRef { + // TODO(4880): this can be removed when IDs are sent over the wire. + by_key: HashMap, + by_id: HashMap, +} + +impl DoubleRef { + fn insert(&mut self, ns: PartitionData) { + let id = ns.partition_id(); + let key = ns.partition_key().clone(); + + assert!(self.by_key.insert(key.clone(), ns).is_none()); + assert!(self.by_id.insert(id, key).is_none()); + } + + #[cfg(test)] + fn by_key(&self, key: &PartitionKey) -> Option<&PartitionData> { + self.by_key.get(key) + } + + fn by_key_mut(&mut self, key: &PartitionKey) -> Option<&mut PartitionData> { + self.by_key.get_mut(key) + } + + fn by_id_mut(&mut self, id: PartitionId) -> Option<&mut PartitionData> { + let key = self.by_id.get(&id)?.clone(); + self.by_key_mut(&key) + } +} + /// Data of a Table in a given Namesapce that belongs to a given Shard #[derive(Debug)] pub(crate) struct TableData { @@ -34,8 +68,8 @@ pub(crate) struct TableData { /// `(key, shard, table)` triplet. partition_provider: Arc, - // Map pf partition key to its data - pub(super) partition_data: BTreeMap, + // Map of partition key to its data + partition_data: DoubleRef, } impl TableData { @@ -71,6 +105,7 @@ impl TableData { /// Return parquet_max_sequence_number pub(super) fn parquet_max_sequence_number(&self) -> Option { self.partition_data + .by_key .values() .map(|p| p.max_persisted_sequence_number()) .max() @@ -92,7 +127,7 @@ impl TableData { partition_key: PartitionKey, lifecycle_handle: &dyn LifecycleHandle, ) -> Result { - let partition_data = match self.partition_data.get_mut(&partition_key) { + let partition_data = match self.partition_data.by_key.get_mut(&partition_key) { Some(p) => p, None => { let p = self @@ -105,12 +140,9 @@ impl TableData { Arc::clone(&self.table_name), ) .await; - // Add the partition to the map. - assert!(self - .partition_data - .insert(partition_key.clone(), p) - .is_none()); - self.partition_data.get_mut(&partition_key).unwrap() + // Add the double-referenced partition to the map. + self.partition_data.insert(p); + self.partition_data.by_key_mut(&partition_key).unwrap() } }; @@ -171,15 +203,42 @@ impl TableData { self.tombstone_max_sequence_number = Some(sequence_number); // modify one partition at a time - for data in self.partition_data.values_mut() { + for data in self.partition_data.by_key.values_mut() { data.buffer_tombstone(executor, tombstone.clone()).await; } Ok(()) } + /// Return the [`PartitionData`] for the specified ID. + #[allow(unused)] + pub(crate) fn get_partition( + &mut self, + partition_id: PartitionId, + ) -> Option<&mut PartitionData> { + self.partition_data.by_id_mut(partition_id) + } + + /// Return the [`PartitionData`] for the specified partition key. + #[cfg(test)] + pub(crate) fn get_partition_by_key( + &self, + partition_key: &PartitionKey, + ) -> Option<&PartitionData> { + self.partition_data.by_key(partition_key) + } + + /// Return the [`PartitionData`] for the specified partition key. + pub(crate) fn get_partition_by_key_mut( + &mut self, + partition_key: &PartitionKey, + ) -> Option<&mut PartitionData> { + self.partition_data.by_key_mut(partition_key) + } + pub(crate) fn unpersisted_partition_data(&self) -> Vec { self.partition_data + .by_key .values() .map(|p| UnpersistedPartitionData { partition_id: p.partition_id(), @@ -204,6 +263,7 @@ impl TableData { }; self.partition_data + .by_key .values() .fold(progress, |progress, partition_data| { progress.combine(partition_data.progress()) @@ -248,6 +308,65 @@ mod tests { const PARTITION_KEY: &str = "platanos"; const PARTITION_ID: PartitionId = PartitionId::new(0); + #[tokio::test] + async fn test_partition_double_ref() { + let metrics = Arc::new(metric::Registry::default()); + let catalog: Arc = + Arc::new(iox_catalog::mem::MemCatalog::new(Arc::clone(&metrics))); + + // Populate the catalog with the shard / namespace / table + let (shard_id, ns_id, table_id) = + populate_catalog(&*catalog, SHARD_INDEX, NAMESPACE_NAME, TABLE_NAME).await; + + // Configure the mock partition provider to return a partition for this + // table ID. + let partition_provider = Arc::new(MockPartitionProvider::default().with_partition( + PartitionData::new( + PARTITION_ID, + PARTITION_KEY.into(), + shard_id, + ns_id, + table_id, + TABLE_NAME.into(), + None, + ), + )); + + let mut table = TableData::new( + table_id, + TABLE_NAME, + shard_id, + ns_id, + None, + partition_provider, + ); + + let batch = lines_to_batches(r#"bananas,bat=man value=24 42"#, 0) + .unwrap() + .remove(TABLE_NAME) + .unwrap(); + + // Assert the table does not contain the test partition + assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_none()); + assert!(table.partition_data.by_id_mut(PARTITION_ID).is_none()); + + // Write some test data + let pause = table + .buffer_table_write( + SequenceNumber::new(42), + batch, + PARTITION_KEY.into(), + &MockLifecycleHandle::default(), + ) + .await + .expect("buffer op should succeed"); + assert!(!pause); + + // Referencing the partition should succeed + assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_some()); + assert!(table.partition_data.by_id_mut(PARTITION_ID).is_some()); + } + #[tokio::test] async fn test_bad_write_memory_counting() { let metrics = Arc::new(metric::Registry::default()); @@ -291,7 +410,7 @@ mod tests { let handle = MockLifecycleHandle::default(); // Assert the table does not contain the test partition - assert!(table.partition_data.get(&PARTITION_KEY.into()).is_none()); + assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_none()); // Write some test data let pause = table @@ -306,7 +425,7 @@ mod tests { assert!(!pause); // Referencing the partition should succeed - assert!(table.partition_data.get(&PARTITION_KEY.into()).is_some()); + assert!(table.partition_data.by_key(&PARTITION_KEY.into()).is_some()); // And the lifecycle handle was called with the expected values assert_eq!(