diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index b76efaae63..a0dbd14e59 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -1593,7 +1593,7 @@ impl TimestampRange { /// /// Note this differs subtlety (but critically) from a /// [`TimestampRange`] as the minimum and maximum values are included ([`TimestampRange`] has an exclusive end). -#[derive(Clone, Debug, Copy)] +#[derive(Clone, Debug, Copy, PartialEq)] pub struct TimestampMinMax { /// The minimum timestamp value pub min: i64, diff --git a/ingester/src/buffer_tree/partition.rs b/ingester/src/buffer_tree/partition.rs index 856204dd0c..ce3898ec57 100644 --- a/ingester/src/buffer_tree/partition.rs +++ b/ingester/src/buffer_tree/partition.rs @@ -4,7 +4,7 @@ use std::{collections::VecDeque, sync::Arc}; use data_types::{ sequence_number_set::SequenceNumberSet, NamespaceId, PartitionHashId, PartitionId, - PartitionKey, SequenceNumber, TableId, TransitionPartitionId, + PartitionKey, SequenceNumber, TableId, TimestampMinMax, TransitionPartitionId, }; use mutable_batch::MutableBatch; use observability_deps::tracing::*; @@ -156,6 +156,50 @@ impl PartitionData { self.buffer.persist_cost_estimate() } + /// Returns the number of rows currently buffered in this [`PartitionData`]. + /// + /// The returned value will always match the row count of the data returned + /// by a subsequent call to [`PartitionData::get_query_data()`], without any + /// column projection applied. + /// + /// This value is inclusive of "hot" buffered data, and all currently + /// persisting data. + /// + /// This is an `O(n)` operation where `n` is the number of currently + /// persisting batches, plus 1 for the "hot" buffer. Reading the row count + /// of each batch is `O(1)`. This method is expected to be fast. + pub(crate) fn rows(&self) -> usize { + self.persisting.iter().map(|(_, v)| v.rows()).sum::() + self.buffer.rows() + } + + /// Return the timestamp min/max values for the data contained within this + /// [`PartitionData`]. + /// + /// The returned value will always match the timestamp summary values of the + /// data returned by a subsequent call to + /// [`PartitionData::get_query_data()`] iff the projection provided to the + /// call includes the timestamp column (returns pre-projection value). + /// + /// This value is inclusive of "hot" buffered data, and all currently + /// persisting data. + /// + /// This is an `O(n)` operation where `n` is the number of currently + /// persisting batches, plus 1 for the "hot" buffer, Reading the timestamp + /// statistics for each batch is `O(1)`. This method is expected to be fast. + pub(crate) fn timestamp_stats(&self) -> Option { + self.persisting + .iter() + .map(|(_, v)| { + v.timestamp_stats() + .expect("persisting batches must be non-empty") + }) + .chain(self.buffer.timestamp_stats()) + .reduce(|acc, v| TimestampMinMax { + min: acc.min.min(v.min), + max: acc.max.max(v.max), + }) + } + /// Return all data for this partition, ordered by the calls to /// [`PartitionData::buffer_write()`]. pub(crate) fn get_query_data(&mut self, projection: &OwnedProjection) -> Option { @@ -185,6 +229,7 @@ impl PartitionData { ); if data.is_empty() { + debug_assert_eq!(self.rows(), 0); return None; } @@ -194,11 +239,32 @@ impl PartitionData { // is upheld by the FSM, which ensures only non-empty snapshots / // RecordBatch are generated. Because `data` contains at least one // RecordBatch, this invariant holds. - Some(QueryAdaptor::new( - self.partition_id, - self.transition_partition_id(), - data, - )) + let q = QueryAdaptor::new(self.partition_id, self.transition_partition_id(), data); + + // Invariant: the number of rows returned in a query MUST always match + // the row count reported by the rows() method. + // + // The row count is never affected by projection. + debug_assert_eq!(q.num_rows(), self.rows() as u64); + + // Invariant: the timestamp min/max MUST match the values reported by + // timestamp_stats(), iff the projection contains the "time" column. + #[cfg(debug_assertions)] + { + if projection + .columns() + .map(|v| v.iter().any(|v| v == schema::TIME_COLUMN_NAME)) + .unwrap_or(true) + { + assert_eq!(q.ts_min_max(), self.timestamp_stats()); + } else { + // Otherwise the timestamp summary in the query response MUST be + // empty. + assert_eq!(q.ts_min_max(), None); + } + } + + Some(q) } /// Snapshot and mark all buffered data as persisting. diff --git a/ingester/src/buffer_tree/partition/buffer.rs b/ingester/src/buffer_tree/partition/buffer.rs index 91cc13cd44..ad72df4fb3 100644 --- a/ingester/src/buffer_tree/partition/buffer.rs +++ b/ingester/src/buffer_tree/partition/buffer.rs @@ -1,5 +1,5 @@ use arrow::record_batch::RecordBatch; -use data_types::SequenceNumber; +use data_types::{SequenceNumber, TimestampMinMax}; use mutable_batch::MutableBatch; mod always_some; @@ -74,6 +74,20 @@ impl DataBuffer { }) } + /// Return the row count for this buffer. + pub(crate) fn rows(&self) -> usize { + match self.0.get() { + FsmState::Buffering(v) => v.rows(), + } + } + + /// Return the timestamp min/max values, if this buffer contains data. + pub(crate) fn timestamp_stats(&self) -> Option { + match self.0.get() { + FsmState::Buffering(v) => v.timestamp_stats(), + } + } + // Deconstruct the [`DataBuffer`] into the underlying FSM in a // [`Persisting`] state, if the buffer contains any data. pub(crate) fn into_persisting(self) -> Option> { diff --git a/ingester/src/buffer_tree/partition/buffer/mutable_buffer.rs b/ingester/src/buffer_tree/partition/buffer/mutable_buffer.rs index 1d2ef1d581..1fe45b8e24 100644 --- a/ingester/src/buffer_tree/partition/buffer/mutable_buffer.rs +++ b/ingester/src/buffer_tree/partition/buffer/mutable_buffer.rs @@ -49,6 +49,8 @@ impl Buffer { self.buffer.is_none() } + /// Returns the underlying buffer if this [`Buffer`] contains data, + /// otherwise returns [`None`]. pub(super) fn buffer(&self) -> Option<&MutableBatch> { self.buffer.as_ref() } diff --git a/ingester/src/buffer_tree/partition/buffer/state_machine.rs b/ingester/src/buffer_tree/partition/buffer/state_machine.rs index f271f8291b..451edc2eca 100644 --- a/ingester/src/buffer_tree/partition/buffer/state_machine.rs +++ b/ingester/src/buffer_tree/partition/buffer/state_machine.rs @@ -1,6 +1,6 @@ #![allow(dead_code)] use arrow::record_batch::RecordBatch; -use data_types::{sequence_number_set::SequenceNumberSet, SequenceNumber}; +use data_types::{sequence_number_set::SequenceNumberSet, SequenceNumber, TimestampMinMax}; use mutable_batch::MutableBatch; mod buffering; @@ -125,6 +125,14 @@ where fn get_query_data(&self, projection: &OwnedProjection) -> Vec { self.state.get_query_data(projection) } + + fn rows(&self) -> usize { + self.state.rows() + } + + fn timestamp_stats(&self) -> Option { + self.state.timestamp_stats() + } } #[cfg(test)] @@ -132,6 +140,7 @@ mod tests { use std::sync::Arc; use arrow_util::assert_batches_eq; + use assert_matches::assert_matches; use mutable_batch_lp::test_helpers::lp_to_mutable_batch; use schema::Projection; use snapshot::*; @@ -403,4 +412,72 @@ mod tests { assert!(!set.contains(SequenceNumber::new(12))); assert_eq!(set.len(), 1); } + + /// Assert the summary statistics are correctly computed across various FSM + /// states. + #[test] + fn test_summary_statistics() { + let mut buffer: BufferState = BufferState::new(); + + // Write some data to a buffer. + buffer + .write( + lp_to_mutable_batch( + r#"bananas,tag=platanos great=true,how_much=42 668563242000000042"#, + ) + .1, + SequenceNumber::new(0), + ) + .expect("write to empty buffer should succeed"); + + assert_eq!(buffer.rows(), 1); + assert_eq!( + buffer.timestamp_stats(), + Some(TimestampMinMax { + min: 668563242000000042, + max: 668563242000000042 + }) + ); + + // Write another row to change the timestamp (it goes backwards!) + buffer + .write( + lp_to_mutable_batch(r#"bananas,tag=platanos great=true,how_much=42 42"#).1, + SequenceNumber::new(0), + ) + .unwrap(); + + assert_eq!(buffer.rows(), 2); + assert_eq!( + buffer.timestamp_stats(), + Some(TimestampMinMax { + min: 42, + max: 668563242000000042 + }) + ); + + // Transition to a snapshot. + let buffer = assert_matches!(buffer.snapshot(), Transition::Ok(v) => v); + + assert_eq!(buffer.rows(), 2); + assert_eq!( + buffer.timestamp_stats(), + Some(TimestampMinMax { + min: 42, + max: 668563242000000042 + }) + ); + + // Transition to persisting + let buffer = buffer.into_persisting(); + + assert_eq!(buffer.rows(), 2); + assert_eq!( + buffer.timestamp_stats(), + Some(TimestampMinMax { + min: 42, + max: 668563242000000042 + }) + ); + } } diff --git a/ingester/src/buffer_tree/partition/buffer/state_machine/buffering.rs b/ingester/src/buffer_tree/partition/buffer/state_machine/buffering.rs index c0e203b71e..af3daa2468 100644 --- a/ingester/src/buffer_tree/partition/buffer/state_machine/buffering.rs +++ b/ingester/src/buffer_tree/partition/buffer/state_machine/buffering.rs @@ -1,7 +1,9 @@ //! A write buffer. use arrow::record_batch::RecordBatch; -use mutable_batch::MutableBatch; +use data_types::{StatValues, TimestampMinMax}; +use mutable_batch::{column::ColumnData, MutableBatch}; +use schema::TIME_COLUMN_NAME; use super::{snapshot::Snapshot, BufferState, Transition}; use crate::{ @@ -41,6 +43,22 @@ impl Queryable for Buffering { .map(|v| vec![projection.project_mutable_batches(v)]) .unwrap_or_default() } + + fn rows(&self) -> usize { + self.buffer.buffer().map(|v| v.rows()).unwrap_or_default() + } + + fn timestamp_stats(&self) -> Option { + self.buffer + .buffer() + .map(extract_timestamp_summary) + // Safety: unwrapping the timestamp bounds is safe, as any non-empty + // buffer must contain timestamps. + .map(|v| TimestampMinMax { + min: v.min.unwrap(), + max: v.max.unwrap(), + }) + } } impl Writeable for Buffering { @@ -75,6 +93,18 @@ impl BufferState { } } +/// Perform an O(1) extraction of the timestamp column statistics. +fn extract_timestamp_summary(batch: &MutableBatch) -> &StatValues { + let col = batch + .column(TIME_COLUMN_NAME) + .expect("timestamps must exist for non-empty buffer"); + + match col.data() { + ColumnData::I64(_data, stats) => stats, + _ => unreachable!(), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/ingester/src/buffer_tree/partition/buffer/state_machine/persisting.rs b/ingester/src/buffer_tree/partition/buffer/state_machine/persisting.rs index 174f24c74d..5e3c35935b 100644 --- a/ingester/src/buffer_tree/partition/buffer/state_machine/persisting.rs +++ b/ingester/src/buffer_tree/partition/buffer/state_machine/persisting.rs @@ -1,7 +1,8 @@ //! A writfield1 buffer, with one or more snapshots. use arrow::record_batch::RecordBatch; -use data_types::sequence_number_set::SequenceNumberSet; +use data_types::{sequence_number_set::SequenceNumberSet, TimestampMinMax}; +use iox_query::util::compute_timenanosecond_min_max; use super::BufferState; use crate::{ @@ -15,11 +16,34 @@ pub(crate) struct Persisting { /// /// INVARIANT: this array is always non-empty. snapshots: Vec, + + /// Statistics describing the data in snapshots. + row_count: usize, + timestamp_stats: TimestampMinMax, } impl Persisting { - pub(super) fn new(snapshots: Vec) -> Self { - Self { snapshots } + pub(super) fn new( + snapshots: Vec, + row_count: usize, + timestamp_stats: TimestampMinMax, + ) -> Self { + // Invariant: the summary statistics provided must match the actual + // data. + debug_assert_eq!( + row_count, + snapshots.iter().map(|v| v.num_rows()).sum::() + ); + debug_assert_eq!( + timestamp_stats, + compute_timenanosecond_min_max(snapshots.iter()).unwrap() + ); + + Self { + snapshots, + row_count, + timestamp_stats, + } } } @@ -27,6 +51,14 @@ impl Queryable for Persisting { fn get_query_data(&self, projection: &OwnedProjection) -> Vec { projection.project_record_batch(&self.snapshots) } + + fn rows(&self) -> usize { + self.row_count + } + + fn timestamp_stats(&self) -> Option { + Some(self.timestamp_stats) + } } impl BufferState { diff --git a/ingester/src/buffer_tree/partition/buffer/state_machine/snapshot.rs b/ingester/src/buffer_tree/partition/buffer/state_machine/snapshot.rs index 05834357f4..20858afbe7 100644 --- a/ingester/src/buffer_tree/partition/buffer/state_machine/snapshot.rs +++ b/ingester/src/buffer_tree/partition/buffer/state_machine/snapshot.rs @@ -1,6 +1,8 @@ //! A writfield1 buffer, with one or more snapshots. use arrow::record_batch::RecordBatch; +use data_types::TimestampMinMax; +use iox_query::util::compute_timenanosecond_min_max; use super::BufferState; use crate::{ @@ -15,12 +17,26 @@ pub(crate) struct Snapshot { /// /// INVARIANT: this array is always non-empty. snapshots: Vec, + + /// Statistics describing the data in snapshots. + row_count: usize, + timestamp_stats: TimestampMinMax, } impl Snapshot { pub(super) fn new(snapshots: Vec) -> Self { assert!(!snapshots.is_empty()); - Self { snapshots } + + // Compute some summary statistics for query pruning/reuse later. + let row_count = snapshots.iter().map(|v| v.num_rows()).sum(); + let timestamp_stats = compute_timenanosecond_min_max(snapshots.iter()) + .expect("non-empty batch must contain timestamps"); + + Self { + snapshots, + row_count, + timestamp_stats, + } } } @@ -28,13 +44,25 @@ impl Queryable for Snapshot { fn get_query_data(&self, projection: &OwnedProjection) -> Vec { projection.project_record_batch(&self.snapshots) } + + fn rows(&self) -> usize { + self.row_count + } + + fn timestamp_stats(&self) -> Option { + Some(self.timestamp_stats) + } } impl BufferState { pub(crate) fn into_persisting(self) -> BufferState { assert!(!self.state.snapshots.is_empty()); BufferState { - state: Persisting::new(self.state.snapshots), + state: Persisting::new( + self.state.snapshots, + self.state.row_count, + self.state.timestamp_stats, + ), sequence_numbers: self.sequence_numbers, } } diff --git a/ingester/src/buffer_tree/partition/buffer/traits.rs b/ingester/src/buffer_tree/partition/buffer/traits.rs index 2f4dd4feb2..62390ddb4b 100644 --- a/ingester/src/buffer_tree/partition/buffer/traits.rs +++ b/ingester/src/buffer_tree/partition/buffer/traits.rs @@ -3,6 +3,7 @@ use std::fmt::Debug; use arrow::record_batch::RecordBatch; +use data_types::TimestampMinMax; use mutable_batch::MutableBatch; use crate::query::projection::OwnedProjection; @@ -15,5 +16,10 @@ pub(crate) trait Writeable: Debug { /// A state that can return the contents of the buffer as one or more /// [`RecordBatch`] instances. pub(crate) trait Queryable: Debug { + fn rows(&self) -> usize; + + fn timestamp_stats(&self) -> Option; + + /// Return the set of [`RecordBatch`] containing ONLY the projected columns. fn get_query_data(&self, projection: &OwnedProjection) -> Vec; } diff --git a/ingester/src/query/projection.rs b/ingester/src/query/projection.rs index 59e4f58201..ae1bab2cba 100644 --- a/ingester/src/query/projection.rs +++ b/ingester/src/query/projection.rs @@ -138,4 +138,12 @@ impl OwnedProjection { } } } + + /// Return the column names in this projection, if specified. + pub(crate) fn columns(&self) -> Option<&[String]> { + match &self.0 { + Projection::All => None, + Projection::Project(v) => Some(v.as_ref()), + } + } }