fix: Take a TableBatch in the MBChunk constructor

Thus ensuring all MBChunks will have data in them.
pull/24376/head
Carol (Nichols || Goulding) 2021-07-08 15:49:18 -04:00
parent 57501e7f59
commit 22495dd355
6 changed files with 116 additions and 91 deletions

View File

@ -89,8 +89,14 @@ pub struct MBChunk {
}
impl MBChunk {
pub fn new(table_name: impl AsRef<str>, metrics: ChunkMetrics) -> Self {
let table_name = Arc::from(table_name.as_ref());
/// Create a new batch and write the contents of the [`TableBatch`] into it. Chunks
/// shouldn't exist without some data.
pub fn new(
metrics: ChunkMetrics,
sequence: Option<&Sequence>,
batch: TableBatch<'_>,
) -> Result<Self> {
let table_name = Arc::from(batch.name());
let mut chunk = Self {
table_name,
@ -98,8 +104,13 @@ impl MBChunk {
metrics,
snapshot: Mutex::new(None),
};
let columns = batch.columns();
chunk.write_columns(sequence, columns)?;
chunk.metrics.memory_bytes.set(chunk.size());
chunk
Ok(chunk)
}
/// Write the contents of a [`TableBatch`] into this Chunk.
@ -362,11 +373,49 @@ pub mod test_helpers {
Ok(())
}
pub fn write_lp_to_new_chunk(lp: &str) -> Result<MBChunk> {
let entry = lp_to_entry(lp);
let mut chunk: Option<MBChunk> = None;
for w in entry.partition_writes().unwrap() {
let table_batches = w.table_batches();
// ensure they are all to the same table
let table_names: BTreeSet<String> =
table_batches.iter().map(|b| b.name().to_string()).collect();
assert!(
table_names.len() <= 1,
"Can only write 0 or one tables to chunk. Found {:?}",
table_names
);
for batch in table_batches {
let seq = Some(Sequence::new(1, 5));
match chunk {
Some(ref mut c) => c.write_table_batch(seq.as_ref(), batch)?,
None => {
chunk = Some(MBChunk::new(
ChunkMetrics::new_unregistered(),
seq.as_ref(),
batch,
)?);
}
}
}
}
Ok(chunk.expect("Must write at least one table batch to create a chunk"))
}
}
#[cfg(test)]
mod tests {
use super::{test_helpers::write_lp_to_chunk, *};
use super::{
test_helpers::{write_lp_to_chunk, write_lp_to_new_chunk},
*,
};
use arrow::datatypes::DataType as ArrowDataType;
use arrow_util::assert_batches_eq;
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics};
@ -376,11 +425,8 @@ mod tests {
#[test]
fn writes_table_batches() {
let mut chunk = MBChunk::new("cpu", ChunkMetrics::new_unregistered());
let lp = vec!["cpu,host=a val=23 1", "cpu,host=b val=2 1"].join("\n");
write_lp_to_chunk(&lp, &mut chunk).unwrap();
let chunk = write_lp_to_new_chunk(&lp).unwrap();
assert_batches_eq!(
vec![
@ -397,18 +443,13 @@ mod tests {
#[test]
fn writes_table_3_batches() {
let mut chunk = MBChunk::new("cpu", ChunkMetrics::new_unregistered());
let lp = vec!["cpu,host=a val=23 1", "cpu,host=b val=2 1"].join("\n");
write_lp_to_chunk(&lp, &mut chunk).unwrap();
let mut chunk = write_lp_to_new_chunk(&lp).unwrap();
let lp = vec!["cpu,host=c val=11 1"].join("\n");
write_lp_to_chunk(&lp, &mut chunk).unwrap();
let lp = vec!["cpu,host=a val=14 2"].join("\n");
write_lp_to_chunk(&lp, &mut chunk).unwrap();
assert_batches_eq!(
@ -428,14 +469,13 @@ mod tests {
#[test]
fn test_summary() {
let mut chunk = MBChunk::new("cpu", ChunkMetrics::new_unregistered());
let lp = r#"
cpu,host=a val=23 1
cpu,host=b,env=prod val=2 1
cpu,host=c,env=stage val=11 1
cpu,host=a,env=prod val=14 2
"#;
write_lp_to_chunk(&lp, &mut chunk).unwrap();
let chunk = write_lp_to_new_chunk(&lp).unwrap();
let summary = chunk.table_summary();
assert_eq!(
@ -491,11 +531,9 @@ mod tests {
#[test]
#[cfg(not(feature = "nocache"))]
fn test_snapshot() {
let mut chunk = MBChunk::new("cpu", ChunkMetrics::new_unregistered());
let lp = vec!["cpu,host=a val=23 1", "cpu,host=b val=2 1"].join("\n");
let mut chunk = write_lp_to_new_chunk(&lp).unwrap();
write_lp_to_chunk(&lp, &mut chunk).unwrap();
let s1 = chunk.snapshot();
let s2 = chunk.snapshot();
@ -514,15 +552,12 @@ mod tests {
#[test]
fn table_size() {
let mut chunk = MBChunk::new("h2o", ChunkMetrics::new_unregistered());
let lp = vec![
"h2o,state=MA,city=Boston temp=70.4 100",
"h2o,state=MA,city=Boston temp=72.4 250",
]
.join("\n");
write_lp_to_chunk(&lp, &mut chunk).unwrap();
let mut chunk = write_lp_to_new_chunk(&lp).unwrap();
let s1 = chunk.size();
write_lp_to_chunk(&lp, &mut chunk).unwrap();
@ -537,11 +572,8 @@ mod tests {
#[test]
fn test_to_arrow_schema_all() {
let mut chunk = MBChunk::new("h2o", ChunkMetrics::new_unregistered());
let lp = "h2o,state=MA,city=Boston float_field=70.4,int_field=8i,uint_field=42u,bool_field=t,string_field=\"foo\" 100";
write_lp_to_chunk(lp, &mut chunk).unwrap();
let chunk = write_lp_to_new_chunk(lp).unwrap();
let selection = Selection::All;
let actual_schema = chunk.schema(selection).unwrap();
@ -566,11 +598,8 @@ mod tests {
#[test]
fn test_to_arrow_schema_subset() {
let mut chunk = MBChunk::new("h2o", ChunkMetrics::new_unregistered());
let lp = "h2o,state=MA,city=Boston float_field=70.4 100";
write_lp_to_chunk(lp, &mut chunk).unwrap();
let chunk = write_lp_to_new_chunk(lp).unwrap();
let selection = Selection::Some(&["float_field"]);
let actual_schema = chunk.schema(selection).unwrap();
@ -588,27 +617,12 @@ mod tests {
#[test]
fn write_columns_validates_schema() {
let mut table = MBChunk::new("table_name", ChunkMetrics::new_unregistered());
let sequencer_id = 1;
let sequence_number = 5;
let sequence = Some(Sequence::new(sequencer_id, sequence_number));
let lp = "foo,t1=asdf iv=1i,uv=1u,fv=1.0,bv=true,sv=\"hi\" 1";
let entry = lp_to_entry(&lp);
table
.write_columns(
sequence.as_ref(),
entry
.partition_writes()
.unwrap()
.first()
.unwrap()
.table_batches()
.first()
.unwrap()
.columns(),
)
.unwrap();
let mut table = write_lp_to_new_chunk(lp).unwrap();
let lp = "foo t1=\"string\" 1";
let entry = lp_to_entry(&lp);

View File

@ -764,25 +764,27 @@ impl Db {
"mutable_buffer",
self.metric_labels.clone(),
);
let mut mb_chunk = MBChunk::new(
table_batch.name(),
let chunk_result = MBChunk::new(
MutableBufferChunkMetrics::new(
&metrics,
self.catalog.metrics().memory().mutable_buffer(),
),
);
sequence,
table_batch,
)
.context(WriteEntryInitial { partition_key });
if let Err(e) = mb_chunk
.write_table_batch(sequence, table_batch)
.context(WriteEntryInitial { partition_key })
{
if errors.len() < MAX_ERRORS_PER_SEQUENCED_ENTRY {
errors.push(e);
match chunk_result {
Ok(mb_chunk) => {
partition.create_open_chunk(mb_chunk);
}
Err(e) => {
if errors.len() < MAX_ERRORS_PER_SEQUENCED_ENTRY {
errors.push(e);
}
continue;
}
continue;
}
partition.create_open_chunk(mb_chunk);
}
};
partition.update_last_write_at();

View File

@ -323,15 +323,13 @@ mod tests {
let write = entry.partition_writes().unwrap().remove(0);
let batch = write.table_batches().remove(0);
let mut mb_chunk = mutable_buffer::chunk::MBChunk::new(
batch.name(),
mutable_buffer::chunk::ChunkMetrics::new_unregistered(),
);
let sequence = Some(Sequence::new(1, 5));
mb_chunk
.write_table_batch(sequence.as_ref(), batch)
.unwrap();
let mb_chunk = mutable_buffer::chunk::MBChunk::new(
mutable_buffer::chunk::ChunkMetrics::new_unregistered(),
sequence.as_ref(),
batch,
)
.unwrap();
partition.create_open_chunk(mb_chunk);
}

View File

@ -245,7 +245,6 @@ impl CatalogChunk {
chunk: mutable_buffer::chunk::MBChunk,
metrics: ChunkMetrics,
) -> Self {
assert!(chunk.rows() > 0, "chunk must not be empty");
assert_eq!(chunk.table_name(), &addr.table_name);
let stage = ChunkStage::Open { mb_chunk: chunk };
@ -275,8 +274,6 @@ impl CatalogChunk {
schema: Schema,
metrics: ChunkMetrics,
) -> Self {
assert!(chunk.rows() > 0, "chunk must not be empty");
// TODO: Move RUB to single table (#1295)
let summaries = chunk.table_summaries();
assert_eq!(summaries.len(), 1);
@ -834,15 +831,6 @@ mod tests {
assert!(matches!(chunk.stage(), &ChunkStage::Open { .. }));
}
#[test]
#[should_panic(expected = "chunk must not be empty")]
fn test_new_open_empty() {
let addr = chunk_addr();
// fails with empty MBChunk
let mb_chunk = MBChunk::new(&addr.table_name, MBChunkMetrics::new_unregistered());
CatalogChunk::new_open(addr, mb_chunk, ChunkMetrics::new_unregistered());
}
#[tokio::test]
async fn test_freeze() {
let mut chunk = make_open_chunk();
@ -927,15 +915,12 @@ mod tests {
}
fn make_mb_chunk(table_name: &str, sequencer_id: u32) -> MBChunk {
let mut mb_chunk = MBChunk::new(table_name, MBChunkMetrics::new_unregistered());
let entry = lp_to_entry(&format!("{} bar=1 10", table_name));
let write = entry.partition_writes().unwrap().remove(0);
let batch = write.table_batches().remove(0);
let sequence = Some(Sequence::new(sequencer_id, 1));
mb_chunk
.write_table_batch(sequence.as_ref(), batch)
.unwrap();
mb_chunk
MBChunk::new(MBChunkMetrics::new_unregistered(), sequence.as_ref(), batch).unwrap()
}
async fn make_parquet_chunk(addr: ChunkAddr) -> ParquetChunk {

View File

@ -13,8 +13,7 @@ fn snapshot_chunk(chunk: &MBChunk) {
}
fn chunk(count: usize) -> MBChunk {
// m0 is hard coded into tag_values.lp.gz
let mut chunk = MBChunk::new("m0", ChunkMetrics::new_unregistered());
let mut chunk: Option<MBChunk> = None;
let raw = include_bytes!("../../tests/fixtures/lineproto/tag_values.lp.gz");
let mut gz = GzDecoder::new(&raw[..]);
@ -26,13 +25,27 @@ fn chunk(count: usize) -> MBChunk {
for entry in lp_to_entries(&lp, &hour_partitioner()) {
for write in entry.partition_writes().iter().flatten() {
for batch in write.table_batches() {
chunk.write_table_batch(sequence.as_ref(), batch).unwrap();
match chunk {
Some(ref mut c) => {
c.write_table_batch(sequence.as_ref(), batch).unwrap();
}
None => {
chunk = Some(
MBChunk::new(
ChunkMetrics::new_unregistered(),
sequence.as_ref(),
batch,
)
.unwrap(),
);
}
}
}
}
}
}
chunk
chunk.expect("Must write at least one table batch to create a chunk")
}
pub fn snapshot_mb(c: &mut Criterion) {

View File

@ -9,15 +9,28 @@ use std::io::Read;
#[inline]
fn write_chunk(count: usize, entries: &[Entry]) {
// m0 is hard coded into tag_values.lp.gz
let mut chunk = MBChunk::new("m0", ChunkMetrics::new_unregistered());
let mut chunk: Option<MBChunk> = None;
let sequence = Some(Sequence::new(1, 5));
for _ in 0..count {
for entry in entries {
for write in entry.partition_writes().iter().flatten() {
for batch in write.table_batches() {
chunk.write_table_batch(sequence.as_ref(), batch).unwrap();
match chunk {
Some(ref mut c) => {
c.write_table_batch(sequence.as_ref(), batch).unwrap();
}
None => {
chunk = Some(
MBChunk::new(
ChunkMetrics::new_unregistered(),
sequence.as_ref(),
batch,
)
.unwrap(),
);
}
}
}
}
}