Merge pull request #5644 from influxdata/dom/split-data
refactor: hoist per-partition persistence watermark from bufferpull/24376/head
commit
1bac7792db
|
|
@ -1278,7 +1278,7 @@ mod tests {
|
|||
let table = table_data.read().await;
|
||||
let p = table.partition_data.get(&"1970-01-01".into()).unwrap();
|
||||
assert_eq!(
|
||||
p.data.max_persisted_sequence_number,
|
||||
p.max_persisted_sequence_number(),
|
||||
Some(SequenceNumber::new(1))
|
||||
);
|
||||
assert!(p.data.buffer.is_none());
|
||||
|
|
|
|||
|
|
@ -300,7 +300,7 @@ impl NamespaceData {
|
|||
let partition = t.partition_data.get_mut(partition_key);
|
||||
|
||||
if let Some(p) = partition {
|
||||
p.data.mark_persisted(sequence_number);
|
||||
p.mark_persisted(sequence_number);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -113,14 +113,19 @@ impl SnapshotBatch {
|
|||
pub(crate) struct PartitionData {
|
||||
id: PartitionId,
|
||||
pub(crate) data: DataBuffer,
|
||||
|
||||
/// The max_persisted_sequence number for any parquet_file in this
|
||||
/// partition.
|
||||
max_persisted_sequence_number: Option<SequenceNumber>,
|
||||
}
|
||||
|
||||
impl PartitionData {
|
||||
/// Initialize a new partition data buffer
|
||||
pub fn new(id: PartitionId) -> Self {
|
||||
pub fn new(id: PartitionId, max_persisted_sequence_number: Option<SequenceNumber>) -> Self {
|
||||
Self {
|
||||
id,
|
||||
data: Default::default(),
|
||||
max_persisted_sequence_number,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -262,6 +267,19 @@ impl PartitionData {
|
|||
pub(crate) fn id(&self) -> PartitionId {
|
||||
self.id
|
||||
}
|
||||
|
||||
/// Return the [`SequenceNumber`] that forms the (inclusive) persistence
|
||||
/// watermark for this partition.
|
||||
pub(crate) fn max_persisted_sequence_number(&self) -> Option<SequenceNumber> {
|
||||
self.max_persisted_sequence_number
|
||||
}
|
||||
|
||||
/// Mark this partition as having completed persistence up to, and
|
||||
/// including, the specified [`SequenceNumber`].
|
||||
pub(crate) fn mark_persisted(&mut self, sequence_number: SequenceNumber) {
|
||||
self.max_persisted_sequence_number = Some(sequence_number);
|
||||
self.data.mark_persisted();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
@ -277,6 +295,7 @@ mod tests {
|
|||
let mut partition_data = PartitionData {
|
||||
id: PartitionId::new(1),
|
||||
data: Default::default(),
|
||||
max_persisted_sequence_number: None,
|
||||
};
|
||||
|
||||
let seq_num1 = SequenceNumber::new(1);
|
||||
|
|
@ -317,7 +336,7 @@ mod tests {
|
|||
let t_id = 1;
|
||||
let p_id = 1;
|
||||
let table_name = "restaurant";
|
||||
let mut p = PartitionData::new(PartitionId::new(p_id));
|
||||
let mut p = PartitionData::new(PartitionId::new(p_id), None);
|
||||
let exec = Executor::new(1);
|
||||
|
||||
// ------------------------------------------
|
||||
|
|
|
|||
|
|
@ -38,9 +38,6 @@ pub(crate) struct DataBuffer {
|
|||
/// Buffer of incoming writes
|
||||
pub(crate) buffer: Option<BufferBatch>,
|
||||
|
||||
/// The max_persisted_sequence number for any parquet_file in this partition
|
||||
pub(crate) max_persisted_sequence_number: Option<SequenceNumber>,
|
||||
|
||||
/// Buffer of tombstones whose time range may overlap with this partition.
|
||||
/// All tombstones were already applied to corresponding snapshots. This list
|
||||
/// only keep the ones that come during persisting. The reason
|
||||
|
|
@ -239,16 +236,11 @@ impl DataBuffer {
|
|||
self.snapshots.as_ref()
|
||||
}
|
||||
|
||||
pub(crate) fn mark_persisted(&mut self, up_to: SequenceNumber) {
|
||||
self.max_persisted_sequence_number = Some(up_to);
|
||||
pub(crate) fn mark_persisted(&mut self) {
|
||||
self.persisting = None;
|
||||
self.deletes_during_persisting.clear()
|
||||
}
|
||||
|
||||
pub(crate) fn max_persisted_sequence_number(&self) -> Option<SequenceNumber> {
|
||||
self.max_persisted_sequence_number
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(super) fn deletes_during_persisting(&self) -> &[Tombstone] {
|
||||
self.deletes_during_persisting.as_ref()
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ impl TableData {
|
|||
pub fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
|
||||
self.partition_data
|
||||
.values()
|
||||
.map(|p| p.data.max_persisted_sequence_number())
|
||||
.map(|p| p.max_persisted_sequence_number())
|
||||
.max()
|
||||
.flatten()
|
||||
}
|
||||
|
|
@ -82,7 +82,7 @@ impl TableData {
|
|||
};
|
||||
|
||||
// skip the write if it has already been persisted
|
||||
if let Some(max) = partition_data.data.max_persisted_sequence_number() {
|
||||
if let Some(max) = partition_data.max_persisted_sequence_number() {
|
||||
if max >= sequence_number {
|
||||
return Ok(false);
|
||||
}
|
||||
|
|
@ -148,7 +148,7 @@ impl TableData {
|
|||
.expect("get_non_persisting should always work"),
|
||||
persisting: p.get_persisting_data(),
|
||||
partition_status: PartitionStatus {
|
||||
parquet_max_sequence_number: p.data.max_persisted_sequence_number(),
|
||||
parquet_max_sequence_number: p.max_persisted_sequence_number(),
|
||||
tombstone_max_sequence_number: self.tombstone_max_sequence_number,
|
||||
},
|
||||
})
|
||||
|
|
@ -178,10 +178,10 @@ impl TableData {
|
|||
// for now we just need the max persisted
|
||||
let max_persisted_sequence_number = files.iter().map(|p| p.max_sequence_number).max();
|
||||
|
||||
let mut data = PartitionData::new(partition.id);
|
||||
data.data.max_persisted_sequence_number = max_persisted_sequence_number;
|
||||
|
||||
self.partition_data.insert(partition.partition_key, data);
|
||||
self.partition_data.insert(
|
||||
partition.partition_key,
|
||||
PartitionData::new(partition.id, max_persisted_sequence_number),
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -809,7 +809,7 @@ pub(crate) fn make_partitions(
|
|||
let mut seq_num = seq_num.get();
|
||||
if two_partitions {
|
||||
let partition_id = PartitionId::new(2);
|
||||
let mut p2 = PartitionData::new(partition_id);
|
||||
let mut p2 = PartitionData::new(partition_id, None);
|
||||
// Group 4: in buffer of p2
|
||||
// Fill `buffer`
|
||||
seq_num += 1;
|
||||
|
|
@ -933,7 +933,7 @@ fn make_first_partition_data(
|
|||
|
||||
// ------------------------------------------
|
||||
// Build the first partition
|
||||
let mut p1 = PartitionData::new(partition_id);
|
||||
let mut p1 = PartitionData::new(partition_id, None);
|
||||
let mut seq_num = 0;
|
||||
|
||||
// --------------------
|
||||
|
|
|
|||
Loading…
Reference in New Issue