From 0a0b8b21506e838cfa9f4dee8e87dba5e0847c41 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 26 Jan 2022 12:39:29 +0000 Subject: [PATCH] feat: decouple read buffer row group size from Datafusion batch size (#3538) * feat: add chunk builder * test: test coverage for chunk builder * refactor: apply suggestions from code review * refactor: address PR feedback --- db/src/lifecycle.rs | 31 ++-- db/src/lifecycle/error.rs | 10 ++ read_buffer/src/chunk.rs | 340 ++++++++++++++++++++++++++++---------- read_buffer/src/lib.rs | 2 +- 4 files changed, 278 insertions(+), 105 deletions(-) diff --git a/db/src/lifecycle.rs b/db/src/lifecycle.rs index ea9a95236f..5caadcc9ce 100644 --- a/db/src/lifecycle.rs +++ b/db/src/lifecycle.rs @@ -357,29 +357,34 @@ fn collect_rub( partition_addr: &PartitionAddr, metric_registry: &metric::Registry, ) -> impl futures::Future>> { - use futures::{future, StreamExt, TryStreamExt}; + use futures::{future, TryStreamExt}; let db_name = partition_addr.db_name.to_string(); - let table_name = partition_addr.table_name.to_string(); + let table_name = Arc::clone(&partition_addr.table_name); let chunk_metrics = read_buffer::ChunkMetrics::new(metric_registry, db_name); async move { - let mut adapted_stream = stream.try_filter(|batch| future::ready(batch.num_rows() > 0)); + let schema = stream.schema(); + let adapted_stream = stream.try_filter(|batch| future::ready(batch.num_rows() > 0)); - let first_batch = match adapted_stream.next().await { - Some(rb_result) => rb_result?, - // At least one RecordBatch is required to create a read_buffer::Chunk - None => return Ok(None), - }; - let mut chunk = read_buffer::RBChunk::new(table_name, first_batch, chunk_metrics); + let mut chunk_builder = read_buffer::RBChunkBuilder::new(table_name.as_ref(), schema) + .with_metrics(chunk_metrics); adapted_stream - .try_for_each(|batch| { - chunk.upsert_table(batch); - future::ready(Ok(())) - }) + .try_for_each(|batch| future::ready(chunk_builder.push_record_batch(batch))) .await?; + // The adapted stream may not have yielded any non-empty record batches. + if chunk_builder.is_empty() { + return Ok(None); + } + + let chunk = chunk_builder + .build() + .map_err(|e| Error::ReadBufferChunkBuilderError { + table_name: table_name.to_string(), + source: e, + })?; Ok(Some(chunk)) } } diff --git a/db/src/lifecycle/error.rs b/db/src/lifecycle/error.rs index 45cca11858..1063dbaf80 100644 --- a/db/src/lifecycle/error.rs +++ b/db/src/lifecycle/error.rs @@ -37,6 +37,16 @@ pub enum Error { chunk_id: u32, }, + #[snafu(display( + "Error building Read Buffer chunk for table '{}' : {}", + table_name, + source + ))] + ReadBufferChunkBuilderError { + source: read_buffer::Error, + table_name: String, + }, + #[snafu(display("Error reading from object store: {}", source))] ReadingObjectStore { source: parquet_file::storage::Error, diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 86a3a3a6c7..f46930007d 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -4,7 +4,7 @@ use crate::{ schema::{AggregateType, ResultSchema}, table::{self, Table}, }; -use arrow::record_batch::RecordBatch; +use arrow::{error::ArrowError, record_batch::RecordBatch}; use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary}; use metric::{Attributes, CumulativeGauge, CumulativeRecorder, RecorderCollection}; use observability_deps::tracing::debug; @@ -16,6 +16,9 @@ use std::{ convert::TryFrom, }; +// The desired minimum row group size, used as the default for the `ChunkBuilder`. +const DEFAULT_ROW_GROUP_MIN_ROWS: usize = 100_000; + #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("unsupported operation: {}", msg))] @@ -30,6 +33,15 @@ pub enum Error { #[snafu(display("table '{}' does not exist", table_name))] TableNotFound { table_name: String }, + #[snafu(display("no data to build chunk for table '{}'", table_name))] + ChunkBuilderNoInput { table_name: String }, + + #[snafu(display("error building chunk for table '{}': {}", table_name, source))] + ChunkBuilderError { + table_name: String, + source: arrow::error::ArrowError, + }, + #[snafu(display("column '{}' does not exist in table '{}'", column_name, table_name))] ColumnDoesNotExist { column_name: String, @@ -53,25 +65,23 @@ impl Chunk { pub fn new( table_name: impl Into, table_data: RecordBatch, - mut metrics: ChunkMetrics, + metrics: ChunkMetrics, ) -> Self { let table_name = table_name.into(); let row_group = record_batch_to_row_group(&table_name, table_data); - let storage_statistics = row_group.column_storage_statistics(); - let table = Table::with_row_group(table_name, row_group); - - metrics.update_column_storage_statistics(&storage_statistics); - - Self { metrics, table } + Self::new_from_row_group(table_name, row_group, metrics) } // Only used in tests and benchmarks pub(crate) fn new_from_row_group( table_name: impl Into, row_group: RowGroup, - metrics: ChunkMetrics, + mut metrics: ChunkMetrics, ) -> Self { + let storage_statistics = row_group.column_storage_statistics(); + metrics.update_column_storage_statistics(&storage_statistics); + Self { metrics, table: Table::with_row_group(table_name, row_group), @@ -352,6 +362,157 @@ impl std::fmt::Debug for Chunk { } } +/// A ChunkBuilder builds up a chunk with row-groups whose row counts try to be +/// at least a specified number of rows. The row groups will be built as the row +/// count is met, reducing the overall memory footprint during the building +/// phase. +pub struct ChunkBuilder { + table_name: String, + schema: arrow::datatypes::SchemaRef, + + current_rows: usize, // current total rows of below vec + record_batches: Vec, + + row_group_min_rows: usize, + chunk_metrics: Option, + row_groups: Vec, +} + +impl ChunkBuilder { + pub fn new(table_name: impl Into, schema: arrow::datatypes::SchemaRef) -> Self { + Self { + table_name: table_name.into(), + schema, + current_rows: 0, + record_batches: vec![], + row_group_min_rows: DEFAULT_ROW_GROUP_MIN_ROWS, + chunk_metrics: None, + row_groups: vec![], + } + } + + /// Provide metrics for the built `Chunk`. If no metrics are provided then + /// no metrics will be tracked for the `Chunk`. + pub fn with_metrics(mut self, metrics: ChunkMetrics) -> Self { + self.chunk_metrics = Some(metrics); + self + } + + /// Specify the desired minimum number of rows in a row group. During build + /// one final row group may be smaller than this size. + pub fn set_row_group_min_size(mut self, size: usize) -> Self { + self.row_group_min_rows = size; + self + } + + /// Add a new record batch to the chunk. If the minimum number of rows + /// threshold is met then all the builder's record batches will be + /// compressed into a row group and the record batches cleared. + /// + /// Note: pushing an empty record batch is currently treated as a no-op. + pub fn push_record_batch(&mut self, rb: RecordBatch) -> Result<(), ArrowError> { + if rb.num_rows() == 0 { + return Ok(()); + } + + self.current_rows += rb.num_rows(); + self.record_batches.push(rb); + + if self.current_rows >= self.row_group_min_rows { + return self.snapshot_rowgroup(); + } + + Ok(()) + } + + // Force a rowgroup snapshot. + fn snapshot_rowgroup(&mut self) -> Result<(), ArrowError> { + // PERF(edd): This will copy all record batches so that they are in a + // contiguous block of memory. To reduce memory overhead we can avoid + // the concatenation and teach the `RowGroup` to build itself from a + // vec of record batches. + let concat_batch = + arrow::record_batch::RecordBatch::concat(&self.schema, &self.record_batches)?; + + self.row_groups + .push(record_batch_to_row_group(&self.table_name, concat_batch)); + + // clear pending batches + self.record_batches.clear(); + self.current_rows = 0; + Ok(()) + } + + /// Returns true if there are no pending record batches or snapshotted + /// row groups; effectively the `Chunk` is empty. + pub fn is_empty(&self) -> bool { + self.record_batches.is_empty() && self.row_groups.is_empty() + } + + /// Consume the builder and attempt to create a single `Chunk`. The resulting + /// `Chunk` will have at most `n + 1` row groups, where `n` is number of + /// already snapshotted row groups within the builder. + pub fn build(mut self) -> Result { + // No batches or row groups is an error because we have nothing to build + // chunk with. + if self.is_empty() { + return ChunkBuilderNoInputSnafu { + table_name: self.table_name.clone(), + } + .fail(); + // Snapshot remaining batches to a row group (accepting that it may + // be smaller than desired) + } else if !self.record_batches.is_empty() { + self.snapshot_rowgroup().context(ChunkBuilderSnafu { + table_name: self.table_name.clone(), + })?; + } + + // Create new chunk + let mut chunk = Chunk::new_from_row_group( + self.table_name.clone(), + self.row_groups.remove(0), + match self.chunk_metrics.take() { + // avoid partial move of self + Some(metrics) => metrics, + None => ChunkMetrics::new_unregistered(), + }, + ); + + for rg in self.row_groups { + chunk.upsert_table_with_row_group(rg); + } + Ok(chunk) + } + + /* + * + * Test helpers + * + */ + + #[cfg(test)] + fn with_record_batch(mut self, rb: RecordBatch) -> Self { + self.must_push_record_batch(rb); + self + } + + #[cfg(test)] + fn must_push_record_batch(&mut self, rb: RecordBatch) { + self.push_record_batch(rb).unwrap() + } + + #[cfg(test)] + fn must_snapshot_rowgroup(&mut self) { + self.snapshot_rowgroup().unwrap(); + } + + #[cfg(test)] + fn must_build(self) -> Chunk { + self.build().unwrap() + } +} + /// The collection of metrics exposed by the Read Buffer. Note: several of these /// be better represented as distributions, but the histogram story in IOx is not /// yet figured out. @@ -669,45 +830,15 @@ mod test { } } - #[derive(Debug, Default)] - struct ChunkBuilder { - name: Option, - record_batch: Option, - metrics: Option, - } - - impl ChunkBuilder { - fn name(mut self, name: impl Into) -> Self { - self.name = Some(name.into()); - self - } - - fn record_batch(mut self, record_batch: RecordBatch) -> Self { - self.record_batch = Some(record_batch); - self - } - - fn metrics(mut self, metrics: ChunkMetrics) -> Self { - self.metrics = Some(metrics); - self - } - - fn build(self) -> Chunk { - Chunk::new( - self.name.unwrap_or_else(|| String::from("a_table")), - self.record_batch.unwrap_or_else(gen_recordbatch), - self.metrics.unwrap_or_else(ChunkMetrics::new_unregistered), - ) - } - } - #[test] fn add_remove_tables() { let registry = metric::Registry::new(); - let mut chunk = ChunkBuilder::default() - .metrics(ChunkMetrics::new(®istry, "mydb")) - .build(); + let rb = gen_recordbatch(); + let mut chunk = ChunkBuilder::new("mydb", rb.schema()) + .with_metrics(ChunkMetrics::new(®istry, "mydb")) + .with_record_batch(rb) + .must_build(); assert_eq!(chunk.rows(), 3); assert_eq!(chunk.row_groups(), 1); @@ -830,7 +961,11 @@ mod test { #[test] fn read_filter_table_schema() { - let chunk = ChunkBuilder::default().build(); + let rb = gen_recordbatch(); + let chunk = ChunkBuilder::new("mydb", rb.schema()) + .with_record_batch(rb) + .must_build(); + let schema = chunk.read_filter_table_schema(Selection::All).unwrap(); let exp_schema: Arc = SchemaBuilder::new() @@ -922,11 +1057,9 @@ mod test { // Add a record batch to a single partition let rb = RecordBatch::try_new(schema.into(), data).unwrap(); - // The row group gets added to the same chunk each time. - let chunk = ChunkBuilder::default() - .name("a_table") - .record_batch(rb) - .build(); + let mut chunk_builder = ChunkBuilder::new("a_table", rb.schema()); + chunk_builder.must_push_record_batch(rb); + let chunk = chunk_builder.must_build(); let summary = chunk.table_summary(); assert_eq!("a_table", summary.name); @@ -1026,22 +1159,22 @@ mod test { } fn read_filter_setup() -> Chunk { - let mut chunk: Option = None; + let schema = SchemaBuilder::new() + .non_null_tag("env") + .non_null_tag("region") + .non_null_field("counter", Float64) + .field("sketchy_sensor", Int64) + .non_null_field("active", Boolean) + .field("msg", Utf8) + .field("all_null", Utf8) + .timestamp() + .build() + .unwrap(); + + let mut chunk_builder = ChunkBuilder::new("Coolverine", schema.clone().into()); // Add a bunch of row groups to a single table in a single chunk for &i in &[100, 200, 300] { - let schema = SchemaBuilder::new() - .non_null_tag("env") - .non_null_tag("region") - .non_null_field("counter", Float64) - .field("sketchy_sensor", Int64) - .non_null_field("active", Boolean) - .field("msg", Utf8) - .field("all_null", Utf8) - .timestamp() - .build() - .unwrap(); - let data: Vec = vec![ Arc::new( vec!["us-west", "us-east", "us-west"] @@ -1069,22 +1202,11 @@ mod test { ]; // Add a record batch to a single partition - let rb = RecordBatch::try_new(schema.into(), data).unwrap(); - - // First time through the loop, create a new Chunk. Other times, upsert into the chunk. - match chunk { - Some(ref mut c) => c.upsert_table(rb), - None => { - chunk = Some( - ChunkBuilder::default() - .name("Coolverine") - .record_batch(rb) - .build(), - ); - } - } + let rb = RecordBatch::try_new(schema.clone().into(), data).unwrap(); + chunk_builder.must_push_record_batch(rb); + chunk_builder.must_snapshot_rowgroup(); } - chunk.unwrap() + chunk_builder.must_build() } #[test] @@ -1238,7 +1360,10 @@ mod test { #[test] fn could_pass_predicate() { - let chunk = ChunkBuilder::default().build(); + let rb = gen_recordbatch(); + let mut chunk_builder = ChunkBuilder::new("table", rb.schema()); + chunk_builder.must_push_record_batch(rb); + let chunk = chunk_builder.must_build(); assert!( chunk.could_pass_predicate(Predicate::new(vec![BinaryExpr::from(( @@ -1319,10 +1444,9 @@ mod test { // Create the chunk with the above table let rb = RecordBatch::try_new(schema, data).unwrap(); - let chunk = ChunkBuilder::default() - .name("Utopia") - .record_batch(rb) - .build(); + let mut chunk_builder = ChunkBuilder::new("Utopia", rb.schema()); + chunk_builder.must_push_record_batch(rb); + let chunk = chunk_builder.must_build(); let result = chunk .column_names( @@ -1391,10 +1515,9 @@ mod test { // Create the chunk with the above table let rb = RecordBatch::try_new(schema, data).unwrap(); - let chunk = ChunkBuilder::default() - .name("Utopia") - .record_batch(rb) - .build(); + let mut chunk_builder = ChunkBuilder::new("Utopia", rb.schema()); + chunk_builder.must_push_record_batch(rb); + let chunk = chunk_builder.must_build(); let result = chunk .column_names( @@ -1479,10 +1602,9 @@ mod test { // Create the chunk with the above table let rb = RecordBatch::try_new(schema, data).unwrap(); - let chunk = ChunkBuilder::default() - .name("my_table") - .record_batch(rb) - .build(); + let mut chunk_builder = ChunkBuilder::new("my_table", rb.schema()); + chunk_builder.must_push_record_batch(rb); + let chunk = chunk_builder.must_build(); let result = chunk .column_values( @@ -1536,4 +1658,40 @@ mod test { Err(Error::TableError { .. }) )); } + + #[test] + fn chunk_builder() { + // test row group configuration + let rb = gen_recordbatch(); + let chunk = ChunkBuilder::new("table_a", rb.schema()) + .with_record_batch(rb) + .must_build(); + + assert_eq!(chunk.row_groups(), 1); + assert_eq!(chunk.rows(), 3); + + let rb = gen_recordbatch(); + let mut builder = ChunkBuilder::new("table_a", rb.schema()).set_row_group_min_size(3); + builder.must_push_record_batch(rb); + builder.must_push_record_batch(gen_recordbatch()); + builder.must_push_record_batch(gen_recordbatch()); + + let chunk = builder.must_build(); + assert_eq!(chunk.table.name(), "table_a"); + assert_eq!(chunk.row_groups(), 3); + assert_eq!(chunk.rows(), 9); + + // when the chunk is empty an error is returned on build + let rb = gen_recordbatch(); + let builder = ChunkBuilder::new("table_a", rb.schema()); + assert!(builder.build().is_err()); + + // empty record batches are not stored for snapshotting + let rb = gen_recordbatch(); + let mut builder = ChunkBuilder::new("table_a", rb.schema()); + assert!(builder + .push_record_batch(RecordBatch::new_empty(rb.schema())) + .is_ok()); + assert!(builder.is_empty()); + } } diff --git a/read_buffer/src/lib.rs b/read_buffer/src/lib.rs index 421802a8f1..1b29af81c8 100644 --- a/read_buffer/src/lib.rs +++ b/read_buffer/src/lib.rs @@ -10,7 +10,7 @@ mod value; // Identifiers that are exported as part of the public API. pub use self::schema::*; -pub use chunk::{Chunk as RBChunk, ChunkMetrics, Error}; +pub use chunk::{Chunk as RBChunk, ChunkBuilder as RBChunkBuilder, ChunkMetrics, Error}; pub use row_group::{BinaryExpr, Predicate}; pub use table::ReadFilterResults;