feat: decouple read buffer row group size from Datafusion batch size (#3538)

* feat: add chunk builder

* test: test coverage for chunk builder

* refactor: apply suggestions from code review

* refactor: address PR feedback
pull/24376/head
Edd Robinson 2022-01-26 12:39:29 +00:00 committed by GitHub
parent 107f39d53c
commit 0a0b8b2150
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 278 additions and 105 deletions

View File

@ -357,29 +357,34 @@ fn collect_rub(
partition_addr: &PartitionAddr,
metric_registry: &metric::Registry,
) -> impl futures::Future<Output = Result<Option<read_buffer::RBChunk>>> {
use futures::{future, StreamExt, TryStreamExt};
use futures::{future, TryStreamExt};
let db_name = partition_addr.db_name.to_string();
let table_name = partition_addr.table_name.to_string();
let table_name = Arc::clone(&partition_addr.table_name);
let chunk_metrics = read_buffer::ChunkMetrics::new(metric_registry, db_name);
async move {
let mut adapted_stream = stream.try_filter(|batch| future::ready(batch.num_rows() > 0));
let schema = stream.schema();
let adapted_stream = stream.try_filter(|batch| future::ready(batch.num_rows() > 0));
let first_batch = match adapted_stream.next().await {
Some(rb_result) => rb_result?,
// At least one RecordBatch is required to create a read_buffer::Chunk
None => return Ok(None),
};
let mut chunk = read_buffer::RBChunk::new(table_name, first_batch, chunk_metrics);
let mut chunk_builder = read_buffer::RBChunkBuilder::new(table_name.as_ref(), schema)
.with_metrics(chunk_metrics);
adapted_stream
.try_for_each(|batch| {
chunk.upsert_table(batch);
future::ready(Ok(()))
})
.try_for_each(|batch| future::ready(chunk_builder.push_record_batch(batch)))
.await?;
// The adapted stream may not have yielded any non-empty record batches.
if chunk_builder.is_empty() {
return Ok(None);
}
let chunk = chunk_builder
.build()
.map_err(|e| Error::ReadBufferChunkBuilderError {
table_name: table_name.to_string(),
source: e,
})?;
Ok(Some(chunk))
}
}

View File

@ -37,6 +37,16 @@ pub enum Error {
chunk_id: u32,
},
#[snafu(display(
"Error building Read Buffer chunk for table '{}' : {}",
table_name,
source
))]
ReadBufferChunkBuilderError {
source: read_buffer::Error,
table_name: String,
},
#[snafu(display("Error reading from object store: {}", source))]
ReadingObjectStore {
source: parquet_file::storage::Error,

View File

@ -4,7 +4,7 @@ use crate::{
schema::{AggregateType, ResultSchema},
table::{self, Table},
};
use arrow::record_batch::RecordBatch;
use arrow::{error::ArrowError, record_batch::RecordBatch};
use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary};
use metric::{Attributes, CumulativeGauge, CumulativeRecorder, RecorderCollection};
use observability_deps::tracing::debug;
@ -16,6 +16,9 @@ use std::{
convert::TryFrom,
};
// The desired minimum row group size, used as the default for the `ChunkBuilder`.
const DEFAULT_ROW_GROUP_MIN_ROWS: usize = 100_000;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("unsupported operation: {}", msg))]
@ -30,6 +33,15 @@ pub enum Error {
#[snafu(display("table '{}' does not exist", table_name))]
TableNotFound { table_name: String },
#[snafu(display("no data to build chunk for table '{}'", table_name))]
ChunkBuilderNoInput { table_name: String },
#[snafu(display("error building chunk for table '{}': {}", table_name, source))]
ChunkBuilderError {
table_name: String,
source: arrow::error::ArrowError,
},
#[snafu(display("column '{}' does not exist in table '{}'", column_name, table_name))]
ColumnDoesNotExist {
column_name: String,
@ -53,25 +65,23 @@ impl Chunk {
pub fn new(
table_name: impl Into<String>,
table_data: RecordBatch,
mut metrics: ChunkMetrics,
metrics: ChunkMetrics,
) -> Self {
let table_name = table_name.into();
let row_group = record_batch_to_row_group(&table_name, table_data);
let storage_statistics = row_group.column_storage_statistics();
let table = Table::with_row_group(table_name, row_group);
metrics.update_column_storage_statistics(&storage_statistics);
Self { metrics, table }
Self::new_from_row_group(table_name, row_group, metrics)
}
// Only used in tests and benchmarks
pub(crate) fn new_from_row_group(
table_name: impl Into<String>,
row_group: RowGroup,
metrics: ChunkMetrics,
mut metrics: ChunkMetrics,
) -> Self {
let storage_statistics = row_group.column_storage_statistics();
metrics.update_column_storage_statistics(&storage_statistics);
Self {
metrics,
table: Table::with_row_group(table_name, row_group),
@ -352,6 +362,157 @@ impl std::fmt::Debug for Chunk {
}
}
/// A ChunkBuilder builds up a chunk with row-groups whose row counts try to be
/// at least a specified number of rows. The row groups will be built as the row
/// count is met, reducing the overall memory footprint during the building
/// phase.
pub struct ChunkBuilder {
table_name: String,
schema: arrow::datatypes::SchemaRef,
current_rows: usize, // current total rows of below vec
record_batches: Vec<RecordBatch>,
row_group_min_rows: usize,
chunk_metrics: Option<ChunkMetrics>,
row_groups: Vec<RowGroup>,
}
impl ChunkBuilder {
pub fn new(table_name: impl Into<String>, schema: arrow::datatypes::SchemaRef) -> Self {
Self {
table_name: table_name.into(),
schema,
current_rows: 0,
record_batches: vec![],
row_group_min_rows: DEFAULT_ROW_GROUP_MIN_ROWS,
chunk_metrics: None,
row_groups: vec![],
}
}
/// Provide metrics for the built `Chunk`. If no metrics are provided then
/// no metrics will be tracked for the `Chunk`.
pub fn with_metrics(mut self, metrics: ChunkMetrics) -> Self {
self.chunk_metrics = Some(metrics);
self
}
/// Specify the desired minimum number of rows in a row group. During build
/// one final row group may be smaller than this size.
pub fn set_row_group_min_size(mut self, size: usize) -> Self {
self.row_group_min_rows = size;
self
}
/// Add a new record batch to the chunk. If the minimum number of rows
/// threshold is met then all the builder's record batches will be
/// compressed into a row group and the record batches cleared.
///
/// Note: pushing an empty record batch is currently treated as a no-op.
pub fn push_record_batch(&mut self, rb: RecordBatch) -> Result<(), ArrowError> {
if rb.num_rows() == 0 {
return Ok(());
}
self.current_rows += rb.num_rows();
self.record_batches.push(rb);
if self.current_rows >= self.row_group_min_rows {
return self.snapshot_rowgroup();
}
Ok(())
}
// Force a rowgroup snapshot.
fn snapshot_rowgroup(&mut self) -> Result<(), ArrowError> {
// PERF(edd): This will copy all record batches so that they are in a
// contiguous block of memory. To reduce memory overhead we can avoid
// the concatenation and teach the `RowGroup` to build itself from a
// vec of record batches.
let concat_batch =
arrow::record_batch::RecordBatch::concat(&self.schema, &self.record_batches)?;
self.row_groups
.push(record_batch_to_row_group(&self.table_name, concat_batch));
// clear pending batches
self.record_batches.clear();
self.current_rows = 0;
Ok(())
}
/// Returns true if there are no pending record batches or snapshotted
/// row groups; effectively the `Chunk` is empty.
pub fn is_empty(&self) -> bool {
self.record_batches.is_empty() && self.row_groups.is_empty()
}
/// Consume the builder and attempt to create a single `Chunk`. The resulting
/// `Chunk` will have at most `n + 1` row groups, where `n` is number of
/// already snapshotted row groups within the builder.
pub fn build(mut self) -> Result<Chunk> {
// No batches or row groups is an error because we have nothing to build
// chunk with.
if self.is_empty() {
return ChunkBuilderNoInputSnafu {
table_name: self.table_name.clone(),
}
.fail();
// Snapshot remaining batches to a row group (accepting that it may
// be smaller than desired)
} else if !self.record_batches.is_empty() {
self.snapshot_rowgroup().context(ChunkBuilderSnafu {
table_name: self.table_name.clone(),
})?;
}
// Create new chunk
let mut chunk = Chunk::new_from_row_group(
self.table_name.clone(),
self.row_groups.remove(0),
match self.chunk_metrics.take() {
// avoid partial move of self
Some(metrics) => metrics,
None => ChunkMetrics::new_unregistered(),
},
);
for rg in self.row_groups {
chunk.upsert_table_with_row_group(rg);
}
Ok(chunk)
}
/*
*
* Test helpers
*
*/
#[cfg(test)]
fn with_record_batch(mut self, rb: RecordBatch) -> Self {
self.must_push_record_batch(rb);
self
}
#[cfg(test)]
fn must_push_record_batch(&mut self, rb: RecordBatch) {
self.push_record_batch(rb).unwrap()
}
#[cfg(test)]
fn must_snapshot_rowgroup(&mut self) {
self.snapshot_rowgroup().unwrap();
}
#[cfg(test)]
fn must_build(self) -> Chunk {
self.build().unwrap()
}
}
/// The collection of metrics exposed by the Read Buffer. Note: several of these
/// be better represented as distributions, but the histogram story in IOx is not
/// yet figured out.
@ -669,45 +830,15 @@ mod test {
}
}
#[derive(Debug, Default)]
struct ChunkBuilder {
name: Option<String>,
record_batch: Option<RecordBatch>,
metrics: Option<ChunkMetrics>,
}
impl ChunkBuilder {
fn name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
fn record_batch(mut self, record_batch: RecordBatch) -> Self {
self.record_batch = Some(record_batch);
self
}
fn metrics(mut self, metrics: ChunkMetrics) -> Self {
self.metrics = Some(metrics);
self
}
fn build(self) -> Chunk {
Chunk::new(
self.name.unwrap_or_else(|| String::from("a_table")),
self.record_batch.unwrap_or_else(gen_recordbatch),
self.metrics.unwrap_or_else(ChunkMetrics::new_unregistered),
)
}
}
#[test]
fn add_remove_tables() {
let registry = metric::Registry::new();
let mut chunk = ChunkBuilder::default()
.metrics(ChunkMetrics::new(&registry, "mydb"))
.build();
let rb = gen_recordbatch();
let mut chunk = ChunkBuilder::new("mydb", rb.schema())
.with_metrics(ChunkMetrics::new(&registry, "mydb"))
.with_record_batch(rb)
.must_build();
assert_eq!(chunk.rows(), 3);
assert_eq!(chunk.row_groups(), 1);
@ -830,7 +961,11 @@ mod test {
#[test]
fn read_filter_table_schema() {
let chunk = ChunkBuilder::default().build();
let rb = gen_recordbatch();
let chunk = ChunkBuilder::new("mydb", rb.schema())
.with_record_batch(rb)
.must_build();
let schema = chunk.read_filter_table_schema(Selection::All).unwrap();
let exp_schema: Arc<Schema> = SchemaBuilder::new()
@ -922,11 +1057,9 @@ mod test {
// Add a record batch to a single partition
let rb = RecordBatch::try_new(schema.into(), data).unwrap();
// The row group gets added to the same chunk each time.
let chunk = ChunkBuilder::default()
.name("a_table")
.record_batch(rb)
.build();
let mut chunk_builder = ChunkBuilder::new("a_table", rb.schema());
chunk_builder.must_push_record_batch(rb);
let chunk = chunk_builder.must_build();
let summary = chunk.table_summary();
assert_eq!("a_table", summary.name);
@ -1026,22 +1159,22 @@ mod test {
}
fn read_filter_setup() -> Chunk {
let mut chunk: Option<Chunk> = None;
let schema = SchemaBuilder::new()
.non_null_tag("env")
.non_null_tag("region")
.non_null_field("counter", Float64)
.field("sketchy_sensor", Int64)
.non_null_field("active", Boolean)
.field("msg", Utf8)
.field("all_null", Utf8)
.timestamp()
.build()
.unwrap();
let mut chunk_builder = ChunkBuilder::new("Coolverine", schema.clone().into());
// Add a bunch of row groups to a single table in a single chunk
for &i in &[100, 200, 300] {
let schema = SchemaBuilder::new()
.non_null_tag("env")
.non_null_tag("region")
.non_null_field("counter", Float64)
.field("sketchy_sensor", Int64)
.non_null_field("active", Boolean)
.field("msg", Utf8)
.field("all_null", Utf8)
.timestamp()
.build()
.unwrap();
let data: Vec<ArrayRef> = vec![
Arc::new(
vec!["us-west", "us-east", "us-west"]
@ -1069,22 +1202,11 @@ mod test {
];
// Add a record batch to a single partition
let rb = RecordBatch::try_new(schema.into(), data).unwrap();
// First time through the loop, create a new Chunk. Other times, upsert into the chunk.
match chunk {
Some(ref mut c) => c.upsert_table(rb),
None => {
chunk = Some(
ChunkBuilder::default()
.name("Coolverine")
.record_batch(rb)
.build(),
);
}
}
let rb = RecordBatch::try_new(schema.clone().into(), data).unwrap();
chunk_builder.must_push_record_batch(rb);
chunk_builder.must_snapshot_rowgroup();
}
chunk.unwrap()
chunk_builder.must_build()
}
#[test]
@ -1238,7 +1360,10 @@ mod test {
#[test]
fn could_pass_predicate() {
let chunk = ChunkBuilder::default().build();
let rb = gen_recordbatch();
let mut chunk_builder = ChunkBuilder::new("table", rb.schema());
chunk_builder.must_push_record_batch(rb);
let chunk = chunk_builder.must_build();
assert!(
chunk.could_pass_predicate(Predicate::new(vec![BinaryExpr::from((
@ -1319,10 +1444,9 @@ mod test {
// Create the chunk with the above table
let rb = RecordBatch::try_new(schema, data).unwrap();
let chunk = ChunkBuilder::default()
.name("Utopia")
.record_batch(rb)
.build();
let mut chunk_builder = ChunkBuilder::new("Utopia", rb.schema());
chunk_builder.must_push_record_batch(rb);
let chunk = chunk_builder.must_build();
let result = chunk
.column_names(
@ -1391,10 +1515,9 @@ mod test {
// Create the chunk with the above table
let rb = RecordBatch::try_new(schema, data).unwrap();
let chunk = ChunkBuilder::default()
.name("Utopia")
.record_batch(rb)
.build();
let mut chunk_builder = ChunkBuilder::new("Utopia", rb.schema());
chunk_builder.must_push_record_batch(rb);
let chunk = chunk_builder.must_build();
let result = chunk
.column_names(
@ -1479,10 +1602,9 @@ mod test {
// Create the chunk with the above table
let rb = RecordBatch::try_new(schema, data).unwrap();
let chunk = ChunkBuilder::default()
.name("my_table")
.record_batch(rb)
.build();
let mut chunk_builder = ChunkBuilder::new("my_table", rb.schema());
chunk_builder.must_push_record_batch(rb);
let chunk = chunk_builder.must_build();
let result = chunk
.column_values(
@ -1536,4 +1658,40 @@ mod test {
Err(Error::TableError { .. })
));
}
#[test]
fn chunk_builder() {
// test row group configuration
let rb = gen_recordbatch();
let chunk = ChunkBuilder::new("table_a", rb.schema())
.with_record_batch(rb)
.must_build();
assert_eq!(chunk.row_groups(), 1);
assert_eq!(chunk.rows(), 3);
let rb = gen_recordbatch();
let mut builder = ChunkBuilder::new("table_a", rb.schema()).set_row_group_min_size(3);
builder.must_push_record_batch(rb);
builder.must_push_record_batch(gen_recordbatch());
builder.must_push_record_batch(gen_recordbatch());
let chunk = builder.must_build();
assert_eq!(chunk.table.name(), "table_a");
assert_eq!(chunk.row_groups(), 3);
assert_eq!(chunk.rows(), 9);
// when the chunk is empty an error is returned on build
let rb = gen_recordbatch();
let builder = ChunkBuilder::new("table_a", rb.schema());
assert!(builder.build().is_err());
// empty record batches are not stored for snapshotting
let rb = gen_recordbatch();
let mut builder = ChunkBuilder::new("table_a", rb.schema());
assert!(builder
.push_record_batch(RecordBatch::new_empty(rb.schema()))
.is_ok());
assert!(builder.is_empty());
}
}

View File

@ -10,7 +10,7 @@ mod value;
// Identifiers that are exported as part of the public API.
pub use self::schema::*;
pub use chunk::{Chunk as RBChunk, ChunkMetrics, Error};
pub use chunk::{Chunk as RBChunk, ChunkBuilder as RBChunkBuilder, ChunkMetrics, Error};
pub use row_group::{BinaryExpr, Predicate};
pub use table::ReadFilterResults;