feat: Require at least one RecordBatch to create a read_buffer::Chunk::new

In the signature only for the moment.
pull/24376/head
Carol (Nichols || Goulding) 2021-07-14 14:42:39 -04:00
parent bbb4462264
commit 6feea3b2d5
8 changed files with 84 additions and 53 deletions

View File

@ -12,8 +12,7 @@ const ONE_MS: i64 = 1_000_000;
fn satisfies_predicate(c: &mut Criterion) {
let rb = generate_row_group(500_000);
let mut chunk = RBChunk::new("table_a", ChunkMetrics::new_unregistered());
chunk.upsert_table(rb);
let chunk = RBChunk::new("table_a", rb, ChunkMetrics::new_unregistered());
// no predicate
benchmark_satisfies_predicate(

View File

@ -17,9 +17,8 @@ const ONE_MS: i64 = 1_000_000;
pub fn read_filter(c: &mut Criterion) {
let mut rng = rand::thread_rng();
let mut chunk = RBChunk::new("table", read_buffer::ChunkMetrics::new_unregistered());
let row_group = generate_row_group(200_000, &mut rng);
read_buffer::benchmarks::upsert_table_with_row_group(&mut chunk, "table", row_group);
let chunk = read_buffer::benchmarks::new_from_row_group("table", row_group);
read_filter_no_pred_vary_proj(c, &chunk);
read_filter_with_pred_vary_proj(c, &chunk);

View File

@ -48,12 +48,32 @@ pub struct Chunk {
}
impl Chunk {
/// Initialises a new `Chunk` with the associated chunk ID.
pub fn new(table_name: impl Into<String>, metrics: ChunkMetrics) -> Self {
Self {
/// Start a new Chunk from the given record batch.
pub fn new(
table_name: impl Into<String>,
table_data: RecordBatch,
metrics: ChunkMetrics,
) -> Self {
let mut c = Self {
metrics,
table: Table::new(table_name.into()),
}
};
c.upsert_table(table_data);
c
}
// Only used in tests and benchmarks
pub(crate) fn new_from_row_group(
table_name: impl Into<String>,
row_group: RowGroup,
metrics: ChunkMetrics,
) -> Self {
let mut c = Self {
metrics,
table: Table::new(table_name.into()),
};
c.table.add_row_group(row_group);
c
}
// The total size taken up by an empty instance of `Chunk`.
@ -625,10 +645,7 @@ mod test {
let domain =
registry.register_domain_with_labels("read_buffer", vec![KeyValue::new("db", "mydb")]);
let mut chunk = Chunk::new("a_table", ChunkMetrics::new(&domain));
// Add a new table to the chunk.
chunk.upsert_table(gen_recordbatch());
let mut chunk = Chunk::new("a_table", gen_recordbatch(), ChunkMetrics::new(&domain));
assert_eq!(chunk.rows(), 3);
assert_eq!(chunk.row_groups(), 1);
@ -739,10 +756,11 @@ mod test {
#[test]
fn read_filter_table_schema() {
let mut chunk = Chunk::new("a_table", ChunkMetrics::new_unregistered());
// Add a new table to the chunk.
chunk.upsert_table(gen_recordbatch());
let chunk = Chunk::new(
"a_table",
gen_recordbatch(),
ChunkMetrics::new_unregistered(),
);
let schema = chunk.read_filter_table_schema(Selection::All).unwrap();
let exp_schema: Arc<Schema> = SchemaBuilder::new()
@ -778,8 +796,6 @@ mod test {
#[test]
fn table_summaries() {
let mut chunk = Chunk::new("a_table", ChunkMetrics::new_unregistered());
let schema = SchemaBuilder::new()
.non_null_tag("env")
.non_null_field("temp", Float64)
@ -811,7 +827,7 @@ mod test {
// Add a record batch to a single partition
let rb = RecordBatch::try_new(schema.into(), data).unwrap();
// The row group gets added to the same chunk each time.
chunk.upsert_table(rb);
let chunk = Chunk::new("a_table", rb, ChunkMetrics::new_unregistered());
let summary = chunk.table_summary();
assert_eq!("a_table", summary.name);
@ -874,7 +890,7 @@ mod test {
#[test]
fn read_filter() {
let mut chunk = Chunk::new("Coolverine", ChunkMetrics::new_unregistered());
let mut chunk: Option<Chunk> = None;
// Add a bunch of row groups to a single table in a single chunk
for &i in &[100, 200, 300] {
@ -916,9 +932,23 @@ mod test {
// Add a record batch to a single partition
let rb = RecordBatch::try_new(schema.into(), data).unwrap();
chunk.upsert_table(rb);
// First time through the loop, create a new Chunk. Other times, upsert into the chunk.
match chunk {
Some(ref mut c) => c.upsert_table(rb),
None => {
chunk = Some(Chunk::new(
"Coolverine",
rb,
ChunkMetrics::new_unregistered(),
))
}
}
}
// Chunk should be initialized now.
let chunk = chunk.unwrap();
// Build the operation equivalent to the following query:
//
// SELECT * FROM "table_1"
@ -968,10 +998,11 @@ mod test {
#[test]
fn could_pass_predicate() {
let mut chunk = Chunk::new("a_table", ChunkMetrics::new_unregistered());
// Add table data to the chunk.
chunk.upsert_table(gen_recordbatch());
let chunk = Chunk::new(
"a_table",
gen_recordbatch(),
ChunkMetrics::new_unregistered(),
);
assert!(
chunk.could_pass_predicate(Predicate::new(vec![BinaryExpr::from((
@ -994,8 +1025,7 @@ mod test {
];
let rg = RowGroup::new(6, columns);
let mut chunk = Chunk::new("table_1", ChunkMetrics::new_unregistered());
chunk.table.add_row_group(rg);
let chunk = Chunk::new_from_row_group("table_1", rg, ChunkMetrics::new_unregistered());
// No predicate so at least one row matches
assert!(chunk.satisfies_predicate(&Predicate::default()));
@ -1021,8 +1051,6 @@ mod test {
#[test]
fn column_names() {
let mut chunk = Chunk::new("Utopia", ChunkMetrics::new_unregistered());
let schema = SchemaBuilder::new()
.non_null_tag("region")
.non_null_field("counter", Float64)
@ -1046,9 +1074,9 @@ mod test {
Arc::new(Float64Array::from(vec![Some(11.0), None, Some(12.0)])),
];
// Add the above table to the chunk
// Create the chunk with the above table
let rb = RecordBatch::try_new(schema, data).unwrap();
chunk.upsert_table(rb);
let chunk = Chunk::new("Utopia", rb, ChunkMetrics::new_unregistered());
let result = chunk
.column_names(Predicate::default(), Selection::All, BTreeSet::new())
@ -1089,8 +1117,6 @@ mod test {
#[test]
fn column_values() {
let mut chunk = Chunk::new("my_table", ChunkMetrics::new_unregistered());
let schema = SchemaBuilder::new()
.non_null_tag("region")
.non_null_tag("env")
@ -1116,9 +1142,9 @@ mod test {
)),
];
// Add the above table to a chunk and partition
// Create the chunk with the above table
let rb = RecordBatch::try_new(schema, data).unwrap();
chunk.upsert_table(rb);
let chunk = Chunk::new("my_table", rb, ChunkMetrics::new_unregistered());
let result = chunk
.column_values(

View File

@ -29,14 +29,10 @@ pub mod benchmarks {
Column, RowIDs,
};
pub use crate::row_group::{ColumnType, RowGroup};
use crate::RBChunk;
use crate::{ChunkMetrics, RBChunk};
// Allow external benchmarks to use this crate-only test method
pub fn upsert_table_with_row_group(
chunk: &mut RBChunk,
_table_name: impl Into<String>,
row_group: RowGroup,
) {
chunk.upsert_table_with_row_group(row_group)
pub fn new_from_row_group(table_name: impl Into<String>, row_group: RowGroup) -> RBChunk {
RBChunk::new_from_row_group(table_name, row_group, ChunkMetrics::new_unregistered())
}
}

View File

@ -335,8 +335,8 @@ fn collect_rub(
stream: SendableRecordBatchStream,
db: &Db,
table_name: &str,
) -> impl futures::Future<Output = Result<read_buffer::RBChunk>> {
use futures::{future, TryStreamExt};
) -> impl futures::Future<Output = Result<Option<read_buffer::RBChunk>>> {
use futures::{future, StreamExt, TryStreamExt};
let table_name = table_name.to_string();
let metrics = db
@ -345,17 +345,23 @@ fn collect_rub(
let chunk_metrics = read_buffer::ChunkMetrics::new(&metrics);
async {
let mut chunk = read_buffer::RBChunk::new(table_name, chunk_metrics);
let mut adapted_stream = stream.try_filter(|batch| future::ready(batch.num_rows() > 0));
stream
.try_filter(|batch| future::ready(batch.num_rows() > 0))
let first_batch = match adapted_stream.next().await {
Some(rb_result) => rb_result?,
// At least one RecordBatch is required to create a read_buffer::Chunk
None => return Ok(None),
};
let mut chunk = read_buffer::RBChunk::new(table_name, first_batch, chunk_metrics);
adapted_stream
.try_for_each(|batch| {
chunk.upsert_table(batch);
future::ready(Ok(()))
})
.await?;
Ok(chunk)
Ok(Some(chunk))
}
}

View File

@ -75,7 +75,9 @@ pub(crate) fn compact_chunks(
let physical_plan = ctx.prepare_plan(&plan)?;
let stream = ctx.execute(physical_plan).await?;
let rb_chunk = collect_rub(stream, &db, &table_name).await?;
let rb_chunk = collect_rub(stream, &db, &table_name)
.await?
.expect("chunk has zero rows");
let rb_row_groups = rb_chunk.row_groups();
let new_chunk = {

View File

@ -64,6 +64,9 @@ pub fn move_chunk_to_read_buffer(
// Can drop and re-acquire as lifecycle action prevents concurrent modification
let mut guard = chunk.write();
let rb_chunk =
rb_chunk.expect("Chunks moving to the read buffer should have at least one row");
// update the catalog to say we are done processing
guard
.set_moved(Arc::new(rb_chunk), schema)

View File

@ -92,8 +92,8 @@ pub fn persist_chunks(
)
.await?;
let persisted_rows = to_persist.rows();
let remainder_rows = remainder.rows();
let persisted_rows = to_persist.as_ref().map(|p| p.rows()).unwrap_or(0);
let remainder_rows = remainder.as_ref().map(|r| r.rows()).unwrap_or(0);
let persist_fut = {
let partition = LockableCatalogPartition::new(Arc::clone(&db), partition);
@ -103,11 +103,11 @@ pub fn persist_chunks(
}
// Upsert remainder to catalog
if remainder.rows() > 0 {
if let Some(remainder) = remainder {
partition_write.create_rub_chunk(remainder, Arc::clone(&schema));
}
assert!(to_persist.rows() > 0);
let to_persist = to_persist.expect("should be rows to persist");
let to_persist = LockableCatalogChunk {
db,