diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 9575923c7f..bfc6b0a574 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -267,7 +267,12 @@ impl Persister for IngesterData { .await .expect("retry forever"); - let persisting_batch = namespace.snapshot_to_persisting(&partition_info).await; + let persisting_batch = namespace + .snapshot_to_persisting( + &partition_info.table_name, + &partition_info.partition.partition_key, + ) + .await; if let Some(persisting_batch) = persisting_batch { // do the CPU intensive work of compaction, de-duplication and sorting diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index a24658cedc..c69aafd503 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -5,7 +5,7 @@ use std::{ sync::Arc, }; -use data_types::{NamespaceId, PartitionInfo, PartitionKey, SequenceNumber, ShardId}; +use data_types::{NamespaceId, PartitionKey, SequenceNumber, ShardId}; use dml::DmlOperation; use iox_catalog::interface::Catalog; use iox_query::exec::Executor; @@ -183,7 +183,6 @@ impl NamespaceData { table_data .buffer_delete( - table_name, delete.predicate(), shard_id, sequence_number, @@ -224,17 +223,16 @@ impl NamespaceData { /// or persist, None will be returned. pub async fn snapshot_to_persisting( &self, - partition_info: &PartitionInfo, + table_name: &str, + partition_key: &PartitionKey, ) -> Option> { - if let Some(table_data) = self.table_data(&partition_info.table_name) { + if let Some(table_data) = self.table_data(table_name) { let mut table_data = table_data.write().await; return table_data .partition_data - .get_mut(&partition_info.partition.partition_key) - .and_then(|partition_data| { - partition_data.snapshot_to_persisting_batch(&partition_info.table_name) - }); + .get_mut(partition_key) + .and_then(|partition_data| partition_data.snapshot_to_persisting_batch()); } None @@ -270,6 +268,7 @@ impl NamespaceData { Entry::Vacant(v) => { let v = v.insert(Arc::new(tokio::sync::RwLock::new(TableData::new( info.table_id, + table_name, info.tombstone_max_sequence_number, )))); self.table_count.inc(1); diff --git a/ingester/src/data/partition.rs b/ingester/src/data/partition.rs index a72347a012..8fe2adcb9a 100644 --- a/ingester/src/data/partition.rs +++ b/ingester/src/data/partition.rs @@ -116,6 +116,8 @@ pub(crate) struct PartitionData { /// The shard and table IDs for this partition. shard_id: ShardId, table_id: TableId, + /// The name of the table this partition is part of. + table_name: Arc, pub(crate) data: DataBuffer, @@ -130,24 +132,23 @@ impl PartitionData { id: PartitionId, shard_id: ShardId, table_id: TableId, + table_name: Arc, max_persisted_sequence_number: Option, ) -> Self { Self { id, shard_id, table_id, + table_name, data: Default::default(), max_persisted_sequence_number, } } /// Snapshot anything in the buffer and move all snapshot data into a persisting batch - pub fn snapshot_to_persisting_batch( - &mut self, - table_name: &str, - ) -> Option> { + pub fn snapshot_to_persisting_batch(&mut self) -> Option> { self.data - .snapshot_to_persisting(self.shard_id, self.table_id, self.id, table_name) + .snapshot_to_persisting(self.shard_id, self.table_id, self.id, &self.table_name) } /// Snapshot whatever is in the buffer and return a new vec of the @@ -198,12 +199,7 @@ impl PartitionData { /// tombstone-applied snapshot /// . The tombstone is only added in the `deletes_during_persisting` if the `persisting` /// exists - pub(crate) async fn buffer_tombstone( - &mut self, - executor: &Executor, - table_name: &str, - tombstone: Tombstone, - ) { + pub(crate) async fn buffer_tombstone(&mut self, executor: &Executor, tombstone: Tombstone) { self.data.add_tombstone(tombstone.clone()); // ---------------------------------------------------------- @@ -211,7 +207,7 @@ impl PartitionData { // Make a QueryableBatch for all buffer + snapshots + the given tombstone let max_sequence_number = tombstone.sequence_number; let query_batch = match self.data.snapshot_to_queryable_batch( - table_name, + &self.table_name, self.id, Some(tombstone.clone()), ) { @@ -301,8 +297,13 @@ mod tests { #[test] fn snapshot_buffer_different_but_compatible_schemas() { - let mut partition_data = - PartitionData::new(PartitionId::new(1), ShardId::new(1), TableId::new(1), None); + let mut partition_data = PartitionData::new( + PartitionId::new(1), + ShardId::new(1), + TableId::new(1), + "foo".into(), + None, + ); let seq_num1 = SequenceNumber::new(1); // Missing tag `t1` @@ -341,11 +342,11 @@ mod tests { let s_id = 1; let t_id = 1; let p_id = 1; - let table_name = "restaurant"; let mut p = PartitionData::new( PartitionId::new(p_id), ShardId::new(s_id), TableId::new(t_id), + "restaurant".into(), None, ); let exec = Executor::new(1); @@ -387,7 +388,7 @@ mod tests { "day=thu", // delete predicate ); // one row will get deleted, the other is moved to snapshot - p.buffer_tombstone(&exec, "restaurant", ts).await; + p.buffer_tombstone(&exec, ts).await; // verify data assert!(p.data.buffer.is_none()); // always empty after delete @@ -450,7 +451,7 @@ mod tests { ); // two rows will get deleted, one from existing snapshot, one from the buffer being moved // to snpashot - p.buffer_tombstone(&exec, "restaurant", ts).await; + p.buffer_tombstone(&exec, ts).await; // verify data assert!(p.data.buffer.is_none()); // always empty after delete @@ -473,7 +474,7 @@ mod tests { // ------------------------------------------ // Persisting - let p_batch = p.snapshot_to_persisting_batch(table_name).unwrap(); + let p_batch = p.snapshot_to_persisting_batch().unwrap(); // verify data assert!(p.data.buffer.is_none()); // always empty after issuing persit @@ -495,7 +496,7 @@ mod tests { ); // if a query come while persisting, the row with temp=55 will be deleted before // data is sent back to Querier - p.buffer_tombstone(&exec, "restaurant", ts).await; + p.buffer_tombstone(&exec, ts).await; // verify data assert!(p.data.buffer.is_none()); // always empty after delete @@ -563,7 +564,7 @@ mod tests { "temp=60", // delete predicate ); // the row with temp=60 will be removed from the sanphot - p.buffer_tombstone(&exec, "restaurant", ts).await; + p.buffer_tombstone(&exec, ts).await; // verify data assert!(p.data.buffer.is_none()); // always empty after delete diff --git a/ingester/src/data/partition/buffer.rs b/ingester/src/data/partition/buffer.rs index cc64ed6108..48552ad550 100644 --- a/ingester/src/data/partition/buffer.rs +++ b/ingester/src/data/partition/buffer.rs @@ -109,7 +109,7 @@ impl DataBuffer { /// Both buffer and snapshots will be empty after this pub(super) fn snapshot_to_queryable_batch( &mut self, - table_name: &str, + table_name: &Arc, partition_id: PartitionId, tombstone: Option, ) -> Option { @@ -129,7 +129,7 @@ impl DataBuffer { None } else { Some(QueryableBatch::new( - table_name, + Arc::clone(table_name), partition_id, data, tombstones, @@ -164,7 +164,7 @@ impl DataBuffer { shard_id: ShardId, table_id: TableId, partition_id: PartitionId, - table_name: &str, + table_name: &Arc, ) -> Option> { if self.persisting.is_some() { panic!("Unable to snapshot while persisting. This is an unexpected state.") diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index f579e17e8e..99bd428b89 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -1,6 +1,6 @@ //! Table level data buffer structures. -use std::collections::BTreeMap; +use std::{collections::BTreeMap, sync::Arc}; use data_types::{DeletePredicate, PartitionKey, SequenceNumber, ShardId, TableId, Timestamp}; use iox_catalog::interface::Catalog; @@ -16,6 +16,7 @@ use crate::lifecycle::LifecycleHandle; #[derive(Debug)] pub(crate) struct TableData { table_id: TableId, + table_name: Arc, // the max sequence number for a tombstone associated with this table tombstone_max_sequence_number: Option, // Map pf partition key to its data @@ -24,9 +25,14 @@ pub(crate) struct TableData { impl TableData { /// Initialize new table buffer - pub fn new(table_id: TableId, tombstone_max_sequence_number: Option) -> Self { + pub fn new( + table_id: TableId, + table_name: &str, + tombstone_max_sequence_number: Option, + ) -> Self { Self { table_id, + table_name: table_name.into(), tombstone_max_sequence_number, partition_data: Default::default(), } @@ -36,11 +42,13 @@ impl TableData { #[cfg(test)] pub fn new_for_test( table_id: TableId, + table_name: &str, tombstone_max_sequence_number: Option, partitions: BTreeMap, ) -> Self { Self { table_id, + table_name: table_name.into(), tombstone_max_sequence_number, partition_data: partitions, } @@ -102,7 +110,6 @@ impl TableData { pub(super) async fn buffer_delete( &mut self, - table_name: &str, predicate: &DeletePredicate, shard_id: ShardId, sequence_number: SequenceNumber, @@ -131,8 +138,7 @@ impl TableData { // modify one partition at a time for data in self.partition_data.values_mut() { - data.buffer_tombstone(executor, table_name, tombstone.clone()) - .await; + data.buffer_tombstone(executor, tombstone.clone()).await; } Ok(()) @@ -175,6 +181,7 @@ impl TableData { partition.id, shard_id, self.table_id, + Arc::clone(&self.table_name), partition.persisted_sequence_number, ), ); diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index c8e9d37129..2645f010b6 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -191,7 +191,7 @@ async fn prepare_data_to_querier_for_partition( .persisting .unwrap_or_else(|| { QueryableBatch::new( - &request.table, + request.table.clone().into(), unpersisted_partition_data.partition_id, vec![], vec![], diff --git a/ingester/src/query.rs b/ingester/src/query.rs index ff735d96ca..a3d6bc8fb0 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -57,7 +57,7 @@ pub struct QueryableBatch { pub(crate) delete_predicates: Vec>, /// This is needed to return a reference for a trait function - pub(crate) table_name: String, + pub(crate) table_name: Arc, /// Partition ID pub(crate) partition_id: PartitionId, @@ -66,7 +66,7 @@ pub struct QueryableBatch { impl QueryableBatch { /// Initilaize a QueryableBatch pub fn new( - table_name: &str, + table_name: Arc, partition_id: PartitionId, data: Vec>, deletes: Vec, @@ -75,7 +75,7 @@ impl QueryableBatch { Self { data, delete_predicates, - table_name: table_name.to_string(), + table_name, partition_id, } } @@ -318,7 +318,7 @@ mod tests { // This new queryable batch will convert tombstone to delete predicates let query_batch = - QueryableBatch::new("test_table", PartitionId::new(0), vec![], tombstones); + QueryableBatch::new("test_table".into(), PartitionId::new(0), vec![], tombstones); let predicates = query_batch.delete_predicates(); let expected = vec![ Arc::new(DeletePredicate { diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index c1b83f2a0a..c043820eeb 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -205,7 +205,7 @@ pub fn make_queryable_batch_with_deletes( } Arc::new(QueryableBatch::new( - table_name, + table_name.into(), PartitionId::new(partition_id), snapshots, tombstones, @@ -685,16 +685,18 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa let data_table_id = TableId::new(2); // Make partitions per requested - let partitions = make_partitions(two_partitions, loc, shard_id, data_table_id, TEST_TABLE); + let partitions = make_partitions(two_partitions, loc, shard_id, data_table_id); // Two tables: one empty and one with data of one or two partitions let mut tables = BTreeMap::new(); let empty_tbl = Arc::new(tokio::sync::RwLock::new(TableData::new( empty_table_id, + "test_table", None, ))); let data_tbl = Arc::new(tokio::sync::RwLock::new(TableData::new_for_test( data_table_id, + "test_table", None, partitions, ))); @@ -737,12 +739,11 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa let data_table_id = TableId::new(2); // Make partitions per requested - let partitions = - make_one_partition_with_tombstones(&exec, loc, shard_id, data_table_id, TEST_TABLE).await; + let partitions = make_one_partition_with_tombstones(&exec, loc, shard_id, data_table_id).await; // Two tables: one empty and one with data of one or two partitions let mut tables = BTreeMap::new(); - let data_tbl = TableData::new_for_test(data_table_id, None, partitions); + let data_tbl = TableData::new_for_test(data_table_id, TEST_TABLE, None, partitions); tables.insert( TEST_TABLE.to_string(), Arc::new(tokio::sync::RwLock::new(data_tbl)), @@ -776,7 +777,6 @@ pub(crate) fn make_partitions( loc: DataLocation, shard_id: ShardId, table_id: TableId, - table_name: &str, ) -> BTreeMap { // In-memory data includes these rows but split between 4 groups go into // different batches of parittion 1 or partittion 2 as requeted @@ -800,8 +800,7 @@ pub(crate) fn make_partitions( // ------------------------------------------ // Build the first partition let partition_id = PartitionId::new(1); - let (mut p1, seq_num) = - make_first_partition_data(partition_id, loc, shard_id, table_id, table_name); + let (mut p1, seq_num) = make_first_partition_data(partition_id, loc, shard_id, table_id); // ------------------------------------------ // Build the second partition if asked @@ -809,7 +808,7 @@ pub(crate) fn make_partitions( let mut seq_num = seq_num.get(); if two_partitions { let partition_id = PartitionId::new(2); - let mut p2 = PartitionData::new(partition_id, shard_id, table_id, None); + let mut p2 = PartitionData::new(partition_id, shard_id, table_id, TEST_TABLE.into(), None); // Group 4: in buffer of p2 // Fill `buffer` seq_num += 1; @@ -849,7 +848,6 @@ pub(crate) async fn make_one_partition_with_tombstones( loc: DataLocation, shard_id: ShardId, table_id: TableId, - table_name: &str, ) -> BTreeMap { // In-memory data includes these rows but split between 4 groups go into // different batches of parittion 1 or partittion 2 as requeted @@ -869,8 +867,7 @@ pub(crate) async fn make_one_partition_with_tombstones( // ]; let partition_id = PartitionId::new(1); - let (mut p1, seq_num) = - make_first_partition_data(partition_id, loc, shard_id, table_id, table_name); + let (mut p1, seq_num) = make_first_partition_data(partition_id, loc, shard_id, table_id); // Add tombstones // Depending on where the existing data is, they (buffer & snapshot) will be either moved to a new snapshot after @@ -888,7 +885,7 @@ pub(crate) async fn make_one_partition_with_tombstones( 50, // max time of data to get deleted "city=Boston", // delete predicate ); - p1.buffer_tombstone(exec, table_name, ts).await; + p1.buffer_tombstone(exec, ts).await; // Group 4: in buffer of p1 after the tombstone // Fill `buffer` @@ -914,7 +911,6 @@ fn make_first_partition_data( loc: DataLocation, shard_id: ShardId, table_id: TableId, - table_name: &str, ) -> (PartitionData, SequenceNumber) { // In-memory data includes these rows but split between 3 groups go into // different batches of parittion p1 @@ -933,7 +929,7 @@ fn make_first_partition_data( // ------------------------------------------ // Build the first partition - let mut p1 = PartitionData::new(partition_id, shard_id, table_id, None); + let mut p1 = PartitionData::new(partition_id, shard_id, table_id, TEST_TABLE.into(), None); let mut seq_num = 0; // -------------------- @@ -952,7 +948,7 @@ fn make_first_partition_data( if loc.contains(DataLocation::PERSISTING) { // Move group 1 data to persisting - p1.snapshot_to_persisting_batch(table_name); + p1.snapshot_to_persisting_batch(); } else if loc.contains(DataLocation::SNAPSHOT) { // move group 1 data to snapshot p1.snapshot().unwrap();