perf(ingester2): partition persist queue
This commit swaps the existing single "persisting batch" slot (field) in a PartitionData for an ordered queue of outstanding partitions. This decouples marking a partition buffer for persistence from the actual persistence operation, allowing them to proceed at different rates. This reduces the complexity of persistence management, but also allows us to gracefully handle "hot" partitions; for example, this problematic scenario in the ingester(1) implementation during recovery: * Writes come into a partition, reaching a size/row/hotness limit * Partition is enqueued for persistence * More writes come into the new buffer, exceeding the same limit * Cannot persist the hot buffer because of outstanding persist op Without this change the only possibilities in this situation are: * stop ingest for the partition and error all writes that (partially!) write to the partition, or * continue accepting writes, allowing the partition to exceed the limit that marked it for persistence in the first place The latter is what the ingester(1) implementation does today, which results in partitions exceeding their row/size/age limits, which exist to limit the cost of generating the Parquet file from the buffer - this is a significant contributor to instability during recovery. This strategy enforces configured the limits on the partition buffer, but does not block / slow down recovery while persistence is completed.pull/24376/head
parent
94cd66fd99
commit
4f928fbd0c
|
@ -1,17 +1,21 @@
|
|||
//! Partition level data buffer structures.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
|
||||
use data_types::{NamespaceId, PartitionId, PartitionKey, SequenceNumber, TableId};
|
||||
use mutable_batch::MutableBatch;
|
||||
use observability_deps::tracing::*;
|
||||
use schema::sort::SortKey;
|
||||
|
||||
use self::buffer::{traits::Queryable, BufferState, DataBuffer, Persisting};
|
||||
use self::{
|
||||
buffer::{traits::Queryable, BufferState, DataBuffer, Persisting},
|
||||
persisting::{BatchIdent, PersistingData},
|
||||
};
|
||||
use super::{namespace::NamespaceName, table::TableName};
|
||||
use crate::{deferred_load::DeferredLoad, query_adaptor::QueryAdaptor};
|
||||
|
||||
mod buffer;
|
||||
mod persisting;
|
||||
pub(crate) mod resolver;
|
||||
|
||||
/// The load state of the [`SortKey`] for a given partition.
|
||||
|
@ -69,8 +73,18 @@ pub(crate) struct PartitionData {
|
|||
/// A [`DataBuffer`] for incoming writes.
|
||||
buffer: DataBuffer,
|
||||
|
||||
/// The currently persisting [`DataBuffer`], if any.
|
||||
persisting: Option<BufferState<Persisting>>,
|
||||
/// The currently persisting [`DataBuffer`] instances, if any.
|
||||
///
|
||||
/// This queue is ordered from newest at the head, to oldest at the tail -
|
||||
/// forward iteration order matches write order.
|
||||
///
|
||||
/// The [`BatchIdent`] is a generational counter that is used to tag each
|
||||
/// persisting with a unique, opaque identifier.
|
||||
persisting: VecDeque<(BatchIdent, BufferState<Persisting>)>,
|
||||
|
||||
/// The number of persist operations started over the lifetime of this
|
||||
/// [`PartitionData`].
|
||||
started_persistence_count: BatchIdent,
|
||||
}
|
||||
|
||||
impl PartitionData {
|
||||
|
@ -94,7 +108,8 @@ impl PartitionData {
|
|||
table_id,
|
||||
table_name,
|
||||
buffer: DataBuffer::default(),
|
||||
persisting: None,
|
||||
persisting: VecDeque::with_capacity(1),
|
||||
started_persistence_count: BatchIdent::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -133,7 +148,7 @@ impl PartitionData {
|
|||
let data = self
|
||||
.persisting
|
||||
.iter()
|
||||
.flat_map(|b| b.get_query_data())
|
||||
.flat_map(|(_, b)| b.get_query_data())
|
||||
.chain(buffered_data)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
|
@ -167,42 +182,45 @@ impl PartitionData {
|
|||
/// A reference to the persisting data is retained until a corresponding
|
||||
/// call to [`Self::mark_persisted()`] is made to release it.
|
||||
///
|
||||
/// # Panics
|
||||
/// It is an invariant that partitions are persisted in order, as queriers
|
||||
/// consider writes in the object store as being strictly after writes
|
||||
/// returned from an ingester.
|
||||
///
|
||||
/// This method panics if [`Self`] contains data already an ongoing persist
|
||||
/// operation. All calls to [`Self::mark_persisting()`] must be followed by
|
||||
/// a matching call to [`Self::mark_persisted()`] before a new persist can
|
||||
/// begin.
|
||||
pub(crate) fn mark_persisting(&mut self) -> Option<QueryAdaptor> {
|
||||
// Assert that there is at most one persist operation per partition
|
||||
// ongoing at any one time.
|
||||
//
|
||||
// This is not a system invariant, however the system MUST make
|
||||
// persisted partitions visible in monotonic order w.r.t their sequence
|
||||
// numbers.
|
||||
assert!(
|
||||
self.persisting.is_none(),
|
||||
"starting persistence on partition in persisting state"
|
||||
);
|
||||
|
||||
let persisting = std::mem::take(&mut self.buffer).into_persisting()?;
|
||||
/// Additionally each persistence MAY update the partition sort key, which
|
||||
/// is not a commutative operations, requiring partition persistence to be
|
||||
/// serialised (unless it can be known in advance no sort key update is
|
||||
/// necessary for a given persistence).
|
||||
pub(crate) fn mark_persisting(&mut self) -> Option<PersistingData> {
|
||||
let fsm = std::mem::take(&mut self.buffer).into_persisting()?;
|
||||
|
||||
// From this point on, all code MUST be infallible or the buffered data
|
||||
// contained within persisting may be dropped.
|
||||
|
||||
// Increment the "started persist" counter.
|
||||
//
|
||||
// This is used to cheaply identify batches given to the
|
||||
// mark_persisted() call.
|
||||
let batch_ident = self.started_persistence_count.next();
|
||||
|
||||
debug!(
|
||||
namespace_id = %self.namespace_id,
|
||||
table_id = %self.table_id,
|
||||
table_name = %self.table_name,
|
||||
partition_id = %self.partition_id,
|
||||
partition_key = %self.partition_key,
|
||||
%batch_ident,
|
||||
"marking partition as persisting"
|
||||
);
|
||||
|
||||
let data = persisting.get_query_data();
|
||||
self.persisting = Some(persisting);
|
||||
// Wrap the persisting data in the type wrapper
|
||||
let data = PersistingData::new(
|
||||
QueryAdaptor::new(self.partition_id, fsm.get_query_data()),
|
||||
batch_ident,
|
||||
);
|
||||
|
||||
Some(QueryAdaptor::new(self.partition_id, data))
|
||||
self.persisting.push_front((batch_ident, fsm));
|
||||
|
||||
Some(data)
|
||||
}
|
||||
|
||||
/// Mark this partition as having completed persistence of the specified
|
||||
|
@ -217,16 +235,30 @@ impl PartitionData {
|
|||
/// persisted out-of-order w.r.t other persisting batches. All calls to
|
||||
/// [`Self::mark_persisted()`] must be preceded by a matching call to
|
||||
/// [`Self::mark_persisting()`].
|
||||
pub(crate) fn mark_persisted(&mut self, _batch: QueryAdaptor) {
|
||||
// It is an invariant that partitions are persisted in order, as
|
||||
// queriers consider writes in the object store as being strictly after
|
||||
// writes returned from an ingester.
|
||||
pub(crate) fn mark_persisted(&mut self, batch: PersistingData) {
|
||||
// Pop the oldest persist task from the persist queue.
|
||||
let (old_ident, _oldest) = self
|
||||
.persisting
|
||||
.pop_back()
|
||||
.expect("no currently persisting batch");
|
||||
|
||||
assert!(
|
||||
self.persisting.is_some(),
|
||||
"must be a persisting batch when marking complete"
|
||||
// Currently in ingester1 there is an invariant that partition buffers
|
||||
// not be persisted out-of-order - the assert below enforces that.
|
||||
assert_eq!(
|
||||
old_ident,
|
||||
batch.batch_ident(),
|
||||
"out-of-order persist notification received"
|
||||
);
|
||||
|
||||
debug!(
|
||||
namespace_id = %self.namespace_id,
|
||||
table_id = %self.table_id,
|
||||
table_name = %self.table_name,
|
||||
partition_id = %self.partition_id,
|
||||
partition_key = %self.partition_key,
|
||||
batch_ident = %batch.batch_ident(),
|
||||
"marking partition persistence complete"
|
||||
);
|
||||
self.persisting = None;
|
||||
}
|
||||
|
||||
pub(crate) fn partition_id(&self) -> PartitionId {
|
||||
|
@ -414,6 +446,9 @@ mod tests {
|
|||
p.buffer_write(mb, SequenceNumber::new(1))
|
||||
.expect("write should succeed");
|
||||
|
||||
// Ensure the batch ident is increased after a persist call.
|
||||
assert_eq!(p.started_persistence_count.get(), 0);
|
||||
|
||||
// Begin persisting the partition.
|
||||
let persisting_data = p.mark_persisting().expect("must contain existing data");
|
||||
// And validate the data being persisted.
|
||||
|
@ -436,6 +471,11 @@ mod tests {
|
|||
.collect::<Vec<_>>()
|
||||
);
|
||||
|
||||
// Ensure the batch ident is increased after a persist call.
|
||||
assert_eq!(p.started_persistence_count.get(), 1);
|
||||
// And the batch is correctly identified
|
||||
assert_eq!(persisting_data.batch_ident().get(), 1);
|
||||
|
||||
// Buffer another write during an ongoing persist.
|
||||
let mb = lp_to_mutable_batch(r#"bananas,city=Madrid people=4,pigeons="none" 20"#).1;
|
||||
p.buffer_write(mb, SequenceNumber::new(2))
|
||||
|
@ -468,6 +508,9 @@ mod tests {
|
|||
// The persist now "completes".
|
||||
p.mark_persisted(persisting_data);
|
||||
|
||||
// Ensure the batch ident is increased after a persist call.
|
||||
assert_eq!(p.started_persistence_count.get(), 1);
|
||||
|
||||
// Querying the buffer should now return only the second write.
|
||||
{
|
||||
let data = p.get_query_data().expect("must have data");
|
||||
|
@ -493,7 +536,8 @@ mod tests {
|
|||
}
|
||||
|
||||
// Ensure the ordering of snapshots & persisting data is preserved such that
|
||||
// updates resolve correctly.
|
||||
// updates resolve correctly, and batch identifiers are correctly allocated
|
||||
// and validated in mark_persisted() calls
|
||||
#[tokio::test]
|
||||
async fn test_record_batch_ordering() {
|
||||
// A helper function to dedupe the record batches in [`QueryAdaptor`]
|
||||
|
@ -579,10 +623,13 @@ mod tests {
|
|||
)
|
||||
.await;
|
||||
|
||||
// Ensure the batch ident is increased after a persist call.
|
||||
assert_eq!(p.started_persistence_count.get(), 0);
|
||||
|
||||
// Begin persisting the data, moving the buffer to the persisting state.
|
||||
|
||||
let persisting_data = p.mark_persisting().unwrap();
|
||||
assert_eq!(persisting_data.record_batches().len(), 1);
|
||||
let persisting_data1 = p.mark_persisting().unwrap();
|
||||
assert_eq!(persisting_data1.record_batches().len(), 1);
|
||||
assert_deduped(
|
||||
&[
|
||||
"+--------------------------------+---+",
|
||||
|
@ -591,10 +638,15 @@ mod tests {
|
|||
"| 1970-01-01T00:00:00.000000042Z | 2 |",
|
||||
"+--------------------------------+---+",
|
||||
],
|
||||
persisting_data.clone(),
|
||||
(*persisting_data1).clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Ensure the batch ident is increased after a persist call.
|
||||
assert_eq!(p.started_persistence_count.get(), 1);
|
||||
// And the batch is correctly identified
|
||||
assert_eq!(persisting_data1.batch_ident().get(), 1);
|
||||
|
||||
// Buffer another write, and generate a snapshot by querying it.
|
||||
let mb = lp_to_mutable_batch(r#"bananas x=3 42"#).1;
|
||||
p.buffer_write(mb, SequenceNumber::new(3))
|
||||
|
@ -613,11 +665,11 @@ mod tests {
|
|||
)
|
||||
.await;
|
||||
|
||||
// Finish persisting.
|
||||
p.mark_persisted(persisting_data);
|
||||
// Persist again, moving the last write to the persisting state and
|
||||
// adding it to the persisting queue.
|
||||
|
||||
// And assert the correct value remains.
|
||||
assert_eq!(p.get_query_data().unwrap().record_batches().len(), 1);
|
||||
let persisting_data2 = p.mark_persisting().unwrap();
|
||||
assert_eq!(persisting_data2.record_batches().len(), 1);
|
||||
assert_deduped(
|
||||
&[
|
||||
"+--------------------------------+---+",
|
||||
|
@ -626,9 +678,118 @@ mod tests {
|
|||
"| 1970-01-01T00:00:00.000000042Z | 3 |",
|
||||
"+--------------------------------+---+",
|
||||
],
|
||||
(*persisting_data2).clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Ensure the batch ident is increased after a persist call.
|
||||
assert_eq!(p.started_persistence_count.get(), 2);
|
||||
// And the batch is correctly identified
|
||||
assert_eq!(persisting_data1.batch_ident().get(), 1);
|
||||
assert_eq!(persisting_data2.batch_ident().get(), 2);
|
||||
|
||||
// Buffer another write, and generate a snapshot by querying it.
|
||||
let mb = lp_to_mutable_batch(r#"bananas x=4 42"#).1;
|
||||
p.buffer_write(mb, SequenceNumber::new(3))
|
||||
.expect("write should succeed");
|
||||
|
||||
assert_eq!(p.get_query_data().unwrap().record_batches().len(), 3);
|
||||
assert_deduped(
|
||||
&[
|
||||
"+--------------------------------+---+",
|
||||
"| time | x |",
|
||||
"+--------------------------------+---+",
|
||||
"| 1970-01-01T00:00:00.000000042Z | 4 |",
|
||||
"+--------------------------------+---+",
|
||||
],
|
||||
p.get_query_data().unwrap(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Finish persisting the first batch.
|
||||
p.mark_persisted(persisting_data1);
|
||||
|
||||
// And assert the correct value remains.
|
||||
assert_eq!(p.get_query_data().unwrap().record_batches().len(), 2);
|
||||
assert_deduped(
|
||||
&[
|
||||
"+--------------------------------+---+",
|
||||
"| time | x |",
|
||||
"+--------------------------------+---+",
|
||||
"| 1970-01-01T00:00:00.000000042Z | 4 |",
|
||||
"+--------------------------------+---+",
|
||||
],
|
||||
p.get_query_data().unwrap(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Finish persisting the second batch.
|
||||
p.mark_persisted(persisting_data2);
|
||||
|
||||
// And assert the correct value remains.
|
||||
assert_eq!(p.get_query_data().unwrap().record_batches().len(), 1);
|
||||
assert_deduped(
|
||||
&[
|
||||
"+--------------------------------+---+",
|
||||
"| time | x |",
|
||||
"+--------------------------------+---+",
|
||||
"| 1970-01-01T00:00:00.000000042Z | 4 |",
|
||||
"+--------------------------------+---+",
|
||||
],
|
||||
p.get_query_data().unwrap(),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(p.started_persistence_count.get(), 2);
|
||||
}
|
||||
|
||||
// Ensure the ordering of snapshots & persisting data is preserved such that
|
||||
// updates resolve correctly, and batch identifiers are correctly allocated
|
||||
// and validated in mark_persisted() calls
|
||||
#[tokio::test]
|
||||
#[should_panic = "out-of-order persist notification received"]
|
||||
async fn test_out_of_order_persist() {
|
||||
let mut p = PartitionData::new(
|
||||
PARTITION_ID,
|
||||
PARTITION_KEY.clone(),
|
||||
NamespaceId::new(3),
|
||||
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
|
||||
NAMESPACE_NAME.clone()
|
||||
})),
|
||||
TableId::new(4),
|
||||
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
|
||||
TABLE_NAME.clone()
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
);
|
||||
|
||||
// Perform the initial write.
|
||||
//
|
||||
// In the next series of writes this test will overwrite the value of x
|
||||
// and assert the deduped resulting state.
|
||||
let mb = lp_to_mutable_batch(r#"bananas x=1 42"#).1;
|
||||
p.buffer_write(mb, SequenceNumber::new(1))
|
||||
.expect("write should succeed");
|
||||
|
||||
// Begin persisting the data, moving the buffer to the persisting state.
|
||||
|
||||
let persisting_data1 = p.mark_persisting().unwrap();
|
||||
|
||||
// Buffer another write, and generate a snapshot by querying it.
|
||||
let mb = lp_to_mutable_batch(r#"bananas x=3 42"#).1;
|
||||
p.buffer_write(mb, SequenceNumber::new(3))
|
||||
.expect("write should succeed");
|
||||
|
||||
// Persist again, moving the last write to the persisting state and
|
||||
// adding it to the persisting queue.
|
||||
|
||||
let persisting_data2 = p.mark_persisting().unwrap();
|
||||
|
||||
// Finish persisting the second batch out-of-order!
|
||||
p.mark_persisted(persisting_data2);
|
||||
|
||||
// Finish persisting the first batch.
|
||||
p.mark_persisted(persisting_data1);
|
||||
}
|
||||
|
||||
// Ensure an updated sort key is returned.
|
||||
|
@ -765,33 +926,6 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic(expected = "must be a persisting batch when marking complete")]
|
||||
async fn test_mark_persisted_not_persisting() {
|
||||
let mut p = PartitionData::new(
|
||||
PARTITION_ID,
|
||||
PARTITION_KEY.clone(),
|
||||
NamespaceId::new(3),
|
||||
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
|
||||
NAMESPACE_NAME.clone()
|
||||
})),
|
||||
TableId::new(4),
|
||||
Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
|
||||
TABLE_NAME.clone()
|
||||
})),
|
||||
SortKeyState::Provided(None),
|
||||
);
|
||||
|
||||
// Write some data
|
||||
let mb = lp_to_mutable_batch(r#"bananas,city=London people=2,pigeons="millions" 10"#).1;
|
||||
p.buffer_write(mb, SequenceNumber::new(2))
|
||||
.expect("write should succeed");
|
||||
|
||||
let not_persisting = p.get_query_data().unwrap();
|
||||
|
||||
p.mark_persisted(not_persisting);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_mark_persisting_no_data() {
|
||||
let mut p = PartitionData::new(
|
||||
|
@ -812,7 +946,6 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[should_panic(expected = "starting persistence on partition in persisting state")]
|
||||
async fn test_mark_persisting_twice() {
|
||||
let mut p = PartitionData::new(
|
||||
PARTITION_ID,
|
||||
|
@ -833,8 +966,7 @@ mod tests {
|
|||
.expect("write should succeed");
|
||||
|
||||
assert!(p.mark_persisting().is_some());
|
||||
|
||||
p.mark_persisting();
|
||||
assert!(p.mark_persisting().is_none());
|
||||
}
|
||||
|
||||
// Ensure an empty PartitionData does not panic due to constructing an empty
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
use std::fmt::Display;
|
||||
|
||||
use crate::query_adaptor::QueryAdaptor;
|
||||
|
||||
/// An opaque generational identifier of a buffer in a [`PartitionData`].
|
||||
///
|
||||
/// [`PartitionData`]: super::PartitionData
|
||||
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
|
||||
pub(super) struct BatchIdent(u64);
|
||||
|
||||
impl BatchIdent {
|
||||
/// Return the next unique value.
|
||||
pub(super) fn next(&mut self) -> Self {
|
||||
self.0 += 1;
|
||||
Self(self.0)
|
||||
}
|
||||
|
||||
/// Only for tests, this allows reading the opaque identifier to assert the
|
||||
/// value changing between persist ops.
|
||||
#[cfg(test)]
|
||||
pub(super) fn get(&self) -> u64 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for BatchIdent {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
self.0.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
/// A type wrapper over [`QueryAdaptor`] that statically ensures only batches of
|
||||
/// data from [`PartitionData::mark_persisting()`] are given to
|
||||
/// [`PartitionData::mark_persisted()`].
|
||||
///
|
||||
/// Cloning this type is relatively cheap.
|
||||
///
|
||||
/// [`PartitionData::mark_persisting()`]: super::PartitionData::mark_persisting
|
||||
/// [`PartitionData::mark_persisted()`]: super::PartitionData::mark_persisted
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct PersistingData {
|
||||
data: QueryAdaptor,
|
||||
batch_ident: BatchIdent,
|
||||
}
|
||||
|
||||
impl PersistingData {
|
||||
pub(super) fn new(data: QueryAdaptor, batch_ident: BatchIdent) -> Self {
|
||||
Self { data, batch_ident }
|
||||
}
|
||||
|
||||
pub(super) fn batch_ident(&self) -> BatchIdent {
|
||||
self.batch_ident
|
||||
}
|
||||
|
||||
pub(crate) fn query_adaptor(&self) -> QueryAdaptor {
|
||||
self.data.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for PersistingData {
|
||||
type Target = QueryAdaptor;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.data
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_batch_ident() {
|
||||
let mut b = BatchIdent::default();
|
||||
|
||||
assert_eq!(b.get(), 0);
|
||||
|
||||
assert_eq!(b.next().get(), 1);
|
||||
assert_eq!(b.get(), 1);
|
||||
|
||||
assert_eq!(b.next().get(), 2);
|
||||
assert_eq!(b.get(), 2);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue