feat(ingester): partition buffer state machine
This commit introduces code that is intended to replace the current implicit state machine used by PartitionData. The existing code is still in use, the new code is NOT used in this commit. A follow-up commit will switch over to minimise the diff. This change has two main goals; * encapsulation & simplification for callers * robust implementation so developing correct additions is easier This is a significant refactor of the partition buffering logic to encapsulate the various states of data (buffering, snapshot, persisting and the mixed states between them) within the Partition. This alleviates the rest of the system from having to be concerned with the differences between "buffering" data, and "unpersisted data", "snapshot data", "persisting data", "persisting with snapshots" etc - callers now invoke a method called get_query_data() and they are provided with all the relevant data for a partition. This abstraction change alone significantly reduces code and test complexity in the rest of the ingester. For the second goal, the new implementation leverages an explicit state machine, encoded using typestates. Typestate ensures compile-time correctness of transitions and method calls, and the explicit FSM itself helps ensure the system progresses in the desired manner - this fixes and helps prevent bugs caused by implicit states such as: https://github.com/influxdata/influxdb_iox/issues/5805 This state machine makes the system states explicit and self-descriptive, helping to reduce the cost of developer on-boarding (no prior knowledge of "how this bit works") and reduces ongoing developer burden. This explicit nature also de-risks adding new functionality - it should be relatively easy to add concurrent snapshot generation or incremental compaction without introducing bugs. The state transition logic is abstracted away from callers, minimising the overhead of this strategy.pull/24376/head
parent
9fb1433317
commit
c8fdd76033
|
@ -0,0 +1,99 @@
|
|||
//! A helper type that ensures an `Option` is always `Some` once the guard is
|
||||
//! dropped.
|
||||
|
||||
/// A guard through which a value can be placed back into the [`AlwaysSome`].
|
||||
#[derive(Debug)]
|
||||
#[must_use = "Guard must be used to restore the value"]
|
||||
pub(super) struct Guard<'a, T>(&'a mut Option<T>);
|
||||
|
||||
impl<'a, T> Guard<'a, T> {
|
||||
/// Store `value` in the [`AlwaysSome`] for subsequent
|
||||
/// [`AlwaysSome::take()`] calls.
|
||||
pub(super) fn store(self, value: T) {
|
||||
assert!(self.0.is_none());
|
||||
*self.0 = Some(value);
|
||||
}
|
||||
}
|
||||
|
||||
/// A helper type that aims to ease working with an [`Option`] that must always
|
||||
/// be restored in a given scope.
|
||||
///
|
||||
/// Accessing the value within an [`AlwaysSome`] returns a [`Guard`], which MUST
|
||||
/// be used to store the value before going out of scope. Failure to store a
|
||||
/// value cause a subsequent [`Self::take()`] call to panic.
|
||||
///
|
||||
/// Failing to store a value in the [`Guard`] causes a compiler warning, however
|
||||
/// this does not prevent failing to return a value to the [`AlwaysSome`] as the
|
||||
/// warning can be falsely silenced by using it within one conditional code path
|
||||
/// and not the other.
|
||||
#[derive(Debug)]
|
||||
pub(super) struct AlwaysSome<T>(Option<T>);
|
||||
|
||||
impl<T> Default for AlwaysSome<T>
|
||||
where
|
||||
T: Default,
|
||||
{
|
||||
fn default() -> Self {
|
||||
Self::new(T::default())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> std::ops::Deref for AlwaysSome<T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.0.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AlwaysSome<T> {
|
||||
/// Wrap `value` in an [`AlwaysSome`].
|
||||
pub(super) fn new(value: T) -> Self {
|
||||
Self(Some(value))
|
||||
}
|
||||
|
||||
/// Read the value.
|
||||
pub(super) fn take(&mut self) -> (Guard<'_, T>, T) {
|
||||
let value = std::mem::take(&mut self.0);
|
||||
|
||||
(
|
||||
Guard(&mut self.0),
|
||||
value.expect("AlwaysSome value is None!"),
|
||||
)
|
||||
}
|
||||
|
||||
/// Deconstruct `self`, returning the inner value.
|
||||
pub(crate) fn into_inner(self) -> T {
|
||||
self.0.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_always_some() {
|
||||
let mut a = AlwaysSome::<usize>::default();
|
||||
|
||||
let (guard, value) = a.take();
|
||||
assert_eq!(value, 0);
|
||||
guard.store(42);
|
||||
|
||||
let (guard, value) = a.take();
|
||||
assert_eq!(value, 42);
|
||||
guard.store(24);
|
||||
|
||||
assert_eq!(a.into_inner(), 24);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic = "AlwaysSome value is None!"]
|
||||
fn test_drops_guard() {
|
||||
let mut a = AlwaysSome::<usize>::default();
|
||||
{
|
||||
let _ = a.take();
|
||||
}
|
||||
let _ = a.take();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use mutable_batch::MutableBatch;
|
||||
use schema::selection::Selection;
|
||||
|
||||
/// A [`Buffer`] is an internal mutable buffer wrapper over a [`MutableBatch`]
|
||||
/// for the [`BufferState`] FSM.
|
||||
///
|
||||
/// A [`Buffer`] can contain no writes.
|
||||
///
|
||||
/// [`BufferState`]: super::super::BufferState
|
||||
#[derive(Debug, Default)]
|
||||
pub(super) struct Buffer {
|
||||
buffer: Option<MutableBatch>,
|
||||
}
|
||||
|
||||
impl Buffer {
|
||||
/// Apply `batch` to the in-memory buffer.
|
||||
///
|
||||
/// # Data Loss
|
||||
///
|
||||
/// If this method returns an error, the data in `batch` is problematic and
|
||||
/// has been discarded.
|
||||
pub(super) fn buffer_write(&mut self, batch: MutableBatch) -> Result<(), mutable_batch::Error> {
|
||||
match self.buffer {
|
||||
Some(ref mut b) => b.extend_from(&batch)?,
|
||||
None => self.buffer = Some(batch),
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Generates a [`RecordBatch`] from the data in this [`Buffer`].
|
||||
///
|
||||
/// If this [`Buffer`] is empty when this method is called, the call is a
|
||||
/// NOP and [`None`] is returned.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// If generating the snapshot fails, this method panics.
|
||||
pub(super) fn snapshot(self) -> Option<Arc<RecordBatch>> {
|
||||
Some(Arc::new(
|
||||
self.buffer?
|
||||
.to_arrow(Selection::All)
|
||||
.expect("failed to snapshot buffer data"),
|
||||
))
|
||||
}
|
||||
|
||||
pub(super) fn is_empty(&self) -> bool {
|
||||
self.buffer.is_none()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,289 @@
|
|||
#![allow(dead_code)]
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use data_types::SequenceNumber;
|
||||
use mutable_batch::MutableBatch;
|
||||
|
||||
mod buffering;
|
||||
mod buffering_with_snapshot;
|
||||
mod persisting;
|
||||
mod snapshot;
|
||||
|
||||
pub(in crate::data::partition::buffer) use buffering::*;
|
||||
pub(in crate::data::partition::buffer) use buffering_with_snapshot::*;
|
||||
pub(crate) use persisting::*;
|
||||
pub(in crate::data::partition::buffer) use snapshot::*;
|
||||
|
||||
use crate::data::SequenceNumberRange;
|
||||
|
||||
use super::traits::{Queryable, Writeable};
|
||||
|
||||
/// A result type for fallible transitions.
|
||||
///
|
||||
/// The type system ensures the state machine is always returned to the caller,
|
||||
/// regardless of the transition outcome.
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum Transition<A, B> {
|
||||
/// The transition succeeded, and the new state is contained within.
|
||||
Ok(BufferState<A>),
|
||||
/// The state machine failed to transition due to an invariant not being
|
||||
/// upheld, and the original state is contained within.
|
||||
Unchanged(BufferState<B>),
|
||||
}
|
||||
|
||||
impl<A, B> Transition<A, B> {
|
||||
/// A helper function to construct [`Self::Ok`] variants.
|
||||
pub(super) fn ok(v: A, sequence_range: SequenceNumberRange) -> Transition<A, B> {
|
||||
Self::Ok(BufferState {
|
||||
state: v,
|
||||
sequence_range,
|
||||
})
|
||||
}
|
||||
|
||||
/// A helper function to construct [`Self::Unchanged`] variants.
|
||||
pub(super) fn unchanged(v: BufferState<B>) -> Transition<A, B> {
|
||||
Self::Unchanged(v)
|
||||
}
|
||||
}
|
||||
|
||||
/// A finite state machine for buffering writes, and converting them into a
|
||||
/// queryable data format on-demand.
|
||||
///
|
||||
/// This FSM is used to provide explicit states for each stage of the data
|
||||
/// lifecycle within a partition buffer:
|
||||
///
|
||||
/// ```text
|
||||
/// ┌──────────────┐
|
||||
/// │ Buffering │
|
||||
/// └───────┬──────┘
|
||||
/// │
|
||||
/// ▼
|
||||
/// ┌ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─
|
||||
/// ┌─────▶ Snapshot ├─────▶ Persisting │
|
||||
/// │ └ ─ ─ ─ ┬ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ─
|
||||
/// │ │
|
||||
/// │ ▼
|
||||
/// │ ┌───────────────────────┐
|
||||
/// └──│ BufferingWithSnapshot │
|
||||
/// └───────────────────────┘
|
||||
/// ```
|
||||
///
|
||||
/// Boxes with dashed lines indicate immutable, queryable states that contain
|
||||
/// data in an efficient data format for query execution ([`RecordBatch`]).
|
||||
///
|
||||
/// Boxes with solid lines indicate a mutable state to which further writes can
|
||||
/// be applied.
|
||||
///
|
||||
/// A [`BufferState`] tracks the bounding [`SequenceNumber`] values it has
|
||||
/// observed, and enforces monotonic writes (w.r.t their [`SequenceNumber`]).
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct BufferState<T> {
|
||||
state: T,
|
||||
sequence_range: SequenceNumberRange,
|
||||
}
|
||||
|
||||
impl BufferState<Buffering> {
|
||||
/// Initialise a new buffer state machine.
|
||||
pub(super) fn new() -> Self {
|
||||
Self {
|
||||
state: Buffering::default(),
|
||||
sequence_range: SequenceNumberRange::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> BufferState<T> {
|
||||
pub fn sequence_number_range(&self) -> &SequenceNumberRange {
|
||||
&self.sequence_range
|
||||
}
|
||||
}
|
||||
|
||||
/// A [`BufferState`] in a mutable state can accept writes and record their
|
||||
/// [`SequenceNumber`].
|
||||
impl<T> BufferState<T>
|
||||
where
|
||||
T: Writeable,
|
||||
{
|
||||
/// The provided [`SequenceNumber`] MUST be for the given [`MutableBatch`].
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// This method panics if it is called non-monotonic writes/sequence
|
||||
/// numbers.
|
||||
pub(crate) fn write(
|
||||
&mut self,
|
||||
batch: MutableBatch,
|
||||
n: SequenceNumber,
|
||||
) -> Result<(), mutable_batch::Error> {
|
||||
self.state.write(batch)?;
|
||||
self.sequence_range.observe(n);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A [`BufferState`] in a queryable state delegates the read to the current
|
||||
/// state machine state.
|
||||
impl<T> Queryable for BufferState<T>
|
||||
where
|
||||
T: Queryable,
|
||||
{
|
||||
/// Returns the current buffer data.
|
||||
///
|
||||
/// This is always a cheap method call.
|
||||
fn get_query_data(&self) -> &[Arc<RecordBatch>] {
|
||||
self.state.get_query_data()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::ops::Deref;
|
||||
|
||||
use arrow_util::assert_batches_eq;
|
||||
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
|
||||
use schema::selection::Selection;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_buffer_lifecycle() {
|
||||
// Initialise a buffer in the base state.
|
||||
let mut buffer: BufferState<Buffering> = BufferState::new();
|
||||
|
||||
// Validate the sequence number ranges are not populated.
|
||||
assert!(buffer.sequence_number_range().inclusive_min().is_none());
|
||||
assert!(buffer.sequence_number_range().inclusive_max().is_none());
|
||||
|
||||
// Write some data to a buffer.
|
||||
buffer
|
||||
.write(
|
||||
lp_to_mutable_batch(r#"bananas,tag=platanos great=true 668563242000000042"#).1,
|
||||
SequenceNumber::new(0),
|
||||
)
|
||||
.expect("write to empty buffer should succeed");
|
||||
|
||||
// Snapshot the buffer into an immutable, queryable data format.
|
||||
let buffer: BufferState<Snapshot> = match buffer.snapshot() {
|
||||
Transition::Ok(v) => v,
|
||||
Transition::Unchanged(_) => panic!("did not transition to snapshot state"),
|
||||
};
|
||||
|
||||
// Extract the queryable data from the buffer and validate it.
|
||||
//
|
||||
// Keep the data to validate they are ref-counted copies after further
|
||||
// writes below. Note this construct allows the caller to decide when/if
|
||||
// to allocate.
|
||||
let w1_data = buffer.get_query_data().to_owned();
|
||||
|
||||
let expected = vec![
|
||||
"+-------+----------+--------------------------------+",
|
||||
"| great | tag | time |",
|
||||
"+-------+----------+--------------------------------+",
|
||||
"| true | platanos | 1991-03-10T00:00:42.000000042Z |",
|
||||
"+-------+----------+--------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(&expected, &[w1_data[0].deref().clone()]);
|
||||
|
||||
// Transition the buffer into a mutable state in which it can accept
|
||||
// writes.
|
||||
let mut buffer: BufferState<BufferingWithSnapshot> = buffer.into_buffering();
|
||||
|
||||
// Apply another write.
|
||||
buffer
|
||||
.write(
|
||||
lp_to_mutable_batch(
|
||||
r#"bananas,tag=platanos great=true,how_much=1000 668563242000000042"#,
|
||||
)
|
||||
.1,
|
||||
SequenceNumber::new(1),
|
||||
)
|
||||
.expect("write to empty buffer should succeed");
|
||||
|
||||
// Snapshot the buffer into an immutable, queryable data format.
|
||||
let buffer: BufferState<Snapshot> = buffer.snapshot();
|
||||
|
||||
// Verify the second write was buffered.
|
||||
let w2_data = buffer.get_query_data().to_owned();
|
||||
let expected = vec![
|
||||
"+-------+----------+----------+--------------------------------+",
|
||||
"| great | how_much | tag | time |",
|
||||
"+-------+----------+----------+--------------------------------+",
|
||||
"| true | 1000 | platanos | 1991-03-10T00:00:42.000000042Z |",
|
||||
"+-------+----------+----------+--------------------------------+",
|
||||
];
|
||||
assert_batches_eq!(&expected, &[w2_data[1].deref().clone()]);
|
||||
|
||||
// Verify the first write has not changed, and has not been
|
||||
// re-ordered.
|
||||
assert_eq!(w1_data, w2_data[..1]);
|
||||
// Furthermore, ensure no data was actually copied
|
||||
assert!(Arc::ptr_eq(&w1_data[0], &w2_data[0]));
|
||||
|
||||
// Ensure the same data is returned for a second read.
|
||||
{
|
||||
let second_read = buffer.get_query_data();
|
||||
assert_eq!(w2_data, second_read);
|
||||
|
||||
// And that no data was actually copied.
|
||||
let same_arcs = w2_data
|
||||
.iter()
|
||||
.zip(second_read.iter())
|
||||
.all(|(a, b)| Arc::ptr_eq(a, &b));
|
||||
assert!(same_arcs);
|
||||
}
|
||||
|
||||
// Finally transition into the terminal persisting state.
|
||||
let buffer: BufferState<Persisting> = buffer.into_persisting();
|
||||
|
||||
// Validate the sequence number ranges were updated as writes occurred.
|
||||
assert_eq!(
|
||||
buffer.sequence_number_range().inclusive_min(),
|
||||
Some(SequenceNumber::new(0))
|
||||
);
|
||||
assert_eq!(
|
||||
buffer.sequence_number_range().inclusive_max(),
|
||||
Some(SequenceNumber::new(1))
|
||||
);
|
||||
|
||||
// Extract the final buffered result
|
||||
let final_data = buffer.into_data();
|
||||
|
||||
// And once again verify no data was changed, copied or re-ordered.
|
||||
assert_eq!(w2_data, final_data);
|
||||
let same_arcs = w2_data
|
||||
.into_iter()
|
||||
.zip(final_data.into_iter())
|
||||
.all(|(a, b)| Arc::ptr_eq(&a, &b));
|
||||
assert!(same_arcs);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_snapshot_buffer_different_but_compatible_schemas() {
|
||||
let mut buffer = BufferState::new();
|
||||
|
||||
// Missing tag `t1`
|
||||
let (_, mut mb1) = lp_to_mutable_batch(r#"foo iv=1i,uv=774u,fv=1.0,bv=true,sv="hi" 1"#);
|
||||
buffer.state.write(mb1.clone()).unwrap();
|
||||
|
||||
// Missing field `iv`
|
||||
let (_, mb2) = lp_to_mutable_batch(r#"foo,t1=aoeu uv=1u,fv=12.0,bv=false,sv="bye" 10000"#);
|
||||
buffer.state.write(mb2.clone()).unwrap();
|
||||
|
||||
let buffer: BufferState<Snapshot> = match buffer.snapshot() {
|
||||
Transition::Ok(v) => v,
|
||||
Transition::Unchanged(_) => panic!("failed to transition"),
|
||||
};
|
||||
|
||||
assert_eq!(buffer.get_query_data().len(), 1);
|
||||
|
||||
let snapshot = &buffer.get_query_data()[0];
|
||||
|
||||
// Generate the combined buffer from the original inputs to compare
|
||||
// against.
|
||||
mb1.extend_from(&mb2).unwrap();
|
||||
let want = mb1.to_arrow(Selection::All).unwrap();
|
||||
|
||||
assert_eq!(&**snapshot, &want);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
//! A write buffer.
|
||||
|
||||
use mutable_batch::MutableBatch;
|
||||
|
||||
use crate::data::partition::buffer::{mutable_buffer::Buffer, traits::Writeable};
|
||||
|
||||
use super::{snapshot::Snapshot, BufferState, Transition};
|
||||
|
||||
/// The FSM starting ingest state - a mutable buffer collecting writes.
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct Buffering {
|
||||
/// The buffer for incoming writes.
|
||||
///
|
||||
/// This buffer MAY be empty when no writes have occured since transitioning
|
||||
/// to this state.
|
||||
buffer: Buffer,
|
||||
}
|
||||
|
||||
impl Writeable for Buffering {
|
||||
fn write(&mut self, batch: MutableBatch) -> Result<(), mutable_batch::Error> {
|
||||
self.buffer.buffer_write(batch)
|
||||
}
|
||||
}
|
||||
|
||||
impl BufferState<Buffering> {
|
||||
/// Attempt to generate a snapshot from the data in this buffer.
|
||||
///
|
||||
/// This returns [`Transition::Unchanged`] if this buffer contains no data.
|
||||
pub(crate) fn snapshot(self) -> Transition<Snapshot, Buffering> {
|
||||
if self.state.buffer.is_empty() {
|
||||
// It is a logical error to snapshot an empty buffer.
|
||||
return Transition::unchanged(self);
|
||||
}
|
||||
|
||||
// Generate a snapshot from the buffer.
|
||||
let snap = self
|
||||
.state
|
||||
.buffer
|
||||
.snapshot()
|
||||
.expect("snapshot of non-empty buffer should succeed");
|
||||
|
||||
// And transition to the WithSnapshot state.
|
||||
Transition::ok(Snapshot::new(vec![snap]), self.sequence_range)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_empty_buffer_does_not_snapshot() {
|
||||
let b = BufferState::new();
|
||||
match b.snapshot() {
|
||||
Transition::Ok(_) => panic!("empty buffer should not transition to snapshot state"),
|
||||
Transition::Unchanged(_) => {
|
||||
// OK!
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
//! A writfield1 buffer, with one or more snapshots.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use mutable_batch::MutableBatch;
|
||||
|
||||
use crate::data::partition::buffer::{mutable_buffer::Buffer, traits::Writeable};
|
||||
|
||||
use super::{snapshot::Snapshot, BufferState};
|
||||
|
||||
/// A mutable state that buffers incoming writes while holding at least one
|
||||
/// previously generated buffer snapshot.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct BufferingWithSnapshot {
|
||||
/// The buffer for incoming writes.
|
||||
///
|
||||
/// NOTE: this buffer MAY be empty.
|
||||
buffer: Buffer,
|
||||
|
||||
/// Snapshots generated from previous buffer contents.
|
||||
///
|
||||
/// INVARIANT: this array is always non-empty.
|
||||
snapshots: Vec<Arc<RecordBatch>>,
|
||||
}
|
||||
|
||||
impl BufferingWithSnapshot {
|
||||
pub(super) fn new(snapshots: Vec<Arc<RecordBatch>>) -> Self {
|
||||
Self {
|
||||
buffer: Buffer::default(),
|
||||
snapshots,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Writeable for BufferingWithSnapshot {
|
||||
fn write(&mut self, batch: MutableBatch) -> Result<(), mutable_batch::Error> {
|
||||
// TODO(5806): assert schema compatibility with existing snapshots
|
||||
self.buffer.buffer_write(batch)
|
||||
}
|
||||
}
|
||||
|
||||
impl BufferState<BufferingWithSnapshot> {
|
||||
/// Snapshot the current buffer contents and transition to an immutable,
|
||||
/// queryable state containing only snapshots.
|
||||
///
|
||||
/// This call MAY be a NOP if the buffer has accrued no writes.
|
||||
pub(crate) fn snapshot(self) -> BufferState<Snapshot> {
|
||||
assert!(!self.state.snapshots.is_empty());
|
||||
|
||||
BufferState {
|
||||
state: Snapshot::new(
|
||||
self.state
|
||||
.snapshots
|
||||
.into_iter()
|
||||
.chain(self.state.buffer.snapshot())
|
||||
.collect(),
|
||||
),
|
||||
sequence_range: self.sequence_range,
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
//! A writfield1 buffer, with one or more snapshots.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
|
||||
use crate::data::partition::buffer::traits::Queryable;
|
||||
|
||||
use super::BufferState;
|
||||
|
||||
/// An immutable set of [`RecordBatch`] in the process of being persisted.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Persisting {
|
||||
/// Snapshots generated from previous buffer contents to be persisted.
|
||||
///
|
||||
/// INVARIANT: this array is always non-empty.
|
||||
snapshots: Vec<Arc<RecordBatch>>,
|
||||
}
|
||||
|
||||
impl Persisting {
|
||||
pub(super) fn new(snapshots: Vec<Arc<RecordBatch>>) -> Self {
|
||||
Self { snapshots }
|
||||
}
|
||||
}
|
||||
|
||||
impl Queryable for Persisting {
|
||||
fn get_query_data(&self) -> &[Arc<RecordBatch>] {
|
||||
&self.snapshots
|
||||
}
|
||||
}
|
||||
|
||||
impl BufferState<Persisting> {
|
||||
/// Consume `self`, returning the data it holds as a set of [`RecordBatch`].
|
||||
pub(super) fn into_data(self) -> Vec<Arc<RecordBatch>> {
|
||||
self.state.snapshots
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
//! A writfield1 buffer, with one or more snapshots.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
|
||||
use crate::data::partition::buffer::{state_machine::persisting::Persisting, traits::Queryable};
|
||||
|
||||
use super::{buffering_with_snapshot::BufferingWithSnapshot, BufferState};
|
||||
|
||||
/// An immutable, queryable FSM state containing at least one buffer snapshot.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Snapshot {
|
||||
/// Snapshots generated from previous buffer contents.
|
||||
///
|
||||
/// INVARIANT: this array is always non-empty.
|
||||
snapshots: Vec<Arc<RecordBatch>>,
|
||||
}
|
||||
|
||||
impl Snapshot {
|
||||
pub(super) fn new(snapshots: Vec<Arc<RecordBatch>>) -> Self {
|
||||
assert!(!snapshots.is_empty());
|
||||
Self { snapshots }
|
||||
}
|
||||
}
|
||||
|
||||
impl Queryable for Snapshot {
|
||||
fn get_query_data(&self) -> &[Arc<RecordBatch>] {
|
||||
&self.snapshots
|
||||
}
|
||||
}
|
||||
|
||||
impl BufferState<Snapshot> {
|
||||
pub(crate) fn into_buffering(self) -> BufferState<BufferingWithSnapshot> {
|
||||
assert!(!self.state.snapshots.is_empty());
|
||||
BufferState {
|
||||
state: BufferingWithSnapshot::new(self.state.snapshots),
|
||||
sequence_range: self.sequence_range,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn into_persisting(self) -> BufferState<Persisting> {
|
||||
assert!(!self.state.snapshots.is_empty());
|
||||
BufferState {
|
||||
state: Persisting::new(self.state.snapshots),
|
||||
sequence_range: self.sequence_range,
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
//! Private traits for state machine states.
|
||||
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use mutable_batch::MutableBatch;
|
||||
|
||||
/// A state that can accept writes.
|
||||
pub(crate) trait Writeable: Debug {
|
||||
fn write(&mut self, batch: MutableBatch) -> Result<(), mutable_batch::Error>;
|
||||
}
|
||||
|
||||
/// A state that can return the contents of the buffer as one or more
|
||||
/// [`RecordBatch`] instances.
|
||||
pub(crate) trait Queryable: Debug {
|
||||
fn get_query_data(&self) -> &[Arc<RecordBatch>];
|
||||
}
|
|
@ -0,0 +1,141 @@
|
|||
use data_types::SequenceNumber;
|
||||
|
||||
/// A range of sequence numbers, both inclusive [min, max].
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone)]
|
||||
pub(crate) struct SequenceNumberRange {
|
||||
range: Option<(SequenceNumber, SequenceNumber)>,
|
||||
}
|
||||
|
||||
impl SequenceNumberRange {
|
||||
pub(crate) fn observe(&mut self, n: SequenceNumber) {
|
||||
self.range = Some(match self.range {
|
||||
Some((min, max)) => {
|
||||
assert!(n > max, "monotonicity violation");
|
||||
(min, n)
|
||||
}
|
||||
None => (n, n),
|
||||
});
|
||||
}
|
||||
|
||||
/// Returns the inclusive lower bound on [`SequenceNumber`] values observed.
|
||||
pub(crate) fn inclusive_min(&self) -> Option<SequenceNumber> {
|
||||
self.range.map(|v| v.0)
|
||||
}
|
||||
|
||||
/// Returns the inclusive upper bound on [`SequenceNumber`] values observed.
|
||||
pub(crate) fn inclusive_max(&self) -> Option<SequenceNumber> {
|
||||
self.range.map(|v| v.1)
|
||||
}
|
||||
|
||||
/// Merge two [`SequenceNumberRange`] instances, returning a new, merged
|
||||
/// instance.
|
||||
///
|
||||
/// The merge result contains the minimum of [`Self::inclusive_min()`] from
|
||||
/// each instance, and the maximum of [`Self::inclusive_max()`].
|
||||
///
|
||||
/// If both `self` and `other` contain no [`SequenceNumber`] observations,
|
||||
/// the returned instance contains no observations.
|
||||
pub(crate) fn merge(&self, other: &Self) -> Self {
|
||||
let merged_range = self
|
||||
.range
|
||||
.into_iter()
|
||||
.chain(other.range.clone())
|
||||
.reduce(|a, b| (a.0.min(b.0), a.1.max(b.1)));
|
||||
|
||||
Self {
|
||||
range: merged_range,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_ranges() {
|
||||
let mut r = SequenceNumberRange::default();
|
||||
|
||||
r.observe(SequenceNumber::new(0));
|
||||
r.observe(SequenceNumber::new(2));
|
||||
r.observe(SequenceNumber::new(3));
|
||||
|
||||
assert_eq!(r.inclusive_min(), Some(SequenceNumber::new(0)));
|
||||
assert_eq!(r.inclusive_max(), Some(SequenceNumber::new(3)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic = "monotonicity violation"]
|
||||
fn test_monotonicity() {
|
||||
let mut r = SequenceNumberRange::default();
|
||||
|
||||
r.observe(SequenceNumber::new(1));
|
||||
r.observe(SequenceNumber::new(3));
|
||||
r.observe(SequenceNumber::new(2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic = "monotonicity violation"]
|
||||
fn test_exactly_once() {
|
||||
let mut r = SequenceNumberRange::default();
|
||||
|
||||
r.observe(SequenceNumber::new(1));
|
||||
r.observe(SequenceNumber::new(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge() {
|
||||
let mut a = SequenceNumberRange::default();
|
||||
let mut b = SequenceNumberRange::default();
|
||||
|
||||
a.observe(SequenceNumber::new(4));
|
||||
b.observe(SequenceNumber::new(2));
|
||||
|
||||
let a_b = a.merge(&b);
|
||||
assert_eq!(a_b.inclusive_min(), Some(SequenceNumber::new(2)));
|
||||
assert_eq!(a_b.inclusive_max(), Some(SequenceNumber::new(4)));
|
||||
|
||||
let b_a = b.merge(&a);
|
||||
assert_eq!(b_a.inclusive_min(), Some(SequenceNumber::new(2)));
|
||||
assert_eq!(b_a.inclusive_max(), Some(SequenceNumber::new(4)));
|
||||
|
||||
assert_eq!(a_b, b_a);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_half_empty() {
|
||||
let mut a = SequenceNumberRange::default();
|
||||
let b = SequenceNumberRange::default();
|
||||
|
||||
a.observe(SequenceNumber::new(4));
|
||||
// B observes nothing
|
||||
|
||||
let a_b = a.merge(&b);
|
||||
assert_eq!(a_b.inclusive_min(), Some(SequenceNumber::new(4)));
|
||||
assert_eq!(a_b.inclusive_max(), Some(SequenceNumber::new(4)));
|
||||
|
||||
let b_a = b.merge(&a);
|
||||
assert_eq!(b_a.inclusive_min(), Some(SequenceNumber::new(4)));
|
||||
assert_eq!(b_a.inclusive_max(), Some(SequenceNumber::new(4)));
|
||||
|
||||
assert_eq!(a_b, b_a);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_merge_both_empty() {
|
||||
let a = SequenceNumberRange::default();
|
||||
let b = SequenceNumberRange::default();
|
||||
|
||||
// Neither observe anything
|
||||
|
||||
let a_b = a.merge(&b);
|
||||
assert_eq!(a_b.inclusive_min(), None);
|
||||
assert_eq!(a_b.inclusive_max(), None);
|
||||
|
||||
let b_a = b.merge(&a);
|
||||
assert_eq!(b_a.inclusive_min(), None);
|
||||
assert_eq!(b_a.inclusive_max(), None);
|
||||
|
||||
assert_eq!(a_b, b_a);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue