From 31115742ecc2b6e23593024dc4fe9d9d45bd293e Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Sat, 10 Apr 2021 13:38:03 -0400 Subject: [PATCH] feat: Add writing of Entry structures to MB Chunk This adds writing of Entry of a vec of TableWriteBatch to the Mutable Buffer Chunk. This is additional to the previous method of writing via ReplicatedWrite. The next step is to remove the old ReplicatedWrite bits. Test helpers for parsing line protocol into Entry and writing line protocol directly to Chunks have also been added. --- internal_types/src/entry.rs | 30 +++- mutable_buffer/src/chunk.rs | 155 +++++++++++++++++++- mutable_buffer/src/column.rs | 271 +++++++++++++++++++++++++++++++++++ mutable_buffer/src/table.rs | 250 ++++++++++++++++++++++++++++++++ 4 files changed, 701 insertions(+), 5 deletions(-) diff --git a/internal_types/src/entry.rs b/internal_types/src/entry.rs index acc067d53c..c5789dfcfe 100644 --- a/internal_types/src/entry.rs +++ b/internal_types/src/entry.rs @@ -430,7 +430,7 @@ impl<'a> TableBatch<'a> { #[derive(Debug)] pub struct Column<'a> { fb: entry_fb::Column<'a>, - row_count: usize, + pub row_count: usize, } impl<'a> Column<'a> { @@ -442,6 +442,18 @@ impl<'a> Column<'a> { self.fb.logical_column_type() } + pub fn is_tag(&self) -> bool { + self.fb.logical_column_type() == entry_fb::LogicalColumnType::Tag + } + + pub fn is_field(&self) -> bool { + self.fb.logical_column_type() == entry_fb::LogicalColumnType::Field + } + + pub fn is_time(&self) -> bool { + self.fb.logical_column_type() == entry_fb::LogicalColumnType::Time + } + pub fn values(&self) -> TypedValuesIterator<'a> { match self.fb.values_type() { entry_fb::ColumnValues::BoolValues => TypedValuesIterator::Bool(BoolIterator { @@ -564,12 +576,22 @@ impl<'a> TypedValuesIterator<'a> { _ => None, } } + + pub fn type_description(&self) -> &str { + match self { + Self::Bool(_) => "bool", + Self::I64(_) => "i64", + Self::F64(_) => "f64", + Self::U64(_) => "u64", + Self::String(_) => "String", + } + } } /// Iterator over the flatbuffers BoolValues #[derive(Debug)] pub struct BoolIterator<'a> { - row_count: usize, + pub row_count: usize, position: usize, null_mask: Option<&'a [u8]>, values: &'a [bool], @@ -599,7 +621,7 @@ impl<'a> Iterator for BoolIterator<'a> { /// Iterator over the flatbuffers I64Values, F64Values, and U64Values. #[derive(Debug)] pub struct ValIterator<'a, T: Follow<'a> + Follow<'a, Inner = T>> { - row_count: usize, + pub row_count: usize, position: usize, null_mask: Option<&'a [u8]>, values_iter: VectorIter<'a, T>, @@ -625,7 +647,7 @@ impl<'a, T: Follow<'a> + Follow<'a, Inner = T>> Iterator for ValIterator<'a, T> /// Iterator over the flatbuffers StringValues #[derive(Debug)] pub struct StringIterator<'a> { - row_count: usize, + pub row_count: usize, position: usize, null_mask: Option<&'a [u8]>, values: VectorIter<'a, ForwardsUOffset<&'a str>>, diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs index 14bacacbfe..269c887c11 100644 --- a/mutable_buffer/src/chunk.rs +++ b/mutable_buffer/src/chunk.rs @@ -6,7 +6,7 @@ use generated_types::wal as wb; use std::collections::{BTreeSet, HashMap}; use data_types::partition_metadata::TableSummary; -use internal_types::{schema::Schema, selection::Selection}; +use internal_types::{entry::TableBatch, schema::Schema, selection::Selection}; use crate::{ column::Column, @@ -14,6 +14,8 @@ use crate::{ pred::{ChunkPredicate, ChunkPredicateBuilder}, table::Table, }; +use data_types::database_rules::WriterId; +use internal_types::entry::ClockValue; use snafu::{OptionExt, ResultExt, Snafu}; use tracker::{MemRegistry, MemTracker}; @@ -151,6 +153,30 @@ impl Chunk { chunk } + pub fn write_table_batches( + &mut self, + clock_value: ClockValue, + writer_id: WriterId, + batches: &[TableBatch<'_>], + ) -> Result<()> { + for batch in batches { + let table_name = batch.name(); + let table_id = self.dictionary.lookup_value_or_insert(table_name); + + let table = self + .tables + .entry(table_id) + .or_insert_with(|| Table::new(table_id)); + + let columns = batch.columns(); + table + .write_columns(&mut self.dictionary, clock_value, writer_id, columns) + .context(TableWrite { table_name })?; + } + + Ok(()) + } + pub fn write_entry(&mut self, entry: &wb::WriteBufferEntry<'_>) -> Result<()> { if let Some(table_batches) = entry.table_batches() { for batch in table_batches { @@ -486,3 +512,130 @@ impl Chunk { matches!(self.table(table_name), Ok(Some(_))) } } + +pub mod test_helpers { + use super::*; + use internal_types::entry::test_helpers::lp_to_entry; + + /// A helper that will write line protocol string to the passed in Chunk. + /// All data will be under a single partition with a clock value and + /// writer id of 0. + pub fn write_lp_to_chunk(lp: &str, chunk: &mut Chunk) -> Result<()> { + let entry = lp_to_entry(lp); + + for w in entry.partition_writes().unwrap() { + chunk.write_table_batches(0, 0, &w.table_batches())?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::test_helpers::write_lp_to_chunk; + use super::*; + use arrow_deps::arrow::util::pretty::pretty_format_batches; + + #[test] + fn writes_table_batches() { + let mr = MemRegistry::new(); + let mut chunk = Chunk::new(1, &mr); + + let lp = vec![ + "cpu,host=a val=23 1", + "cpu,host=b val=2 1", + "mem,host=a val=23432i 1", + ] + .join("\n"); + + write_lp_to_chunk(&lp, &mut chunk).unwrap(); + + assert_table( + &chunk, + "cpu", + &[ + "+------+------+-----+", + "| host | time | val |", + "+------+------+-----+", + "| a | 1 | 23 |", + "| b | 1 | 2 |", + "+------+------+-----+\n", + ], + ); + + assert_table( + &chunk, + "mem", + &[ + "+------+------+-------+", + "| host | time | val |", + "+------+------+-------+", + "| a | 1 | 23432 |", + "+------+------+-------+\n", + ], + ); + + let lp = vec![ + "cpu,host=c val=11 1", + "mem sval=\"hi\" 2", + "disk val=true 1", + ] + .join("\n"); + + write_lp_to_chunk(&lp, &mut chunk).unwrap(); + + assert_table( + &chunk, + "cpu", + &[ + "+------+------+-----+", + "| host | time | val |", + "+------+------+-----+", + "| a | 1 | 23 |", + "| b | 1 | 2 |", + "| c | 1 | 11 |", + "+------+------+-----+\n", + ], + ); + + assert_table( + &chunk, + "disk", + &[ + "+------+------+", + "| time | val |", + "+------+------+", + "| 1 | true |", + "+------+------+\n", + ], + ); + + assert_table( + &chunk, + "mem", + &[ + "+------+------+------+-------+", + "| host | sval | time | val |", + "+------+------+------+-------+", + "| a | | 1 | 23432 |", + "| | hi | 2 | |", + "+------+------+------+-------+\n", + ], + ); + } + + fn assert_table(chunk: &Chunk, table: &str, data: &[&str]) { + let mut batches = vec![]; + chunk + .table_to_arrow(&mut batches, table, Selection::All) + .unwrap(); + let res = pretty_format_batches(&batches).unwrap(); + let data = data.join("\n"); + assert_eq!( + res, data, + "\n{} table results not as expected:\nEXPECTED:\n{}\nRECEIVED:\n{}", + table, data, res + ); + } +} diff --git a/mutable_buffer/src/column.rs b/mutable_buffer/src/column.rs index 2ba6a89ee9..360a8122fb 100644 --- a/mutable_buffer/src/column.rs +++ b/mutable_buffer/src/column.rs @@ -4,7 +4,9 @@ use snafu::Snafu; use crate::dictionary::Dictionary; use arrow_deps::arrow::datatypes::DataType as ArrowDataType; use data_types::partition_metadata::StatValues; +use generated_types::entry::LogicalColumnType; use internal_types::data::type_description; +use internal_types::entry::TypedValuesIterator; use std::mem; @@ -41,6 +43,275 @@ pub enum Column { } impl Column { + /// Initializes a new column from typed values, the column on a table write + /// batach on an Entry. Will initialize the stats with the first + /// non-null value and update with any other non-null values included. + pub fn new_from_typed_values( + dictionary: &mut Dictionary, + row_count: usize, + logical_type: LogicalColumnType, + values: TypedValuesIterator<'_>, + ) -> Self { + match values { + TypedValuesIterator::String(vals) => match logical_type { + LogicalColumnType::Tag => { + let mut tag_values = vec![None; row_count]; + let mut stats: Option> = None; + + for tag in vals { + let tag_id = match tag { + Some(tag) => { + match stats.as_mut() { + Some(s) => StatValues::update_string(s, tag), + None => { + stats = Some(StatValues::new(tag.to_string())); + } + } + + Some(dictionary.lookup_value_or_insert(tag)) + } + None => None, + }; + + tag_values.push(tag_id); + } + + Self::Tag( + tag_values, + stats.expect("can't insert tag column with no values"), + ) + } + LogicalColumnType::Field => { + let mut values = vec![None; row_count]; + let mut stats: Option> = None; + + for value in vals { + match value { + Some(v) => { + match stats.as_mut() { + Some(s) => StatValues::update_string(s, v), + None => stats = Some(StatValues::new(v.to_string())), + } + + values.push(Some(v.to_string())); + } + None => values.push(None), + } + } + + Self::String( + values, + stats.expect("can't insert string column with no values"), + ) + } + _ => panic!("unsupported!"), + }, + TypedValuesIterator::I64(vals) => { + let mut values = vec![None; row_count]; + let mut stats: Option> = None; + + for v in vals { + if let Some(val) = v { + match stats.as_mut() { + Some(s) => s.update(val), + None => stats = Some(StatValues::new(val)), + } + } + values.push(v); + } + + Self::I64( + values, + stats.expect("can't insert i64 column with no values"), + ) + } + TypedValuesIterator::F64(vals) => { + let mut values = vec![None; row_count]; + let mut stats: Option> = None; + + for v in vals { + if let Some(val) = v { + match stats.as_mut() { + Some(s) => s.update(val), + None => stats = Some(StatValues::new(val)), + } + } + values.push(v); + } + + Self::F64( + values, + stats.expect("can't insert f64 column with no values"), + ) + } + TypedValuesIterator::U64(vals) => { + let mut values = vec![None; row_count]; + let mut stats: Option> = None; + + for v in vals { + if let Some(val) = v { + match stats.as_mut() { + Some(s) => s.update(val), + None => stats = Some(StatValues::new(val)), + } + } + values.push(v); + } + + Self::U64( + values, + stats.expect("can't insert u64 column with no values"), + ) + } + TypedValuesIterator::Bool(vals) => { + let mut values = vec![None; row_count]; + let mut stats: Option> = None; + + for v in vals { + if let Some(val) = v { + match stats.as_mut() { + Some(s) => s.update(val), + None => stats = Some(StatValues::new(val)), + } + } + values.push(v); + } + + Self::Bool( + values, + stats.expect("can't insert bool column with no values"), + ) + } + } + } + + /// Pushes typed values, the column from a table write batch on an Entry. + /// Updates statsistics for any non-null values. + pub fn push_typed_values( + &mut self, + dictionary: &mut Dictionary, + logical_type: LogicalColumnType, + values: TypedValuesIterator<'_>, + ) -> Result<()> { + match (self, values) { + (Self::Bool(col, stats), TypedValuesIterator::Bool(values)) => { + for val in values { + if let Some(v) = val { + stats.update(v) + }; + col.push(val); + } + } + (Self::I64(col, stats), TypedValuesIterator::I64(values)) => { + for val in values { + if let Some(v) = val { + stats.update(v) + }; + col.push(val); + } + } + (Self::F64(col, stats), TypedValuesIterator::F64(values)) => { + for val in values { + if let Some(v) = val { + stats.update(v) + }; + col.push(val); + } + } + (Self::U64(col, stats), TypedValuesIterator::U64(values)) => { + for val in values { + if let Some(v) = val { + stats.update(v) + }; + col.push(val); + } + } + (Self::String(col, stats), TypedValuesIterator::String(values)) => { + if logical_type != LogicalColumnType::Field { + TypeMismatch { + existing_column_type: "String", + inserted_value_type: "tag", + } + .fail()?; + } + + for val in values { + match val { + Some(v) => { + StatValues::update_string(stats, v); + col.push(Some(v.to_string())); + } + None => col.push(None), + } + } + } + (Self::Tag(col, stats), TypedValuesIterator::String(values)) => { + if logical_type != LogicalColumnType::Tag { + TypeMismatch { + existing_column_type: "tag", + inserted_value_type: "String", + } + .fail()?; + } + + for val in values { + match val { + Some(v) => { + StatValues::update_string(stats, v); + let id = dictionary.lookup_value_or_insert(v); + col.push(Some(id)); + } + None => col.push(None), + } + } + } + (existing, values) => TypeMismatch { + existing_column_type: existing.type_description(), + inserted_value_type: values.type_description(), + } + .fail()?, + } + + Ok(()) + } + + /// Pushes None values onto the column until its len is equal to that passed + /// in + pub fn push_nulls_to_len(&mut self, len: usize) { + match self { + Self::Tag(vals, _) => { + while vals.len() < len { + vals.push(None); + } + } + Self::I64(vals, _) => { + while vals.len() < len { + vals.push(None); + } + } + Self::F64(vals, _) => { + while vals.len() < len { + vals.push(None); + } + } + Self::U64(vals, _) => { + while vals.len() < len { + vals.push(None); + } + } + Self::Bool(vals, _) => { + while vals.len() < len { + vals.push(None); + } + } + Self::String(vals, _) => { + while vals.len() < len { + vals.push(None); + } + } + } + } + pub fn with_value( dictionary: &mut Dictionary, capacity: usize, diff --git a/mutable_buffer/src/table.rs b/mutable_buffer/src/table.rs index fe35b2f1f8..8d2e9f837d 100644 --- a/mutable_buffer/src/table.rs +++ b/mutable_buffer/src/table.rs @@ -14,6 +14,7 @@ use crate::{ }; use data_types::partition_metadata::{ColumnSummary, Statistics}; use internal_types::{ + entry, schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME}, selection::Selection, }; @@ -30,6 +31,8 @@ use arrow_deps::{ record_batch::RecordBatch, }, }; +use data_types::database_rules::WriterId; +use internal_types::entry::ClockValue; #[derive(Debug, Snafu)] pub enum Error { @@ -237,6 +240,106 @@ impl Table { Ok(()) } + /// Validates the schema of the passed in columns, then adds their values to + /// the associated columns in the table and updates summary statistics. + pub fn write_columns( + &mut self, + dictionary: &mut Dictionary, + _clock_value: ClockValue, + _writer_id: WriterId, + columns: Vec>, + ) -> Result<()> { + // get the column ids and validate schema for those that already exist + let columns_with_inserts = columns + .into_iter() + .map(|insert_column| { + let column_id = dictionary.lookup_value_or_insert(insert_column.name()); + let values = insert_column.values(); + + if let Some(c) = self.columns.get(&column_id) { + match (&values, c) { + (entry::TypedValuesIterator::Bool(_), Column::Bool(_, _)) => (), + (entry::TypedValuesIterator::U64(_), Column::U64(_, _)) => (), + (entry::TypedValuesIterator::F64(_), Column::F64(_, _)) => (), + (entry::TypedValuesIterator::I64(_), Column::I64(_, _)) => (), + (entry::TypedValuesIterator::String(_), Column::String(_, _)) => { + if !insert_column.is_field() { + InternalColumnTypeMismatch { + column_id, + expected_column_type: c.type_description(), + actual_column_type: values.type_description(), + } + .fail()? + }; + } + (entry::TypedValuesIterator::String(_), Column::Tag(_, _)) => { + if !insert_column.is_tag() { + InternalColumnTypeMismatch { + column_id, + expected_column_type: c.type_description(), + actual_column_type: values.type_description(), + } + .fail()? + }; + } + _ => InternalColumnTypeMismatch { + column_id, + expected_column_type: c.type_description(), + actual_column_type: values.type_description(), + } + .fail()?, + } + } + + Ok((column_id, insert_column.logical_type(), values)) + }) + .collect::>>()?; + + let row_count_before_insert = self.row_count(); + + for (column_id, logical_type, values) in columns_with_inserts.into_iter() { + match self.columns.get_mut(&column_id) { + Some(c) => c + .push_typed_values(dictionary, logical_type, values) + .with_context(|| { + let column = dictionary.lookup_id(column_id).unwrap_or("unknown"); + ColumnError { column } + })?, + None => { + self.columns.insert( + column_id, + Column::new_from_typed_values( + dictionary, + row_count_before_insert, + logical_type, + values, + ), + ); + } + } + } + + // ensure all columns have the same number of rows as the one with the most. + // This adds nulls to the columns that weren't included in this write + let max_row_count = self + .columns + .values() + .fold(row_count_before_insert, |max, col| { + let len = col.len(); + if max < len { + len + } else { + max + } + }); + + for c in self.columns.values_mut() { + c.push_nulls_to_len(max_row_count); + } + + Ok(()) + } + /// Returns the column selection for all the columns in this table, orderd /// by table name fn all_columns_selection<'a>(&self, chunk: &'a Chunk) -> Result> { @@ -348,6 +451,7 @@ impl Table { for col in &selection.cols { let column = self.column(col.column_id)?; + println!("COLUMN: {:#?}", column); let array = match column { Column::String(vals, _) => { @@ -594,6 +698,7 @@ mod tests { use influxdb_line_protocol::{parse_lines, ParsedLine}; use internal_types::data::split_lines_into_write_entry_partitions; + use internal_types::entry::test_helpers::lp_to_entry; use super::*; use tracker::MemRegistry; @@ -802,6 +907,151 @@ mod tests { ); } + #[test] + fn write_columns_validates_schema() { + let mut dictionary = Dictionary::new(); + let mut table = Table::new(dictionary.lookup_value_or_insert("foo")); + + let lp = "foo,t1=asdf iv=1i,uv=1u,fv=1.0,bv=true,sv=\"hi\" 1"; + let entry = lp_to_entry(&lp); + table + .write_columns( + &mut dictionary, + 0, + 0, + entry + .partition_writes() + .unwrap() + .first() + .unwrap() + .table_batches() + .first() + .unwrap() + .columns(), + ) + .unwrap(); + + let lp = "foo t1=\"string\" 1"; + let entry = lp_to_entry(&lp); + let response = table + .write_columns( + &mut dictionary, + 0, + 0, + entry + .partition_writes() + .unwrap() + .first() + .unwrap() + .table_batches() + .first() + .unwrap() + .columns(), + ) + .err() + .unwrap(); + assert!( + matches!(&response, Error::InternalColumnTypeMismatch {expected_column_type, actual_column_type, ..} if expected_column_type == "tag" && actual_column_type == "String"), + format!("didn't match returned error: {:?}", response) + ); + + let lp = "foo iv=1u 1"; + let entry = lp_to_entry(&lp); + let response = table + .write_columns( + &mut dictionary, + 0, + 0, + entry + .partition_writes() + .unwrap() + .first() + .unwrap() + .table_batches() + .first() + .unwrap() + .columns(), + ) + .err() + .unwrap(); + assert!( + matches!(&response, Error::InternalColumnTypeMismatch {expected_column_type, actual_column_type, ..} if expected_column_type == "i64" && actual_column_type == "u64"), + format!("didn't match returned error: {:?}", response) + ); + + let lp = "foo fv=1i 1"; + let entry = lp_to_entry(&lp); + let response = table + .write_columns( + &mut dictionary, + 0, + 0, + entry + .partition_writes() + .unwrap() + .first() + .unwrap() + .table_batches() + .first() + .unwrap() + .columns(), + ) + .err() + .unwrap(); + assert!( + matches!(&response, Error::InternalColumnTypeMismatch {expected_column_type, actual_column_type, ..} if expected_column_type == "f64" && actual_column_type == "i64"), + format!("didn't match returned error: {:?}", response) + ); + + let lp = "foo bv=1 1"; + let entry = lp_to_entry(&lp); + let response = table + .write_columns( + &mut dictionary, + 0, + 0, + entry + .partition_writes() + .unwrap() + .first() + .unwrap() + .table_batches() + .first() + .unwrap() + .columns(), + ) + .err() + .unwrap(); + assert!( + matches!(&response, Error::InternalColumnTypeMismatch {expected_column_type, actual_column_type, ..} if expected_column_type == "bool" && actual_column_type == "f64"), + format!("didn't match returned error: {:?}", response) + ); + + let lp = "foo sv=true 1"; + let entry = lp_to_entry(&lp); + let response = table + .write_columns( + &mut dictionary, + 0, + 0, + entry + .partition_writes() + .unwrap() + .first() + .unwrap() + .table_batches() + .first() + .unwrap() + .columns(), + ) + .err() + .unwrap(); + assert!( + matches!(&response, Error::InternalColumnTypeMismatch {expected_column_type, actual_column_type, ..} if expected_column_type == "String" && actual_column_type == "bool"), + format!("didn't match returned error: {:?}", response) + ); + } + /// Insert the line protocol lines in `lp_lines` into this table fn write_lines_to_table(table: &mut Table, dictionary: &mut Dictionary, lp_lines: Vec<&str>) { let lp_data = lp_lines.join("\n");