Merge pull request #6413 from influxdata/dom/ooo-parittion-persist

feat(ingester2): out-of-order partition persist
pull/24376/head
Dom 2022-12-15 17:15:56 +00:00 committed by GitHub
commit e0886c3cdf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 149 additions and 24 deletions

View File

@ -187,10 +187,6 @@ impl PartitionData {
/// A reference to the persisting data is retained until a corresponding
/// call to [`Self::mark_persisted()`] is made to release it.
///
/// It is an invariant that partitions are persisted in order, as queriers
/// consider writes in the object store as being strictly after writes
/// returned from an ingester.
///
/// Additionally each persistence MAY update the partition sort key, which
/// is not a commutative operations, requiring partition persistence to be
/// serialised (unless it can be known in advance no sort key update is
@ -223,7 +219,10 @@ impl PartitionData {
batch_ident,
);
self.persisting.push_front((batch_ident, fsm));
// Push the new buffer to the back of the persisting queue, so that
// iterating from back to front during queries iterates over writes from
// oldest to newest.
self.persisting.push_back((batch_ident, fsm));
Some(data)
}
@ -236,28 +235,24 @@ impl PartitionData {
/// # Panics
///
/// This method panics if [`Self`] is not marked as undergoing a persist
/// operation, `batch` is not currently being persisted, or `batch` is
/// persisted out-of-order w.r.t other persisting batches. All calls to
/// [`Self::mark_persisted()`] must be preceded by a matching call to
/// [`Self::mark_persisting()`].
/// operation, or `batch` is not currently being persisted.
pub(crate) fn mark_persisted(&mut self, batch: PersistingData) {
// Pop the oldest persist task from the persist queue.
let (old_ident, _oldest) = self
// Find the batch in the persisting queue.
let idx = self
.persisting
.pop_back()
.iter()
.position(|(old, _)| *old == batch.batch_ident())
.expect("no currently persisting batch");
// Currently in ingester1 there is an invariant that partition buffers
// not be persisted out-of-order - the assert below enforces that.
assert_eq!(
old_ident,
batch.batch_ident(),
"out-of-order persist notification received"
);
// Remove the batch from the queue, preserving the order of the queue
// for batch iteration during queries.
let (old_ident, _oldest) = self.persisting.remove(idx).unwrap();
assert_eq!(old_ident, batch.batch_ident(),);
self.completed_persistence_count += 1;
debug!(
batch_ident = %old_ident,
persistence_count = %self.completed_persistence_count,
namespace_id = %self.namespace_id,
table_id = %self.table_id,
@ -762,10 +757,9 @@ mod tests {
}
// Ensure the ordering of snapshots & persisting data is preserved such that
// updates resolve correctly, and batch identifiers are correctly allocated
// and validated in mark_persisted() calls
// updates resolve correctly when queried, and batch identifiers are
// correctly allocated, validated, and removed in mark_persisted() calls
#[tokio::test]
#[should_panic = "out-of-order persist notification received"]
async fn test_out_of_order_persist() {
let mut p = PartitionData::new(
PARTITION_ID,
@ -794,20 +788,151 @@ mod tests {
let persisting_data1 = p.mark_persisting().unwrap();
// Buffer another write, and generate a snapshot by querying it.
let mb = lp_to_mutable_batch(r#"bananas x=3 42"#).1;
let mb = lp_to_mutable_batch(r#"bananas x=2 42"#).1;
p.buffer_write(mb, SequenceNumber::new(3))
.expect("write should succeed");
let data = p.get_query_data().unwrap();
assert_batches_eq!(
[
"+--------------------------------+---+",
"| time | x |",
"+--------------------------------+---+",
"| 1970-01-01T00:00:00.000000042Z | 1 |",
"| 1970-01-01T00:00:00.000000042Z | 2 |",
"+--------------------------------+---+",
],
&*data
.record_batches()
.iter()
.map(Deref::deref)
.cloned()
.collect::<Vec<_>>()
);
// Persist again, moving the last write to the persisting state and
// adding it to the persisting queue.
let persisting_data2 = p.mark_persisting().unwrap();
// Finish persisting the second batch out-of-order!
// Buffer another write, and generate a snapshot by querying it.
let mb = lp_to_mutable_batch(r#"bananas x=3 42"#).1;
p.buffer_write(mb, SequenceNumber::new(4))
.expect("write should succeed");
let data = p.get_query_data().unwrap();
assert_batches_eq!(
[
"+--------------------------------+---+",
"| time | x |",
"+--------------------------------+---+",
"| 1970-01-01T00:00:00.000000042Z | 1 |",
"| 1970-01-01T00:00:00.000000042Z | 2 |",
"| 1970-01-01T00:00:00.000000042Z | 3 |",
"+--------------------------------+---+",
],
&*data
.record_batches()
.iter()
.map(Deref::deref)
.cloned()
.collect::<Vec<_>>()
);
// Persist again, moving the last write to the persisting state and
// adding it to the persisting queue ordered such that querying returns
// the correctly ordered rows (newest rows last).
let persisting_data3 = p.mark_persisting().unwrap();
// Buffer another write, and generate a snapshot by querying it.
let mb = lp_to_mutable_batch(r#"bananas x=4 42"#).1;
p.buffer_write(mb, SequenceNumber::new(5))
.expect("write should succeed");
let data = p.get_query_data().unwrap();
assert_batches_eq!(
[
"+--------------------------------+---+",
"| time | x |",
"+--------------------------------+---+",
"| 1970-01-01T00:00:00.000000042Z | 1 |",
"| 1970-01-01T00:00:00.000000042Z | 2 |",
"| 1970-01-01T00:00:00.000000042Z | 3 |",
"| 1970-01-01T00:00:00.000000042Z | 4 |",
"+--------------------------------+---+",
],
&*data
.record_batches()
.iter()
.map(Deref::deref)
.cloned()
.collect::<Vec<_>>()
);
// Finish persisting the second batch out-of-order! The middle entry,
// ensuring the first and last entries remain ordered.
p.mark_persisted(persisting_data2);
let data = p.get_query_data().unwrap();
assert_batches_eq!(
[
"+--------------------------------+---+",
"| time | x |",
"+--------------------------------+---+",
"| 1970-01-01T00:00:00.000000042Z | 1 |",
"| 1970-01-01T00:00:00.000000042Z | 3 |",
"| 1970-01-01T00:00:00.000000042Z | 4 |",
"+--------------------------------+---+",
],
&*data
.record_batches()
.iter()
.map(Deref::deref)
.cloned()
.collect::<Vec<_>>()
);
// Finish persisting the last batch.
p.mark_persisted(persisting_data3);
let data = p.get_query_data().unwrap();
assert_batches_eq!(
[
"+--------------------------------+---+",
"| time | x |",
"+--------------------------------+---+",
"| 1970-01-01T00:00:00.000000042Z | 1 |",
"| 1970-01-01T00:00:00.000000042Z | 4 |",
"+--------------------------------+---+",
],
&*data
.record_batches()
.iter()
.map(Deref::deref)
.cloned()
.collect::<Vec<_>>()
);
// Finish persisting the first batch.
p.mark_persisted(persisting_data1);
// Assert only the buffered data remains
let data = p.get_query_data().unwrap();
assert_batches_eq!(
[
"+--------------------------------+---+",
"| time | x |",
"+--------------------------------+---+",
"| 1970-01-01T00:00:00.000000042Z | 4 |",
"+--------------------------------+---+",
],
&*data
.record_batches()
.iter()
.map(Deref::deref)
.cloned()
.collect::<Vec<_>>()
);
}
// Ensure an updated sort key is returned.