diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 0c29b371c6..819c951beb 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -1278,7 +1278,7 @@ mod tests { let table = table_data.read().await; let p = table.partition_data.get(&"1970-01-01".into()).unwrap(); assert_eq!( - p.data.max_persisted_sequence_number, + p.max_persisted_sequence_number(), Some(SequenceNumber::new(1)) ); assert!(p.data.buffer.is_none()); diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index d63289ad93..e65e80dd01 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -300,7 +300,7 @@ impl NamespaceData { let partition = t.partition_data.get_mut(partition_key); if let Some(p) = partition { - p.data.mark_persisted(sequence_number); + p.mark_persisted(sequence_number); } } } diff --git a/ingester/src/data/partition.rs b/ingester/src/data/partition.rs index 32a991e9a9..f2059dd1cc 100644 --- a/ingester/src/data/partition.rs +++ b/ingester/src/data/partition.rs @@ -113,14 +113,19 @@ impl SnapshotBatch { pub(crate) struct PartitionData { id: PartitionId, pub(crate) data: DataBuffer, + + /// The max_persisted_sequence number for any parquet_file in this + /// partition. + max_persisted_sequence_number: Option, } impl PartitionData { /// Initialize a new partition data buffer - pub fn new(id: PartitionId) -> Self { + pub fn new(id: PartitionId, max_persisted_sequence_number: Option) -> Self { Self { id, data: Default::default(), + max_persisted_sequence_number, } } @@ -262,6 +267,19 @@ impl PartitionData { pub(crate) fn id(&self) -> PartitionId { self.id } + + /// Return the [`SequenceNumber`] that forms the (inclusive) persistence + /// watermark for this partition. + pub(crate) fn max_persisted_sequence_number(&self) -> Option { + self.max_persisted_sequence_number + } + + /// Mark this partition as having completed persistence up to, and + /// including, the specified [`SequenceNumber`]. + pub(crate) fn mark_persisted(&mut self, sequence_number: SequenceNumber) { + self.max_persisted_sequence_number = Some(sequence_number); + self.data.mark_persisted(); + } } #[cfg(test)] @@ -277,6 +295,7 @@ mod tests { let mut partition_data = PartitionData { id: PartitionId::new(1), data: Default::default(), + max_persisted_sequence_number: None, }; let seq_num1 = SequenceNumber::new(1); @@ -317,7 +336,7 @@ mod tests { let t_id = 1; let p_id = 1; let table_name = "restaurant"; - let mut p = PartitionData::new(PartitionId::new(p_id)); + let mut p = PartitionData::new(PartitionId::new(p_id), None); let exec = Executor::new(1); // ------------------------------------------ diff --git a/ingester/src/data/partition/buffer.rs b/ingester/src/data/partition/buffer.rs index 9de62192ea..cc64ed6108 100644 --- a/ingester/src/data/partition/buffer.rs +++ b/ingester/src/data/partition/buffer.rs @@ -38,9 +38,6 @@ pub(crate) struct DataBuffer { /// Buffer of incoming writes pub(crate) buffer: Option, - /// The max_persisted_sequence number for any parquet_file in this partition - pub(crate) max_persisted_sequence_number: Option, - /// Buffer of tombstones whose time range may overlap with this partition. /// All tombstones were already applied to corresponding snapshots. This list /// only keep the ones that come during persisting. The reason @@ -239,16 +236,11 @@ impl DataBuffer { self.snapshots.as_ref() } - pub(crate) fn mark_persisted(&mut self, up_to: SequenceNumber) { - self.max_persisted_sequence_number = Some(up_to); + pub(crate) fn mark_persisted(&mut self) { self.persisting = None; self.deletes_during_persisting.clear() } - pub(crate) fn max_persisted_sequence_number(&self) -> Option { - self.max_persisted_sequence_number - } - #[cfg(test)] pub(super) fn deletes_during_persisting(&self) -> &[Tombstone] { self.deletes_during_persisting.as_ref() diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index c84857c805..14dc73a9e3 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -50,7 +50,7 @@ impl TableData { pub fn parquet_max_sequence_number(&self) -> Option { self.partition_data .values() - .map(|p| p.data.max_persisted_sequence_number()) + .map(|p| p.max_persisted_sequence_number()) .max() .flatten() } @@ -82,7 +82,7 @@ impl TableData { }; // skip the write if it has already been persisted - if let Some(max) = partition_data.data.max_persisted_sequence_number() { + if let Some(max) = partition_data.max_persisted_sequence_number() { if max >= sequence_number { return Ok(false); } @@ -148,7 +148,7 @@ impl TableData { .expect("get_non_persisting should always work"), persisting: p.get_persisting_data(), partition_status: PartitionStatus { - parquet_max_sequence_number: p.data.max_persisted_sequence_number(), + parquet_max_sequence_number: p.max_persisted_sequence_number(), tombstone_max_sequence_number: self.tombstone_max_sequence_number, }, }) @@ -178,10 +178,10 @@ impl TableData { // for now we just need the max persisted let max_persisted_sequence_number = files.iter().map(|p| p.max_sequence_number).max(); - let mut data = PartitionData::new(partition.id); - data.data.max_persisted_sequence_number = max_persisted_sequence_number; - - self.partition_data.insert(partition.partition_key, data); + self.partition_data.insert( + partition.partition_key, + PartitionData::new(partition.id, max_persisted_sequence_number), + ); Ok(()) } diff --git a/ingester/src/test_util.rs b/ingester/src/test_util.rs index fdd101dcb7..cb554708b3 100644 --- a/ingester/src/test_util.rs +++ b/ingester/src/test_util.rs @@ -809,7 +809,7 @@ pub(crate) fn make_partitions( let mut seq_num = seq_num.get(); if two_partitions { let partition_id = PartitionId::new(2); - let mut p2 = PartitionData::new(partition_id); + let mut p2 = PartitionData::new(partition_id, None); // Group 4: in buffer of p2 // Fill `buffer` seq_num += 1; @@ -933,7 +933,7 @@ fn make_first_partition_data( // ------------------------------------------ // Build the first partition - let mut p1 = PartitionData::new(partition_id); + let mut p1 = PartitionData::new(partition_id, None); let mut seq_num = 0; // --------------------