refactor: hoist persistence watermark from buffer

The maximum persisted sequence number is tracked to answer "up to where
has this partition been persisted", used for querying and skipping
writes that have already been applied (though I suspect this is
redundant).

This is a property of the partition, not the actual data buffer, so this
commit hoists it up out of the data buffer and onto the per-partition
data structure, internalising the field in the process (not pub).
pull/24376/head
Dom Dwyer 2022-09-14 15:21:24 +02:00
parent 44e12aa512
commit fc17f2ec2d
6 changed files with 33 additions and 22 deletions

View File

@ -1278,7 +1278,7 @@ mod tests {
let table = table_data.read().await;
let p = table.partition_data.get(&"1970-01-01".into()).unwrap();
assert_eq!(
p.data.max_persisted_sequence_number,
p.max_persisted_sequence_number(),
Some(SequenceNumber::new(1))
);
assert!(p.data.buffer.is_none());

View File

@ -300,7 +300,7 @@ impl NamespaceData {
let partition = t.partition_data.get_mut(partition_key);
if let Some(p) = partition {
p.data.mark_persisted(sequence_number);
p.mark_persisted(sequence_number);
}
}
}

View File

@ -113,14 +113,19 @@ impl SnapshotBatch {
pub(crate) struct PartitionData {
id: PartitionId,
pub(crate) data: DataBuffer,
/// The max_persisted_sequence number for any parquet_file in this
/// partition.
max_persisted_sequence_number: Option<SequenceNumber>,
}
impl PartitionData {
/// Initialize a new partition data buffer
pub fn new(id: PartitionId) -> Self {
pub fn new(id: PartitionId, max_persisted_sequence_number: Option<SequenceNumber>) -> Self {
Self {
id,
data: Default::default(),
max_persisted_sequence_number,
}
}
@ -262,6 +267,19 @@ impl PartitionData {
pub(crate) fn id(&self) -> PartitionId {
self.id
}
/// Return the [`SequenceNumber`] that forms the (inclusive) persistence
/// watermark for this partition.
pub(crate) fn max_persisted_sequence_number(&self) -> Option<SequenceNumber> {
self.max_persisted_sequence_number
}
/// Mark this partition as having completed persistence up to, and
/// including, the specified [`SequenceNumber`].
pub(crate) fn mark_persisted(&mut self, sequence_number: SequenceNumber) {
self.max_persisted_sequence_number = Some(sequence_number);
self.data.mark_persisted();
}
}
#[cfg(test)]
@ -277,6 +295,7 @@ mod tests {
let mut partition_data = PartitionData {
id: PartitionId::new(1),
data: Default::default(),
max_persisted_sequence_number: None,
};
let seq_num1 = SequenceNumber::new(1);
@ -317,7 +336,7 @@ mod tests {
let t_id = 1;
let p_id = 1;
let table_name = "restaurant";
let mut p = PartitionData::new(PartitionId::new(p_id));
let mut p = PartitionData::new(PartitionId::new(p_id), None);
let exec = Executor::new(1);
// ------------------------------------------

View File

@ -38,9 +38,6 @@ pub(crate) struct DataBuffer {
/// Buffer of incoming writes
pub(crate) buffer: Option<BufferBatch>,
/// The max_persisted_sequence number for any parquet_file in this partition
pub(crate) max_persisted_sequence_number: Option<SequenceNumber>,
/// Buffer of tombstones whose time range may overlap with this partition.
/// All tombstones were already applied to corresponding snapshots. This list
/// only keep the ones that come during persisting. The reason
@ -239,16 +236,11 @@ impl DataBuffer {
self.snapshots.as_ref()
}
pub(crate) fn mark_persisted(&mut self, up_to: SequenceNumber) {
self.max_persisted_sequence_number = Some(up_to);
pub(crate) fn mark_persisted(&mut self) {
self.persisting = None;
self.deletes_during_persisting.clear()
}
pub(crate) fn max_persisted_sequence_number(&self) -> Option<SequenceNumber> {
self.max_persisted_sequence_number
}
#[cfg(test)]
pub(super) fn deletes_during_persisting(&self) -> &[Tombstone] {
self.deletes_during_persisting.as_ref()

View File

@ -50,7 +50,7 @@ impl TableData {
pub fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
self.partition_data
.values()
.map(|p| p.data.max_persisted_sequence_number())
.map(|p| p.max_persisted_sequence_number())
.max()
.flatten()
}
@ -82,7 +82,7 @@ impl TableData {
};
// skip the write if it has already been persisted
if let Some(max) = partition_data.data.max_persisted_sequence_number() {
if let Some(max) = partition_data.max_persisted_sequence_number() {
if max >= sequence_number {
return Ok(false);
}
@ -148,7 +148,7 @@ impl TableData {
.expect("get_non_persisting should always work"),
persisting: p.get_persisting_data(),
partition_status: PartitionStatus {
parquet_max_sequence_number: p.data.max_persisted_sequence_number(),
parquet_max_sequence_number: p.max_persisted_sequence_number(),
tombstone_max_sequence_number: self.tombstone_max_sequence_number,
},
})
@ -178,10 +178,10 @@ impl TableData {
// for now we just need the max persisted
let max_persisted_sequence_number = files.iter().map(|p| p.max_sequence_number).max();
let mut data = PartitionData::new(partition.id);
data.data.max_persisted_sequence_number = max_persisted_sequence_number;
self.partition_data.insert(partition.partition_key, data);
self.partition_data.insert(
partition.partition_key,
PartitionData::new(partition.id, max_persisted_sequence_number),
);
Ok(())
}

View File

@ -809,7 +809,7 @@ pub(crate) fn make_partitions(
let mut seq_num = seq_num.get();
if two_partitions {
let partition_id = PartitionId::new(2);
let mut p2 = PartitionData::new(partition_id);
let mut p2 = PartitionData::new(partition_id, None);
// Group 4: in buffer of p2
// Fill `buffer`
seq_num += 1;
@ -933,7 +933,7 @@ fn make_first_partition_data(
// ------------------------------------------
// Build the first partition
let mut p1 = PartitionData::new(partition_id);
let mut p1 = PartitionData::new(partition_id, None);
let mut seq_num = 0;
// --------------------