From b0eb85ddd5f60a2d4c2010c688c09abe4fbe31c4 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 16 Sep 2022 17:52:38 +0200 Subject: [PATCH] refactor: store ShardId in child nodes Instead of passing the ShardId into each function for child nodes of the Shard, store it. This avoids the possibility of mistakenly passing the wrong value. --- ingester/src/data.rs | 25 ++++++++++++++----------- ingester/src/data/namespace.rs | 28 +++++++++++++--------------- ingester/src/data/shard.rs | 16 ++++++++++++---- ingester/src/data/table.rs | 16 +++++++++++----- ingester/src/handler.rs | 2 +- ingester/src/test_util.rs | 22 ++++++++++++++++------ query_tests/src/scenarios/util.rs | 6 +++++- 7 files changed, 72 insertions(+), 43 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index bfc6b0a574..ebfad96f37 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -166,7 +166,6 @@ impl IngesterData { shard_data .buffer_operation( dml_operation, - shard_id, self.catalog.as_ref(), lifecycle_handle, &self.exec, @@ -653,7 +652,10 @@ mod tests { let mut shards = BTreeMap::new(); let shard_index = ShardIndex::new(0); - shards.insert(shard1.id, ShardData::new(shard_index, Arc::clone(&metrics))); + shards.insert( + shard1.id, + ShardData::new(shard_index, shard1.id, Arc::clone(&metrics)), + ); let object_store: Arc = Arc::new(InMemory::new()); @@ -738,7 +740,7 @@ mod tests { let mut shards = BTreeMap::new(); shards.insert( shard1.id, - ShardData::new(shard1.shard_index, Arc::clone(&metrics)), + ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)), ); let object_store: Arc = Arc::new(InMemory::new()); @@ -843,11 +845,11 @@ mod tests { let mut shards = BTreeMap::new(); shards.insert( shard1.id, - ShardData::new(shard1.shard_index, Arc::clone(&metrics)), + ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)), ); shards.insert( shard2.id, - ShardData::new(shard2.shard_index, Arc::clone(&metrics)), + ShardData::new(shard2.shard_index, shard2.id, Arc::clone(&metrics)), ); let object_store: Arc = Arc::new(InMemory::new()); @@ -1099,11 +1101,11 @@ mod tests { let mut shards = BTreeMap::new(); shards.insert( shard1.id, - ShardData::new(shard1.shard_index, Arc::clone(&metrics)), + ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)), ); shards.insert( shard2.id, - ShardData::new(shard2.shard_index, Arc::clone(&metrics)), + ShardData::new(shard2.shard_index, shard2.id, Arc::clone(&metrics)), ); let object_store: Arc = Arc::new(InMemory::new()); @@ -1338,7 +1340,7 @@ mod tests { ); let exec = Executor::new(1); - let data = NamespaceData::new(namespace.id, &*metrics); + let data = NamespaceData::new(namespace.id, shard.id, &*metrics); // w1 should be ignored because the per-partition replay offset is set // to 1 already, so it shouldn't be buffered and the buffer should @@ -1346,7 +1348,6 @@ mod tests { let should_pause = data .buffer_operation( DmlOperation::Write(w1), - shard.id, catalog.as_ref(), &manager.handle(), &exec, @@ -1368,7 +1369,6 @@ mod tests { // w2 should be in the buffer data.buffer_operation( DmlOperation::Write(w2), - shard.id, catalog.as_ref(), &manager.handle(), &exec, @@ -1410,7 +1410,10 @@ mod tests { let mut shards = BTreeMap::new(); let shard_index = ShardIndex::new(0); - shards.insert(shard1.id, ShardData::new(shard_index, Arc::clone(&metrics))); + shards.insert( + shard1.id, + ShardData::new(shard_index, shard1.id, Arc::clone(&metrics)), + ); let object_store: Arc = Arc::new(InMemory::new()); diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index c69aafd503..3a63aa2674 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -26,8 +26,11 @@ use crate::lifecycle::LifecycleHandle; #[derive(Debug)] pub struct NamespaceData { namespace_id: NamespaceId, - tables: RwLock>>>, + /// The catalog ID of the shard this namespace is being populated from. + shard_id: ShardId, + + tables: RwLock>>>, table_count: U64Counter, /// The sequence number being actively written, if any. @@ -78,7 +81,7 @@ pub struct NamespaceData { impl NamespaceData { /// Initialize new tables with default partition template of daily - pub fn new(namespace_id: NamespaceId, metrics: &metric::Registry) -> Self { + pub fn new(namespace_id: NamespaceId, shard_id: ShardId, metrics: &metric::Registry) -> Self { let table_count = metrics .register_metric::( "ingester_tables_total", @@ -88,6 +91,7 @@ impl NamespaceData { Self { namespace_id, + shard_id, tables: Default::default(), table_count, buffering_sequence_number: RwLock::new(None), @@ -100,10 +104,12 @@ impl NamespaceData { #[cfg(test)] pub(crate) fn new_for_test( namespace_id: NamespaceId, + shard_id: ShardId, tables: BTreeMap>>, ) -> Self { Self { namespace_id, + shard_id, tables: RwLock::new(tables), table_count: Default::default(), buffering_sequence_number: RwLock::new(None), @@ -117,7 +123,6 @@ impl NamespaceData { pub async fn buffer_operation( &self, dml_operation: DmlOperation, - shard_id: ShardId, catalog: &dyn Catalog, lifecycle_handle: &dyn LifecycleHandle, executor: &Executor, @@ -148,7 +153,7 @@ impl NamespaceData { for (t, b) in write.into_tables() { let table_data = match self.table_data(&t) { Some(t) => t, - None => self.insert_table(shard_id, &t, catalog).await?, + None => self.insert_table(&t, catalog).await?, }; { @@ -159,7 +164,6 @@ impl NamespaceData { sequence_number, b, partition_key.clone(), - shard_id, catalog, lifecycle_handle, ) @@ -176,19 +180,13 @@ impl NamespaceData { let table_name = delete.table_name().context(super::TableNotPresentSnafu)?; let table_data = match self.table_data(table_name) { Some(t) => t, - None => self.insert_table(shard_id, table_name, catalog).await?, + None => self.insert_table(table_name, catalog).await?, }; let mut table_data = table_data.write().await; table_data - .buffer_delete( - delete.predicate(), - shard_id, - sequence_number, - catalog, - executor, - ) + .buffer_delete(delete.predicate(), sequence_number, catalog, executor) .await?; // don't pause writes since deletes don't count towards memory limits @@ -250,14 +248,13 @@ impl NamespaceData { /// Inserts the table or returns it if it happens to be inserted by some other thread async fn insert_table( &self, - shard_id: ShardId, table_name: &str, catalog: &dyn Catalog, ) -> Result>, super::Error> { let mut repos = catalog.repositories().await; let info = repos .tables() - .get_table_persist_info(shard_id, self.namespace_id, table_name) + .get_table_persist_info(self.shard_id, self.namespace_id, table_name) .await .context(super::CatalogSnafu)? .context(super::TableNotFoundSnafu { table_name })?; @@ -269,6 +266,7 @@ impl NamespaceData { let v = v.insert(Arc::new(tokio::sync::RwLock::new(TableData::new( info.table_id, table_name, + self.shard_id, info.tombstone_max_sequence_number, )))); self.table_count.inc(1); diff --git a/ingester/src/data/shard.rs b/ingester/src/data/shard.rs index d69a062137..b3676af045 100644 --- a/ingester/src/data/shard.rs +++ b/ingester/src/data/shard.rs @@ -22,6 +22,8 @@ use crate::lifecycle::LifecycleHandle; pub struct ShardData { /// The shard index for this shard shard_index: ShardIndex, + /// The catalog ID for this shard. + shard_id: ShardId, // New namespaces can come in at any time so we need to be able to add new ones namespaces: RwLock>>, @@ -32,7 +34,7 @@ pub struct ShardData { impl ShardData { /// Initialise a new [`ShardData`] that emits metrics to `metrics`. - pub fn new(shard_index: ShardIndex, metrics: Arc) -> Self { + pub fn new(shard_index: ShardIndex, shard_id: ShardId, metrics: Arc) -> Self { let namespace_count = metrics .register_metric::( "ingester_namespaces_total", @@ -42,6 +44,7 @@ impl ShardData { Self { shard_index, + shard_id, namespaces: Default::default(), metrics, namespace_count, @@ -52,10 +55,12 @@ impl ShardData { #[cfg(test)] pub fn new_for_test( shard_index: ShardIndex, + shard_id: ShardId, namespaces: BTreeMap>, ) -> Self { Self { shard_index, + shard_id, namespaces: RwLock::new(namespaces), metrics: Default::default(), namespace_count: Default::default(), @@ -69,7 +74,6 @@ impl ShardData { pub async fn buffer_operation( &self, dml_operation: DmlOperation, - shard_id: ShardId, catalog: &dyn Catalog, lifecycle_handle: &dyn LifecycleHandle, executor: &Executor, @@ -83,7 +87,7 @@ impl ShardData { }; namespace_data - .buffer_operation(dml_operation, shard_id, catalog, lifecycle_handle, executor) + .buffer_operation(dml_operation, catalog, lifecycle_handle, executor) .await } @@ -112,7 +116,11 @@ impl ShardData { let data = match n.entry(namespace.name) { Entry::Vacant(v) => { - let v = v.insert(Arc::new(NamespaceData::new(namespace.id, &*self.metrics))); + let v = v.insert(Arc::new(NamespaceData::new( + namespace.id, + self.shard_id, + &*self.metrics, + ))); self.namespace_count.inc(1); Arc::clone(v) } diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index 99bd428b89..a3c55a9157 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -17,6 +17,10 @@ use crate::lifecycle::LifecycleHandle; pub(crate) struct TableData { table_id: TableId, table_name: Arc, + + /// The catalog ID of the shard this table is being populated from. + shard_id: ShardId, + // the max sequence number for a tombstone associated with this table tombstone_max_sequence_number: Option, // Map pf partition key to its data @@ -28,11 +32,13 @@ impl TableData { pub fn new( table_id: TableId, table_name: &str, + shard_id: ShardId, tombstone_max_sequence_number: Option, ) -> Self { Self { table_id, table_name: table_name.into(), + shard_id, tombstone_max_sequence_number, partition_data: Default::default(), } @@ -43,12 +49,14 @@ impl TableData { pub fn new_for_test( table_id: TableId, table_name: &str, + shard_id: ShardId, tombstone_max_sequence_number: Option, partitions: BTreeMap, ) -> Self { Self { table_id, table_name: table_name.into(), + shard_id, tombstone_max_sequence_number, partition_data: partitions, } @@ -76,14 +84,13 @@ impl TableData { sequence_number: SequenceNumber, batch: MutableBatch, partition_key: PartitionKey, - shard_id: ShardId, catalog: &dyn Catalog, lifecycle_handle: &dyn LifecycleHandle, ) -> Result { let partition_data = match self.partition_data.get_mut(&partition_key) { Some(p) => p, None => { - self.insert_partition(partition_key.clone(), shard_id, catalog) + self.insert_partition(partition_key.clone(), self.shard_id, catalog) .await?; self.partition_data.get_mut(&partition_key).unwrap() } @@ -98,7 +105,7 @@ impl TableData { let should_pause = lifecycle_handle.log_write( partition_data.id(), - shard_id, + self.shard_id, sequence_number, batch.size(), batch.rows(), @@ -111,7 +118,6 @@ impl TableData { pub(super) async fn buffer_delete( &mut self, predicate: &DeletePredicate, - shard_id: ShardId, sequence_number: SequenceNumber, catalog: &dyn Catalog, executor: &Executor, @@ -124,7 +130,7 @@ impl TableData { .tombstones() .create_or_get( self.table_id, - shard_id, + self.shard_id, sequence_number, min_time, max_time, diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index a8a7308150..7e0d7f5a2b 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -140,7 +140,7 @@ impl IngestHandlerImpl { for s in shard_states.values() { shards.insert( s.id, - ShardData::new(s.shard_index, Arc::clone(&metric_registry)), + ShardData::new(s.shard_index, s.id, Arc::clone(&metric_registry)), ); } let data = Arc::new(IngesterData::new( diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index c043820eeb..19dabb12a3 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -692,11 +692,13 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa let empty_tbl = Arc::new(tokio::sync::RwLock::new(TableData::new( empty_table_id, "test_table", + shard_id, None, ))); let data_tbl = Arc::new(tokio::sync::RwLock::new(TableData::new_for_test( data_table_id, "test_table", + shard_id, None, partitions, ))); @@ -705,14 +707,18 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa // Two namespaces: one empty and one with data of 2 tables let mut namespaces = BTreeMap::new(); - let empty_ns = Arc::new(NamespaceData::new(NamespaceId::new(1), &*metrics)); - let data_ns = Arc::new(NamespaceData::new_for_test(NamespaceId::new(2), tables)); + let empty_ns = Arc::new(NamespaceData::new(NamespaceId::new(1), shard_id, &*metrics)); + let data_ns = Arc::new(NamespaceData::new_for_test( + NamespaceId::new(2), + shard_id, + tables, + )); namespaces.insert(TEST_NAMESPACE_EMPTY.to_string(), empty_ns); namespaces.insert(TEST_NAMESPACE.to_string(), data_ns); // One shard that contains 2 namespaces let shard_index = ShardIndex::new(0); - let shard_data = ShardData::new_for_test(shard_index, namespaces); + let shard_data = ShardData::new_for_test(shard_index, shard_id, namespaces); let mut shards = BTreeMap::new(); shards.insert(shard_id, shard_data); @@ -743,7 +749,7 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa // Two tables: one empty and one with data of one or two partitions let mut tables = BTreeMap::new(); - let data_tbl = TableData::new_for_test(data_table_id, TEST_TABLE, None, partitions); + let data_tbl = TableData::new_for_test(data_table_id, TEST_TABLE, shard_id, None, partitions); tables.insert( TEST_TABLE.to_string(), Arc::new(tokio::sync::RwLock::new(data_tbl)), @@ -751,12 +757,16 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa // Two namespaces: one empty and one with data of 2 tables let mut namespaces = BTreeMap::new(); - let data_ns = Arc::new(NamespaceData::new_for_test(NamespaceId::new(2), tables)); + let data_ns = Arc::new(NamespaceData::new_for_test( + NamespaceId::new(2), + shard_id, + tables, + )); namespaces.insert(TEST_NAMESPACE.to_string(), data_ns); // One shard that contains 1 namespace let shard_index = ShardIndex::new(0); - let shard_data = ShardData::new_for_test(shard_index, namespaces); + let shard_data = ShardData::new_for_test(shard_index, shard_id, namespaces); let mut shards = BTreeMap::new(); shards.insert(shard_id, shard_data); diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 8cc7c5b992..31a5431d6e 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -694,7 +694,11 @@ impl MockIngester { let shards = BTreeMap::from([( shard.shard.id, - ShardData::new(shard.shard.shard_index, catalog.metric_registry()), + ShardData::new( + shard.shard.shard_index, + shard.shard.id, + catalog.metric_registry(), + ), )]); let ingester_data = Arc::new(IngesterData::new( catalog.object_store(),