From fc17f2ec2dbf503b7b9cbe7bfddb422e6fd7182d Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 14 Sep 2022 15:21:24 +0200 Subject: [PATCH] refactor: hoist persistence watermark from buffer The maximum persisted sequence number is tracked to answer "up to where has this partition been persisted", used for querying and skipping writes that have already been applied (though I suspect this is redundant). This is a property of the partition, not the actual data buffer, so this commit hoists it up out of the data buffer and onto the per-partition data structure, internalising the field in the process (not pub). --- ingester/src/data.rs | 2 +- ingester/src/data/namespace.rs | 2 +- ingester/src/data/partition.rs | 23 +++++++++++++++++++++-- ingester/src/data/partition/buffer.rs | 10 +--------- ingester/src/data/table.rs | 14 +++++++------- ingester/src/test_util.rs | 4 ++-- 6 files changed, 33 insertions(+), 22 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 0c29b371c6..819c951beb 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -1278,7 +1278,7 @@ mod tests { let table = table_data.read().await; let p = table.partition_data.get(&"1970-01-01".into()).unwrap(); assert_eq!( - p.data.max_persisted_sequence_number, + p.max_persisted_sequence_number(), Some(SequenceNumber::new(1)) ); assert!(p.data.buffer.is_none()); diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index d63289ad93..e65e80dd01 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -300,7 +300,7 @@ impl NamespaceData { let partition = t.partition_data.get_mut(partition_key); if let Some(p) = partition { - p.data.mark_persisted(sequence_number); + p.mark_persisted(sequence_number); } } } diff --git a/ingester/src/data/partition.rs b/ingester/src/data/partition.rs index 32a991e9a9..f2059dd1cc 100644 --- a/ingester/src/data/partition.rs +++ b/ingester/src/data/partition.rs @@ -113,14 +113,19 @@ impl SnapshotBatch { pub(crate) struct PartitionData { id: PartitionId, pub(crate) data: DataBuffer, + + /// The max_persisted_sequence number for any parquet_file in this + /// partition. + max_persisted_sequence_number: Option, } impl PartitionData { /// Initialize a new partition data buffer - pub fn new(id: PartitionId) -> Self { + pub fn new(id: PartitionId, max_persisted_sequence_number: Option) -> Self { Self { id, data: Default::default(), + max_persisted_sequence_number, } } @@ -262,6 +267,19 @@ impl PartitionData { pub(crate) fn id(&self) -> PartitionId { self.id } + + /// Return the [`SequenceNumber`] that forms the (inclusive) persistence + /// watermark for this partition. + pub(crate) fn max_persisted_sequence_number(&self) -> Option { + self.max_persisted_sequence_number + } + + /// Mark this partition as having completed persistence up to, and + /// including, the specified [`SequenceNumber`]. + pub(crate) fn mark_persisted(&mut self, sequence_number: SequenceNumber) { + self.max_persisted_sequence_number = Some(sequence_number); + self.data.mark_persisted(); + } } #[cfg(test)] @@ -277,6 +295,7 @@ mod tests { let mut partition_data = PartitionData { id: PartitionId::new(1), data: Default::default(), + max_persisted_sequence_number: None, }; let seq_num1 = SequenceNumber::new(1); @@ -317,7 +336,7 @@ mod tests { let t_id = 1; let p_id = 1; let table_name = "restaurant"; - let mut p = PartitionData::new(PartitionId::new(p_id)); + let mut p = PartitionData::new(PartitionId::new(p_id), None); let exec = Executor::new(1); // ------------------------------------------ diff --git a/ingester/src/data/partition/buffer.rs b/ingester/src/data/partition/buffer.rs index 9de62192ea..cc64ed6108 100644 --- a/ingester/src/data/partition/buffer.rs +++ b/ingester/src/data/partition/buffer.rs @@ -38,9 +38,6 @@ pub(crate) struct DataBuffer { /// Buffer of incoming writes pub(crate) buffer: Option, - /// The max_persisted_sequence number for any parquet_file in this partition - pub(crate) max_persisted_sequence_number: Option, - /// Buffer of tombstones whose time range may overlap with this partition. /// All tombstones were already applied to corresponding snapshots. This list /// only keep the ones that come during persisting. The reason @@ -239,16 +236,11 @@ impl DataBuffer { self.snapshots.as_ref() } - pub(crate) fn mark_persisted(&mut self, up_to: SequenceNumber) { - self.max_persisted_sequence_number = Some(up_to); + pub(crate) fn mark_persisted(&mut self) { self.persisting = None; self.deletes_during_persisting.clear() } - pub(crate) fn max_persisted_sequence_number(&self) -> Option { - self.max_persisted_sequence_number - } - #[cfg(test)] pub(super) fn deletes_during_persisting(&self) -> &[Tombstone] { self.deletes_during_persisting.as_ref() diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index c84857c805..14dc73a9e3 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -50,7 +50,7 @@ impl TableData { pub fn parquet_max_sequence_number(&self) -> Option { self.partition_data .values() - .map(|p| p.data.max_persisted_sequence_number()) + .map(|p| p.max_persisted_sequence_number()) .max() .flatten() } @@ -82,7 +82,7 @@ impl TableData { }; // skip the write if it has already been persisted - if let Some(max) = partition_data.data.max_persisted_sequence_number() { + if let Some(max) = partition_data.max_persisted_sequence_number() { if max >= sequence_number { return Ok(false); } @@ -148,7 +148,7 @@ impl TableData { .expect("get_non_persisting should always work"), persisting: p.get_persisting_data(), partition_status: PartitionStatus { - parquet_max_sequence_number: p.data.max_persisted_sequence_number(), + parquet_max_sequence_number: p.max_persisted_sequence_number(), tombstone_max_sequence_number: self.tombstone_max_sequence_number, }, }) @@ -178,10 +178,10 @@ impl TableData { // for now we just need the max persisted let max_persisted_sequence_number = files.iter().map(|p| p.max_sequence_number).max(); - let mut data = PartitionData::new(partition.id); - data.data.max_persisted_sequence_number = max_persisted_sequence_number; - - self.partition_data.insert(partition.partition_key, data); + self.partition_data.insert( + partition.partition_key, + PartitionData::new(partition.id, max_persisted_sequence_number), + ); Ok(()) } diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index fdd101dcb7..cb554708b3 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -809,7 +809,7 @@ pub(crate) fn make_partitions( let mut seq_num = seq_num.get(); if two_partitions { let partition_id = PartitionId::new(2); - let mut p2 = PartitionData::new(partition_id); + let mut p2 = PartitionData::new(partition_id, None); // Group 4: in buffer of p2 // Fill `buffer` seq_num += 1; @@ -933,7 +933,7 @@ fn make_first_partition_data( // ------------------------------------------ // Build the first partition - let mut p1 = PartitionData::new(partition_id); + let mut p1 = PartitionData::new(partition_id, None); let mut seq_num = 0; // --------------------