diff --git a/Cargo.lock b/Cargo.lock index 9f49eb0e87..746217f02a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1218,9 +1218,9 @@ checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" [[package]] name = "digest" -version = "0.10.3" +version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" +checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c" dependencies = [ "block-buffer", "crypto-common", diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 3664e0864a..b41524dd47 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -170,15 +170,12 @@ impl Compactor { let file_size_buckets = U64HistogramOptions::new([ 50 * 1024, // 50KB - 100 * 1024, // 100KB - 300 * 1024, // 300KB 500 * 1024, // 500 KB 1024 * 1024, // 1 MB 3 * 1024 * 1024, // 3 MB 10 * 1024 * 1024, // 10 MB 30 * 1024 * 1024, // 30 MB 100 * 1024 * 1024, // 100 MB - 300 * 1024 * 1024, // 300 MB 500 * 1024 * 1024, // 500 MB u64::MAX, // Inf ]); @@ -196,17 +193,14 @@ impl Compactor { ); let duration_histogram_options = DurationHistogramOptions::new([ - Duration::from_millis(100), Duration::from_millis(500), - Duration::from_micros(2_000), + Duration::from_millis(1_000), // 1 second Duration::from_millis(5_000), Duration::from_millis(15_000), Duration::from_millis(30_000), Duration::from_millis(60_000), // 1 minute Duration::from_millis(5 * 60_000), - Duration::from_millis(10 * 60_000), - Duration::from_millis(20 * 60_000), - Duration::from_millis(40 * 60_000), + Duration::from_millis(15 * 60_000), Duration::from_millis(60 * 60_000), DURATION_MAX, ]); diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 06ab8f3b2c..ebfad96f37 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -166,7 +166,6 @@ impl IngesterData { shard_data .buffer_operation( dml_operation, - shard_id, self.catalog.as_ref(), lifecycle_handle, &self.exec, @@ -267,7 +266,12 @@ impl Persister for IngesterData { .await .expect("retry forever"); - let persisting_batch = namespace.snapshot_to_persisting(&partition_info).await; + let persisting_batch = namespace + .snapshot_to_persisting( + &partition_info.table_name, + &partition_info.partition.partition_key, + ) + .await; if let Some(persisting_batch) = persisting_batch { // do the CPU intensive work of compaction, de-duplication and sorting @@ -648,7 +652,10 @@ mod tests { let mut shards = BTreeMap::new(); let shard_index = ShardIndex::new(0); - shards.insert(shard1.id, ShardData::new(shard_index, Arc::clone(&metrics))); + shards.insert( + shard1.id, + ShardData::new(shard_index, shard1.id, Arc::clone(&metrics)), + ); let object_store: Arc = Arc::new(InMemory::new()); @@ -733,7 +740,7 @@ mod tests { let mut shards = BTreeMap::new(); shards.insert( shard1.id, - ShardData::new(shard1.shard_index, Arc::clone(&metrics)), + ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)), ); let object_store: Arc = Arc::new(InMemory::new()); @@ -838,11 +845,11 @@ mod tests { let mut shards = BTreeMap::new(); shards.insert( shard1.id, - ShardData::new(shard1.shard_index, Arc::clone(&metrics)), + ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)), ); shards.insert( shard2.id, - ShardData::new(shard2.shard_index, Arc::clone(&metrics)), + ShardData::new(shard2.shard_index, shard2.id, Arc::clone(&metrics)), ); let object_store: Arc = Arc::new(InMemory::new()); @@ -1094,11 +1101,11 @@ mod tests { let mut shards = BTreeMap::new(); shards.insert( shard1.id, - ShardData::new(shard1.shard_index, Arc::clone(&metrics)), + ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)), ); shards.insert( shard2.id, - ShardData::new(shard2.shard_index, Arc::clone(&metrics)), + ShardData::new(shard2.shard_index, shard2.id, Arc::clone(&metrics)), ); let object_store: Arc = Arc::new(InMemory::new()); @@ -1271,6 +1278,11 @@ mod tests { .create_or_get("1970-01-01".into(), shard.id, table.id) .await .unwrap(); + repos + .partitions() + .update_persisted_sequence_number(partition.id, SequenceNumber::new(1)) + .await + .unwrap(); let partition2 = repos .partitions() .create_or_get("1970-01-02".into(), shard.id, table.id) @@ -1328,13 +1340,14 @@ mod tests { ); let exec = Executor::new(1); - let data = NamespaceData::new(namespace.id, &*metrics); + let data = NamespaceData::new(namespace.id, shard.id, &*metrics); - // w1 should be ignored so it shouldn't be present in the buffer + // w1 should be ignored because the per-partition replay offset is set + // to 1 already, so it shouldn't be buffered and the buffer should + // remain empty. let should_pause = data .buffer_operation( DmlOperation::Write(w1), - shard.id, catalog.as_ref(), &manager.handle(), &exec, @@ -1356,7 +1369,6 @@ mod tests { // w2 should be in the buffer data.buffer_operation( DmlOperation::Write(w2), - shard.id, catalog.as_ref(), &manager.handle(), &exec, @@ -1398,7 +1410,10 @@ mod tests { let mut shards = BTreeMap::new(); let shard_index = ShardIndex::new(0); - shards.insert(shard1.id, ShardData::new(shard_index, Arc::clone(&metrics))); + shards.insert( + shard1.id, + ShardData::new(shard_index, shard1.id, Arc::clone(&metrics)), + ); let object_store: Arc = Arc::new(InMemory::new()); diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index e65e80dd01..3a63aa2674 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -5,7 +5,7 @@ use std::{ sync::Arc, }; -use data_types::{NamespaceId, PartitionInfo, PartitionKey, SequenceNumber, ShardId}; +use data_types::{NamespaceId, PartitionKey, SequenceNumber, ShardId}; use dml::DmlOperation; use iox_catalog::interface::Catalog; use iox_query::exec::Executor; @@ -26,8 +26,11 @@ use crate::lifecycle::LifecycleHandle; #[derive(Debug)] pub struct NamespaceData { namespace_id: NamespaceId, - tables: RwLock>>>, + /// The catalog ID of the shard this namespace is being populated from. + shard_id: ShardId, + + tables: RwLock>>>, table_count: U64Counter, /// The sequence number being actively written, if any. @@ -78,7 +81,7 @@ pub struct NamespaceData { impl NamespaceData { /// Initialize new tables with default partition template of daily - pub fn new(namespace_id: NamespaceId, metrics: &metric::Registry) -> Self { + pub fn new(namespace_id: NamespaceId, shard_id: ShardId, metrics: &metric::Registry) -> Self { let table_count = metrics .register_metric::( "ingester_tables_total", @@ -88,6 +91,7 @@ impl NamespaceData { Self { namespace_id, + shard_id, tables: Default::default(), table_count, buffering_sequence_number: RwLock::new(None), @@ -100,10 +104,12 @@ impl NamespaceData { #[cfg(test)] pub(crate) fn new_for_test( namespace_id: NamespaceId, + shard_id: ShardId, tables: BTreeMap>>, ) -> Self { Self { namespace_id, + shard_id, tables: RwLock::new(tables), table_count: Default::default(), buffering_sequence_number: RwLock::new(None), @@ -117,7 +123,6 @@ impl NamespaceData { pub async fn buffer_operation( &self, dml_operation: DmlOperation, - shard_id: ShardId, catalog: &dyn Catalog, lifecycle_handle: &dyn LifecycleHandle, executor: &Executor, @@ -148,7 +153,7 @@ impl NamespaceData { for (t, b) in write.into_tables() { let table_data = match self.table_data(&t) { Some(t) => t, - None => self.insert_table(shard_id, &t, catalog).await?, + None => self.insert_table(&t, catalog).await?, }; { @@ -159,7 +164,6 @@ impl NamespaceData { sequence_number, b, partition_key.clone(), - shard_id, catalog, lifecycle_handle, ) @@ -176,20 +180,13 @@ impl NamespaceData { let table_name = delete.table_name().context(super::TableNotPresentSnafu)?; let table_data = match self.table_data(table_name) { Some(t) => t, - None => self.insert_table(shard_id, table_name, catalog).await?, + None => self.insert_table(table_name, catalog).await?, }; let mut table_data = table_data.write().await; table_data - .buffer_delete( - table_name, - delete.predicate(), - shard_id, - sequence_number, - catalog, - executor, - ) + .buffer_delete(delete.predicate(), sequence_number, catalog, executor) .await?; // don't pause writes since deletes don't count towards memory limits @@ -224,22 +221,16 @@ impl NamespaceData { /// or persist, None will be returned. pub async fn snapshot_to_persisting( &self, - partition_info: &PartitionInfo, + table_name: &str, + partition_key: &PartitionKey, ) -> Option> { - if let Some(table_data) = self.table_data(&partition_info.table_name) { + if let Some(table_data) = self.table_data(table_name) { let mut table_data = table_data.write().await; return table_data .partition_data - .get_mut(&partition_info.partition.partition_key) - .and_then(|partition_data| { - partition_data.snapshot_to_persisting_batch( - partition_info.partition.shard_id, - partition_info.partition.table_id, - partition_info.partition.id, - &partition_info.table_name, - ) - }); + .get_mut(partition_key) + .and_then(|partition_data| partition_data.snapshot_to_persisting_batch()); } None @@ -257,14 +248,13 @@ impl NamespaceData { /// Inserts the table or returns it if it happens to be inserted by some other thread async fn insert_table( &self, - shard_id: ShardId, table_name: &str, catalog: &dyn Catalog, ) -> Result>, super::Error> { let mut repos = catalog.repositories().await; let info = repos .tables() - .get_table_persist_info(shard_id, self.namespace_id, table_name) + .get_table_persist_info(self.shard_id, self.namespace_id, table_name) .await .context(super::CatalogSnafu)? .context(super::TableNotFoundSnafu { table_name })?; @@ -275,6 +265,8 @@ impl NamespaceData { Entry::Vacant(v) => { let v = v.insert(Arc::new(tokio::sync::RwLock::new(TableData::new( info.table_id, + table_name, + self.shard_id, info.tombstone_max_sequence_number, )))); self.table_count.inc(1); diff --git a/ingester/src/data/partition.rs b/ingester/src/data/partition.rs index f2059dd1cc..8fe2adcb9a 100644 --- a/ingester/src/data/partition.rs +++ b/ingester/src/data/partition.rs @@ -111,7 +111,14 @@ impl SnapshotBatch { /// Data of an IOx Partition of a given Table of a Namesapce that belongs to a given Shard #[derive(Debug)] pub(crate) struct PartitionData { + /// The catalog ID of the partition this buffer is for. id: PartitionId, + /// The shard and table IDs for this partition. + shard_id: ShardId, + table_id: TableId, + /// The name of the table this partition is part of. + table_name: Arc, + pub(crate) data: DataBuffer, /// The max_persisted_sequence number for any parquet_file in this @@ -121,24 +128,27 @@ pub(crate) struct PartitionData { impl PartitionData { /// Initialize a new partition data buffer - pub fn new(id: PartitionId, max_persisted_sequence_number: Option) -> Self { + pub fn new( + id: PartitionId, + shard_id: ShardId, + table_id: TableId, + table_name: Arc, + max_persisted_sequence_number: Option, + ) -> Self { Self { id, + shard_id, + table_id, + table_name, data: Default::default(), max_persisted_sequence_number, } } /// Snapshot anything in the buffer and move all snapshot data into a persisting batch - pub fn snapshot_to_persisting_batch( - &mut self, - shard_id: ShardId, - table_id: TableId, - partition_id: PartitionId, - table_name: &str, - ) -> Option> { + pub fn snapshot_to_persisting_batch(&mut self) -> Option> { self.data - .snapshot_to_persisting(shard_id, table_id, partition_id, table_name) + .snapshot_to_persisting(self.shard_id, self.table_id, self.id, &self.table_name) } /// Snapshot whatever is in the buffer and return a new vec of the @@ -189,12 +199,7 @@ impl PartitionData { /// tombstone-applied snapshot /// . The tombstone is only added in the `deletes_during_persisting` if the `persisting` /// exists - pub(crate) async fn buffer_tombstone( - &mut self, - executor: &Executor, - table_name: &str, - tombstone: Tombstone, - ) { + pub(crate) async fn buffer_tombstone(&mut self, executor: &Executor, tombstone: Tombstone) { self.data.add_tombstone(tombstone.clone()); // ---------------------------------------------------------- @@ -202,7 +207,7 @@ impl PartitionData { // Make a QueryableBatch for all buffer + snapshots + the given tombstone let max_sequence_number = tombstone.sequence_number; let query_batch = match self.data.snapshot_to_queryable_batch( - table_name, + &self.table_name, self.id, Some(tombstone.clone()), ) { @@ -292,11 +297,13 @@ mod tests { #[test] fn snapshot_buffer_different_but_compatible_schemas() { - let mut partition_data = PartitionData { - id: PartitionId::new(1), - data: Default::default(), - max_persisted_sequence_number: None, - }; + let mut partition_data = PartitionData::new( + PartitionId::new(1), + ShardId::new(1), + TableId::new(1), + "foo".into(), + None, + ); let seq_num1 = SequenceNumber::new(1); // Missing tag `t1` @@ -335,8 +342,13 @@ mod tests { let s_id = 1; let t_id = 1; let p_id = 1; - let table_name = "restaurant"; - let mut p = PartitionData::new(PartitionId::new(p_id), None); + let mut p = PartitionData::new( + PartitionId::new(p_id), + ShardId::new(s_id), + TableId::new(t_id), + "restaurant".into(), + None, + ); let exec = Executor::new(1); // ------------------------------------------ @@ -376,7 +388,7 @@ mod tests { "day=thu", // delete predicate ); // one row will get deleted, the other is moved to snapshot - p.buffer_tombstone(&exec, "restaurant", ts).await; + p.buffer_tombstone(&exec, ts).await; // verify data assert!(p.data.buffer.is_none()); // always empty after delete @@ -439,7 +451,7 @@ mod tests { ); // two rows will get deleted, one from existing snapshot, one from the buffer being moved // to snpashot - p.buffer_tombstone(&exec, "restaurant", ts).await; + p.buffer_tombstone(&exec, ts).await; // verify data assert!(p.data.buffer.is_none()); // always empty after delete @@ -462,14 +474,7 @@ mod tests { // ------------------------------------------ // Persisting - let p_batch = p - .snapshot_to_persisting_batch( - ShardId::new(s_id), - TableId::new(t_id), - PartitionId::new(p_id), - table_name, - ) - .unwrap(); + let p_batch = p.snapshot_to_persisting_batch().unwrap(); // verify data assert!(p.data.buffer.is_none()); // always empty after issuing persit @@ -491,7 +496,7 @@ mod tests { ); // if a query come while persisting, the row with temp=55 will be deleted before // data is sent back to Querier - p.buffer_tombstone(&exec, "restaurant", ts).await; + p.buffer_tombstone(&exec, ts).await; // verify data assert!(p.data.buffer.is_none()); // always empty after delete @@ -559,7 +564,7 @@ mod tests { "temp=60", // delete predicate ); // the row with temp=60 will be removed from the sanphot - p.buffer_tombstone(&exec, "restaurant", ts).await; + p.buffer_tombstone(&exec, ts).await; // verify data assert!(p.data.buffer.is_none()); // always empty after delete diff --git a/ingester/src/data/partition/buffer.rs b/ingester/src/data/partition/buffer.rs index cc64ed6108..48552ad550 100644 --- a/ingester/src/data/partition/buffer.rs +++ b/ingester/src/data/partition/buffer.rs @@ -109,7 +109,7 @@ impl DataBuffer { /// Both buffer and snapshots will be empty after this pub(super) fn snapshot_to_queryable_batch( &mut self, - table_name: &str, + table_name: &Arc, partition_id: PartitionId, tombstone: Option, ) -> Option { @@ -129,7 +129,7 @@ impl DataBuffer { None } else { Some(QueryableBatch::new( - table_name, + Arc::clone(table_name), partition_id, data, tombstones, @@ -164,7 +164,7 @@ impl DataBuffer { shard_id: ShardId, table_id: TableId, partition_id: PartitionId, - table_name: &str, + table_name: &Arc, ) -> Option> { if self.persisting.is_some() { panic!("Unable to snapshot while persisting. This is an unexpected state.") diff --git a/ingester/src/data/shard.rs b/ingester/src/data/shard.rs index d69a062137..b3676af045 100644 --- a/ingester/src/data/shard.rs +++ b/ingester/src/data/shard.rs @@ -22,6 +22,8 @@ use crate::lifecycle::LifecycleHandle; pub struct ShardData { /// The shard index for this shard shard_index: ShardIndex, + /// The catalog ID for this shard. + shard_id: ShardId, // New namespaces can come in at any time so we need to be able to add new ones namespaces: RwLock>>, @@ -32,7 +34,7 @@ pub struct ShardData { impl ShardData { /// Initialise a new [`ShardData`] that emits metrics to `metrics`. - pub fn new(shard_index: ShardIndex, metrics: Arc) -> Self { + pub fn new(shard_index: ShardIndex, shard_id: ShardId, metrics: Arc) -> Self { let namespace_count = metrics .register_metric::( "ingester_namespaces_total", @@ -42,6 +44,7 @@ impl ShardData { Self { shard_index, + shard_id, namespaces: Default::default(), metrics, namespace_count, @@ -52,10 +55,12 @@ impl ShardData { #[cfg(test)] pub fn new_for_test( shard_index: ShardIndex, + shard_id: ShardId, namespaces: BTreeMap>, ) -> Self { Self { shard_index, + shard_id, namespaces: RwLock::new(namespaces), metrics: Default::default(), namespace_count: Default::default(), @@ -69,7 +74,6 @@ impl ShardData { pub async fn buffer_operation( &self, dml_operation: DmlOperation, - shard_id: ShardId, catalog: &dyn Catalog, lifecycle_handle: &dyn LifecycleHandle, executor: &Executor, @@ -83,7 +87,7 @@ impl ShardData { }; namespace_data - .buffer_operation(dml_operation, shard_id, catalog, lifecycle_handle, executor) + .buffer_operation(dml_operation, catalog, lifecycle_handle, executor) .await } @@ -112,7 +116,11 @@ impl ShardData { let data = match n.entry(namespace.name) { Entry::Vacant(v) => { - let v = v.insert(Arc::new(NamespaceData::new(namespace.id, &*self.metrics))); + let v = v.insert(Arc::new(NamespaceData::new( + namespace.id, + self.shard_id, + &*self.metrics, + ))); self.namespace_count.inc(1); Arc::clone(v) } diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index 14dc73a9e3..a3c55a9157 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -1,6 +1,6 @@ //! Table level data buffer structures. -use std::collections::BTreeMap; +use std::{collections::BTreeMap, sync::Arc}; use data_types::{DeletePredicate, PartitionKey, SequenceNumber, ShardId, TableId, Timestamp}; use iox_catalog::interface::Catalog; @@ -16,6 +16,11 @@ use crate::lifecycle::LifecycleHandle; #[derive(Debug)] pub(crate) struct TableData { table_id: TableId, + table_name: Arc, + + /// The catalog ID of the shard this table is being populated from. + shard_id: ShardId, + // the max sequence number for a tombstone associated with this table tombstone_max_sequence_number: Option, // Map pf partition key to its data @@ -24,9 +29,16 @@ pub(crate) struct TableData { impl TableData { /// Initialize new table buffer - pub fn new(table_id: TableId, tombstone_max_sequence_number: Option) -> Self { + pub fn new( + table_id: TableId, + table_name: &str, + shard_id: ShardId, + tombstone_max_sequence_number: Option, + ) -> Self { Self { table_id, + table_name: table_name.into(), + shard_id, tombstone_max_sequence_number, partition_data: Default::default(), } @@ -36,11 +48,15 @@ impl TableData { #[cfg(test)] pub fn new_for_test( table_id: TableId, + table_name: &str, + shard_id: ShardId, tombstone_max_sequence_number: Option, partitions: BTreeMap, ) -> Self { Self { table_id, + table_name: table_name.into(), + shard_id, tombstone_max_sequence_number, partition_data: partitions, } @@ -68,14 +84,13 @@ impl TableData { sequence_number: SequenceNumber, batch: MutableBatch, partition_key: PartitionKey, - shard_id: ShardId, catalog: &dyn Catalog, lifecycle_handle: &dyn LifecycleHandle, ) -> Result { let partition_data = match self.partition_data.get_mut(&partition_key) { Some(p) => p, None => { - self.insert_partition(partition_key.clone(), shard_id, catalog) + self.insert_partition(partition_key.clone(), self.shard_id, catalog) .await?; self.partition_data.get_mut(&partition_key).unwrap() } @@ -90,7 +105,7 @@ impl TableData { let should_pause = lifecycle_handle.log_write( partition_data.id(), - shard_id, + self.shard_id, sequence_number, batch.size(), batch.rows(), @@ -102,9 +117,7 @@ impl TableData { pub(super) async fn buffer_delete( &mut self, - table_name: &str, predicate: &DeletePredicate, - shard_id: ShardId, sequence_number: SequenceNumber, catalog: &dyn Catalog, executor: &Executor, @@ -117,7 +130,7 @@ impl TableData { .tombstones() .create_or_get( self.table_id, - shard_id, + self.shard_id, sequence_number, min_time, max_time, @@ -131,8 +144,7 @@ impl TableData { // modify one partition at a time for data in self.partition_data.values_mut() { - data.buffer_tombstone(executor, table_name, tombstone.clone()) - .await; + data.buffer_tombstone(executor, tombstone.clone()).await; } Ok(()) @@ -161,26 +173,23 @@ impl TableData { shard_id: ShardId, catalog: &dyn Catalog, ) -> Result<(), super::Error> { - let mut repos = catalog.repositories().await; - let partition = repos + let partition = catalog + .repositories() + .await .partitions() .create_or_get(partition_key, shard_id, self.table_id) .await .context(super::CatalogSnafu)?; - // get info on the persisted parquet files to use later for replay or for snapshot - // information on query. - let files = repos - .parquet_files() - .list_by_partition_not_to_delete(partition.id) - .await - .context(super::CatalogSnafu)?; - // for now we just need the max persisted - let max_persisted_sequence_number = files.iter().map(|p| p.max_sequence_number).max(); - self.partition_data.insert( partition.partition_key, - PartitionData::new(partition.id, max_persisted_sequence_number), + PartitionData::new( + partition.id, + shard_id, + self.table_id, + Arc::clone(&self.table_name), + partition.persisted_sequence_number, + ), ); Ok(()) diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index a8a7308150..7e0d7f5a2b 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -140,7 +140,7 @@ impl IngestHandlerImpl { for s in shard_states.values() { shards.insert( s.id, - ShardData::new(s.shard_index, Arc::clone(&metric_registry)), + ShardData::new(s.shard_index, s.id, Arc::clone(&metric_registry)), ); } let data = Arc::new(IngesterData::new( diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index c8e9d37129..2645f010b6 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -191,7 +191,7 @@ async fn prepare_data_to_querier_for_partition( .persisting .unwrap_or_else(|| { QueryableBatch::new( - &request.table, + request.table.clone().into(), unpersisted_partition_data.partition_id, vec![], vec![], diff --git a/ingester/src/query.rs b/ingester/src/query.rs index ff735d96ca..a3d6bc8fb0 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -57,7 +57,7 @@ pub struct QueryableBatch { pub(crate) delete_predicates: Vec>, /// This is needed to return a reference for a trait function - pub(crate) table_name: String, + pub(crate) table_name: Arc, /// Partition ID pub(crate) partition_id: PartitionId, @@ -66,7 +66,7 @@ pub struct QueryableBatch { impl QueryableBatch { /// Initilaize a QueryableBatch pub fn new( - table_name: &str, + table_name: Arc, partition_id: PartitionId, data: Vec>, deletes: Vec, @@ -75,7 +75,7 @@ impl QueryableBatch { Self { data, delete_predicates, - table_name: table_name.to_string(), + table_name, partition_id, } } @@ -318,7 +318,7 @@ mod tests { // This new queryable batch will convert tombstone to delete predicates let query_batch = - QueryableBatch::new("test_table", PartitionId::new(0), vec![], tombstones); + QueryableBatch::new("test_table".into(), PartitionId::new(0), vec![], tombstones); let predicates = query_batch.delete_predicates(); let expected = vec![ Arc::new(DeletePredicate { diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index cb554708b3..19dabb12a3 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -205,7 +205,7 @@ pub fn make_queryable_batch_with_deletes( } Arc::new(QueryableBatch::new( - table_name, + table_name.into(), PartitionId::new(partition_id), snapshots, tombstones, @@ -685,16 +685,20 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa let data_table_id = TableId::new(2); // Make partitions per requested - let partitions = make_partitions(two_partitions, loc, shard_id, data_table_id, TEST_TABLE); + let partitions = make_partitions(two_partitions, loc, shard_id, data_table_id); // Two tables: one empty and one with data of one or two partitions let mut tables = BTreeMap::new(); let empty_tbl = Arc::new(tokio::sync::RwLock::new(TableData::new( empty_table_id, + "test_table", + shard_id, None, ))); let data_tbl = Arc::new(tokio::sync::RwLock::new(TableData::new_for_test( data_table_id, + "test_table", + shard_id, None, partitions, ))); @@ -703,14 +707,18 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa // Two namespaces: one empty and one with data of 2 tables let mut namespaces = BTreeMap::new(); - let empty_ns = Arc::new(NamespaceData::new(NamespaceId::new(1), &*metrics)); - let data_ns = Arc::new(NamespaceData::new_for_test(NamespaceId::new(2), tables)); + let empty_ns = Arc::new(NamespaceData::new(NamespaceId::new(1), shard_id, &*metrics)); + let data_ns = Arc::new(NamespaceData::new_for_test( + NamespaceId::new(2), + shard_id, + tables, + )); namespaces.insert(TEST_NAMESPACE_EMPTY.to_string(), empty_ns); namespaces.insert(TEST_NAMESPACE.to_string(), data_ns); // One shard that contains 2 namespaces let shard_index = ShardIndex::new(0); - let shard_data = ShardData::new_for_test(shard_index, namespaces); + let shard_data = ShardData::new_for_test(shard_index, shard_id, namespaces); let mut shards = BTreeMap::new(); shards.insert(shard_id, shard_data); @@ -737,12 +745,11 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa let data_table_id = TableId::new(2); // Make partitions per requested - let partitions = - make_one_partition_with_tombstones(&exec, loc, shard_id, data_table_id, TEST_TABLE).await; + let partitions = make_one_partition_with_tombstones(&exec, loc, shard_id, data_table_id).await; // Two tables: one empty and one with data of one or two partitions let mut tables = BTreeMap::new(); - let data_tbl = TableData::new_for_test(data_table_id, None, partitions); + let data_tbl = TableData::new_for_test(data_table_id, TEST_TABLE, shard_id, None, partitions); tables.insert( TEST_TABLE.to_string(), Arc::new(tokio::sync::RwLock::new(data_tbl)), @@ -750,12 +757,16 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa // Two namespaces: one empty and one with data of 2 tables let mut namespaces = BTreeMap::new(); - let data_ns = Arc::new(NamespaceData::new_for_test(NamespaceId::new(2), tables)); + let data_ns = Arc::new(NamespaceData::new_for_test( + NamespaceId::new(2), + shard_id, + tables, + )); namespaces.insert(TEST_NAMESPACE.to_string(), data_ns); // One shard that contains 1 namespace let shard_index = ShardIndex::new(0); - let shard_data = ShardData::new_for_test(shard_index, namespaces); + let shard_data = ShardData::new_for_test(shard_index, shard_id, namespaces); let mut shards = BTreeMap::new(); shards.insert(shard_id, shard_data); @@ -776,7 +787,6 @@ pub(crate) fn make_partitions( loc: DataLocation, shard_id: ShardId, table_id: TableId, - table_name: &str, ) -> BTreeMap { // In-memory data includes these rows but split between 4 groups go into // different batches of parittion 1 or partittion 2 as requeted @@ -800,8 +810,7 @@ pub(crate) fn make_partitions( // ------------------------------------------ // Build the first partition let partition_id = PartitionId::new(1); - let (mut p1, seq_num) = - make_first_partition_data(partition_id, loc, shard_id, table_id, table_name); + let (mut p1, seq_num) = make_first_partition_data(partition_id, loc, shard_id, table_id); // ------------------------------------------ // Build the second partition if asked @@ -809,7 +818,7 @@ pub(crate) fn make_partitions( let mut seq_num = seq_num.get(); if two_partitions { let partition_id = PartitionId::new(2); - let mut p2 = PartitionData::new(partition_id, None); + let mut p2 = PartitionData::new(partition_id, shard_id, table_id, TEST_TABLE.into(), None); // Group 4: in buffer of p2 // Fill `buffer` seq_num += 1; @@ -849,7 +858,6 @@ pub(crate) async fn make_one_partition_with_tombstones( loc: DataLocation, shard_id: ShardId, table_id: TableId, - table_name: &str, ) -> BTreeMap { // In-memory data includes these rows but split between 4 groups go into // different batches of parittion 1 or partittion 2 as requeted @@ -869,8 +877,7 @@ pub(crate) async fn make_one_partition_with_tombstones( // ]; let partition_id = PartitionId::new(1); - let (mut p1, seq_num) = - make_first_partition_data(partition_id, loc, shard_id, table_id, table_name); + let (mut p1, seq_num) = make_first_partition_data(partition_id, loc, shard_id, table_id); // Add tombstones // Depending on where the existing data is, they (buffer & snapshot) will be either moved to a new snapshot after @@ -888,7 +895,7 @@ pub(crate) async fn make_one_partition_with_tombstones( 50, // max time of data to get deleted "city=Boston", // delete predicate ); - p1.buffer_tombstone(exec, table_name, ts).await; + p1.buffer_tombstone(exec, ts).await; // Group 4: in buffer of p1 after the tombstone // Fill `buffer` @@ -914,7 +921,6 @@ fn make_first_partition_data( loc: DataLocation, shard_id: ShardId, table_id: TableId, - table_name: &str, ) -> (PartitionData, SequenceNumber) { // In-memory data includes these rows but split between 3 groups go into // different batches of parittion p1 @@ -933,7 +939,7 @@ fn make_first_partition_data( // ------------------------------------------ // Build the first partition - let mut p1 = PartitionData::new(partition_id, None); + let mut p1 = PartitionData::new(partition_id, shard_id, table_id, TEST_TABLE.into(), None); let mut seq_num = 0; // -------------------- @@ -952,7 +958,7 @@ fn make_first_partition_data( if loc.contains(DataLocation::PERSISTING) { // Move group 1 data to persisting - p1.snapshot_to_persisting_batch(shard_id, table_id, partition_id, table_name); + p1.snapshot_to_persisting_batch(); } else if loc.contains(DataLocation::SNAPSHOT) { // move group 1 data to snapshot p1.snapshot().unwrap(); diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 8cc7c5b992..31a5431d6e 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -694,7 +694,11 @@ impl MockIngester { let shards = BTreeMap::from([( shard.shard.id, - ShardData::new(shard.shard.shard_index, catalog.metric_registry()), + ShardData::new( + shard.shard.shard_index, + shard.shard.id, + catalog.metric_registry(), + ), )]); let ingester_data = Arc::new(IngesterData::new( catalog.object_store(),