diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs index d198bcdbf3..1bec696cc7 100644 --- a/mutable_buffer/src/chunk.rs +++ b/mutable_buffer/src/chunk.rs @@ -89,8 +89,14 @@ pub struct MBChunk { } impl MBChunk { - pub fn new(table_name: impl AsRef, metrics: ChunkMetrics) -> Self { - let table_name = Arc::from(table_name.as_ref()); + /// Create a new batch and write the contents of the [`TableBatch`] into it. Chunks + /// shouldn't exist without some data. + pub fn new( + metrics: ChunkMetrics, + sequence: Option<&Sequence>, + batch: TableBatch<'_>, + ) -> Result { + let table_name = Arc::from(batch.name()); let mut chunk = Self { table_name, @@ -98,8 +104,13 @@ impl MBChunk { metrics, snapshot: Mutex::new(None), }; + + let columns = batch.columns(); + chunk.write_columns(sequence, columns)?; + chunk.metrics.memory_bytes.set(chunk.size()); - chunk + + Ok(chunk) } /// Write the contents of a [`TableBatch`] into this Chunk. @@ -362,11 +373,49 @@ pub mod test_helpers { Ok(()) } + + pub fn write_lp_to_new_chunk(lp: &str) -> Result { + let entry = lp_to_entry(lp); + let mut chunk: Option = None; + + for w in entry.partition_writes().unwrap() { + let table_batches = w.table_batches(); + // ensure they are all to the same table + let table_names: BTreeSet = + table_batches.iter().map(|b| b.name().to_string()).collect(); + + assert!( + table_names.len() <= 1, + "Can only write 0 or one tables to chunk. Found {:?}", + table_names + ); + + for batch in table_batches { + let seq = Some(Sequence::new(1, 5)); + + match chunk { + Some(ref mut c) => c.write_table_batch(seq.as_ref(), batch)?, + None => { + chunk = Some(MBChunk::new( + ChunkMetrics::new_unregistered(), + seq.as_ref(), + batch, + )?); + } + } + } + } + + Ok(chunk.expect("Must write at least one table batch to create a chunk")) + } } #[cfg(test)] mod tests { - use super::{test_helpers::write_lp_to_chunk, *}; + use super::{ + test_helpers::{write_lp_to_chunk, write_lp_to_new_chunk}, + *, + }; use arrow::datatypes::DataType as ArrowDataType; use arrow_util::assert_batches_eq; use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}; @@ -376,11 +425,8 @@ mod tests { #[test] fn writes_table_batches() { - let mut chunk = MBChunk::new("cpu", ChunkMetrics::new_unregistered()); - let lp = vec!["cpu,host=a val=23 1", "cpu,host=b val=2 1"].join("\n"); - - write_lp_to_chunk(&lp, &mut chunk).unwrap(); + let chunk = write_lp_to_new_chunk(&lp).unwrap(); assert_batches_eq!( vec![ @@ -397,18 +443,13 @@ mod tests { #[test] fn writes_table_3_batches() { - let mut chunk = MBChunk::new("cpu", ChunkMetrics::new_unregistered()); - let lp = vec!["cpu,host=a val=23 1", "cpu,host=b val=2 1"].join("\n"); - - write_lp_to_chunk(&lp, &mut chunk).unwrap(); + let mut chunk = write_lp_to_new_chunk(&lp).unwrap(); let lp = vec!["cpu,host=c val=11 1"].join("\n"); - write_lp_to_chunk(&lp, &mut chunk).unwrap(); let lp = vec!["cpu,host=a val=14 2"].join("\n"); - write_lp_to_chunk(&lp, &mut chunk).unwrap(); assert_batches_eq!( @@ -428,14 +469,13 @@ mod tests { #[test] fn test_summary() { - let mut chunk = MBChunk::new("cpu", ChunkMetrics::new_unregistered()); let lp = r#" cpu,host=a val=23 1 cpu,host=b,env=prod val=2 1 cpu,host=c,env=stage val=11 1 cpu,host=a,env=prod val=14 2 "#; - write_lp_to_chunk(&lp, &mut chunk).unwrap(); + let chunk = write_lp_to_new_chunk(&lp).unwrap(); let summary = chunk.table_summary(); assert_eq!( @@ -491,11 +531,9 @@ mod tests { #[test] #[cfg(not(feature = "nocache"))] fn test_snapshot() { - let mut chunk = MBChunk::new("cpu", ChunkMetrics::new_unregistered()); - let lp = vec!["cpu,host=a val=23 1", "cpu,host=b val=2 1"].join("\n"); + let mut chunk = write_lp_to_new_chunk(&lp).unwrap(); - write_lp_to_chunk(&lp, &mut chunk).unwrap(); let s1 = chunk.snapshot(); let s2 = chunk.snapshot(); @@ -514,15 +552,12 @@ mod tests { #[test] fn table_size() { - let mut chunk = MBChunk::new("h2o", ChunkMetrics::new_unregistered()); - let lp = vec![ "h2o,state=MA,city=Boston temp=70.4 100", "h2o,state=MA,city=Boston temp=72.4 250", ] .join("\n"); - - write_lp_to_chunk(&lp, &mut chunk).unwrap(); + let mut chunk = write_lp_to_new_chunk(&lp).unwrap(); let s1 = chunk.size(); write_lp_to_chunk(&lp, &mut chunk).unwrap(); @@ -537,11 +572,8 @@ mod tests { #[test] fn test_to_arrow_schema_all() { - let mut chunk = MBChunk::new("h2o", ChunkMetrics::new_unregistered()); - let lp = "h2o,state=MA,city=Boston float_field=70.4,int_field=8i,uint_field=42u,bool_field=t,string_field=\"foo\" 100"; - - write_lp_to_chunk(lp, &mut chunk).unwrap(); + let chunk = write_lp_to_new_chunk(lp).unwrap(); let selection = Selection::All; let actual_schema = chunk.schema(selection).unwrap(); @@ -566,11 +598,8 @@ mod tests { #[test] fn test_to_arrow_schema_subset() { - let mut chunk = MBChunk::new("h2o", ChunkMetrics::new_unregistered()); - let lp = "h2o,state=MA,city=Boston float_field=70.4 100"; - - write_lp_to_chunk(lp, &mut chunk).unwrap(); + let chunk = write_lp_to_new_chunk(lp).unwrap(); let selection = Selection::Some(&["float_field"]); let actual_schema = chunk.schema(selection).unwrap(); @@ -588,27 +617,12 @@ mod tests { #[test] fn write_columns_validates_schema() { - let mut table = MBChunk::new("table_name", ChunkMetrics::new_unregistered()); let sequencer_id = 1; let sequence_number = 5; let sequence = Some(Sequence::new(sequencer_id, sequence_number)); let lp = "foo,t1=asdf iv=1i,uv=1u,fv=1.0,bv=true,sv=\"hi\" 1"; - let entry = lp_to_entry(&lp); - table - .write_columns( - sequence.as_ref(), - entry - .partition_writes() - .unwrap() - .first() - .unwrap() - .table_batches() - .first() - .unwrap() - .columns(), - ) - .unwrap(); + let mut table = write_lp_to_new_chunk(lp).unwrap(); let lp = "foo t1=\"string\" 1"; let entry = lp_to_entry(&lp); diff --git a/server/src/db.rs b/server/src/db.rs index eece23ef64..ab0f215a27 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -764,25 +764,27 @@ impl Db { "mutable_buffer", self.metric_labels.clone(), ); - let mut mb_chunk = MBChunk::new( - table_batch.name(), + let chunk_result = MBChunk::new( MutableBufferChunkMetrics::new( &metrics, self.catalog.metrics().memory().mutable_buffer(), ), - ); + sequence, + table_batch, + ) + .context(WriteEntryInitial { partition_key }); - if let Err(e) = mb_chunk - .write_table_batch(sequence, table_batch) - .context(WriteEntryInitial { partition_key }) - { - if errors.len() < MAX_ERRORS_PER_SEQUENCED_ENTRY { - errors.push(e); + match chunk_result { + Ok(mb_chunk) => { + partition.create_open_chunk(mb_chunk); + } + Err(e) => { + if errors.len() < MAX_ERRORS_PER_SEQUENCED_ENTRY { + errors.push(e); + } + continue; } - continue; } - - partition.create_open_chunk(mb_chunk); } }; partition.update_last_write_at(); diff --git a/server/src/db/catalog.rs b/server/src/db/catalog.rs index 758eb56b9e..e7e7fe8ab6 100644 --- a/server/src/db/catalog.rs +++ b/server/src/db/catalog.rs @@ -323,15 +323,13 @@ mod tests { let write = entry.partition_writes().unwrap().remove(0); let batch = write.table_batches().remove(0); - let mut mb_chunk = mutable_buffer::chunk::MBChunk::new( - batch.name(), - mutable_buffer::chunk::ChunkMetrics::new_unregistered(), - ); - let sequence = Some(Sequence::new(1, 5)); - mb_chunk - .write_table_batch(sequence.as_ref(), batch) - .unwrap(); + let mb_chunk = mutable_buffer::chunk::MBChunk::new( + mutable_buffer::chunk::ChunkMetrics::new_unregistered(), + sequence.as_ref(), + batch, + ) + .unwrap(); partition.create_open_chunk(mb_chunk); } diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index fff1ae9282..9367745505 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -245,7 +245,6 @@ impl CatalogChunk { chunk: mutable_buffer::chunk::MBChunk, metrics: ChunkMetrics, ) -> Self { - assert!(chunk.rows() > 0, "chunk must not be empty"); assert_eq!(chunk.table_name(), &addr.table_name); let stage = ChunkStage::Open { mb_chunk: chunk }; @@ -275,8 +274,6 @@ impl CatalogChunk { schema: Schema, metrics: ChunkMetrics, ) -> Self { - assert!(chunk.rows() > 0, "chunk must not be empty"); - // TODO: Move RUB to single table (#1295) let summaries = chunk.table_summaries(); assert_eq!(summaries.len(), 1); @@ -834,15 +831,6 @@ mod tests { assert!(matches!(chunk.stage(), &ChunkStage::Open { .. })); } - #[test] - #[should_panic(expected = "chunk must not be empty")] - fn test_new_open_empty() { - let addr = chunk_addr(); - // fails with empty MBChunk - let mb_chunk = MBChunk::new(&addr.table_name, MBChunkMetrics::new_unregistered()); - CatalogChunk::new_open(addr, mb_chunk, ChunkMetrics::new_unregistered()); - } - #[tokio::test] async fn test_freeze() { let mut chunk = make_open_chunk(); @@ -927,15 +915,12 @@ mod tests { } fn make_mb_chunk(table_name: &str, sequencer_id: u32) -> MBChunk { - let mut mb_chunk = MBChunk::new(table_name, MBChunkMetrics::new_unregistered()); let entry = lp_to_entry(&format!("{} bar=1 10", table_name)); let write = entry.partition_writes().unwrap().remove(0); let batch = write.table_batches().remove(0); let sequence = Some(Sequence::new(sequencer_id, 1)); - mb_chunk - .write_table_batch(sequence.as_ref(), batch) - .unwrap(); - mb_chunk + + MBChunk::new(MBChunkMetrics::new_unregistered(), sequence.as_ref(), batch).unwrap() } async fn make_parquet_chunk(addr: ChunkAddr) -> ParquetChunk { diff --git a/server_benchmarks/benches/snapshot.rs b/server_benchmarks/benches/snapshot.rs index aeec73e72c..b643bfd0e0 100644 --- a/server_benchmarks/benches/snapshot.rs +++ b/server_benchmarks/benches/snapshot.rs @@ -13,8 +13,7 @@ fn snapshot_chunk(chunk: &MBChunk) { } fn chunk(count: usize) -> MBChunk { - // m0 is hard coded into tag_values.lp.gz - let mut chunk = MBChunk::new("m0", ChunkMetrics::new_unregistered()); + let mut chunk: Option = None; let raw = include_bytes!("../../tests/fixtures/lineproto/tag_values.lp.gz"); let mut gz = GzDecoder::new(&raw[..]); @@ -26,13 +25,27 @@ fn chunk(count: usize) -> MBChunk { for entry in lp_to_entries(&lp, &hour_partitioner()) { for write in entry.partition_writes().iter().flatten() { for batch in write.table_batches() { - chunk.write_table_batch(sequence.as_ref(), batch).unwrap(); + match chunk { + Some(ref mut c) => { + c.write_table_batch(sequence.as_ref(), batch).unwrap(); + } + None => { + chunk = Some( + MBChunk::new( + ChunkMetrics::new_unregistered(), + sequence.as_ref(), + batch, + ) + .unwrap(), + ); + } + } } } } } - chunk + chunk.expect("Must write at least one table batch to create a chunk") } pub fn snapshot_mb(c: &mut Criterion) { diff --git a/server_benchmarks/benches/write.rs b/server_benchmarks/benches/write.rs index 52c457b0df..b86c14a821 100644 --- a/server_benchmarks/benches/write.rs +++ b/server_benchmarks/benches/write.rs @@ -9,15 +9,28 @@ use std::io::Read; #[inline] fn write_chunk(count: usize, entries: &[Entry]) { - // m0 is hard coded into tag_values.lp.gz - let mut chunk = MBChunk::new("m0", ChunkMetrics::new_unregistered()); + let mut chunk: Option = None; let sequence = Some(Sequence::new(1, 5)); for _ in 0..count { for entry in entries { for write in entry.partition_writes().iter().flatten() { for batch in write.table_batches() { - chunk.write_table_batch(sequence.as_ref(), batch).unwrap(); + match chunk { + Some(ref mut c) => { + c.write_table_batch(sequence.as_ref(), batch).unwrap(); + } + None => { + chunk = Some( + MBChunk::new( + ChunkMetrics::new_unregistered(), + sequence.as_ref(), + batch, + ) + .unwrap(), + ); + } + } } } }