From ce0d189260b156cf38b62688d8287b5275ffa73d Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 15 Sep 2022 16:19:01 +0200 Subject: [PATCH 1/7] perf: O(1) partition persist mark discovery Changes the ingest code path to eliminate scanning the parquet_files table to discover the last persisted offset per partition, instead utilising the new persisted_sequence_number field on the Partition itself to read the same value. This lookup blocks ingest for the shard, so removing the expensive query from the ingest hot path should improve catch-up time after a restart/deployment. --- ingester/src/data.rs | 9 ++++++++- ingester/src/data/table.rs | 17 ++++------------- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 06ab8f3b2c..9575923c7f 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -1271,6 +1271,11 @@ mod tests { .create_or_get("1970-01-01".into(), shard.id, table.id) .await .unwrap(); + repos + .partitions() + .update_persisted_sequence_number(partition.id, SequenceNumber::new(1)) + .await + .unwrap(); let partition2 = repos .partitions() .create_or_get("1970-01-02".into(), shard.id, table.id) @@ -1330,7 +1335,9 @@ mod tests { let data = NamespaceData::new(namespace.id, &*metrics); - // w1 should be ignored so it shouldn't be present in the buffer + // w1 should be ignored because the per-partition replay offset is set + // to 1 already, so it shouldn't be buffered and the buffer should + // remain empty. let should_pause = data .buffer_operation( DmlOperation::Write(w1), diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index 14dc73a9e3..beb8b53d96 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -161,26 +161,17 @@ impl TableData { shard_id: ShardId, catalog: &dyn Catalog, ) -> Result<(), super::Error> { - let mut repos = catalog.repositories().await; - let partition = repos + let partition = catalog + .repositories() + .await .partitions() .create_or_get(partition_key, shard_id, self.table_id) .await .context(super::CatalogSnafu)?; - // get info on the persisted parquet files to use later for replay or for snapshot - // information on query. - let files = repos - .parquet_files() - .list_by_partition_not_to_delete(partition.id) - .await - .context(super::CatalogSnafu)?; - // for now we just need the max persisted - let max_persisted_sequence_number = files.iter().map(|p| p.max_sequence_number).max(); - self.partition_data.insert( partition.partition_key, - PartitionData::new(partition.id, max_persisted_sequence_number), + PartitionData::new(partition.id, partition.persisted_sequence_number), ); Ok(()) From 099dda430e4cda8a1a20db187c06d1cebe02dc12 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 16 Sep 2022 14:03:54 +0000 Subject: [PATCH 2/7] chore(deps): Bump digest from 0.10.3 to 0.10.5 (#5655) Bumps [digest](https://github.com/RustCrypto/traits) from 0.10.3 to 0.10.5. - [Release notes](https://github.com/RustCrypto/traits/releases) - [Commits](https://github.com/RustCrypto/traits/compare/digest-v0.10.3...digest-v0.10.5) --- updated-dependencies: - dependency-name: digest dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Dom --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9f49eb0e87..746217f02a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1218,9 +1218,9 @@ checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" [[package]] name = "digest" -version = "0.10.3" +version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" +checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c" dependencies = [ "block-buffer", "crypto-common", From 85d6efafe19627a0107a0d044490e9fcaaa4dbd6 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 16 Sep 2022 17:08:08 +0200 Subject: [PATCH 3/7] refactor: snapshot_to_persisting redundant ID Partition::snapshot_to_persisting() passes the ID of the partition it is calling `snapshot_to_persisting()` on. The partition already knows what its ID is, so at best it's redundant, and at worst, inconsistent with the actual ID. --- ingester/src/data/namespace.rs | 1 - ingester/src/data/partition.rs | 10 ++-------- ingester/src/test_util.rs | 2 +- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index e65e80dd01..9728fd6285 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -236,7 +236,6 @@ impl NamespaceData { partition_data.snapshot_to_persisting_batch( partition_info.partition.shard_id, partition_info.partition.table_id, - partition_info.partition.id, &partition_info.table_name, ) }); diff --git a/ingester/src/data/partition.rs b/ingester/src/data/partition.rs index f2059dd1cc..bd0e0fa496 100644 --- a/ingester/src/data/partition.rs +++ b/ingester/src/data/partition.rs @@ -134,11 +134,10 @@ impl PartitionData { &mut self, shard_id: ShardId, table_id: TableId, - partition_id: PartitionId, table_name: &str, ) -> Option> { self.data - .snapshot_to_persisting(shard_id, table_id, partition_id, table_name) + .snapshot_to_persisting(shard_id, table_id, self.id, table_name) } /// Snapshot whatever is in the buffer and return a new vec of the @@ -463,12 +462,7 @@ mod tests { // ------------------------------------------ // Persisting let p_batch = p - .snapshot_to_persisting_batch( - ShardId::new(s_id), - TableId::new(t_id), - PartitionId::new(p_id), - table_name, - ) + .snapshot_to_persisting_batch(ShardId::new(s_id), TableId::new(t_id), table_name) .unwrap(); // verify data diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index cb554708b3..0d3b031f5d 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -952,7 +952,7 @@ fn make_first_partition_data( if loc.contains(DataLocation::PERSISTING) { // Move group 1 data to persisting - p1.snapshot_to_persisting_batch(shard_id, table_id, partition_id, table_name); + p1.snapshot_to_persisting_batch(shard_id, table_id, table_name); } else if loc.contains(DataLocation::SNAPSHOT) { // move group 1 data to snapshot p1.snapshot().unwrap(); From c7ba0bea911bc08723669424e430b831f0ef3322 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 16 Sep 2022 17:17:16 +0200 Subject: [PATCH 4/7] refactor: ShardId & TableId in PartitionData When we construct a PartitionData we have the ShardId and TableId. This commit stores them in the PartitionData for later use, rather than repeatedly passing them in again when constructing snapshots, at the risk of passing the wrong IDs. --- ingester/src/data/namespace.rs | 6 +----- ingester/src/data/partition.rs | 36 ++++++++++++++++++++++------------ ingester/src/data/table.rs | 7 ++++++- ingester/src/test_util.rs | 6 +++--- 4 files changed, 33 insertions(+), 22 deletions(-) diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index 9728fd6285..a24658cedc 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -233,11 +233,7 @@ impl NamespaceData { .partition_data .get_mut(&partition_info.partition.partition_key) .and_then(|partition_data| { - partition_data.snapshot_to_persisting_batch( - partition_info.partition.shard_id, - partition_info.partition.table_id, - &partition_info.table_name, - ) + partition_data.snapshot_to_persisting_batch(&partition_info.table_name) }); } diff --git a/ingester/src/data/partition.rs b/ingester/src/data/partition.rs index bd0e0fa496..a72347a012 100644 --- a/ingester/src/data/partition.rs +++ b/ingester/src/data/partition.rs @@ -111,7 +111,12 @@ impl SnapshotBatch { /// Data of an IOx Partition of a given Table of a Namesapce that belongs to a given Shard #[derive(Debug)] pub(crate) struct PartitionData { + /// The catalog ID of the partition this buffer is for. id: PartitionId, + /// The shard and table IDs for this partition. + shard_id: ShardId, + table_id: TableId, + pub(crate) data: DataBuffer, /// The max_persisted_sequence number for any parquet_file in this @@ -121,9 +126,16 @@ pub(crate) struct PartitionData { impl PartitionData { /// Initialize a new partition data buffer - pub fn new(id: PartitionId, max_persisted_sequence_number: Option) -> Self { + pub fn new( + id: PartitionId, + shard_id: ShardId, + table_id: TableId, + max_persisted_sequence_number: Option, + ) -> Self { Self { id, + shard_id, + table_id, data: Default::default(), max_persisted_sequence_number, } @@ -132,12 +144,10 @@ impl PartitionData { /// Snapshot anything in the buffer and move all snapshot data into a persisting batch pub fn snapshot_to_persisting_batch( &mut self, - shard_id: ShardId, - table_id: TableId, table_name: &str, ) -> Option> { self.data - .snapshot_to_persisting(shard_id, table_id, self.id, table_name) + .snapshot_to_persisting(self.shard_id, self.table_id, self.id, table_name) } /// Snapshot whatever is in the buffer and return a new vec of the @@ -291,11 +301,8 @@ mod tests { #[test] fn snapshot_buffer_different_but_compatible_schemas() { - let mut partition_data = PartitionData { - id: PartitionId::new(1), - data: Default::default(), - max_persisted_sequence_number: None, - }; + let mut partition_data = + PartitionData::new(PartitionId::new(1), ShardId::new(1), TableId::new(1), None); let seq_num1 = SequenceNumber::new(1); // Missing tag `t1` @@ -335,7 +342,12 @@ mod tests { let t_id = 1; let p_id = 1; let table_name = "restaurant"; - let mut p = PartitionData::new(PartitionId::new(p_id), None); + let mut p = PartitionData::new( + PartitionId::new(p_id), + ShardId::new(s_id), + TableId::new(t_id), + None, + ); let exec = Executor::new(1); // ------------------------------------------ @@ -461,9 +473,7 @@ mod tests { // ------------------------------------------ // Persisting - let p_batch = p - .snapshot_to_persisting_batch(ShardId::new(s_id), TableId::new(t_id), table_name) - .unwrap(); + let p_batch = p.snapshot_to_persisting_batch(table_name).unwrap(); // verify data assert!(p.data.buffer.is_none()); // always empty after issuing persit diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index beb8b53d96..f579e17e8e 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -171,7 +171,12 @@ impl TableData { self.partition_data.insert( partition.partition_key, - PartitionData::new(partition.id, partition.persisted_sequence_number), + PartitionData::new( + partition.id, + shard_id, + self.table_id, + partition.persisted_sequence_number, + ), ); Ok(()) diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index 0d3b031f5d..c1b83f2a0a 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -809,7 +809,7 @@ pub(crate) fn make_partitions( let mut seq_num = seq_num.get(); if two_partitions { let partition_id = PartitionId::new(2); - let mut p2 = PartitionData::new(partition_id, None); + let mut p2 = PartitionData::new(partition_id, shard_id, table_id, None); // Group 4: in buffer of p2 // Fill `buffer` seq_num += 1; @@ -933,7 +933,7 @@ fn make_first_partition_data( // ------------------------------------------ // Build the first partition - let mut p1 = PartitionData::new(partition_id, None); + let mut p1 = PartitionData::new(partition_id, shard_id, table_id, None); let mut seq_num = 0; // -------------------- @@ -952,7 +952,7 @@ fn make_first_partition_data( if loc.contains(DataLocation::PERSISTING) { // Move group 1 data to persisting - p1.snapshot_to_persisting_batch(shard_id, table_id, table_name); + p1.snapshot_to_persisting_batch(table_name); } else if loc.contains(DataLocation::SNAPSHOT) { // move group 1 data to snapshot p1.snapshot().unwrap(); From 07b08fa9cbdf086aa17a987edc9d70ebf0b95a4c Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 16 Sep 2022 17:41:32 +0200 Subject: [PATCH 5/7] refactor: add table name in PartitionData A partition belongs to a table - this commit stores the table name in the PartitionData (which was readily available at construction time) instead of redundantly passing it into various functions at the risk of getting it wrong. --- ingester/src/data.rs | 7 ++++- ingester/src/data/namespace.rs | 15 +++++----- ingester/src/data/partition.rs | 41 ++++++++++++++------------- ingester/src/data/partition/buffer.rs | 6 ++-- ingester/src/data/table.rs | 17 +++++++---- ingester/src/querier_handler.rs | 2 +- ingester/src/query.rs | 8 +++--- ingester/src/test_util.rs | 28 ++++++++---------- 8 files changed, 66 insertions(+), 58 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 9575923c7f..bfc6b0a574 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -267,7 +267,12 @@ impl Persister for IngesterData { .await .expect("retry forever"); - let persisting_batch = namespace.snapshot_to_persisting(&partition_info).await; + let persisting_batch = namespace + .snapshot_to_persisting( + &partition_info.table_name, + &partition_info.partition.partition_key, + ) + .await; if let Some(persisting_batch) = persisting_batch { // do the CPU intensive work of compaction, de-duplication and sorting diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index a24658cedc..c69aafd503 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -5,7 +5,7 @@ use std::{ sync::Arc, }; -use data_types::{NamespaceId, PartitionInfo, PartitionKey, SequenceNumber, ShardId}; +use data_types::{NamespaceId, PartitionKey, SequenceNumber, ShardId}; use dml::DmlOperation; use iox_catalog::interface::Catalog; use iox_query::exec::Executor; @@ -183,7 +183,6 @@ impl NamespaceData { table_data .buffer_delete( - table_name, delete.predicate(), shard_id, sequence_number, @@ -224,17 +223,16 @@ impl NamespaceData { /// or persist, None will be returned. pub async fn snapshot_to_persisting( &self, - partition_info: &PartitionInfo, + table_name: &str, + partition_key: &PartitionKey, ) -> Option> { - if let Some(table_data) = self.table_data(&partition_info.table_name) { + if let Some(table_data) = self.table_data(table_name) { let mut table_data = table_data.write().await; return table_data .partition_data - .get_mut(&partition_info.partition.partition_key) - .and_then(|partition_data| { - partition_data.snapshot_to_persisting_batch(&partition_info.table_name) - }); + .get_mut(partition_key) + .and_then(|partition_data| partition_data.snapshot_to_persisting_batch()); } None @@ -270,6 +268,7 @@ impl NamespaceData { Entry::Vacant(v) => { let v = v.insert(Arc::new(tokio::sync::RwLock::new(TableData::new( info.table_id, + table_name, info.tombstone_max_sequence_number, )))); self.table_count.inc(1); diff --git a/ingester/src/data/partition.rs b/ingester/src/data/partition.rs index a72347a012..8fe2adcb9a 100644 --- a/ingester/src/data/partition.rs +++ b/ingester/src/data/partition.rs @@ -116,6 +116,8 @@ pub(crate) struct PartitionData { /// The shard and table IDs for this partition. shard_id: ShardId, table_id: TableId, + /// The name of the table this partition is part of. + table_name: Arc, pub(crate) data: DataBuffer, @@ -130,24 +132,23 @@ impl PartitionData { id: PartitionId, shard_id: ShardId, table_id: TableId, + table_name: Arc, max_persisted_sequence_number: Option, ) -> Self { Self { id, shard_id, table_id, + table_name, data: Default::default(), max_persisted_sequence_number, } } /// Snapshot anything in the buffer and move all snapshot data into a persisting batch - pub fn snapshot_to_persisting_batch( - &mut self, - table_name: &str, - ) -> Option> { + pub fn snapshot_to_persisting_batch(&mut self) -> Option> { self.data - .snapshot_to_persisting(self.shard_id, self.table_id, self.id, table_name) + .snapshot_to_persisting(self.shard_id, self.table_id, self.id, &self.table_name) } /// Snapshot whatever is in the buffer and return a new vec of the @@ -198,12 +199,7 @@ impl PartitionData { /// tombstone-applied snapshot /// . The tombstone is only added in the `deletes_during_persisting` if the `persisting` /// exists - pub(crate) async fn buffer_tombstone( - &mut self, - executor: &Executor, - table_name: &str, - tombstone: Tombstone, - ) { + pub(crate) async fn buffer_tombstone(&mut self, executor: &Executor, tombstone: Tombstone) { self.data.add_tombstone(tombstone.clone()); // ---------------------------------------------------------- @@ -211,7 +207,7 @@ impl PartitionData { // Make a QueryableBatch for all buffer + snapshots + the given tombstone let max_sequence_number = tombstone.sequence_number; let query_batch = match self.data.snapshot_to_queryable_batch( - table_name, + &self.table_name, self.id, Some(tombstone.clone()), ) { @@ -301,8 +297,13 @@ mod tests { #[test] fn snapshot_buffer_different_but_compatible_schemas() { - let mut partition_data = - PartitionData::new(PartitionId::new(1), ShardId::new(1), TableId::new(1), None); + let mut partition_data = PartitionData::new( + PartitionId::new(1), + ShardId::new(1), + TableId::new(1), + "foo".into(), + None, + ); let seq_num1 = SequenceNumber::new(1); // Missing tag `t1` @@ -341,11 +342,11 @@ mod tests { let s_id = 1; let t_id = 1; let p_id = 1; - let table_name = "restaurant"; let mut p = PartitionData::new( PartitionId::new(p_id), ShardId::new(s_id), TableId::new(t_id), + "restaurant".into(), None, ); let exec = Executor::new(1); @@ -387,7 +388,7 @@ mod tests { "day=thu", // delete predicate ); // one row will get deleted, the other is moved to snapshot - p.buffer_tombstone(&exec, "restaurant", ts).await; + p.buffer_tombstone(&exec, ts).await; // verify data assert!(p.data.buffer.is_none()); // always empty after delete @@ -450,7 +451,7 @@ mod tests { ); // two rows will get deleted, one from existing snapshot, one from the buffer being moved // to snpashot - p.buffer_tombstone(&exec, "restaurant", ts).await; + p.buffer_tombstone(&exec, ts).await; // verify data assert!(p.data.buffer.is_none()); // always empty after delete @@ -473,7 +474,7 @@ mod tests { // ------------------------------------------ // Persisting - let p_batch = p.snapshot_to_persisting_batch(table_name).unwrap(); + let p_batch = p.snapshot_to_persisting_batch().unwrap(); // verify data assert!(p.data.buffer.is_none()); // always empty after issuing persit @@ -495,7 +496,7 @@ mod tests { ); // if a query come while persisting, the row with temp=55 will be deleted before // data is sent back to Querier - p.buffer_tombstone(&exec, "restaurant", ts).await; + p.buffer_tombstone(&exec, ts).await; // verify data assert!(p.data.buffer.is_none()); // always empty after delete @@ -563,7 +564,7 @@ mod tests { "temp=60", // delete predicate ); // the row with temp=60 will be removed from the sanphot - p.buffer_tombstone(&exec, "restaurant", ts).await; + p.buffer_tombstone(&exec, ts).await; // verify data assert!(p.data.buffer.is_none()); // always empty after delete diff --git a/ingester/src/data/partition/buffer.rs b/ingester/src/data/partition/buffer.rs index cc64ed6108..48552ad550 100644 --- a/ingester/src/data/partition/buffer.rs +++ b/ingester/src/data/partition/buffer.rs @@ -109,7 +109,7 @@ impl DataBuffer { /// Both buffer and snapshots will be empty after this pub(super) fn snapshot_to_queryable_batch( &mut self, - table_name: &str, + table_name: &Arc, partition_id: PartitionId, tombstone: Option, ) -> Option { @@ -129,7 +129,7 @@ impl DataBuffer { None } else { Some(QueryableBatch::new( - table_name, + Arc::clone(table_name), partition_id, data, tombstones, @@ -164,7 +164,7 @@ impl DataBuffer { shard_id: ShardId, table_id: TableId, partition_id: PartitionId, - table_name: &str, + table_name: &Arc, ) -> Option> { if self.persisting.is_some() { panic!("Unable to snapshot while persisting. This is an unexpected state.") diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index f579e17e8e..99bd428b89 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -1,6 +1,6 @@ //! Table level data buffer structures. -use std::collections::BTreeMap; +use std::{collections::BTreeMap, sync::Arc}; use data_types::{DeletePredicate, PartitionKey, SequenceNumber, ShardId, TableId, Timestamp}; use iox_catalog::interface::Catalog; @@ -16,6 +16,7 @@ use crate::lifecycle::LifecycleHandle; #[derive(Debug)] pub(crate) struct TableData { table_id: TableId, + table_name: Arc, // the max sequence number for a tombstone associated with this table tombstone_max_sequence_number: Option, // Map pf partition key to its data @@ -24,9 +25,14 @@ pub(crate) struct TableData { impl TableData { /// Initialize new table buffer - pub fn new(table_id: TableId, tombstone_max_sequence_number: Option) -> Self { + pub fn new( + table_id: TableId, + table_name: &str, + tombstone_max_sequence_number: Option, + ) -> Self { Self { table_id, + table_name: table_name.into(), tombstone_max_sequence_number, partition_data: Default::default(), } @@ -36,11 +42,13 @@ impl TableData { #[cfg(test)] pub fn new_for_test( table_id: TableId, + table_name: &str, tombstone_max_sequence_number: Option, partitions: BTreeMap, ) -> Self { Self { table_id, + table_name: table_name.into(), tombstone_max_sequence_number, partition_data: partitions, } @@ -102,7 +110,6 @@ impl TableData { pub(super) async fn buffer_delete( &mut self, - table_name: &str, predicate: &DeletePredicate, shard_id: ShardId, sequence_number: SequenceNumber, @@ -131,8 +138,7 @@ impl TableData { // modify one partition at a time for data in self.partition_data.values_mut() { - data.buffer_tombstone(executor, table_name, tombstone.clone()) - .await; + data.buffer_tombstone(executor, tombstone.clone()).await; } Ok(()) @@ -175,6 +181,7 @@ impl TableData { partition.id, shard_id, self.table_id, + Arc::clone(&self.table_name), partition.persisted_sequence_number, ), ); diff --git a/ingester/src/querier_handler.rs b/ingester/src/querier_handler.rs index c8e9d37129..2645f010b6 100644 --- a/ingester/src/querier_handler.rs +++ b/ingester/src/querier_handler.rs @@ -191,7 +191,7 @@ async fn prepare_data_to_querier_for_partition( .persisting .unwrap_or_else(|| { QueryableBatch::new( - &request.table, + request.table.clone().into(), unpersisted_partition_data.partition_id, vec![], vec![], diff --git a/ingester/src/query.rs b/ingester/src/query.rs index ff735d96ca..a3d6bc8fb0 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -57,7 +57,7 @@ pub struct QueryableBatch { pub(crate) delete_predicates: Vec>, /// This is needed to return a reference for a trait function - pub(crate) table_name: String, + pub(crate) table_name: Arc, /// Partition ID pub(crate) partition_id: PartitionId, @@ -66,7 +66,7 @@ pub struct QueryableBatch { impl QueryableBatch { /// Initilaize a QueryableBatch pub fn new( - table_name: &str, + table_name: Arc, partition_id: PartitionId, data: Vec>, deletes: Vec, @@ -75,7 +75,7 @@ impl QueryableBatch { Self { data, delete_predicates, - table_name: table_name.to_string(), + table_name, partition_id, } } @@ -318,7 +318,7 @@ mod tests { // This new queryable batch will convert tombstone to delete predicates let query_batch = - QueryableBatch::new("test_table", PartitionId::new(0), vec![], tombstones); + QueryableBatch::new("test_table".into(), PartitionId::new(0), vec![], tombstones); let predicates = query_batch.delete_predicates(); let expected = vec![ Arc::new(DeletePredicate { diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index c1b83f2a0a..c043820eeb 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -205,7 +205,7 @@ pub fn make_queryable_batch_with_deletes( } Arc::new(QueryableBatch::new( - table_name, + table_name.into(), PartitionId::new(partition_id), snapshots, tombstones, @@ -685,16 +685,18 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa let data_table_id = TableId::new(2); // Make partitions per requested - let partitions = make_partitions(two_partitions, loc, shard_id, data_table_id, TEST_TABLE); + let partitions = make_partitions(two_partitions, loc, shard_id, data_table_id); // Two tables: one empty and one with data of one or two partitions let mut tables = BTreeMap::new(); let empty_tbl = Arc::new(tokio::sync::RwLock::new(TableData::new( empty_table_id, + "test_table", None, ))); let data_tbl = Arc::new(tokio::sync::RwLock::new(TableData::new_for_test( data_table_id, + "test_table", None, partitions, ))); @@ -737,12 +739,11 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa let data_table_id = TableId::new(2); // Make partitions per requested - let partitions = - make_one_partition_with_tombstones(&exec, loc, shard_id, data_table_id, TEST_TABLE).await; + let partitions = make_one_partition_with_tombstones(&exec, loc, shard_id, data_table_id).await; // Two tables: one empty and one with data of one or two partitions let mut tables = BTreeMap::new(); - let data_tbl = TableData::new_for_test(data_table_id, None, partitions); + let data_tbl = TableData::new_for_test(data_table_id, TEST_TABLE, None, partitions); tables.insert( TEST_TABLE.to_string(), Arc::new(tokio::sync::RwLock::new(data_tbl)), @@ -776,7 +777,6 @@ pub(crate) fn make_partitions( loc: DataLocation, shard_id: ShardId, table_id: TableId, - table_name: &str, ) -> BTreeMap { // In-memory data includes these rows but split between 4 groups go into // different batches of parittion 1 or partittion 2 as requeted @@ -800,8 +800,7 @@ pub(crate) fn make_partitions( // ------------------------------------------ // Build the first partition let partition_id = PartitionId::new(1); - let (mut p1, seq_num) = - make_first_partition_data(partition_id, loc, shard_id, table_id, table_name); + let (mut p1, seq_num) = make_first_partition_data(partition_id, loc, shard_id, table_id); // ------------------------------------------ // Build the second partition if asked @@ -809,7 +808,7 @@ pub(crate) fn make_partitions( let mut seq_num = seq_num.get(); if two_partitions { let partition_id = PartitionId::new(2); - let mut p2 = PartitionData::new(partition_id, shard_id, table_id, None); + let mut p2 = PartitionData::new(partition_id, shard_id, table_id, TEST_TABLE.into(), None); // Group 4: in buffer of p2 // Fill `buffer` seq_num += 1; @@ -849,7 +848,6 @@ pub(crate) async fn make_one_partition_with_tombstones( loc: DataLocation, shard_id: ShardId, table_id: TableId, - table_name: &str, ) -> BTreeMap { // In-memory data includes these rows but split between 4 groups go into // different batches of parittion 1 or partittion 2 as requeted @@ -869,8 +867,7 @@ pub(crate) async fn make_one_partition_with_tombstones( // ]; let partition_id = PartitionId::new(1); - let (mut p1, seq_num) = - make_first_partition_data(partition_id, loc, shard_id, table_id, table_name); + let (mut p1, seq_num) = make_first_partition_data(partition_id, loc, shard_id, table_id); // Add tombstones // Depending on where the existing data is, they (buffer & snapshot) will be either moved to a new snapshot after @@ -888,7 +885,7 @@ pub(crate) async fn make_one_partition_with_tombstones( 50, // max time of data to get deleted "city=Boston", // delete predicate ); - p1.buffer_tombstone(exec, table_name, ts).await; + p1.buffer_tombstone(exec, ts).await; // Group 4: in buffer of p1 after the tombstone // Fill `buffer` @@ -914,7 +911,6 @@ fn make_first_partition_data( loc: DataLocation, shard_id: ShardId, table_id: TableId, - table_name: &str, ) -> (PartitionData, SequenceNumber) { // In-memory data includes these rows but split between 3 groups go into // different batches of parittion p1 @@ -933,7 +929,7 @@ fn make_first_partition_data( // ------------------------------------------ // Build the first partition - let mut p1 = PartitionData::new(partition_id, shard_id, table_id, None); + let mut p1 = PartitionData::new(partition_id, shard_id, table_id, TEST_TABLE.into(), None); let mut seq_num = 0; // -------------------- @@ -952,7 +948,7 @@ fn make_first_partition_data( if loc.contains(DataLocation::PERSISTING) { // Move group 1 data to persisting - p1.snapshot_to_persisting_batch(table_name); + p1.snapshot_to_persisting_batch(); } else if loc.contains(DataLocation::SNAPSHOT) { // move group 1 data to snapshot p1.snapshot().unwrap(); From b0eb85ddd5f60a2d4c2010c688c09abe4fbe31c4 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 16 Sep 2022 17:52:38 +0200 Subject: [PATCH 6/7] refactor: store ShardId in child nodes Instead of passing the ShardId into each function for child nodes of the Shard, store it. This avoids the possibility of mistakenly passing the wrong value. --- ingester/src/data.rs | 25 ++++++++++++++----------- ingester/src/data/namespace.rs | 28 +++++++++++++--------------- ingester/src/data/shard.rs | 16 ++++++++++++---- ingester/src/data/table.rs | 16 +++++++++++----- ingester/src/handler.rs | 2 +- ingester/src/test_util.rs | 22 ++++++++++++++++------ query_tests/src/scenarios/util.rs | 6 +++++- 7 files changed, 72 insertions(+), 43 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index bfc6b0a574..ebfad96f37 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -166,7 +166,6 @@ impl IngesterData { shard_data .buffer_operation( dml_operation, - shard_id, self.catalog.as_ref(), lifecycle_handle, &self.exec, @@ -653,7 +652,10 @@ mod tests { let mut shards = BTreeMap::new(); let shard_index = ShardIndex::new(0); - shards.insert(shard1.id, ShardData::new(shard_index, Arc::clone(&metrics))); + shards.insert( + shard1.id, + ShardData::new(shard_index, shard1.id, Arc::clone(&metrics)), + ); let object_store: Arc = Arc::new(InMemory::new()); @@ -738,7 +740,7 @@ mod tests { let mut shards = BTreeMap::new(); shards.insert( shard1.id, - ShardData::new(shard1.shard_index, Arc::clone(&metrics)), + ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)), ); let object_store: Arc = Arc::new(InMemory::new()); @@ -843,11 +845,11 @@ mod tests { let mut shards = BTreeMap::new(); shards.insert( shard1.id, - ShardData::new(shard1.shard_index, Arc::clone(&metrics)), + ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)), ); shards.insert( shard2.id, - ShardData::new(shard2.shard_index, Arc::clone(&metrics)), + ShardData::new(shard2.shard_index, shard2.id, Arc::clone(&metrics)), ); let object_store: Arc = Arc::new(InMemory::new()); @@ -1099,11 +1101,11 @@ mod tests { let mut shards = BTreeMap::new(); shards.insert( shard1.id, - ShardData::new(shard1.shard_index, Arc::clone(&metrics)), + ShardData::new(shard1.shard_index, shard1.id, Arc::clone(&metrics)), ); shards.insert( shard2.id, - ShardData::new(shard2.shard_index, Arc::clone(&metrics)), + ShardData::new(shard2.shard_index, shard2.id, Arc::clone(&metrics)), ); let object_store: Arc = Arc::new(InMemory::new()); @@ -1338,7 +1340,7 @@ mod tests { ); let exec = Executor::new(1); - let data = NamespaceData::new(namespace.id, &*metrics); + let data = NamespaceData::new(namespace.id, shard.id, &*metrics); // w1 should be ignored because the per-partition replay offset is set // to 1 already, so it shouldn't be buffered and the buffer should @@ -1346,7 +1348,6 @@ mod tests { let should_pause = data .buffer_operation( DmlOperation::Write(w1), - shard.id, catalog.as_ref(), &manager.handle(), &exec, @@ -1368,7 +1369,6 @@ mod tests { // w2 should be in the buffer data.buffer_operation( DmlOperation::Write(w2), - shard.id, catalog.as_ref(), &manager.handle(), &exec, @@ -1410,7 +1410,10 @@ mod tests { let mut shards = BTreeMap::new(); let shard_index = ShardIndex::new(0); - shards.insert(shard1.id, ShardData::new(shard_index, Arc::clone(&metrics))); + shards.insert( + shard1.id, + ShardData::new(shard_index, shard1.id, Arc::clone(&metrics)), + ); let object_store: Arc = Arc::new(InMemory::new()); diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index c69aafd503..3a63aa2674 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -26,8 +26,11 @@ use crate::lifecycle::LifecycleHandle; #[derive(Debug)] pub struct NamespaceData { namespace_id: NamespaceId, - tables: RwLock>>>, + /// The catalog ID of the shard this namespace is being populated from. + shard_id: ShardId, + + tables: RwLock>>>, table_count: U64Counter, /// The sequence number being actively written, if any. @@ -78,7 +81,7 @@ pub struct NamespaceData { impl NamespaceData { /// Initialize new tables with default partition template of daily - pub fn new(namespace_id: NamespaceId, metrics: &metric::Registry) -> Self { + pub fn new(namespace_id: NamespaceId, shard_id: ShardId, metrics: &metric::Registry) -> Self { let table_count = metrics .register_metric::( "ingester_tables_total", @@ -88,6 +91,7 @@ impl NamespaceData { Self { namespace_id, + shard_id, tables: Default::default(), table_count, buffering_sequence_number: RwLock::new(None), @@ -100,10 +104,12 @@ impl NamespaceData { #[cfg(test)] pub(crate) fn new_for_test( namespace_id: NamespaceId, + shard_id: ShardId, tables: BTreeMap>>, ) -> Self { Self { namespace_id, + shard_id, tables: RwLock::new(tables), table_count: Default::default(), buffering_sequence_number: RwLock::new(None), @@ -117,7 +123,6 @@ impl NamespaceData { pub async fn buffer_operation( &self, dml_operation: DmlOperation, - shard_id: ShardId, catalog: &dyn Catalog, lifecycle_handle: &dyn LifecycleHandle, executor: &Executor, @@ -148,7 +153,7 @@ impl NamespaceData { for (t, b) in write.into_tables() { let table_data = match self.table_data(&t) { Some(t) => t, - None => self.insert_table(shard_id, &t, catalog).await?, + None => self.insert_table(&t, catalog).await?, }; { @@ -159,7 +164,6 @@ impl NamespaceData { sequence_number, b, partition_key.clone(), - shard_id, catalog, lifecycle_handle, ) @@ -176,19 +180,13 @@ impl NamespaceData { let table_name = delete.table_name().context(super::TableNotPresentSnafu)?; let table_data = match self.table_data(table_name) { Some(t) => t, - None => self.insert_table(shard_id, table_name, catalog).await?, + None => self.insert_table(table_name, catalog).await?, }; let mut table_data = table_data.write().await; table_data - .buffer_delete( - delete.predicate(), - shard_id, - sequence_number, - catalog, - executor, - ) + .buffer_delete(delete.predicate(), sequence_number, catalog, executor) .await?; // don't pause writes since deletes don't count towards memory limits @@ -250,14 +248,13 @@ impl NamespaceData { /// Inserts the table or returns it if it happens to be inserted by some other thread async fn insert_table( &self, - shard_id: ShardId, table_name: &str, catalog: &dyn Catalog, ) -> Result>, super::Error> { let mut repos = catalog.repositories().await; let info = repos .tables() - .get_table_persist_info(shard_id, self.namespace_id, table_name) + .get_table_persist_info(self.shard_id, self.namespace_id, table_name) .await .context(super::CatalogSnafu)? .context(super::TableNotFoundSnafu { table_name })?; @@ -269,6 +266,7 @@ impl NamespaceData { let v = v.insert(Arc::new(tokio::sync::RwLock::new(TableData::new( info.table_id, table_name, + self.shard_id, info.tombstone_max_sequence_number, )))); self.table_count.inc(1); diff --git a/ingester/src/data/shard.rs b/ingester/src/data/shard.rs index d69a062137..b3676af045 100644 --- a/ingester/src/data/shard.rs +++ b/ingester/src/data/shard.rs @@ -22,6 +22,8 @@ use crate::lifecycle::LifecycleHandle; pub struct ShardData { /// The shard index for this shard shard_index: ShardIndex, + /// The catalog ID for this shard. + shard_id: ShardId, // New namespaces can come in at any time so we need to be able to add new ones namespaces: RwLock>>, @@ -32,7 +34,7 @@ pub struct ShardData { impl ShardData { /// Initialise a new [`ShardData`] that emits metrics to `metrics`. - pub fn new(shard_index: ShardIndex, metrics: Arc) -> Self { + pub fn new(shard_index: ShardIndex, shard_id: ShardId, metrics: Arc) -> Self { let namespace_count = metrics .register_metric::( "ingester_namespaces_total", @@ -42,6 +44,7 @@ impl ShardData { Self { shard_index, + shard_id, namespaces: Default::default(), metrics, namespace_count, @@ -52,10 +55,12 @@ impl ShardData { #[cfg(test)] pub fn new_for_test( shard_index: ShardIndex, + shard_id: ShardId, namespaces: BTreeMap>, ) -> Self { Self { shard_index, + shard_id, namespaces: RwLock::new(namespaces), metrics: Default::default(), namespace_count: Default::default(), @@ -69,7 +74,6 @@ impl ShardData { pub async fn buffer_operation( &self, dml_operation: DmlOperation, - shard_id: ShardId, catalog: &dyn Catalog, lifecycle_handle: &dyn LifecycleHandle, executor: &Executor, @@ -83,7 +87,7 @@ impl ShardData { }; namespace_data - .buffer_operation(dml_operation, shard_id, catalog, lifecycle_handle, executor) + .buffer_operation(dml_operation, catalog, lifecycle_handle, executor) .await } @@ -112,7 +116,11 @@ impl ShardData { let data = match n.entry(namespace.name) { Entry::Vacant(v) => { - let v = v.insert(Arc::new(NamespaceData::new(namespace.id, &*self.metrics))); + let v = v.insert(Arc::new(NamespaceData::new( + namespace.id, + self.shard_id, + &*self.metrics, + ))); self.namespace_count.inc(1); Arc::clone(v) } diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index 99bd428b89..a3c55a9157 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -17,6 +17,10 @@ use crate::lifecycle::LifecycleHandle; pub(crate) struct TableData { table_id: TableId, table_name: Arc, + + /// The catalog ID of the shard this table is being populated from. + shard_id: ShardId, + // the max sequence number for a tombstone associated with this table tombstone_max_sequence_number: Option, // Map pf partition key to its data @@ -28,11 +32,13 @@ impl TableData { pub fn new( table_id: TableId, table_name: &str, + shard_id: ShardId, tombstone_max_sequence_number: Option, ) -> Self { Self { table_id, table_name: table_name.into(), + shard_id, tombstone_max_sequence_number, partition_data: Default::default(), } @@ -43,12 +49,14 @@ impl TableData { pub fn new_for_test( table_id: TableId, table_name: &str, + shard_id: ShardId, tombstone_max_sequence_number: Option, partitions: BTreeMap, ) -> Self { Self { table_id, table_name: table_name.into(), + shard_id, tombstone_max_sequence_number, partition_data: partitions, } @@ -76,14 +84,13 @@ impl TableData { sequence_number: SequenceNumber, batch: MutableBatch, partition_key: PartitionKey, - shard_id: ShardId, catalog: &dyn Catalog, lifecycle_handle: &dyn LifecycleHandle, ) -> Result { let partition_data = match self.partition_data.get_mut(&partition_key) { Some(p) => p, None => { - self.insert_partition(partition_key.clone(), shard_id, catalog) + self.insert_partition(partition_key.clone(), self.shard_id, catalog) .await?; self.partition_data.get_mut(&partition_key).unwrap() } @@ -98,7 +105,7 @@ impl TableData { let should_pause = lifecycle_handle.log_write( partition_data.id(), - shard_id, + self.shard_id, sequence_number, batch.size(), batch.rows(), @@ -111,7 +118,6 @@ impl TableData { pub(super) async fn buffer_delete( &mut self, predicate: &DeletePredicate, - shard_id: ShardId, sequence_number: SequenceNumber, catalog: &dyn Catalog, executor: &Executor, @@ -124,7 +130,7 @@ impl TableData { .tombstones() .create_or_get( self.table_id, - shard_id, + self.shard_id, sequence_number, min_time, max_time, diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index a8a7308150..7e0d7f5a2b 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -140,7 +140,7 @@ impl IngestHandlerImpl { for s in shard_states.values() { shards.insert( s.id, - ShardData::new(s.shard_index, Arc::clone(&metric_registry)), + ShardData::new(s.shard_index, s.id, Arc::clone(&metric_registry)), ); } let data = Arc::new(IngesterData::new( diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index c043820eeb..19dabb12a3 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -692,11 +692,13 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa let empty_tbl = Arc::new(tokio::sync::RwLock::new(TableData::new( empty_table_id, "test_table", + shard_id, None, ))); let data_tbl = Arc::new(tokio::sync::RwLock::new(TableData::new_for_test( data_table_id, "test_table", + shard_id, None, partitions, ))); @@ -705,14 +707,18 @@ pub fn make_ingester_data(two_partitions: bool, loc: DataLocation) -> IngesterDa // Two namespaces: one empty and one with data of 2 tables let mut namespaces = BTreeMap::new(); - let empty_ns = Arc::new(NamespaceData::new(NamespaceId::new(1), &*metrics)); - let data_ns = Arc::new(NamespaceData::new_for_test(NamespaceId::new(2), tables)); + let empty_ns = Arc::new(NamespaceData::new(NamespaceId::new(1), shard_id, &*metrics)); + let data_ns = Arc::new(NamespaceData::new_for_test( + NamespaceId::new(2), + shard_id, + tables, + )); namespaces.insert(TEST_NAMESPACE_EMPTY.to_string(), empty_ns); namespaces.insert(TEST_NAMESPACE.to_string(), data_ns); // One shard that contains 2 namespaces let shard_index = ShardIndex::new(0); - let shard_data = ShardData::new_for_test(shard_index, namespaces); + let shard_data = ShardData::new_for_test(shard_index, shard_id, namespaces); let mut shards = BTreeMap::new(); shards.insert(shard_id, shard_data); @@ -743,7 +749,7 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa // Two tables: one empty and one with data of one or two partitions let mut tables = BTreeMap::new(); - let data_tbl = TableData::new_for_test(data_table_id, TEST_TABLE, None, partitions); + let data_tbl = TableData::new_for_test(data_table_id, TEST_TABLE, shard_id, None, partitions); tables.insert( TEST_TABLE.to_string(), Arc::new(tokio::sync::RwLock::new(data_tbl)), @@ -751,12 +757,16 @@ pub async fn make_ingester_data_with_tombstones(loc: DataLocation) -> IngesterDa // Two namespaces: one empty and one with data of 2 tables let mut namespaces = BTreeMap::new(); - let data_ns = Arc::new(NamespaceData::new_for_test(NamespaceId::new(2), tables)); + let data_ns = Arc::new(NamespaceData::new_for_test( + NamespaceId::new(2), + shard_id, + tables, + )); namespaces.insert(TEST_NAMESPACE.to_string(), data_ns); // One shard that contains 1 namespace let shard_index = ShardIndex::new(0); - let shard_data = ShardData::new_for_test(shard_index, namespaces); + let shard_data = ShardData::new_for_test(shard_index, shard_id, namespaces); let mut shards = BTreeMap::new(); shards.insert(shard_id, shard_data); diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 8cc7c5b992..31a5431d6e 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -694,7 +694,11 @@ impl MockIngester { let shards = BTreeMap::from([( shard.shard.id, - ShardData::new(shard.shard.shard_index, catalog.metric_registry()), + ShardData::new( + shard.shard.shard_index, + shard.shard.id, + catalog.metric_registry(), + ), )]); let ingester_data = Arc::new(IngesterData::new( catalog.object_store(), From 346ef1c811df718314d5871e706fabda19b53d45 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Fri, 16 Sep 2022 15:44:22 -0400 Subject: [PATCH 7/7] chore: reduce number of histogram buckets (#5661) --- compactor/src/compact.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 3664e0864a..b41524dd47 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -170,15 +170,12 @@ impl Compactor { let file_size_buckets = U64HistogramOptions::new([ 50 * 1024, // 50KB - 100 * 1024, // 100KB - 300 * 1024, // 300KB 500 * 1024, // 500 KB 1024 * 1024, // 1 MB 3 * 1024 * 1024, // 3 MB 10 * 1024 * 1024, // 10 MB 30 * 1024 * 1024, // 30 MB 100 * 1024 * 1024, // 100 MB - 300 * 1024 * 1024, // 300 MB 500 * 1024 * 1024, // 500 MB u64::MAX, // Inf ]); @@ -196,17 +193,14 @@ impl Compactor { ); let duration_histogram_options = DurationHistogramOptions::new([ - Duration::from_millis(100), Duration::from_millis(500), - Duration::from_micros(2_000), + Duration::from_millis(1_000), // 1 second Duration::from_millis(5_000), Duration::from_millis(15_000), Duration::from_millis(30_000), Duration::from_millis(60_000), // 1 minute Duration::from_millis(5 * 60_000), - Duration::from_millis(10 * 60_000), - Duration::from_millis(20 * 60_000), - Duration::from_millis(40 * 60_000), + Duration::from_millis(15 * 60_000), Duration::from_millis(60 * 60_000), DURATION_MAX, ]);