diff --git a/internal_types/src/entry.rs b/internal_types/src/entry.rs index acc067d53c..c5789dfcfe 100644 --- a/internal_types/src/entry.rs +++ b/internal_types/src/entry.rs @@ -430,7 +430,7 @@ impl<'a> TableBatch<'a> { #[derive(Debug)] pub struct Column<'a> { fb: entry_fb::Column<'a>, - row_count: usize, + pub row_count: usize, } impl<'a> Column<'a> { @@ -442,6 +442,18 @@ impl<'a> Column<'a> { self.fb.logical_column_type() } + pub fn is_tag(&self) -> bool { + self.fb.logical_column_type() == entry_fb::LogicalColumnType::Tag + } + + pub fn is_field(&self) -> bool { + self.fb.logical_column_type() == entry_fb::LogicalColumnType::Field + } + + pub fn is_time(&self) -> bool { + self.fb.logical_column_type() == entry_fb::LogicalColumnType::Time + } + pub fn values(&self) -> TypedValuesIterator<'a> { match self.fb.values_type() { entry_fb::ColumnValues::BoolValues => TypedValuesIterator::Bool(BoolIterator { @@ -564,12 +576,22 @@ impl<'a> TypedValuesIterator<'a> { _ => None, } } + + pub fn type_description(&self) -> &str { + match self { + Self::Bool(_) => "bool", + Self::I64(_) => "i64", + Self::F64(_) => "f64", + Self::U64(_) => "u64", + Self::String(_) => "String", + } + } } /// Iterator over the flatbuffers BoolValues #[derive(Debug)] pub struct BoolIterator<'a> { - row_count: usize, + pub row_count: usize, position: usize, null_mask: Option<&'a [u8]>, values: &'a [bool], @@ -599,7 +621,7 @@ impl<'a> Iterator for BoolIterator<'a> { /// Iterator over the flatbuffers I64Values, F64Values, and U64Values. #[derive(Debug)] pub struct ValIterator<'a, T: Follow<'a> + Follow<'a, Inner = T>> { - row_count: usize, + pub row_count: usize, position: usize, null_mask: Option<&'a [u8]>, values_iter: VectorIter<'a, T>, @@ -625,7 +647,7 @@ impl<'a, T: Follow<'a> + Follow<'a, Inner = T>> Iterator for ValIterator<'a, T> /// Iterator over the flatbuffers StringValues #[derive(Debug)] pub struct StringIterator<'a> { - row_count: usize, + pub row_count: usize, position: usize, null_mask: Option<&'a [u8]>, values: VectorIter<'a, ForwardsUOffset<&'a str>>, diff --git a/mutable_buffer/src/chunk.rs b/mutable_buffer/src/chunk.rs index 14bacacbfe..269c887c11 100644 --- a/mutable_buffer/src/chunk.rs +++ b/mutable_buffer/src/chunk.rs @@ -6,7 +6,7 @@ use generated_types::wal as wb; use std::collections::{BTreeSet, HashMap}; use data_types::partition_metadata::TableSummary; -use internal_types::{schema::Schema, selection::Selection}; +use internal_types::{entry::TableBatch, schema::Schema, selection::Selection}; use crate::{ column::Column, @@ -14,6 +14,8 @@ use crate::{ pred::{ChunkPredicate, ChunkPredicateBuilder}, table::Table, }; +use data_types::database_rules::WriterId; +use internal_types::entry::ClockValue; use snafu::{OptionExt, ResultExt, Snafu}; use tracker::{MemRegistry, MemTracker}; @@ -151,6 +153,30 @@ impl Chunk { chunk } + pub fn write_table_batches( + &mut self, + clock_value: ClockValue, + writer_id: WriterId, + batches: &[TableBatch<'_>], + ) -> Result<()> { + for batch in batches { + let table_name = batch.name(); + let table_id = self.dictionary.lookup_value_or_insert(table_name); + + let table = self + .tables + .entry(table_id) + .or_insert_with(|| Table::new(table_id)); + + let columns = batch.columns(); + table + .write_columns(&mut self.dictionary, clock_value, writer_id, columns) + .context(TableWrite { table_name })?; + } + + Ok(()) + } + pub fn write_entry(&mut self, entry: &wb::WriteBufferEntry<'_>) -> Result<()> { if let Some(table_batches) = entry.table_batches() { for batch in table_batches { @@ -486,3 +512,130 @@ impl Chunk { matches!(self.table(table_name), Ok(Some(_))) } } + +pub mod test_helpers { + use super::*; + use internal_types::entry::test_helpers::lp_to_entry; + + /// A helper that will write line protocol string to the passed in Chunk. + /// All data will be under a single partition with a clock value and + /// writer id of 0. + pub fn write_lp_to_chunk(lp: &str, chunk: &mut Chunk) -> Result<()> { + let entry = lp_to_entry(lp); + + for w in entry.partition_writes().unwrap() { + chunk.write_table_batches(0, 0, &w.table_batches())?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::test_helpers::write_lp_to_chunk; + use super::*; + use arrow_deps::arrow::util::pretty::pretty_format_batches; + + #[test] + fn writes_table_batches() { + let mr = MemRegistry::new(); + let mut chunk = Chunk::new(1, &mr); + + let lp = vec![ + "cpu,host=a val=23 1", + "cpu,host=b val=2 1", + "mem,host=a val=23432i 1", + ] + .join("\n"); + + write_lp_to_chunk(&lp, &mut chunk).unwrap(); + + assert_table( + &chunk, + "cpu", + &[ + "+------+------+-----+", + "| host | time | val |", + "+------+------+-----+", + "| a | 1 | 23 |", + "| b | 1 | 2 |", + "+------+------+-----+\n", + ], + ); + + assert_table( + &chunk, + "mem", + &[ + "+------+------+-------+", + "| host | time | val |", + "+------+------+-------+", + "| a | 1 | 23432 |", + "+------+------+-------+\n", + ], + ); + + let lp = vec![ + "cpu,host=c val=11 1", + "mem sval=\"hi\" 2", + "disk val=true 1", + ] + .join("\n"); + + write_lp_to_chunk(&lp, &mut chunk).unwrap(); + + assert_table( + &chunk, + "cpu", + &[ + "+------+------+-----+", + "| host | time | val |", + "+------+------+-----+", + "| a | 1 | 23 |", + "| b | 1 | 2 |", + "| c | 1 | 11 |", + "+------+------+-----+\n", + ], + ); + + assert_table( + &chunk, + "disk", + &[ + "+------+------+", + "| time | val |", + "+------+------+", + "| 1 | true |", + "+------+------+\n", + ], + ); + + assert_table( + &chunk, + "mem", + &[ + "+------+------+------+-------+", + "| host | sval | time | val |", + "+------+------+------+-------+", + "| a | | 1 | 23432 |", + "| | hi | 2 | |", + "+------+------+------+-------+\n", + ], + ); + } + + fn assert_table(chunk: &Chunk, table: &str, data: &[&str]) { + let mut batches = vec![]; + chunk + .table_to_arrow(&mut batches, table, Selection::All) + .unwrap(); + let res = pretty_format_batches(&batches).unwrap(); + let data = data.join("\n"); + assert_eq!( + res, data, + "\n{} table results not as expected:\nEXPECTED:\n{}\nRECEIVED:\n{}", + table, data, res + ); + } +} diff --git a/mutable_buffer/src/column.rs b/mutable_buffer/src/column.rs index 2ba6a89ee9..360a8122fb 100644 --- a/mutable_buffer/src/column.rs +++ b/mutable_buffer/src/column.rs @@ -4,7 +4,9 @@ use snafu::Snafu; use crate::dictionary::Dictionary; use arrow_deps::arrow::datatypes::DataType as ArrowDataType; use data_types::partition_metadata::StatValues; +use generated_types::entry::LogicalColumnType; use internal_types::data::type_description; +use internal_types::entry::TypedValuesIterator; use std::mem; @@ -41,6 +43,275 @@ pub enum Column { } impl Column { + /// Initializes a new column from typed values, the column on a table write + /// batach on an Entry. Will initialize the stats with the first + /// non-null value and update with any other non-null values included. + pub fn new_from_typed_values( + dictionary: &mut Dictionary, + row_count: usize, + logical_type: LogicalColumnType, + values: TypedValuesIterator<'_>, + ) -> Self { + match values { + TypedValuesIterator::String(vals) => match logical_type { + LogicalColumnType::Tag => { + let mut tag_values = vec![None; row_count]; + let mut stats: Option> = None; + + for tag in vals { + let tag_id = match tag { + Some(tag) => { + match stats.as_mut() { + Some(s) => StatValues::update_string(s, tag), + None => { + stats = Some(StatValues::new(tag.to_string())); + } + } + + Some(dictionary.lookup_value_or_insert(tag)) + } + None => None, + }; + + tag_values.push(tag_id); + } + + Self::Tag( + tag_values, + stats.expect("can't insert tag column with no values"), + ) + } + LogicalColumnType::Field => { + let mut values = vec![None; row_count]; + let mut stats: Option> = None; + + for value in vals { + match value { + Some(v) => { + match stats.as_mut() { + Some(s) => StatValues::update_string(s, v), + None => stats = Some(StatValues::new(v.to_string())), + } + + values.push(Some(v.to_string())); + } + None => values.push(None), + } + } + + Self::String( + values, + stats.expect("can't insert string column with no values"), + ) + } + _ => panic!("unsupported!"), + }, + TypedValuesIterator::I64(vals) => { + let mut values = vec![None; row_count]; + let mut stats: Option> = None; + + for v in vals { + if let Some(val) = v { + match stats.as_mut() { + Some(s) => s.update(val), + None => stats = Some(StatValues::new(val)), + } + } + values.push(v); + } + + Self::I64( + values, + stats.expect("can't insert i64 column with no values"), + ) + } + TypedValuesIterator::F64(vals) => { + let mut values = vec![None; row_count]; + let mut stats: Option> = None; + + for v in vals { + if let Some(val) = v { + match stats.as_mut() { + Some(s) => s.update(val), + None => stats = Some(StatValues::new(val)), + } + } + values.push(v); + } + + Self::F64( + values, + stats.expect("can't insert f64 column with no values"), + ) + } + TypedValuesIterator::U64(vals) => { + let mut values = vec![None; row_count]; + let mut stats: Option> = None; + + for v in vals { + if let Some(val) = v { + match stats.as_mut() { + Some(s) => s.update(val), + None => stats = Some(StatValues::new(val)), + } + } + values.push(v); + } + + Self::U64( + values, + stats.expect("can't insert u64 column with no values"), + ) + } + TypedValuesIterator::Bool(vals) => { + let mut values = vec![None; row_count]; + let mut stats: Option> = None; + + for v in vals { + if let Some(val) = v { + match stats.as_mut() { + Some(s) => s.update(val), + None => stats = Some(StatValues::new(val)), + } + } + values.push(v); + } + + Self::Bool( + values, + stats.expect("can't insert bool column with no values"), + ) + } + } + } + + /// Pushes typed values, the column from a table write batch on an Entry. + /// Updates statsistics for any non-null values. + pub fn push_typed_values( + &mut self, + dictionary: &mut Dictionary, + logical_type: LogicalColumnType, + values: TypedValuesIterator<'_>, + ) -> Result<()> { + match (self, values) { + (Self::Bool(col, stats), TypedValuesIterator::Bool(values)) => { + for val in values { + if let Some(v) = val { + stats.update(v) + }; + col.push(val); + } + } + (Self::I64(col, stats), TypedValuesIterator::I64(values)) => { + for val in values { + if let Some(v) = val { + stats.update(v) + }; + col.push(val); + } + } + (Self::F64(col, stats), TypedValuesIterator::F64(values)) => { + for val in values { + if let Some(v) = val { + stats.update(v) + }; + col.push(val); + } + } + (Self::U64(col, stats), TypedValuesIterator::U64(values)) => { + for val in values { + if let Some(v) = val { + stats.update(v) + }; + col.push(val); + } + } + (Self::String(col, stats), TypedValuesIterator::String(values)) => { + if logical_type != LogicalColumnType::Field { + TypeMismatch { + existing_column_type: "String", + inserted_value_type: "tag", + } + .fail()?; + } + + for val in values { + match val { + Some(v) => { + StatValues::update_string(stats, v); + col.push(Some(v.to_string())); + } + None => col.push(None), + } + } + } + (Self::Tag(col, stats), TypedValuesIterator::String(values)) => { + if logical_type != LogicalColumnType::Tag { + TypeMismatch { + existing_column_type: "tag", + inserted_value_type: "String", + } + .fail()?; + } + + for val in values { + match val { + Some(v) => { + StatValues::update_string(stats, v); + let id = dictionary.lookup_value_or_insert(v); + col.push(Some(id)); + } + None => col.push(None), + } + } + } + (existing, values) => TypeMismatch { + existing_column_type: existing.type_description(), + inserted_value_type: values.type_description(), + } + .fail()?, + } + + Ok(()) + } + + /// Pushes None values onto the column until its len is equal to that passed + /// in + pub fn push_nulls_to_len(&mut self, len: usize) { + match self { + Self::Tag(vals, _) => { + while vals.len() < len { + vals.push(None); + } + } + Self::I64(vals, _) => { + while vals.len() < len { + vals.push(None); + } + } + Self::F64(vals, _) => { + while vals.len() < len { + vals.push(None); + } + } + Self::U64(vals, _) => { + while vals.len() < len { + vals.push(None); + } + } + Self::Bool(vals, _) => { + while vals.len() < len { + vals.push(None); + } + } + Self::String(vals, _) => { + while vals.len() < len { + vals.push(None); + } + } + } + } + pub fn with_value( dictionary: &mut Dictionary, capacity: usize, diff --git a/mutable_buffer/src/table.rs b/mutable_buffer/src/table.rs index fe35b2f1f8..8d2e9f837d 100644 --- a/mutable_buffer/src/table.rs +++ b/mutable_buffer/src/table.rs @@ -14,6 +14,7 @@ use crate::{ }; use data_types::partition_metadata::{ColumnSummary, Statistics}; use internal_types::{ + entry, schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME}, selection::Selection, }; @@ -30,6 +31,8 @@ use arrow_deps::{ record_batch::RecordBatch, }, }; +use data_types::database_rules::WriterId; +use internal_types::entry::ClockValue; #[derive(Debug, Snafu)] pub enum Error { @@ -237,6 +240,106 @@ impl Table { Ok(()) } + /// Validates the schema of the passed in columns, then adds their values to + /// the associated columns in the table and updates summary statistics. + pub fn write_columns( + &mut self, + dictionary: &mut Dictionary, + _clock_value: ClockValue, + _writer_id: WriterId, + columns: Vec>, + ) -> Result<()> { + // get the column ids and validate schema for those that already exist + let columns_with_inserts = columns + .into_iter() + .map(|insert_column| { + let column_id = dictionary.lookup_value_or_insert(insert_column.name()); + let values = insert_column.values(); + + if let Some(c) = self.columns.get(&column_id) { + match (&values, c) { + (entry::TypedValuesIterator::Bool(_), Column::Bool(_, _)) => (), + (entry::TypedValuesIterator::U64(_), Column::U64(_, _)) => (), + (entry::TypedValuesIterator::F64(_), Column::F64(_, _)) => (), + (entry::TypedValuesIterator::I64(_), Column::I64(_, _)) => (), + (entry::TypedValuesIterator::String(_), Column::String(_, _)) => { + if !insert_column.is_field() { + InternalColumnTypeMismatch { + column_id, + expected_column_type: c.type_description(), + actual_column_type: values.type_description(), + } + .fail()? + }; + } + (entry::TypedValuesIterator::String(_), Column::Tag(_, _)) => { + if !insert_column.is_tag() { + InternalColumnTypeMismatch { + column_id, + expected_column_type: c.type_description(), + actual_column_type: values.type_description(), + } + .fail()? + }; + } + _ => InternalColumnTypeMismatch { + column_id, + expected_column_type: c.type_description(), + actual_column_type: values.type_description(), + } + .fail()?, + } + } + + Ok((column_id, insert_column.logical_type(), values)) + }) + .collect::>>()?; + + let row_count_before_insert = self.row_count(); + + for (column_id, logical_type, values) in columns_with_inserts.into_iter() { + match self.columns.get_mut(&column_id) { + Some(c) => c + .push_typed_values(dictionary, logical_type, values) + .with_context(|| { + let column = dictionary.lookup_id(column_id).unwrap_or("unknown"); + ColumnError { column } + })?, + None => { + self.columns.insert( + column_id, + Column::new_from_typed_values( + dictionary, + row_count_before_insert, + logical_type, + values, + ), + ); + } + } + } + + // ensure all columns have the same number of rows as the one with the most. + // This adds nulls to the columns that weren't included in this write + let max_row_count = self + .columns + .values() + .fold(row_count_before_insert, |max, col| { + let len = col.len(); + if max < len { + len + } else { + max + } + }); + + for c in self.columns.values_mut() { + c.push_nulls_to_len(max_row_count); + } + + Ok(()) + } + /// Returns the column selection for all the columns in this table, orderd /// by table name fn all_columns_selection<'a>(&self, chunk: &'a Chunk) -> Result> { @@ -348,6 +451,7 @@ impl Table { for col in &selection.cols { let column = self.column(col.column_id)?; + println!("COLUMN: {:#?}", column); let array = match column { Column::String(vals, _) => { @@ -594,6 +698,7 @@ mod tests { use influxdb_line_protocol::{parse_lines, ParsedLine}; use internal_types::data::split_lines_into_write_entry_partitions; + use internal_types::entry::test_helpers::lp_to_entry; use super::*; use tracker::MemRegistry; @@ -802,6 +907,151 @@ mod tests { ); } + #[test] + fn write_columns_validates_schema() { + let mut dictionary = Dictionary::new(); + let mut table = Table::new(dictionary.lookup_value_or_insert("foo")); + + let lp = "foo,t1=asdf iv=1i,uv=1u,fv=1.0,bv=true,sv=\"hi\" 1"; + let entry = lp_to_entry(&lp); + table + .write_columns( + &mut dictionary, + 0, + 0, + entry + .partition_writes() + .unwrap() + .first() + .unwrap() + .table_batches() + .first() + .unwrap() + .columns(), + ) + .unwrap(); + + let lp = "foo t1=\"string\" 1"; + let entry = lp_to_entry(&lp); + let response = table + .write_columns( + &mut dictionary, + 0, + 0, + entry + .partition_writes() + .unwrap() + .first() + .unwrap() + .table_batches() + .first() + .unwrap() + .columns(), + ) + .err() + .unwrap(); + assert!( + matches!(&response, Error::InternalColumnTypeMismatch {expected_column_type, actual_column_type, ..} if expected_column_type == "tag" && actual_column_type == "String"), + format!("didn't match returned error: {:?}", response) + ); + + let lp = "foo iv=1u 1"; + let entry = lp_to_entry(&lp); + let response = table + .write_columns( + &mut dictionary, + 0, + 0, + entry + .partition_writes() + .unwrap() + .first() + .unwrap() + .table_batches() + .first() + .unwrap() + .columns(), + ) + .err() + .unwrap(); + assert!( + matches!(&response, Error::InternalColumnTypeMismatch {expected_column_type, actual_column_type, ..} if expected_column_type == "i64" && actual_column_type == "u64"), + format!("didn't match returned error: {:?}", response) + ); + + let lp = "foo fv=1i 1"; + let entry = lp_to_entry(&lp); + let response = table + .write_columns( + &mut dictionary, + 0, + 0, + entry + .partition_writes() + .unwrap() + .first() + .unwrap() + .table_batches() + .first() + .unwrap() + .columns(), + ) + .err() + .unwrap(); + assert!( + matches!(&response, Error::InternalColumnTypeMismatch {expected_column_type, actual_column_type, ..} if expected_column_type == "f64" && actual_column_type == "i64"), + format!("didn't match returned error: {:?}", response) + ); + + let lp = "foo bv=1 1"; + let entry = lp_to_entry(&lp); + let response = table + .write_columns( + &mut dictionary, + 0, + 0, + entry + .partition_writes() + .unwrap() + .first() + .unwrap() + .table_batches() + .first() + .unwrap() + .columns(), + ) + .err() + .unwrap(); + assert!( + matches!(&response, Error::InternalColumnTypeMismatch {expected_column_type, actual_column_type, ..} if expected_column_type == "bool" && actual_column_type == "f64"), + format!("didn't match returned error: {:?}", response) + ); + + let lp = "foo sv=true 1"; + let entry = lp_to_entry(&lp); + let response = table + .write_columns( + &mut dictionary, + 0, + 0, + entry + .partition_writes() + .unwrap() + .first() + .unwrap() + .table_batches() + .first() + .unwrap() + .columns(), + ) + .err() + .unwrap(); + assert!( + matches!(&response, Error::InternalColumnTypeMismatch {expected_column_type, actual_column_type, ..} if expected_column_type == "String" && actual_column_type == "bool"), + format!("didn't match returned error: {:?}", response) + ); + } + /// Insert the line protocol lines in `lp_lines` into this table fn write_lines_to_table(table: &mut Table, dictionary: &mut Dictionary, lp_lines: Vec<&str>) { let lp_data = lp_lines.join("\n");