From e84186763f378d45a40291e1a9d78b36eb37df59 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 29 Sep 2022 16:55:09 +0200 Subject: [PATCH] refactor: LifecycleStats tracks Namespace/TableId Changes the lifecycle handle to also track the namespace + table ID in addition to the existing shard ID. Adds asserts to ensure the values never vary for a given partition. --- ingester/src/data/table.rs | 2 + ingester/src/lifecycle.rs | 299 ++++++++++++++++++++++++++---- query_tests/src/scenarios/util.rs | 6 +- 3 files changed, 274 insertions(+), 33 deletions(-) diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index b1e5a3dff0..89127d04bf 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -124,6 +124,8 @@ impl TableData { let should_pause = lifecycle_handle.log_write( partition_data.id(), self.shard_id, + self.namespace_id, + self.table_id, sequence_number, batch.size(), batch.rows(), diff --git a/ingester/src/lifecycle.rs b/ingester/src/lifecycle.rs index e87c8dde69..1004a0aa33 100644 --- a/ingester/src/lifecycle.rs +++ b/ingester/src/lifecycle.rs @@ -7,7 +7,7 @@ use std::{collections::BTreeMap, sync::Arc, time::Duration}; -use data_types::{PartitionId, SequenceNumber, ShardId}; +use data_types::{NamespaceId, PartitionId, SequenceNumber, ShardId, TableId}; use iox_time::{Time, TimeProvider}; use metric::{Metric, U64Counter}; use observability_deps::tracing::{error, info, warn}; @@ -26,10 +26,13 @@ pub trait LifecycleHandle: Send + Sync + 'static { /// Logs bytes written into a partition so that it can be tracked for the manager to /// trigger persistence. Returns true if the ingester should pause consuming from the /// write buffer so that persistence can catch up and free up memory. + #[allow(clippy::too_many_arguments)] fn log_write( &self, partition_id: PartitionId, shard_id: ShardId, + namespace_id: NamespaceId, + table_id: TableId, sequence_number: SequenceNumber, bytes_written: usize, rows_written: usize, @@ -60,6 +63,8 @@ impl LifecycleHandle for LifecycleHandleImpl { &self, partition_id: PartitionId, shard_id: ShardId, + namespace_id: NamespaceId, + table_id: TableId, sequence_number: SequenceNumber, bytes_written: usize, rows_written: usize, @@ -73,6 +78,8 @@ impl LifecycleHandle for LifecycleHandleImpl { .or_insert_with(|| PartitionLifecycleStats { shard_id, partition_id, + namespace_id, + table_id, first_write: now, last_write: now, bytes_written: 0, @@ -80,6 +87,10 @@ impl LifecycleHandle for LifecycleHandleImpl { first_sequence_number: sequence_number, }); + assert_eq!(stats.shard_id, shard_id); + assert_eq!(stats.namespace_id, namespace_id); + assert_eq!(stats.table_id, table_id); + stats.bytes_written += bytes_written; stats.last_write = now; stats.rows_written += rows_written; @@ -225,6 +236,10 @@ struct LifecycleStats { struct PartitionLifecycleStats { /// The shard this partition is under shard_id: ShardId, + /// The namespace identifier + namespace_id: NamespaceId, + /// The table identifier + table_id: TableId, /// The partition identifier partition_id: PartitionId, /// Time that the partition received its first write. This is reset anytime @@ -714,14 +729,36 @@ mod tests { let h = m.handle(); // log first two writes at different times - assert!(!h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(1), 1, 1)); + assert!(!h.log_write( + PartitionId::new(1), + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(1), + 1, + 1 + )); time_provider.inc(Duration::from_nanos(10)); - assert!(!h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(2), 1, 1)); + assert!(!h.log_write( + PartitionId::new(1), + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(2), + 1, + 1 + )); // log another write for different partition using a different handle - assert!(!m - .handle() - .log_write(PartitionId::new(2), shard_id, SequenceNumber::new(3), 3, 3)); + assert!(!m.handle().log_write( + PartitionId::new(2), + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(3), + 3, + 3 + )); let stats = m.stats(); assert_eq!(stats.total_bytes, 5); @@ -755,11 +792,27 @@ mod tests { let h = m.handle(); // write more than the limit (10) - assert!(h.log_write(partition_id, shard_id, SequenceNumber::new(1), 15, 1)); + assert!(h.log_write( + partition_id, + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(1), + 15, + 1 + )); assert!(!h.can_resume_ingest()); // all subsequent writes should also indicate a pause - assert!(h.log_write(partition_id, shard_id, SequenceNumber::new(2), 1, 1)); + assert!(h.log_write( + partition_id, + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(2), + 1, + 1 + )); assert!(!h.can_resume_ingest()); // persist the partition @@ -768,7 +821,15 @@ mod tests { // ingest can resume assert!(h.can_resume_ingest()); - assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(3), 3, 1)); + assert!(!h.log_write( + partition_id, + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(3), + 3, + 1 + )); } #[tokio::test] @@ -794,7 +855,15 @@ mod tests { // write more than the limit (10) and don't get stopped, because the // per-partition limit does not pause the server from ingesting. - assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(1), 1, 50)); + assert!(!h.log_write( + partition_id, + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(1), + 1, + 50 + )); assert!(h.can_resume_ingest()); // Rows were counted @@ -808,7 +877,15 @@ mod tests { } // all subsequent writes should also be allowed - assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(2), 1, 1)); + assert!(!h.log_write( + partition_id, + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(2), + 1, + 1 + )); assert!(h.can_resume_ingest()); // persist the partition @@ -823,7 +900,15 @@ mod tests { // ingest can continue assert!(h.can_resume_ingest()); - assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(3), 1, 1)); + assert!(!h.log_write( + partition_id, + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(3), + 1, + 1 + )); } #[tokio::test] @@ -842,7 +927,15 @@ mod tests { let h = m.handle(); // write more than the limit (20) - h.log_write(partition_id, shard_id, SequenceNumber::new(1), 25, 1); + h.log_write( + partition_id, + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(1), + 25, + 1, + ); // can not resume ingest as we are overall the pause ingest limit assert!(!h.can_resume_ingest()); @@ -870,7 +963,15 @@ mod tests { // ingest can resume assert!(h.can_resume_ingest()); - assert!(!h.log_write(partition_id, shard_id, SequenceNumber::new(2), 3, 1)); + assert!(!h.log_write( + partition_id, + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(2), + 3, + 1 + )); } #[tokio::test] @@ -893,7 +994,15 @@ mod tests { let shard_id = ShardId::new(1); let h = m.handle(); - h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10, 1); + h.log_write( + partition_id, + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(1), + 10, + 1, + ); m.maybe_persist(&persister).await; let stats = m.stats(); @@ -908,7 +1017,15 @@ mod tests { // write in data for a new partition so we can be sure it isn't persisted, but the older // one is - h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1); + h.log_write( + PartitionId::new(2), + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(2), + 6, + 1, + ); m.maybe_persist(&persister).await; @@ -948,7 +1065,15 @@ mod tests { let shard_id = ShardId::new(1); let h = m.handle(); - h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10, 1); + h.log_write( + partition_id, + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(1), + 10, + 1, + ); m.maybe_persist(&persister).await; let stats = m.stats(); @@ -963,8 +1088,24 @@ mod tests { // write in data for a new partition so we can be sure it isn't persisted, but the older // one is - h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1); - h.log_write(PartitionId::new(3), shard_id, SequenceNumber::new(3), 7, 1); + h.log_write( + PartitionId::new(2), + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(2), + 6, + 1, + ); + h.log_write( + PartitionId::new(3), + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(3), + 7, + 1, + ); m.maybe_persist(&persister).await; @@ -1005,7 +1146,15 @@ mod tests { let partition_id = PartitionId::new(1); let persister = Arc::new(TestPersister::default()); - h.log_write(partition_id, shard_id, SequenceNumber::new(1), 4, 1); + h.log_write( + partition_id, + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(1), + 4, + 1, + ); m.maybe_persist(&persister).await; @@ -1015,8 +1164,24 @@ mod tests { assert!(!persister.persist_called_for(partition_id)); // introduce a new partition under the limit to verify it doesn't get taken with the other - h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 3, 1); - h.log_write(partition_id, shard_id, SequenceNumber::new(3), 5, 1); + h.log_write( + PartitionId::new(2), + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(2), + 3, + 1, + ); + h.log_write( + partition_id, + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(3), + 5, + 1, + ); m.maybe_persist(&persister).await; @@ -1055,8 +1220,24 @@ mod tests { let h = m.handle(); let partition_id = PartitionId::new(1); let persister = Arc::new(TestPersister::default()); - h.log_write(partition_id, shard_id, SequenceNumber::new(1), 8, 1); - h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 13, 1); + h.log_write( + partition_id, + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(1), + 8, + 1, + ); + h.log_write( + PartitionId::new(2), + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(2), + 13, + 1, + ); m.maybe_persist(&persister).await; @@ -1072,8 +1253,24 @@ mod tests { ); // add that partition back in over size - h.log_write(partition_id, shard_id, SequenceNumber::new(3), 20, 1); - h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(4), 21, 1); + h.log_write( + partition_id, + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(3), + 20, + 1, + ); + h.log_write( + PartitionId::new(2), + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(4), + 21, + 1, + ); // both partitions should now need to be persisted to bring us below the mem threshold of // 20. @@ -1118,11 +1315,35 @@ mod tests { } = TestLifecycleManger::new(config); let h = m.handle(); let persister = Arc::new(TestPersister::default()); - h.log_write(PartitionId::new(1), shard_id, SequenceNumber::new(1), 4, 1); + h.log_write( + PartitionId::new(1), + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(1), + 4, + 1, + ); time_provider.inc(Duration::from_nanos(1)); - h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1); + h.log_write( + PartitionId::new(2), + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(2), + 6, + 1, + ); time_provider.inc(Duration::from_nanos(1)); - h.log_write(PartitionId::new(3), shard_id, SequenceNumber::new(3), 3, 1); + h.log_write( + PartitionId::new(3), + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(3), + 3, + 1, + ); m.maybe_persist(&persister).await; @@ -1162,7 +1383,15 @@ mod tests { let persister = Arc::new(TestPersister::default()); let shard_id = ShardId::new(1); - h.log_write(partition_id, shard_id, SequenceNumber::new(1), 10, 1); + h.log_write( + partition_id, + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(1), + 10, + 1, + ); m.maybe_persist(&persister).await; let stats = m.stats(); @@ -1176,7 +1405,15 @@ mod tests { assert!(!persister.persist_called_for(partition_id)); // write in data for a new partition so we can be sure it isn't persisted, but the older one is - h.log_write(PartitionId::new(2), shard_id, SequenceNumber::new(2), 6, 1); + h.log_write( + PartitionId::new(2), + shard_id, + NamespaceId::new(91), + TableId::new(92), + SequenceNumber::new(2), + 6, + 1, + ); m.maybe_persist(&persister).await; diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index 290767e677..d39e716a59 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -3,8 +3,8 @@ use super::DbScenario; use async_trait::async_trait; use backoff::BackoffConfig; use data_types::{ - DeletePredicate, IngesterMapping, NonEmptyString, ParquetFileId, PartitionId, PartitionKey, - Sequence, SequenceNumber, ShardId, ShardIndex, TombstoneId, + DeletePredicate, IngesterMapping, NamespaceId, NonEmptyString, ParquetFileId, PartitionId, + PartitionKey, Sequence, SequenceNumber, ShardId, ShardIndex, TableId, TombstoneId, }; use dml::{DmlDelete, DmlMeta, DmlOperation, DmlWrite}; use futures::StreamExt; @@ -974,6 +974,8 @@ impl LifecycleHandle for NoopLifecycleHandle { &self, _partition_id: PartitionId, _shard_id: ShardId, + _namespace_id: NamespaceId, + _table_id: TableId, _sequence_number: SequenceNumber, _bytes_written: usize, _rows_written: usize,