Merge pull request #8331 from influxdata/dom/cached-summary-statistics

perf(ingester): cache summary statistics in partition FSM
pull/24376/head
Dom 2023-07-25 15:56:21 +01:00 committed by GitHub
commit 7df6028bf1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 278 additions and 15 deletions

View File

@ -1593,7 +1593,7 @@ impl TimestampRange {
///
/// Note this differs subtlety (but critically) from a
/// [`TimestampRange`] as the minimum and maximum values are included ([`TimestampRange`] has an exclusive end).
#[derive(Clone, Debug, Copy)]
#[derive(Clone, Debug, Copy, PartialEq)]
pub struct TimestampMinMax {
/// The minimum timestamp value
pub min: i64,

View File

@ -4,7 +4,7 @@ use std::{collections::VecDeque, sync::Arc};
use data_types::{
sequence_number_set::SequenceNumberSet, NamespaceId, PartitionHashId, PartitionId,
PartitionKey, SequenceNumber, TableId, TransitionPartitionId,
PartitionKey, SequenceNumber, TableId, TimestampMinMax, TransitionPartitionId,
};
use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
@ -156,6 +156,50 @@ impl PartitionData {
self.buffer.persist_cost_estimate()
}
/// Returns the number of rows currently buffered in this [`PartitionData`].
///
/// The returned value will always match the row count of the data returned
/// by a subsequent call to [`PartitionData::get_query_data()`], without any
/// column projection applied.
///
/// This value is inclusive of "hot" buffered data, and all currently
/// persisting data.
///
/// This is an `O(n)` operation where `n` is the number of currently
/// persisting batches, plus 1 for the "hot" buffer. Reading the row count
/// of each batch is `O(1)`. This method is expected to be fast.
pub(crate) fn rows(&self) -> usize {
self.persisting.iter().map(|(_, v)| v.rows()).sum::<usize>() + self.buffer.rows()
}
/// Return the timestamp min/max values for the data contained within this
/// [`PartitionData`].
///
/// The returned value will always match the timestamp summary values of the
/// data returned by a subsequent call to
/// [`PartitionData::get_query_data()`] iff the projection provided to the
/// call includes the timestamp column (returns pre-projection value).
///
/// This value is inclusive of "hot" buffered data, and all currently
/// persisting data.
///
/// This is an `O(n)` operation where `n` is the number of currently
/// persisting batches, plus 1 for the "hot" buffer, Reading the timestamp
/// statistics for each batch is `O(1)`. This method is expected to be fast.
pub(crate) fn timestamp_stats(&self) -> Option<TimestampMinMax> {
self.persisting
.iter()
.map(|(_, v)| {
v.timestamp_stats()
.expect("persisting batches must be non-empty")
})
.chain(self.buffer.timestamp_stats())
.reduce(|acc, v| TimestampMinMax {
min: acc.min.min(v.min),
max: acc.max.max(v.max),
})
}
/// Return all data for this partition, ordered by the calls to
/// [`PartitionData::buffer_write()`].
pub(crate) fn get_query_data(&mut self, projection: &OwnedProjection) -> Option<QueryAdaptor> {
@ -185,6 +229,7 @@ impl PartitionData {
);
if data.is_empty() {
debug_assert_eq!(self.rows(), 0);
return None;
}
@ -194,11 +239,32 @@ impl PartitionData {
// is upheld by the FSM, which ensures only non-empty snapshots /
// RecordBatch are generated. Because `data` contains at least one
// RecordBatch, this invariant holds.
Some(QueryAdaptor::new(
self.partition_id,
self.transition_partition_id(),
data,
))
let q = QueryAdaptor::new(self.partition_id, self.transition_partition_id(), data);
// Invariant: the number of rows returned in a query MUST always match
// the row count reported by the rows() method.
//
// The row count is never affected by projection.
debug_assert_eq!(q.num_rows(), self.rows() as u64);
// Invariant: the timestamp min/max MUST match the values reported by
// timestamp_stats(), iff the projection contains the "time" column.
#[cfg(debug_assertions)]
{
if projection
.columns()
.map(|v| v.iter().any(|v| v == schema::TIME_COLUMN_NAME))
.unwrap_or(true)
{
assert_eq!(q.ts_min_max(), self.timestamp_stats());
} else {
// Otherwise the timestamp summary in the query response MUST be
// empty.
assert_eq!(q.ts_min_max(), None);
}
}
Some(q)
}
/// Snapshot and mark all buffered data as persisting.

View File

@ -1,5 +1,5 @@
use arrow::record_batch::RecordBatch;
use data_types::SequenceNumber;
use data_types::{SequenceNumber, TimestampMinMax};
use mutable_batch::MutableBatch;
mod always_some;
@ -74,6 +74,20 @@ impl DataBuffer {
})
}
/// Return the row count for this buffer.
pub(crate) fn rows(&self) -> usize {
match self.0.get() {
FsmState::Buffering(v) => v.rows(),
}
}
/// Return the timestamp min/max values, if this buffer contains data.
pub(crate) fn timestamp_stats(&self) -> Option<TimestampMinMax> {
match self.0.get() {
FsmState::Buffering(v) => v.timestamp_stats(),
}
}
// Deconstruct the [`DataBuffer`] into the underlying FSM in a
// [`Persisting`] state, if the buffer contains any data.
pub(crate) fn into_persisting(self) -> Option<BufferState<Persisting>> {

View File

@ -49,6 +49,8 @@ impl Buffer {
self.buffer.is_none()
}
/// Returns the underlying buffer if this [`Buffer`] contains data,
/// otherwise returns [`None`].
pub(super) fn buffer(&self) -> Option<&MutableBatch> {
self.buffer.as_ref()
}

View File

@ -1,6 +1,6 @@
#![allow(dead_code)]
use arrow::record_batch::RecordBatch;
use data_types::{sequence_number_set::SequenceNumberSet, SequenceNumber};
use data_types::{sequence_number_set::SequenceNumberSet, SequenceNumber, TimestampMinMax};
use mutable_batch::MutableBatch;
mod buffering;
@ -125,6 +125,14 @@ where
fn get_query_data(&self, projection: &OwnedProjection) -> Vec<RecordBatch> {
self.state.get_query_data(projection)
}
fn rows(&self) -> usize {
self.state.rows()
}
fn timestamp_stats(&self) -> Option<TimestampMinMax> {
self.state.timestamp_stats()
}
}
#[cfg(test)]
@ -132,6 +140,7 @@ mod tests {
use std::sync::Arc;
use arrow_util::assert_batches_eq;
use assert_matches::assert_matches;
use mutable_batch_lp::test_helpers::lp_to_mutable_batch;
use schema::Projection;
use snapshot::*;
@ -403,4 +412,72 @@ mod tests {
assert!(!set.contains(SequenceNumber::new(12)));
assert_eq!(set.len(), 1);
}
/// Assert the summary statistics are correctly computed across various FSM
/// states.
#[test]
fn test_summary_statistics() {
let mut buffer: BufferState<Buffering> = BufferState::new();
// Write some data to a buffer.
buffer
.write(
lp_to_mutable_batch(
r#"bananas,tag=platanos great=true,how_much=42 668563242000000042"#,
)
.1,
SequenceNumber::new(0),
)
.expect("write to empty buffer should succeed");
assert_eq!(buffer.rows(), 1);
assert_eq!(
buffer.timestamp_stats(),
Some(TimestampMinMax {
min: 668563242000000042,
max: 668563242000000042
})
);
// Write another row to change the timestamp (it goes backwards!)
buffer
.write(
lp_to_mutable_batch(r#"bananas,tag=platanos great=true,how_much=42 42"#).1,
SequenceNumber::new(0),
)
.unwrap();
assert_eq!(buffer.rows(), 2);
assert_eq!(
buffer.timestamp_stats(),
Some(TimestampMinMax {
min: 42,
max: 668563242000000042
})
);
// Transition to a snapshot.
let buffer = assert_matches!(buffer.snapshot(), Transition::Ok(v) => v);
assert_eq!(buffer.rows(), 2);
assert_eq!(
buffer.timestamp_stats(),
Some(TimestampMinMax {
min: 42,
max: 668563242000000042
})
);
// Transition to persisting
let buffer = buffer.into_persisting();
assert_eq!(buffer.rows(), 2);
assert_eq!(
buffer.timestamp_stats(),
Some(TimestampMinMax {
min: 42,
max: 668563242000000042
})
);
}
}

View File

@ -1,7 +1,9 @@
//! A write buffer.
use arrow::record_batch::RecordBatch;
use mutable_batch::MutableBatch;
use data_types::{StatValues, TimestampMinMax};
use mutable_batch::{column::ColumnData, MutableBatch};
use schema::TIME_COLUMN_NAME;
use super::{snapshot::Snapshot, BufferState, Transition};
use crate::{
@ -41,6 +43,22 @@ impl Queryable for Buffering {
.map(|v| vec![projection.project_mutable_batches(v)])
.unwrap_or_default()
}
fn rows(&self) -> usize {
self.buffer.buffer().map(|v| v.rows()).unwrap_or_default()
}
fn timestamp_stats(&self) -> Option<TimestampMinMax> {
self.buffer
.buffer()
.map(extract_timestamp_summary)
// Safety: unwrapping the timestamp bounds is safe, as any non-empty
// buffer must contain timestamps.
.map(|v| TimestampMinMax {
min: v.min.unwrap(),
max: v.max.unwrap(),
})
}
}
impl Writeable for Buffering {
@ -75,6 +93,18 @@ impl BufferState<Buffering> {
}
}
/// Perform an O(1) extraction of the timestamp column statistics.
fn extract_timestamp_summary(batch: &MutableBatch) -> &StatValues<i64> {
let col = batch
.column(TIME_COLUMN_NAME)
.expect("timestamps must exist for non-empty buffer");
match col.data() {
ColumnData::I64(_data, stats) => stats,
_ => unreachable!(),
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,7 +1,8 @@
//! A writfield1 buffer, with one or more snapshots.
use arrow::record_batch::RecordBatch;
use data_types::sequence_number_set::SequenceNumberSet;
use data_types::{sequence_number_set::SequenceNumberSet, TimestampMinMax};
use iox_query::util::compute_timenanosecond_min_max;
use super::BufferState;
use crate::{
@ -15,11 +16,34 @@ pub(crate) struct Persisting {
///
/// INVARIANT: this array is always non-empty.
snapshots: Vec<RecordBatch>,
/// Statistics describing the data in snapshots.
row_count: usize,
timestamp_stats: TimestampMinMax,
}
impl Persisting {
pub(super) fn new(snapshots: Vec<RecordBatch>) -> Self {
Self { snapshots }
pub(super) fn new(
snapshots: Vec<RecordBatch>,
row_count: usize,
timestamp_stats: TimestampMinMax,
) -> Self {
// Invariant: the summary statistics provided must match the actual
// data.
debug_assert_eq!(
row_count,
snapshots.iter().map(|v| v.num_rows()).sum::<usize>()
);
debug_assert_eq!(
timestamp_stats,
compute_timenanosecond_min_max(snapshots.iter()).unwrap()
);
Self {
snapshots,
row_count,
timestamp_stats,
}
}
}
@ -27,6 +51,14 @@ impl Queryable for Persisting {
fn get_query_data(&self, projection: &OwnedProjection) -> Vec<RecordBatch> {
projection.project_record_batch(&self.snapshots)
}
fn rows(&self) -> usize {
self.row_count
}
fn timestamp_stats(&self) -> Option<TimestampMinMax> {
Some(self.timestamp_stats)
}
}
impl BufferState<Persisting> {

View File

@ -1,6 +1,8 @@
//! A writfield1 buffer, with one or more snapshots.
use arrow::record_batch::RecordBatch;
use data_types::TimestampMinMax;
use iox_query::util::compute_timenanosecond_min_max;
use super::BufferState;
use crate::{
@ -15,12 +17,26 @@ pub(crate) struct Snapshot {
///
/// INVARIANT: this array is always non-empty.
snapshots: Vec<RecordBatch>,
/// Statistics describing the data in snapshots.
row_count: usize,
timestamp_stats: TimestampMinMax,
}
impl Snapshot {
pub(super) fn new(snapshots: Vec<RecordBatch>) -> Self {
assert!(!snapshots.is_empty());
Self { snapshots }
// Compute some summary statistics for query pruning/reuse later.
let row_count = snapshots.iter().map(|v| v.num_rows()).sum();
let timestamp_stats = compute_timenanosecond_min_max(snapshots.iter())
.expect("non-empty batch must contain timestamps");
Self {
snapshots,
row_count,
timestamp_stats,
}
}
}
@ -28,13 +44,25 @@ impl Queryable for Snapshot {
fn get_query_data(&self, projection: &OwnedProjection) -> Vec<RecordBatch> {
projection.project_record_batch(&self.snapshots)
}
fn rows(&self) -> usize {
self.row_count
}
fn timestamp_stats(&self) -> Option<TimestampMinMax> {
Some(self.timestamp_stats)
}
}
impl BufferState<Snapshot> {
pub(crate) fn into_persisting(self) -> BufferState<Persisting> {
assert!(!self.state.snapshots.is_empty());
BufferState {
state: Persisting::new(self.state.snapshots),
state: Persisting::new(
self.state.snapshots,
self.state.row_count,
self.state.timestamp_stats,
),
sequence_numbers: self.sequence_numbers,
}
}

View File

@ -3,6 +3,7 @@
use std::fmt::Debug;
use arrow::record_batch::RecordBatch;
use data_types::TimestampMinMax;
use mutable_batch::MutableBatch;
use crate::query::projection::OwnedProjection;
@ -15,5 +16,10 @@ pub(crate) trait Writeable: Debug {
/// A state that can return the contents of the buffer as one or more
/// [`RecordBatch`] instances.
pub(crate) trait Queryable: Debug {
fn rows(&self) -> usize;
fn timestamp_stats(&self) -> Option<TimestampMinMax>;
/// Return the set of [`RecordBatch`] containing ONLY the projected columns.
fn get_query_data(&self, projection: &OwnedProjection) -> Vec<RecordBatch>;
}

View File

@ -138,4 +138,12 @@ impl OwnedProjection {
}
}
}
/// Return the column names in this projection, if specified.
pub(crate) fn columns(&self) -> Option<&[String]> {
match &self.0 {
Projection::All => None,
Projection::Project(v) => Some(v.as_ref()),
}
}
}