feat: Add writing of Entry structures to MB Chunk

This adds writing of Entry of a vec of TableWriteBatch to the Mutable Buffer Chunk. This is additional to the previous method of writing via ReplicatedWrite. The next step is to remove the old ReplicatedWrite bits.

Test helpers for parsing line protocol into Entry and writing line protocol directly to Chunks have also been added.
pull/24376/head
Paul Dix 2021-04-10 13:38:03 -04:00 committed by kodiakhq[bot]
parent 0a3386f24a
commit 31115742ec
4 changed files with 701 additions and 5 deletions

View File

@ -430,7 +430,7 @@ impl<'a> TableBatch<'a> {
#[derive(Debug)]
pub struct Column<'a> {
fb: entry_fb::Column<'a>,
row_count: usize,
pub row_count: usize,
}
impl<'a> Column<'a> {
@ -442,6 +442,18 @@ impl<'a> Column<'a> {
self.fb.logical_column_type()
}
pub fn is_tag(&self) -> bool {
self.fb.logical_column_type() == entry_fb::LogicalColumnType::Tag
}
pub fn is_field(&self) -> bool {
self.fb.logical_column_type() == entry_fb::LogicalColumnType::Field
}
pub fn is_time(&self) -> bool {
self.fb.logical_column_type() == entry_fb::LogicalColumnType::Time
}
pub fn values(&self) -> TypedValuesIterator<'a> {
match self.fb.values_type() {
entry_fb::ColumnValues::BoolValues => TypedValuesIterator::Bool(BoolIterator {
@ -564,12 +576,22 @@ impl<'a> TypedValuesIterator<'a> {
_ => None,
}
}
pub fn type_description(&self) -> &str {
match self {
Self::Bool(_) => "bool",
Self::I64(_) => "i64",
Self::F64(_) => "f64",
Self::U64(_) => "u64",
Self::String(_) => "String",
}
}
}
/// Iterator over the flatbuffers BoolValues
#[derive(Debug)]
pub struct BoolIterator<'a> {
row_count: usize,
pub row_count: usize,
position: usize,
null_mask: Option<&'a [u8]>,
values: &'a [bool],
@ -599,7 +621,7 @@ impl<'a> Iterator for BoolIterator<'a> {
/// Iterator over the flatbuffers I64Values, F64Values, and U64Values.
#[derive(Debug)]
pub struct ValIterator<'a, T: Follow<'a> + Follow<'a, Inner = T>> {
row_count: usize,
pub row_count: usize,
position: usize,
null_mask: Option<&'a [u8]>,
values_iter: VectorIter<'a, T>,
@ -625,7 +647,7 @@ impl<'a, T: Follow<'a> + Follow<'a, Inner = T>> Iterator for ValIterator<'a, T>
/// Iterator over the flatbuffers StringValues
#[derive(Debug)]
pub struct StringIterator<'a> {
row_count: usize,
pub row_count: usize,
position: usize,
null_mask: Option<&'a [u8]>,
values: VectorIter<'a, ForwardsUOffset<&'a str>>,

View File

@ -6,7 +6,7 @@ use generated_types::wal as wb;
use std::collections::{BTreeSet, HashMap};
use data_types::partition_metadata::TableSummary;
use internal_types::{schema::Schema, selection::Selection};
use internal_types::{entry::TableBatch, schema::Schema, selection::Selection};
use crate::{
column::Column,
@ -14,6 +14,8 @@ use crate::{
pred::{ChunkPredicate, ChunkPredicateBuilder},
table::Table,
};
use data_types::database_rules::WriterId;
use internal_types::entry::ClockValue;
use snafu::{OptionExt, ResultExt, Snafu};
use tracker::{MemRegistry, MemTracker};
@ -151,6 +153,30 @@ impl Chunk {
chunk
}
pub fn write_table_batches(
&mut self,
clock_value: ClockValue,
writer_id: WriterId,
batches: &[TableBatch<'_>],
) -> Result<()> {
for batch in batches {
let table_name = batch.name();
let table_id = self.dictionary.lookup_value_or_insert(table_name);
let table = self
.tables
.entry(table_id)
.or_insert_with(|| Table::new(table_id));
let columns = batch.columns();
table
.write_columns(&mut self.dictionary, clock_value, writer_id, columns)
.context(TableWrite { table_name })?;
}
Ok(())
}
pub fn write_entry(&mut self, entry: &wb::WriteBufferEntry<'_>) -> Result<()> {
if let Some(table_batches) = entry.table_batches() {
for batch in table_batches {
@ -486,3 +512,130 @@ impl Chunk {
matches!(self.table(table_name), Ok(Some(_)))
}
}
pub mod test_helpers {
use super::*;
use internal_types::entry::test_helpers::lp_to_entry;
/// A helper that will write line protocol string to the passed in Chunk.
/// All data will be under a single partition with a clock value and
/// writer id of 0.
pub fn write_lp_to_chunk(lp: &str, chunk: &mut Chunk) -> Result<()> {
let entry = lp_to_entry(lp);
for w in entry.partition_writes().unwrap() {
chunk.write_table_batches(0, 0, &w.table_batches())?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::test_helpers::write_lp_to_chunk;
use super::*;
use arrow_deps::arrow::util::pretty::pretty_format_batches;
#[test]
fn writes_table_batches() {
let mr = MemRegistry::new();
let mut chunk = Chunk::new(1, &mr);
let lp = vec![
"cpu,host=a val=23 1",
"cpu,host=b val=2 1",
"mem,host=a val=23432i 1",
]
.join("\n");
write_lp_to_chunk(&lp, &mut chunk).unwrap();
assert_table(
&chunk,
"cpu",
&[
"+------+------+-----+",
"| host | time | val |",
"+------+------+-----+",
"| a | 1 | 23 |",
"| b | 1 | 2 |",
"+------+------+-----+\n",
],
);
assert_table(
&chunk,
"mem",
&[
"+------+------+-------+",
"| host | time | val |",
"+------+------+-------+",
"| a | 1 | 23432 |",
"+------+------+-------+\n",
],
);
let lp = vec![
"cpu,host=c val=11 1",
"mem sval=\"hi\" 2",
"disk val=true 1",
]
.join("\n");
write_lp_to_chunk(&lp, &mut chunk).unwrap();
assert_table(
&chunk,
"cpu",
&[
"+------+------+-----+",
"| host | time | val |",
"+------+------+-----+",
"| a | 1 | 23 |",
"| b | 1 | 2 |",
"| c | 1 | 11 |",
"+------+------+-----+\n",
],
);
assert_table(
&chunk,
"disk",
&[
"+------+------+",
"| time | val |",
"+------+------+",
"| 1 | true |",
"+------+------+\n",
],
);
assert_table(
&chunk,
"mem",
&[
"+------+------+------+-------+",
"| host | sval | time | val |",
"+------+------+------+-------+",
"| a | | 1 | 23432 |",
"| | hi | 2 | |",
"+------+------+------+-------+\n",
],
);
}
fn assert_table(chunk: &Chunk, table: &str, data: &[&str]) {
let mut batches = vec![];
chunk
.table_to_arrow(&mut batches, table, Selection::All)
.unwrap();
let res = pretty_format_batches(&batches).unwrap();
let data = data.join("\n");
assert_eq!(
res, data,
"\n{} table results not as expected:\nEXPECTED:\n{}\nRECEIVED:\n{}",
table, data, res
);
}
}

View File

@ -4,7 +4,9 @@ use snafu::Snafu;
use crate::dictionary::Dictionary;
use arrow_deps::arrow::datatypes::DataType as ArrowDataType;
use data_types::partition_metadata::StatValues;
use generated_types::entry::LogicalColumnType;
use internal_types::data::type_description;
use internal_types::entry::TypedValuesIterator;
use std::mem;
@ -41,6 +43,275 @@ pub enum Column {
}
impl Column {
/// Initializes a new column from typed values, the column on a table write
/// batach on an Entry. Will initialize the stats with the first
/// non-null value and update with any other non-null values included.
pub fn new_from_typed_values(
dictionary: &mut Dictionary,
row_count: usize,
logical_type: LogicalColumnType,
values: TypedValuesIterator<'_>,
) -> Self {
match values {
TypedValuesIterator::String(vals) => match logical_type {
LogicalColumnType::Tag => {
let mut tag_values = vec![None; row_count];
let mut stats: Option<StatValues<String>> = None;
for tag in vals {
let tag_id = match tag {
Some(tag) => {
match stats.as_mut() {
Some(s) => StatValues::update_string(s, tag),
None => {
stats = Some(StatValues::new(tag.to_string()));
}
}
Some(dictionary.lookup_value_or_insert(tag))
}
None => None,
};
tag_values.push(tag_id);
}
Self::Tag(
tag_values,
stats.expect("can't insert tag column with no values"),
)
}
LogicalColumnType::Field => {
let mut values = vec![None; row_count];
let mut stats: Option<StatValues<String>> = None;
for value in vals {
match value {
Some(v) => {
match stats.as_mut() {
Some(s) => StatValues::update_string(s, v),
None => stats = Some(StatValues::new(v.to_string())),
}
values.push(Some(v.to_string()));
}
None => values.push(None),
}
}
Self::String(
values,
stats.expect("can't insert string column with no values"),
)
}
_ => panic!("unsupported!"),
},
TypedValuesIterator::I64(vals) => {
let mut values = vec![None; row_count];
let mut stats: Option<StatValues<i64>> = None;
for v in vals {
if let Some(val) = v {
match stats.as_mut() {
Some(s) => s.update(val),
None => stats = Some(StatValues::new(val)),
}
}
values.push(v);
}
Self::I64(
values,
stats.expect("can't insert i64 column with no values"),
)
}
TypedValuesIterator::F64(vals) => {
let mut values = vec![None; row_count];
let mut stats: Option<StatValues<f64>> = None;
for v in vals {
if let Some(val) = v {
match stats.as_mut() {
Some(s) => s.update(val),
None => stats = Some(StatValues::new(val)),
}
}
values.push(v);
}
Self::F64(
values,
stats.expect("can't insert f64 column with no values"),
)
}
TypedValuesIterator::U64(vals) => {
let mut values = vec![None; row_count];
let mut stats: Option<StatValues<u64>> = None;
for v in vals {
if let Some(val) = v {
match stats.as_mut() {
Some(s) => s.update(val),
None => stats = Some(StatValues::new(val)),
}
}
values.push(v);
}
Self::U64(
values,
stats.expect("can't insert u64 column with no values"),
)
}
TypedValuesIterator::Bool(vals) => {
let mut values = vec![None; row_count];
let mut stats: Option<StatValues<bool>> = None;
for v in vals {
if let Some(val) = v {
match stats.as_mut() {
Some(s) => s.update(val),
None => stats = Some(StatValues::new(val)),
}
}
values.push(v);
}
Self::Bool(
values,
stats.expect("can't insert bool column with no values"),
)
}
}
}
/// Pushes typed values, the column from a table write batch on an Entry.
/// Updates statsistics for any non-null values.
pub fn push_typed_values(
&mut self,
dictionary: &mut Dictionary,
logical_type: LogicalColumnType,
values: TypedValuesIterator<'_>,
) -> Result<()> {
match (self, values) {
(Self::Bool(col, stats), TypedValuesIterator::Bool(values)) => {
for val in values {
if let Some(v) = val {
stats.update(v)
};
col.push(val);
}
}
(Self::I64(col, stats), TypedValuesIterator::I64(values)) => {
for val in values {
if let Some(v) = val {
stats.update(v)
};
col.push(val);
}
}
(Self::F64(col, stats), TypedValuesIterator::F64(values)) => {
for val in values {
if let Some(v) = val {
stats.update(v)
};
col.push(val);
}
}
(Self::U64(col, stats), TypedValuesIterator::U64(values)) => {
for val in values {
if let Some(v) = val {
stats.update(v)
};
col.push(val);
}
}
(Self::String(col, stats), TypedValuesIterator::String(values)) => {
if logical_type != LogicalColumnType::Field {
TypeMismatch {
existing_column_type: "String",
inserted_value_type: "tag",
}
.fail()?;
}
for val in values {
match val {
Some(v) => {
StatValues::update_string(stats, v);
col.push(Some(v.to_string()));
}
None => col.push(None),
}
}
}
(Self::Tag(col, stats), TypedValuesIterator::String(values)) => {
if logical_type != LogicalColumnType::Tag {
TypeMismatch {
existing_column_type: "tag",
inserted_value_type: "String",
}
.fail()?;
}
for val in values {
match val {
Some(v) => {
StatValues::update_string(stats, v);
let id = dictionary.lookup_value_or_insert(v);
col.push(Some(id));
}
None => col.push(None),
}
}
}
(existing, values) => TypeMismatch {
existing_column_type: existing.type_description(),
inserted_value_type: values.type_description(),
}
.fail()?,
}
Ok(())
}
/// Pushes None values onto the column until its len is equal to that passed
/// in
pub fn push_nulls_to_len(&mut self, len: usize) {
match self {
Self::Tag(vals, _) => {
while vals.len() < len {
vals.push(None);
}
}
Self::I64(vals, _) => {
while vals.len() < len {
vals.push(None);
}
}
Self::F64(vals, _) => {
while vals.len() < len {
vals.push(None);
}
}
Self::U64(vals, _) => {
while vals.len() < len {
vals.push(None);
}
}
Self::Bool(vals, _) => {
while vals.len() < len {
vals.push(None);
}
}
Self::String(vals, _) => {
while vals.len() < len {
vals.push(None);
}
}
}
}
pub fn with_value(
dictionary: &mut Dictionary,
capacity: usize,

View File

@ -14,6 +14,7 @@ use crate::{
};
use data_types::partition_metadata::{ColumnSummary, Statistics};
use internal_types::{
entry,
schema::{builder::SchemaBuilder, Schema, TIME_COLUMN_NAME},
selection::Selection,
};
@ -30,6 +31,8 @@ use arrow_deps::{
record_batch::RecordBatch,
},
};
use data_types::database_rules::WriterId;
use internal_types::entry::ClockValue;
#[derive(Debug, Snafu)]
pub enum Error {
@ -237,6 +240,106 @@ impl Table {
Ok(())
}
/// Validates the schema of the passed in columns, then adds their values to
/// the associated columns in the table and updates summary statistics.
pub fn write_columns(
&mut self,
dictionary: &mut Dictionary,
_clock_value: ClockValue,
_writer_id: WriterId,
columns: Vec<entry::Column<'_>>,
) -> Result<()> {
// get the column ids and validate schema for those that already exist
let columns_with_inserts = columns
.into_iter()
.map(|insert_column| {
let column_id = dictionary.lookup_value_or_insert(insert_column.name());
let values = insert_column.values();
if let Some(c) = self.columns.get(&column_id) {
match (&values, c) {
(entry::TypedValuesIterator::Bool(_), Column::Bool(_, _)) => (),
(entry::TypedValuesIterator::U64(_), Column::U64(_, _)) => (),
(entry::TypedValuesIterator::F64(_), Column::F64(_, _)) => (),
(entry::TypedValuesIterator::I64(_), Column::I64(_, _)) => (),
(entry::TypedValuesIterator::String(_), Column::String(_, _)) => {
if !insert_column.is_field() {
InternalColumnTypeMismatch {
column_id,
expected_column_type: c.type_description(),
actual_column_type: values.type_description(),
}
.fail()?
};
}
(entry::TypedValuesIterator::String(_), Column::Tag(_, _)) => {
if !insert_column.is_tag() {
InternalColumnTypeMismatch {
column_id,
expected_column_type: c.type_description(),
actual_column_type: values.type_description(),
}
.fail()?
};
}
_ => InternalColumnTypeMismatch {
column_id,
expected_column_type: c.type_description(),
actual_column_type: values.type_description(),
}
.fail()?,
}
}
Ok((column_id, insert_column.logical_type(), values))
})
.collect::<Result<Vec<_>>>()?;
let row_count_before_insert = self.row_count();
for (column_id, logical_type, values) in columns_with_inserts.into_iter() {
match self.columns.get_mut(&column_id) {
Some(c) => c
.push_typed_values(dictionary, logical_type, values)
.with_context(|| {
let column = dictionary.lookup_id(column_id).unwrap_or("unknown");
ColumnError { column }
})?,
None => {
self.columns.insert(
column_id,
Column::new_from_typed_values(
dictionary,
row_count_before_insert,
logical_type,
values,
),
);
}
}
}
// ensure all columns have the same number of rows as the one with the most.
// This adds nulls to the columns that weren't included in this write
let max_row_count = self
.columns
.values()
.fold(row_count_before_insert, |max, col| {
let len = col.len();
if max < len {
len
} else {
max
}
});
for c in self.columns.values_mut() {
c.push_nulls_to_len(max_row_count);
}
Ok(())
}
/// Returns the column selection for all the columns in this table, orderd
/// by table name
fn all_columns_selection<'a>(&self, chunk: &'a Chunk) -> Result<TableColSelection<'a>> {
@ -348,6 +451,7 @@ impl Table {
for col in &selection.cols {
let column = self.column(col.column_id)?;
println!("COLUMN: {:#?}", column);
let array = match column {
Column::String(vals, _) => {
@ -594,6 +698,7 @@ mod tests {
use influxdb_line_protocol::{parse_lines, ParsedLine};
use internal_types::data::split_lines_into_write_entry_partitions;
use internal_types::entry::test_helpers::lp_to_entry;
use super::*;
use tracker::MemRegistry;
@ -802,6 +907,151 @@ mod tests {
);
}
#[test]
fn write_columns_validates_schema() {
let mut dictionary = Dictionary::new();
let mut table = Table::new(dictionary.lookup_value_or_insert("foo"));
let lp = "foo,t1=asdf iv=1i,uv=1u,fv=1.0,bv=true,sv=\"hi\" 1";
let entry = lp_to_entry(&lp);
table
.write_columns(
&mut dictionary,
0,
0,
entry
.partition_writes()
.unwrap()
.first()
.unwrap()
.table_batches()
.first()
.unwrap()
.columns(),
)
.unwrap();
let lp = "foo t1=\"string\" 1";
let entry = lp_to_entry(&lp);
let response = table
.write_columns(
&mut dictionary,
0,
0,
entry
.partition_writes()
.unwrap()
.first()
.unwrap()
.table_batches()
.first()
.unwrap()
.columns(),
)
.err()
.unwrap();
assert!(
matches!(&response, Error::InternalColumnTypeMismatch {expected_column_type, actual_column_type, ..} if expected_column_type == "tag" && actual_column_type == "String"),
format!("didn't match returned error: {:?}", response)
);
let lp = "foo iv=1u 1";
let entry = lp_to_entry(&lp);
let response = table
.write_columns(
&mut dictionary,
0,
0,
entry
.partition_writes()
.unwrap()
.first()
.unwrap()
.table_batches()
.first()
.unwrap()
.columns(),
)
.err()
.unwrap();
assert!(
matches!(&response, Error::InternalColumnTypeMismatch {expected_column_type, actual_column_type, ..} if expected_column_type == "i64" && actual_column_type == "u64"),
format!("didn't match returned error: {:?}", response)
);
let lp = "foo fv=1i 1";
let entry = lp_to_entry(&lp);
let response = table
.write_columns(
&mut dictionary,
0,
0,
entry
.partition_writes()
.unwrap()
.first()
.unwrap()
.table_batches()
.first()
.unwrap()
.columns(),
)
.err()
.unwrap();
assert!(
matches!(&response, Error::InternalColumnTypeMismatch {expected_column_type, actual_column_type, ..} if expected_column_type == "f64" && actual_column_type == "i64"),
format!("didn't match returned error: {:?}", response)
);
let lp = "foo bv=1 1";
let entry = lp_to_entry(&lp);
let response = table
.write_columns(
&mut dictionary,
0,
0,
entry
.partition_writes()
.unwrap()
.first()
.unwrap()
.table_batches()
.first()
.unwrap()
.columns(),
)
.err()
.unwrap();
assert!(
matches!(&response, Error::InternalColumnTypeMismatch {expected_column_type, actual_column_type, ..} if expected_column_type == "bool" && actual_column_type == "f64"),
format!("didn't match returned error: {:?}", response)
);
let lp = "foo sv=true 1";
let entry = lp_to_entry(&lp);
let response = table
.write_columns(
&mut dictionary,
0,
0,
entry
.partition_writes()
.unwrap()
.first()
.unwrap()
.table_batches()
.first()
.unwrap()
.columns(),
)
.err()
.unwrap();
assert!(
matches!(&response, Error::InternalColumnTypeMismatch {expected_column_type, actual_column_type, ..} if expected_column_type == "String" && actual_column_type == "bool"),
format!("didn't match returned error: {:?}", response)
);
}
/// Insert the line protocol lines in `lp_lines` into this table
fn write_lines_to_table(table: &mut Table, dictionary: &mut Dictionary, lp_lines: Vec<&str>) {
let lp_data = lp_lines.join("\n");