diff --git a/ingester/src/data/partition/buffer/mutable_buffer.rs b/ingester/src/data/partition/buffer/mutable_buffer.rs index cd607a37bd..6b8ed74ae3 100644 --- a/ingester/src/data/partition/buffer/mutable_buffer.rs +++ b/ingester/src/data/partition/buffer/mutable_buffer.rs @@ -50,4 +50,8 @@ impl Buffer { pub(super) fn is_empty(&self) -> bool { self.buffer.is_none() } + + pub(super) fn buffer(&self) -> Option<&MutableBatch> { + self.buffer.as_ref() + } } diff --git a/ingester/src/data/partition/buffer/state_machine.rs b/ingester/src/data/partition/buffer/state_machine.rs index 24c73a1538..278d0384f9 100644 --- a/ingester/src/data/partition/buffer/state_machine.rs +++ b/ingester/src/data/partition/buffer/state_machine.rs @@ -6,14 +6,11 @@ use data_types::SequenceNumber; use mutable_batch::MutableBatch; mod buffering; -mod buffering_with_snapshot; mod persisting; mod snapshot; pub(in crate::data::partition::buffer) use buffering::*; -pub(in crate::data::partition::buffer) use buffering_with_snapshot::*; pub(crate) use persisting::*; -pub(in crate::data::partition::buffer) use snapshot::*; use crate::data::SequenceNumberRange; @@ -54,19 +51,14 @@ impl Transition { /// lifecycle within a partition buffer: /// /// ```text -/// ┌──────────────┐ -/// │ Buffering │ -/// └───────┬──────┘ -/// │ -/// ▼ -/// ┌ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ -/// ┌─────▶ Snapshot ├─────▶ Persisting │ -/// │ └ ─ ─ ─ ┬ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ─ -/// │ │ -/// │ ▼ -/// │ ┌───────────────────────┐ -/// └──│ BufferingWithSnapshot │ -/// └───────────────────────┘ +/// ┌──────────────┐ +/// │ Buffering │ +/// └───────┬──────┘ +/// │ +/// ▼ +/// ┌ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ +/// Snapshot ├─────▶ Persisting │ +/// └ ─ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ─ /// ``` /// /// Boxes with dashed lines indicate immutable, queryable states that contain @@ -131,13 +123,14 @@ where /// Returns the current buffer data. /// /// This is always a cheap method call. - fn get_query_data(&self) -> &[Arc] { + fn get_query_data(&self) -> Vec> { self.state.get_query_data() } } #[cfg(test)] mod tests { + use snapshot::*; use std::ops::Deref; use arrow_util::assert_batches_eq; @@ -158,17 +151,14 @@ mod tests { // Write some data to a buffer. buffer .write( - lp_to_mutable_batch(r#"bananas,tag=platanos great=true 668563242000000042"#).1, + lp_to_mutable_batch( + r#"bananas,tag=platanos great=true,how_much=42 668563242000000042"#, + ) + .1, SequenceNumber::new(0), ) .expect("write to empty buffer should succeed"); - // Snapshot the buffer into an immutable, queryable data format. - let buffer: BufferState = match buffer.snapshot() { - Transition::Ok(v) => v, - Transition::Unchanged(_) => panic!("did not transition to snapshot state"), - }; - // Extract the queryable data from the buffer and validate it. // // Keep the data to validate they are ref-counted copies after further @@ -177,23 +167,19 @@ mod tests { let w1_data = buffer.get_query_data().to_owned(); let expected = vec![ - "+-------+----------+--------------------------------+", - "| great | tag | time |", - "+-------+----------+--------------------------------+", - "| true | platanos | 1991-03-10T00:00:42.000000042Z |", - "+-------+----------+--------------------------------+", + "+-------+----------+----------+--------------------------------+", + "| great | how_much | tag | time |", + "+-------+----------+----------+--------------------------------+", + "| true | 42 | platanos | 1991-03-10T00:00:42.000000042Z |", + "+-------+----------+----------+--------------------------------+", ]; assert_batches_eq!(&expected, &[w1_data[0].deref().clone()]); - // Transition the buffer into a mutable state in which it can accept - // writes. - let mut buffer: BufferState = buffer.into_buffering(); - // Apply another write. buffer .write( lp_to_mutable_batch( - r#"bananas,tag=platanos great=true,how_much=1000 668563242000000042"#, + r#"bananas,tag=platanos great=true,how_much=1000 668563242000000043"#, ) .1, SequenceNumber::new(1), @@ -201,24 +187,23 @@ mod tests { .expect("write to empty buffer should succeed"); // Snapshot the buffer into an immutable, queryable data format. - let buffer: BufferState = buffer.snapshot(); + let buffer: BufferState = match buffer.snapshot() { + Transition::Ok(v) => v, + Transition::Unchanged(_) => panic!("did not transition to snapshot state"), + }; - // Verify the second write was buffered. + // Verify the writes are still queryable. let w2_data = buffer.get_query_data().to_owned(); let expected = vec![ "+-------+----------+----------+--------------------------------+", "| great | how_much | tag | time |", "+-------+----------+----------+--------------------------------+", - "| true | 1000 | platanos | 1991-03-10T00:00:42.000000042Z |", + "| true | 42 | platanos | 1991-03-10T00:00:42.000000042Z |", + "| true | 1000 | platanos | 1991-03-10T00:00:42.000000043Z |", "+-------+----------+----------+--------------------------------+", ]; - assert_batches_eq!(&expected, &[w2_data[1].deref().clone()]); - - // Verify the first write has not changed, and has not been - // re-ordered. - assert_eq!(w1_data, w2_data[..1]); - // Furthermore, ensure no data was actually copied - assert!(Arc::ptr_eq(&w1_data[0], &w2_data[0])); + assert_eq!(w2_data.len(), 1); + assert_batches_eq!(&expected, &[w2_data[0].deref().clone()]); // Ensure the same data is returned for a second read. { diff --git a/ingester/src/data/partition/buffer/state_machine/buffering.rs b/ingester/src/data/partition/buffer/state_machine/buffering.rs index 85ab01bbcd..5faac058fa 100644 --- a/ingester/src/data/partition/buffer/state_machine/buffering.rs +++ b/ingester/src/data/partition/buffer/state_machine/buffering.rs @@ -1,8 +1,15 @@ //! A write buffer. -use mutable_batch::MutableBatch; +use std::sync::Arc; -use crate::data::partition::buffer::{mutable_buffer::Buffer, traits::Writeable}; +use arrow::record_batch::RecordBatch; +use mutable_batch::MutableBatch; +use schema::selection::Selection; + +use crate::data::partition::buffer::{ + mutable_buffer::Buffer, + traits::{Queryable, Writeable}, +}; use super::{snapshot::Snapshot, BufferState, Transition}; @@ -16,6 +23,29 @@ pub(crate) struct Buffering { buffer: Buffer, } +/// Implement on-demand querying of the buffered contents without storing the +/// generated snapshot. +/// +/// In the future this [`Queryable`] should NOT be implemented for +/// [`Buffering`], and instead snapshots should be incrementally generated and +/// compacted. See https://github.com/influxdata/influxdb_iox/issues/5805 for +/// context. +impl Queryable for Buffering { + fn get_query_data(&self) -> Vec> { + let data = self.buffer.buffer().map(|v| { + Arc::new( + v.to_arrow(Selection::All) + .expect("failed to snapshot buffer data"), + ) + }); + + match data { + Some(v) => vec![v], + None => vec![], + } + } +} + impl Writeable for Buffering { fn write(&mut self, batch: MutableBatch) -> Result<(), mutable_batch::Error> { self.buffer.buffer_write(batch) diff --git a/ingester/src/data/partition/buffer/state_machine/buffering_with_snapshot.rs b/ingester/src/data/partition/buffer/state_machine/buffering_with_snapshot.rs deleted file mode 100644 index 4b2af6d379..0000000000 --- a/ingester/src/data/partition/buffer/state_machine/buffering_with_snapshot.rs +++ /dev/null @@ -1,62 +0,0 @@ -//! A mutable buffer, containing one or more snapshots. - -use std::sync::Arc; - -use arrow::record_batch::RecordBatch; -use mutable_batch::MutableBatch; - -use crate::data::partition::buffer::{mutable_buffer::Buffer, traits::Writeable}; - -use super::{snapshot::Snapshot, BufferState}; - -/// A mutable state that buffers incoming writes while holding at least one -/// previously generated buffer snapshot. -#[derive(Debug)] -pub(crate) struct BufferingWithSnapshot { - /// The buffer for incoming writes. - /// - /// NOTE: this buffer MAY be empty. - buffer: Buffer, - - /// Snapshots generated from previous buffer contents. - /// - /// INVARIANT: this array is always non-empty. - snapshots: Vec>, -} - -impl BufferingWithSnapshot { - pub(super) fn new(snapshots: Vec>) -> Self { - Self { - buffer: Buffer::default(), - snapshots, - } - } -} - -impl Writeable for BufferingWithSnapshot { - fn write(&mut self, batch: MutableBatch) -> Result<(), mutable_batch::Error> { - // TODO(5806): assert schema compatibility with existing snapshots - self.buffer.buffer_write(batch) - } -} - -impl BufferState { - /// Snapshot the current buffer contents and transition to an immutable, - /// queryable state containing only snapshots. - /// - /// This call MAY be a NOP if the buffer has accrued no writes. - pub(crate) fn snapshot(self) -> BufferState { - assert!(!self.state.snapshots.is_empty()); - - BufferState { - state: Snapshot::new( - self.state - .snapshots - .into_iter() - .chain(self.state.buffer.snapshot()) - .collect(), - ), - sequence_range: self.sequence_range, - } - } -} diff --git a/ingester/src/data/partition/buffer/state_machine/persisting.rs b/ingester/src/data/partition/buffer/state_machine/persisting.rs index 239747ff6b..925631e793 100644 --- a/ingester/src/data/partition/buffer/state_machine/persisting.rs +++ b/ingester/src/data/partition/buffer/state_machine/persisting.rs @@ -1,4 +1,4 @@ -//! A buffer in the "persisting" state, containing one or more snapshots. +//! A writfield1 buffer, with one or more snapshots. use std::sync::Arc; @@ -24,8 +24,8 @@ impl Persisting { } impl Queryable for Persisting { - fn get_query_data(&self) -> &[Arc] { - &self.snapshots + fn get_query_data(&self) -> Vec> { + self.snapshots.clone() } } diff --git a/ingester/src/data/partition/buffer/state_machine/snapshot.rs b/ingester/src/data/partition/buffer/state_machine/snapshot.rs index dd82501c29..85093254b9 100644 --- a/ingester/src/data/partition/buffer/state_machine/snapshot.rs +++ b/ingester/src/data/partition/buffer/state_machine/snapshot.rs @@ -1,5 +1,4 @@ -//! An immutable buffer, containing one or more snapshots in an efficient query -//! format. +//! A writfield1 buffer, with one or more snapshots. use std::sync::Arc; @@ -7,7 +6,7 @@ use arrow::record_batch::RecordBatch; use crate::data::partition::buffer::{state_machine::persisting::Persisting, traits::Queryable}; -use super::{buffering_with_snapshot::BufferingWithSnapshot, BufferState}; +use super::BufferState; /// An immutable, queryable FSM state containing at least one buffer snapshot. #[derive(Debug)] @@ -26,20 +25,12 @@ impl Snapshot { } impl Queryable for Snapshot { - fn get_query_data(&self) -> &[Arc] { - &self.snapshots + fn get_query_data(&self) -> Vec> { + self.snapshots.clone() } } impl BufferState { - pub(crate) fn into_buffering(self) -> BufferState { - assert!(!self.state.snapshots.is_empty()); - BufferState { - state: BufferingWithSnapshot::new(self.state.snapshots), - sequence_range: self.sequence_range, - } - } - pub(crate) fn into_persisting(self) -> BufferState { assert!(!self.state.snapshots.is_empty()); BufferState { diff --git a/ingester/src/data/partition/buffer/traits.rs b/ingester/src/data/partition/buffer/traits.rs index bff6dfda8f..24241910b6 100644 --- a/ingester/src/data/partition/buffer/traits.rs +++ b/ingester/src/data/partition/buffer/traits.rs @@ -13,5 +13,5 @@ pub(crate) trait Writeable: Debug { /// A state that can return the contents of the buffer as one or more /// [`RecordBatch`] instances. pub(crate) trait Queryable: Debug { - fn get_query_data(&self) -> &[Arc]; + fn get_query_data(&self) -> Vec>; }