refactor: disable incremental snapshot generation

This commit removes the on-demand, incremental snapshot generation
driven by queries.

This functionality is "on hold" due to concerns documented in:

    https://github.com/influxdata/influxdb_iox/issues/5805

Incremental snapshots will be introduced alongside incremental
compactions of those same snapshots.
pull/24376/head
Dom Dwyer 2022-10-21 17:39:22 +02:00
parent 2173f0a400
commit 7b3fa43209
7 changed files with 73 additions and 125 deletions

View File

@ -50,4 +50,8 @@ impl Buffer {
pub(super) fn is_empty(&self) -> bool {
self.buffer.is_none()
}
pub(super) fn buffer(&self) -> Option<&MutableBatch> {
self.buffer.as_ref()
}
}

View File

@ -6,14 +6,11 @@ use data_types::SequenceNumber;
use mutable_batch::MutableBatch;
mod buffering;
mod buffering_with_snapshot;
mod persisting;
mod snapshot;
pub(in crate::data::partition::buffer) use buffering::*;
pub(in crate::data::partition::buffer) use buffering_with_snapshot::*;
pub(crate) use persisting::*;
pub(in crate::data::partition::buffer) use snapshot::*;
use crate::data::SequenceNumberRange;
@ -54,19 +51,14 @@ impl<A, B> Transition<A, B> {
/// lifecycle within a partition buffer:
///
/// ```text
/// ┌──────────────┐
/// │ Buffering │
/// └───────┬──────┘
/// │
/// ▼
/// ┌ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─
/// ┌─────▶ Snapshot ├─────▶ Persisting │
/// │ └ ─ ─ ─ ┬ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ─
/// │ │
/// │ ▼
/// │ ┌───────────────────────┐
/// └──│ BufferingWithSnapshot │
/// └───────────────────────┘
/// ┌──────────────┐
/// │ Buffering │
/// └───────┬──────┘
/// │
/// ▼
/// ┌ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─
/// Snapshot ├─────▶ Persisting │
/// └ ─ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ─
/// ```
///
/// Boxes with dashed lines indicate immutable, queryable states that contain
@ -131,13 +123,14 @@ where
/// Returns the current buffer data.
///
/// This is always a cheap method call.
fn get_query_data(&self) -> &[Arc<RecordBatch>] {
fn get_query_data(&self) -> Vec<Arc<RecordBatch>> {
self.state.get_query_data()
}
}
#[cfg(test)]
mod tests {
use snapshot::*;
use std::ops::Deref;
use arrow_util::assert_batches_eq;
@ -158,17 +151,14 @@ mod tests {
// Write some data to a buffer.
buffer
.write(
lp_to_mutable_batch(r#"bananas,tag=platanos great=true 668563242000000042"#).1,
lp_to_mutable_batch(
r#"bananas,tag=platanos great=true,how_much=42 668563242000000042"#,
)
.1,
SequenceNumber::new(0),
)
.expect("write to empty buffer should succeed");
// Snapshot the buffer into an immutable, queryable data format.
let buffer: BufferState<Snapshot> = match buffer.snapshot() {
Transition::Ok(v) => v,
Transition::Unchanged(_) => panic!("did not transition to snapshot state"),
};
// Extract the queryable data from the buffer and validate it.
//
// Keep the data to validate they are ref-counted copies after further
@ -177,23 +167,19 @@ mod tests {
let w1_data = buffer.get_query_data().to_owned();
let expected = vec![
"+-------+----------+--------------------------------+",
"| great | tag | time |",
"+-------+----------+--------------------------------+",
"| true | platanos | 1991-03-10T00:00:42.000000042Z |",
"+-------+----------+--------------------------------+",
"+-------+----------+----------+--------------------------------+",
"| great | how_much | tag | time |",
"+-------+----------+----------+--------------------------------+",
"| true | 42 | platanos | 1991-03-10T00:00:42.000000042Z |",
"+-------+----------+----------+--------------------------------+",
];
assert_batches_eq!(&expected, &[w1_data[0].deref().clone()]);
// Transition the buffer into a mutable state in which it can accept
// writes.
let mut buffer: BufferState<BufferingWithSnapshot> = buffer.into_buffering();
// Apply another write.
buffer
.write(
lp_to_mutable_batch(
r#"bananas,tag=platanos great=true,how_much=1000 668563242000000042"#,
r#"bananas,tag=platanos great=true,how_much=1000 668563242000000043"#,
)
.1,
SequenceNumber::new(1),
@ -201,24 +187,23 @@ mod tests {
.expect("write to empty buffer should succeed");
// Snapshot the buffer into an immutable, queryable data format.
let buffer: BufferState<Snapshot> = buffer.snapshot();
let buffer: BufferState<Snapshot> = match buffer.snapshot() {
Transition::Ok(v) => v,
Transition::Unchanged(_) => panic!("did not transition to snapshot state"),
};
// Verify the second write was buffered.
// Verify the writes are still queryable.
let w2_data = buffer.get_query_data().to_owned();
let expected = vec![
"+-------+----------+----------+--------------------------------+",
"| great | how_much | tag | time |",
"+-------+----------+----------+--------------------------------+",
"| true | 1000 | platanos | 1991-03-10T00:00:42.000000042Z |",
"| true | 42 | platanos | 1991-03-10T00:00:42.000000042Z |",
"| true | 1000 | platanos | 1991-03-10T00:00:42.000000043Z |",
"+-------+----------+----------+--------------------------------+",
];
assert_batches_eq!(&expected, &[w2_data[1].deref().clone()]);
// Verify the first write has not changed, and has not been
// re-ordered.
assert_eq!(w1_data, w2_data[..1]);
// Furthermore, ensure no data was actually copied
assert!(Arc::ptr_eq(&w1_data[0], &w2_data[0]));
assert_eq!(w2_data.len(), 1);
assert_batches_eq!(&expected, &[w2_data[0].deref().clone()]);
// Ensure the same data is returned for a second read.
{

View File

@ -1,8 +1,15 @@
//! A write buffer.
use mutable_batch::MutableBatch;
use std::sync::Arc;
use crate::data::partition::buffer::{mutable_buffer::Buffer, traits::Writeable};
use arrow::record_batch::RecordBatch;
use mutable_batch::MutableBatch;
use schema::selection::Selection;
use crate::data::partition::buffer::{
mutable_buffer::Buffer,
traits::{Queryable, Writeable},
};
use super::{snapshot::Snapshot, BufferState, Transition};
@ -16,6 +23,29 @@ pub(crate) struct Buffering {
buffer: Buffer,
}
/// Implement on-demand querying of the buffered contents without storing the
/// generated snapshot.
///
/// In the future this [`Queryable`] should NOT be implemented for
/// [`Buffering`], and instead snapshots should be incrementally generated and
/// compacted. See https://github.com/influxdata/influxdb_iox/issues/5805 for
/// context.
impl Queryable for Buffering {
fn get_query_data(&self) -> Vec<Arc<RecordBatch>> {
let data = self.buffer.buffer().map(|v| {
Arc::new(
v.to_arrow(Selection::All)
.expect("failed to snapshot buffer data"),
)
});
match data {
Some(v) => vec![v],
None => vec![],
}
}
}
impl Writeable for Buffering {
fn write(&mut self, batch: MutableBatch) -> Result<(), mutable_batch::Error> {
self.buffer.buffer_write(batch)

View File

@ -1,62 +0,0 @@
//! A mutable buffer, containing one or more snapshots.
use std::sync::Arc;
use arrow::record_batch::RecordBatch;
use mutable_batch::MutableBatch;
use crate::data::partition::buffer::{mutable_buffer::Buffer, traits::Writeable};
use super::{snapshot::Snapshot, BufferState};
/// A mutable state that buffers incoming writes while holding at least one
/// previously generated buffer snapshot.
#[derive(Debug)]
pub(crate) struct BufferingWithSnapshot {
/// The buffer for incoming writes.
///
/// NOTE: this buffer MAY be empty.
buffer: Buffer,
/// Snapshots generated from previous buffer contents.
///
/// INVARIANT: this array is always non-empty.
snapshots: Vec<Arc<RecordBatch>>,
}
impl BufferingWithSnapshot {
pub(super) fn new(snapshots: Vec<Arc<RecordBatch>>) -> Self {
Self {
buffer: Buffer::default(),
snapshots,
}
}
}
impl Writeable for BufferingWithSnapshot {
fn write(&mut self, batch: MutableBatch) -> Result<(), mutable_batch::Error> {
// TODO(5806): assert schema compatibility with existing snapshots
self.buffer.buffer_write(batch)
}
}
impl BufferState<BufferingWithSnapshot> {
/// Snapshot the current buffer contents and transition to an immutable,
/// queryable state containing only snapshots.
///
/// This call MAY be a NOP if the buffer has accrued no writes.
pub(crate) fn snapshot(self) -> BufferState<Snapshot> {
assert!(!self.state.snapshots.is_empty());
BufferState {
state: Snapshot::new(
self.state
.snapshots
.into_iter()
.chain(self.state.buffer.snapshot())
.collect(),
),
sequence_range: self.sequence_range,
}
}
}

View File

@ -1,4 +1,4 @@
//! A buffer in the "persisting" state, containing one or more snapshots.
//! A writfield1 buffer, with one or more snapshots.
use std::sync::Arc;
@ -24,8 +24,8 @@ impl Persisting {
}
impl Queryable for Persisting {
fn get_query_data(&self) -> &[Arc<RecordBatch>] {
&self.snapshots
fn get_query_data(&self) -> Vec<Arc<RecordBatch>> {
self.snapshots.clone()
}
}

View File

@ -1,5 +1,4 @@
//! An immutable buffer, containing one or more snapshots in an efficient query
//! format.
//! A writfield1 buffer, with one or more snapshots.
use std::sync::Arc;
@ -7,7 +6,7 @@ use arrow::record_batch::RecordBatch;
use crate::data::partition::buffer::{state_machine::persisting::Persisting, traits::Queryable};
use super::{buffering_with_snapshot::BufferingWithSnapshot, BufferState};
use super::BufferState;
/// An immutable, queryable FSM state containing at least one buffer snapshot.
#[derive(Debug)]
@ -26,20 +25,12 @@ impl Snapshot {
}
impl Queryable for Snapshot {
fn get_query_data(&self) -> &[Arc<RecordBatch>] {
&self.snapshots
fn get_query_data(&self) -> Vec<Arc<RecordBatch>> {
self.snapshots.clone()
}
}
impl BufferState<Snapshot> {
pub(crate) fn into_buffering(self) -> BufferState<BufferingWithSnapshot> {
assert!(!self.state.snapshots.is_empty());
BufferState {
state: BufferingWithSnapshot::new(self.state.snapshots),
sequence_range: self.sequence_range,
}
}
pub(crate) fn into_persisting(self) -> BufferState<Persisting> {
assert!(!self.state.snapshots.is_empty());
BufferState {

View File

@ -13,5 +13,5 @@ pub(crate) trait Writeable: Debug {
/// A state that can return the contents of the buffer as one or more
/// [`RecordBatch`] instances.
pub(crate) trait Queryable: Debug {
fn get_query_data(&self) -> &[Arc<RecordBatch>];
fn get_query_data(&self) -> Vec<Arc<RecordBatch>>;
}