From 2a134a3ac1809dd127ab071da6a9c21c9934f1f0 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Sat, 3 Apr 2021 12:10:22 -0400 Subject: [PATCH 1/3] feat: implment line protocol to flatbuffers This implements a builder for converting line protocol to the Entry flatbuffers. It also contains wrapper structs to make working with the flatbuffers a little easier. The flatbuffer needed an addition to keep track of how many bits in the null mask should be used. --- data_types/src/database_rules.rs | 8 + .../protos/influxdata/iox/write/v1/entry.fbs | 7 +- generated_types/src/entry_generated.rs | 50 +- internal_types/src/entry.rs | 1544 +++++++++++++++++ internal_types/src/lib.rs | 1 + 5 files changed, 1596 insertions(+), 14 deletions(-) create mode 100644 internal_types/src/entry.rs diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index dd06f11dc7..3ef57659c2 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -741,6 +741,14 @@ impl TryFrom for TemplatePart { } } +/// ShardId maps to a nodegroup that holds the the shard. +pub type ShardId = u8; + +/// Assigns a given line to a specific shard id. +pub trait Sharder { + fn shard(&self, _line: &ParsedLine<'_>) -> Result; +} + /// ShardConfig defines rules for assigning a line/row to an individual /// host or a group of hosts. A shard /// is a logical concept, but the usage is meant to split data into diff --git a/generated_types/protos/influxdata/iox/write/v1/entry.fbs b/generated_types/protos/influxdata/iox/write/v1/entry.fbs index 92c5aaadad..e180a72cfa 100644 --- a/generated_types/protos/influxdata/iox/write/v1/entry.fbs +++ b/generated_types/protos/influxdata/iox/write/v1/entry.fbs @@ -58,8 +58,11 @@ table Delete { // A collection of rows in a table in column oriented representation table TableWriteBatch { name: string; - // every column must have the same number of values in its null bitmask + // every column must have the same number of bytes in its null_mask columns: [Column]; + // specifies how many bits of the last byte of the null mask are valid. Thus the + // number of rows will be: (null_mask.len() - 1) * 8 + bits_in_last_null_byte + bits_in_last_null_byte: uint8; } enum LogicalColumnType : byte { IOx, Tag, Field, Time } @@ -87,6 +90,8 @@ table Column { // array that index is located in. Here's what it might look like: // position: 0 8 9 24 // bit: 00100011 00111000 00000001 + // An on bit (1) indicates that the value at that position is null. If there are + // no null values in the column, the null_mask is omitted from the flatbuffers. null_mask: [ubyte]; } diff --git a/generated_types/src/entry_generated.rs b/generated_types/src/entry_generated.rs index 0e89318fc1..45d222fb24 100644 --- a/generated_types/src/entry_generated.rs +++ b/generated_types/src/entry_generated.rs @@ -1086,11 +1086,13 @@ pub mod influxdata { if let Some(x) = args.name { builder.add_name(x); } + builder.add_bits_in_last_null_byte(args.bits_in_last_null_byte); builder.finish() } pub const VT_NAME: flatbuffers::VOffsetT = 4; pub const VT_COLUMNS: flatbuffers::VOffsetT = 6; + pub const VT_BITS_IN_LAST_NULL_BYTE: flatbuffers::VOffsetT = 8; #[inline] pub fn name(&self) -> Option<&'a str> { @@ -1108,6 +1110,12 @@ pub mod influxdata { flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset>, >>(TableWriteBatch::VT_COLUMNS, None) } + #[inline] + pub fn bits_in_last_null_byte(&self) -> u8 { + self._tab + .get::(TableWriteBatch::VT_BITS_IN_LAST_NULL_BYTE, Some(0)) + .unwrap() + } } impl flatbuffers::Verifiable for TableWriteBatch<'_> { @@ -1126,6 +1134,11 @@ pub mod influxdata { .visit_field::>, >>(&"columns", Self::VT_COLUMNS, false)? + .visit_field::( + &"bits_in_last_null_byte", + Self::VT_BITS_IN_LAST_NULL_BYTE, + false, + )? .finish(); Ok(()) } @@ -1137,6 +1150,7 @@ pub mod influxdata { flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset>>, >, >, + pub bits_in_last_null_byte: u8, } impl<'a> Default for TableWriteBatchArgs<'a> { #[inline] @@ -1144,6 +1158,7 @@ pub mod influxdata { TableWriteBatchArgs { name: None, columns: None, + bits_in_last_null_byte: 0, } } } @@ -1172,6 +1187,14 @@ pub mod influxdata { ); } #[inline] + pub fn add_bits_in_last_null_byte(&mut self, bits_in_last_null_byte: u8) { + self.fbb_.push_slot::( + TableWriteBatch::VT_BITS_IN_LAST_NULL_BYTE, + bits_in_last_null_byte, + 0, + ); + } + #[inline] pub fn new( _fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>, ) -> TableWriteBatchBuilder<'a, 'b> { @@ -1193,6 +1216,7 @@ pub mod influxdata { let mut ds = f.debug_struct("TableWriteBatch"); ds.field("name", &self.name()); ds.field("columns", &self.columns()); + ds.field("bits_in_last_null_byte", &self.bits_in_last_null_byte()); ds.finish() } } @@ -1224,8 +1248,8 @@ pub mod influxdata { args: &'args ColumnArgs<'args>, ) -> flatbuffers::WIPOffset> { let mut builder = ColumnBuilder::new(_fbb); - if let Some(x) = args.null_bitmask { - builder.add_null_bitmask(x); + if let Some(x) = args.null_mask { + builder.add_null_mask(x); } if let Some(x) = args.values { builder.add_values(x); @@ -1242,7 +1266,7 @@ pub mod influxdata { pub const VT_LOGICAL_COLUMN_TYPE: flatbuffers::VOffsetT = 6; pub const VT_VALUES_TYPE: flatbuffers::VOffsetT = 8; pub const VT_VALUES: flatbuffers::VOffsetT = 10; - pub const VT_NULL_BITMASK: flatbuffers::VOffsetT = 12; + pub const VT_NULL_MASK: flatbuffers::VOffsetT = 12; #[inline] pub fn name(&self) -> Option<&'a str> { @@ -1273,10 +1297,10 @@ pub mod influxdata { ) } #[inline] - pub fn null_bitmask(&self) -> Option<&'a [u8]> { + pub fn null_mask(&self) -> Option<&'a [u8]> { self._tab .get::>>( - Column::VT_NULL_BITMASK, + Column::VT_NULL_MASK, None, ) .map(|v| v.safe_slice()) @@ -1363,7 +1387,7 @@ pub mod influxdata { _ => Ok(()), } })? - .visit_field::>>(&"null_bitmask", Self::VT_NULL_BITMASK, false)? + .visit_field::>>(&"null_mask", Self::VT_NULL_MASK, false)? .finish(); Ok(()) } @@ -1373,7 +1397,7 @@ pub mod influxdata { pub logical_column_type: LogicalColumnType, pub values_type: ColumnValues, pub values: Option>, - pub null_bitmask: Option>>, + pub null_mask: Option>>, } impl<'a> Default for ColumnArgs<'a> { #[inline] @@ -1383,7 +1407,7 @@ pub mod influxdata { logical_column_type: LogicalColumnType::IOx, values_type: ColumnValues::NONE, values: None, - null_bitmask: None, + null_mask: None, } } } @@ -1427,13 +1451,13 @@ pub mod influxdata { ); } #[inline] - pub fn add_null_bitmask( + pub fn add_null_mask( &mut self, - null_bitmask: flatbuffers::WIPOffset>, + null_mask: flatbuffers::WIPOffset>, ) { self.fbb_.push_slot_always::>( - Column::VT_NULL_BITMASK, - null_bitmask, + Column::VT_NULL_MASK, + null_mask, ); } #[inline] @@ -1507,7 +1531,7 @@ pub mod influxdata { ds.field("values", &x) } }; - ds.field("null_bitmask", &self.null_bitmask()); + ds.field("null_mask", &self.null_mask()); ds.finish() } } diff --git a/internal_types/src/entry.rs b/internal_types/src/entry.rs new file mode 100644 index 0000000000..fd13f69a90 --- /dev/null +++ b/internal_types/src/entry.rs @@ -0,0 +1,1544 @@ +//! This module contains helper code for building `Entry` and `SequencedEntry` +//! from line protocol and the `DatabaseRules` configuration. + +use crate::schema::TIME_COLUMN_NAME; +use data_types::database_rules::{Error as DataError, Partitioner, ShardId, Sharder}; +use generated_types::entry as entry_fb; +use influxdb_line_protocol::{FieldValue, ParsedLine}; + +use std::{collections::BTreeMap, convert::TryFrom}; + +use chrono::{DateTime, Utc}; +use flatbuffers::{FlatBufferBuilder, Follow, ForwardsUOffset, Vector, VectorIter, WIPOffset}; +use ouroboros::self_referencing; +use snafu::{ResultExt, Snafu}; +use std::fmt::Formatter; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Error generating partition key {}", source))] + GeneratingPartitionKey { source: DataError }, + + #[snafu(display("Error getting shard id {}", source))] + GeneratingShardId { source: DataError }, + + #[snafu(display("column type mismatch"))] + ColumnTypeMismatch, + + #[snafu(display("table {} has column {} with two different types", table, column))] + TableColumnTypeMismatch { table: String, column: String }, +} + +pub type Result = std::result::Result; + +/// Converts parsed line protocol into a collection of ShardedEntry with the +/// underlying flatbuffers bytes generated. +pub fn lines_to_sharded_entries( + lines: &[ParsedLine<'_>], + sharder: &impl Sharder, + partitioner: &impl Partitioner, +) -> Result> { + let default_time = Utc::now(); + let mut sharded_lines = BTreeMap::new(); + + for line in lines { + let shard_id = sharder.shard(line).context(GeneratingShardId)?; + let partition_key = partitioner + .partition_key(line, &default_time) + .context(GeneratingPartitionKey)?; + let table = line.series.measurement.as_str(); + + sharded_lines + .entry(shard_id) + .or_insert_with(BTreeMap::new) + .entry(partition_key) + .or_insert_with(BTreeMap::new) + .entry(table) + .or_insert_with(Vec::new) + .push(line); + } + + let default_time = Utc::now(); + + let mut sharded_entries = Vec::with_capacity(sharded_lines.len()); + for (shard_id, partitions) in sharded_lines.into_iter() { + let entry = build_sharded_entry(shard_id, partitions, &default_time)?; + sharded_entries.push(entry); + } + + Ok(sharded_entries) +} + +fn build_sharded_entry( + shard_id: ShardId, + partitions: BTreeMap>>>, + default_time: &DateTime, +) -> Result { + let mut fbb = flatbuffers::FlatBufferBuilder::new_with_capacity(1024); + + let mut partition_writes = Vec::with_capacity(partitions.len()); + for (partition_key, tables) in partitions.into_iter() { + let write = build_partition_write(&mut fbb, partition_key, tables, default_time)?; + partition_writes.push(write); + } + let partition_writes = fbb.create_vector(&partition_writes); + + let write_operations = entry_fb::WriteOperations::create( + &mut fbb, + &entry_fb::WriteOperationsArgs { + partition_writes: Some(partition_writes), + }, + ); + let entry = entry_fb::Entry::create( + &mut fbb, + &entry_fb::EntryArgs { + operation_type: entry_fb::Operation::write, + operation: Some(write_operations.as_union_value()), + }, + ); + + fbb.finish(entry, None); + + let (mut data, idx) = fbb.collapse(); + let entry = Entry::try_from(data.split_off(idx)) + .expect("Flatbuffer data just constructed should be valid"); + + Ok(ShardedEntry { shard_id, entry }) +} + +fn build_partition_write<'a>( + fbb: &mut FlatBufferBuilder<'a>, + partition_key: String, + tables: BTreeMap<&str, Vec<&'a ParsedLine<'_>>>, + default_time: &DateTime, +) -> Result>> { + let partition_key = fbb.create_string(&partition_key); + + let mut table_batches = Vec::with_capacity(tables.len()); + for (table_name, lines) in tables.into_iter() { + let batch = build_table_write_batch(fbb, table_name, lines, default_time)?; + table_batches.push(batch); + } + let table_batches = fbb.create_vector(&table_batches); + + Ok(entry_fb::PartitionWrite::create( + fbb, + &entry_fb::PartitionWriteArgs { + key: Some(partition_key), + table_batches: Some(table_batches), + }, + )) +} + +fn build_table_write_batch<'a>( + fbb: &mut FlatBufferBuilder<'a>, + table_name: &str, + lines: Vec<&'a ParsedLine<'_>>, + default_time: &DateTime, +) -> Result>> { + let mut columns = BTreeMap::new(); + for (i, line) in lines.iter().enumerate() { + let row_number = i + 1; + + if let Some(tagset) = &line.series.tag_set { + for (key, value) in tagset { + let key = key.as_str(); + let builder = columns + .entry(key) + .or_insert_with(ColumnBuilder::new_tag_column); + builder.null_to_row(row_number); + if builder.push_tag(value.as_str()).is_err() { + return TableColumnTypeMismatch { + table: table_name, + column: key, + } + .fail(); + } + } + } + + for (key, val) in &line.field_set { + let key = key.as_str(); + + match val { + FieldValue::Boolean(b) => { + let builder = columns + .entry(key) + .or_insert_with(ColumnBuilder::new_bool_column); + builder.null_to_row(row_number); + if builder.push_bool(*b).is_err() { + return TableColumnTypeMismatch { + table: table_name, + column: key, + } + .fail(); + } + } + FieldValue::U64(v) => { + let builder = columns + .entry(key) + .or_insert_with(ColumnBuilder::new_u64_column); + builder.null_to_row(row_number); + if builder.push_u64(*v).is_err() { + return TableColumnTypeMismatch { + table: table_name, + column: key, + } + .fail(); + } + } + FieldValue::F64(v) => { + let builder = columns + .entry(key) + .or_insert_with(ColumnBuilder::new_f64_column); + builder.null_to_row(row_number); + if builder.push_f64(*v).is_err() { + return TableColumnTypeMismatch { + table: table_name, + column: key, + } + .fail(); + } + } + FieldValue::I64(v) => { + let builder = columns + .entry(key) + .or_insert_with(ColumnBuilder::new_i64_column); + builder.null_to_row(row_number); + if builder.push_i64(*v).is_err() { + return TableColumnTypeMismatch { + table: table_name, + column: key, + } + .fail(); + } + } + FieldValue::String(v) => { + let builder = columns + .entry(key) + .or_insert_with(ColumnBuilder::new_string_column); + builder.null_to_row(row_number); + if builder.push_string(v.as_str()).is_err() { + return TableColumnTypeMismatch { + table: table_name, + column: key, + } + .fail(); + } + } + } + } + + let builder = columns + .entry(TIME_COLUMN_NAME) + .or_insert_with(ColumnBuilder::new_time_column); + if builder + .push_time( + line.timestamp + .unwrap_or_else(|| default_time.timestamp_nanos()), + ) + .is_err() + { + return TableColumnTypeMismatch { + table: table_name, + column: TIME_COLUMN_NAME, + } + .fail(); + } + + for b in columns.values_mut() { + b.null_to_row(row_number + 1); + } + } + + let columns = columns + .into_iter() + .map(|(column_name, builder)| builder.build_flatbuffer(fbb, column_name)) + .collect::>(); + let columns = fbb.create_vector(&columns); + + let table_name = fbb.create_string(table_name); + + Ok(entry_fb::TableWriteBatch::create( + fbb, + &entry_fb::TableWriteBatchArgs { + name: Some(table_name), + columns: Some(columns), + bits_in_last_null_byte: bits_in_last_byte(lines.len()), + }, + )) +} + +/// Holds a shard id to the associated entry +#[derive(Debug)] +pub struct ShardedEntry { + pub shard_id: ShardId, + pub entry: Entry, +} + +/// Wrapper type for the flatbuffer Entry struct. Has convenience methods for +/// iterating through the partitioned writes. +#[self_referencing] +#[derive(Debug, PartialEq)] +pub struct Entry { + data: Vec, + #[borrows(data)] + #[covariant] + fb: entry_fb::Entry<'this>, +} + +impl Entry { + /// Returns the Flatbuffers struct for the Entry + pub fn fb(&self) -> &entry_fb::Entry<'_> { + self.borrow_fb() + } + + pub fn partition_writes(&self) -> Option>> { + match self.fb().operation_as_write().as_ref() { + Some(w) => w + .partition_writes() + .as_ref() + .map(|w| w.iter().map(|fb| PartitionWrite { fb }).collect::>()), + None => None, + } + } +} + +impl TryFrom> for Entry { + type Error = flatbuffers::InvalidFlatbuffer; + + fn try_from(data: Vec) -> Result { + EntryTryBuilder { + data, + fb_builder: |data| flatbuffers::root::>(data), + } + .try_build() + } +} + +/// Wrapper struct for the flatbuffers PartitionWrite. Has convenience methods +/// for iterating through the table batches. +#[derive(Debug)] +pub struct PartitionWrite<'a> { + fb: entry_fb::PartitionWrite<'a>, +} + +impl<'a> PartitionWrite<'a> { + pub fn key(&self) -> Option<&str> { + self.fb.key() + } + + pub fn table_batches(&self) -> Vec> { + match self.fb.table_batches().as_ref() { + Some(batches) => batches + .iter() + .map(|fb| TableBatch { fb }) + .collect::>(), + None => vec![], + } + } +} + +/// Wrapper struct for the flatbuffers TableBatch. Has convenience methods for +/// iterating through the data in columnar format. +#[derive(Debug)] +pub struct TableBatch<'a> { + fb: entry_fb::TableWriteBatch<'a>, +} + +impl<'a> TableBatch<'a> { + pub fn name(&self) -> Option<&str> { + self.fb.name() + } + + pub fn columns(&self) -> Vec> { + match self.fb.columns().as_ref() { + Some(columns) => { + let row_count = self.row_count(); + columns + .iter() + .map(|fb| Column { fb, row_count }) + .collect::>() + } + None => vec![], + } + } + + pub fn row_count(&self) -> usize { + if let Some(cols) = self.fb.columns() { + if let Some(c) = cols.iter().next() { + match c.null_mask() { + Some(m) => { + let rows = m.len() * 8; + let bits = self.fb.bits_in_last_null_byte() as usize; + + return rows + bits - 8; + } + None => { + return match c.values_type() { + entry_fb::ColumnValues::BoolValues => { + c.values_as_bool_values().unwrap().values().unwrap().len() + } + entry_fb::ColumnValues::U64Values => { + c.values_as_u64values().unwrap().values().unwrap().len() + } + entry_fb::ColumnValues::F64Values => { + c.values_as_f64values().unwrap().values().unwrap().len() + } + entry_fb::ColumnValues::I64Values => { + c.values_as_i64values().unwrap().values().unwrap().len() + } + entry_fb::ColumnValues::StringValues => { + c.values_as_string_values().unwrap().values().unwrap().len() + } + entry_fb::ColumnValues::BytesValues => { + c.values_as_bytes_values().unwrap().values().unwrap().len() + } + _ => panic!("invalid column flatbuffers"), + } + } + } + } + } + + 0 + } +} + +/// Wrapper struct for the flatbuffers Column. Has a convenience method to +/// return an iterator for the values in the column. +#[derive(Debug)] +pub struct Column<'a> { + fb: entry_fb::Column<'a>, + row_count: usize, +} + +impl<'a> Column<'a> { + pub fn name(&self) -> Option<&str> { + self.fb.name() + } + + pub fn logical_type(&self) -> entry_fb::LogicalColumnType { + self.fb.logical_column_type() + } + + pub fn values(&self) -> TypedValuesIterator<'a> { + match self.fb.values_type() { + entry_fb::ColumnValues::BoolValues => TypedValuesIterator::Bool(BoolIterator { + row_count: self.row_count, + position: 0, + null_mask: self.fb.null_mask(), + value_position: 0, + values: self + .fb + .values_as_bool_values() + .expect("invalid flatbuffers") + .values() + .unwrap_or(&[]), + }), + entry_fb::ColumnValues::StringValues => { + let values = self + .fb + .values_as_string_values() + .expect("invalid flatbuffers") + .values() + .expect("flatbuffers StringValues must have string values set") + .iter(); + + TypedValuesIterator::String(StringIterator { + row_count: self.row_count, + position: 0, + null_mask: self.fb.null_mask(), + values, + }) + } + entry_fb::ColumnValues::I64Values => { + let values_iter = self + .fb + .values_as_i64values() + .expect("invalid flatbuffers") + .values() + .unwrap_or_else(|| Vector::new(&[], 0)) + .iter(); + + TypedValuesIterator::I64(ValIterator { + row_count: self.row_count, + position: 0, + null_mask: self.fb.null_mask(), + values_iter, + }) + } + entry_fb::ColumnValues::F64Values => { + let values_iter = self + .fb + .values_as_f64values() + .expect("invalid flatbuffers") + .values() + .unwrap_or_else(|| Vector::new(&[], 0)) + .iter(); + + TypedValuesIterator::F64(ValIterator { + row_count: self.row_count, + position: 0, + null_mask: self.fb.null_mask(), + values_iter, + }) + } + entry_fb::ColumnValues::U64Values => { + let values_iter = self + .fb + .values_as_u64values() + .expect("invalid flatbuffers") + .values() + .unwrap_or_else(|| Vector::new(&[], 0)) + .iter(); + + TypedValuesIterator::U64(ValIterator { + row_count: self.row_count, + position: 0, + null_mask: self.fb.null_mask(), + values_iter, + }) + } + entry_fb::ColumnValues::BytesValues => unimplemented!(), + _ => panic!("unknown fb values type"), + } + } +} + +/// Wrapper for the iterators for the underlying column types. +#[derive(Debug)] +pub enum TypedValuesIterator<'a> { + Bool(BoolIterator<'a>), + I64(ValIterator<'a, i64>), + F64(ValIterator<'a, f64>), + U64(ValIterator<'a, u64>), + String(StringIterator<'a>), +} + +impl<'a> TypedValuesIterator<'a> { + pub fn bool_values(self) -> Option>> { + match self { + Self::Bool(b) => Some(b.collect::>()), + _ => None, + } + } + + pub fn i64_values(self) -> Option>> { + match self { + Self::I64(v) => Some(v.collect::>()), + _ => None, + } + } + + pub fn f64_values(self) -> Option>> { + match self { + Self::F64(v) => Some(v.collect::>()), + _ => None, + } + } + + pub fn u64_values(self) -> Option>> { + match self { + Self::U64(v) => Some(v.collect::>()), + _ => None, + } + } +} + +/// Iterator over the flatbuffers BoolValues +#[derive(Debug)] +pub struct BoolIterator<'a> { + row_count: usize, + position: usize, + null_mask: Option<&'a [u8]>, + values: &'a [bool], + value_position: usize, +} + +impl<'a> Iterator for BoolIterator<'a> { + type Item = Option; + + fn next(&mut self) -> Option { + if self.position >= self.row_count || self.value_position >= self.values.len() { + return None; + } + + self.position += 1; + if is_null_value(self.position, &self.null_mask) { + return Some(None); + } + + let val = Some(self.values[self.value_position]); + self.value_position += 1; + + Some(val) + } +} + +/// Iterator over the flatbuffers I64Values, F64Values, and U64Values. +#[derive(Debug)] +pub struct ValIterator<'a, T: Follow<'a> + Follow<'a, Inner = T>> { + row_count: usize, + position: usize, + null_mask: Option<&'a [u8]>, + values_iter: VectorIter<'a, T>, +} + +impl<'a, T: Follow<'a> + Follow<'a, Inner = T>> Iterator for ValIterator<'a, T> { + type Item = Option; + + fn next(&mut self) -> Option { + if self.position >= self.row_count { + return None; + } + + self.position += 1; + if is_null_value(self.position, &self.null_mask) { + return Some(None); + } + + Some(self.values_iter.next()) + } +} + +/// Iterator over the flatbuffers StringValues +#[derive(Debug)] +pub struct StringIterator<'a> { + row_count: usize, + position: usize, + null_mask: Option<&'a [u8]>, + values: VectorIter<'a, ForwardsUOffset<&'a str>>, +} + +impl<'a> Iterator for StringIterator<'a> { + type Item = Option<&'a str>; + + fn next(&mut self) -> Option { + if self.position >= self.row_count { + return None; + } + + self.position += 1; + if is_null_value(self.position, &self.null_mask) { + return Some(None); + } + + Some(self.values.next()) + } +} + +struct NullMaskBuilder { + bytes: Vec, + position: usize, +} + +const BITS_IN_BYTE: usize = 8; +const LEFT_MOST_BIT_TRUE: u8 = 128; + +impl NullMaskBuilder { + fn new() -> Self { + Self { + bytes: vec![0], + position: 1, + } + } + + fn push(&mut self, is_null: bool) { + if self.position > BITS_IN_BYTE { + self.bytes.push(0); + self.position = 1; + } + + if is_null { + let val: u8 = LEFT_MOST_BIT_TRUE >> (self.position - 1); + let last_byte_position = self.bytes.len() - 1; + self.bytes[last_byte_position] += val; + } + + self.position += 1; + } + + #[allow(dead_code)] + fn to_bool_vec(&self) -> Vec { + (1..self.row_count() + 1) + .map(|r| is_null_value(r, &Some(&self.bytes))) + .collect::>() + } + + fn row_count(&self) -> usize { + self.bytes.len() * BITS_IN_BYTE - BITS_IN_BYTE + self.position - 1 + } + + fn has_nulls(&self) -> bool { + for b in &self.bytes { + if *b > 0 { + return true; + } + } + + false + } +} + +impl std::fmt::Debug for NullMaskBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + for i in 1..self.row_count() { + let bit = if is_null_value(i, &Some(&self.bytes)) { + 1 + } else { + 0 + }; + + write!(f, "{}", bit)?; + if i % 4 == 0 { + write!(f, " ")?; + } + } + + Ok(()) + } +} + +fn is_null_value(row: usize, mask: &Option<&[u8]>) -> bool { + match mask { + Some(mask) => { + let position = bits_in_last_byte(row); + + let mut byte = row / BITS_IN_BYTE; + if position == BITS_IN_BYTE as u8 { + byte -= 1; + } + + if byte >= mask.len() { + return true; + } + + mask[byte] & (LEFT_MOST_BIT_TRUE >> (position - 1)) > 0 + } + None => false, + } +} + +fn bits_in_last_byte(row_count: usize) -> u8 { + let position = (row_count % BITS_IN_BYTE) as u8; + if position == 0 { + BITS_IN_BYTE as u8 + } else { + position + } +} + +#[derive(Debug)] +struct ColumnBuilder<'a> { + nulls: NullMaskBuilder, + values: ColumnRaw<'a>, +} + +impl<'a> ColumnBuilder<'a> { + fn new_tag_column() -> Self { + Self { + nulls: NullMaskBuilder::new(), + values: ColumnRaw::Tag(Vec::new()), + } + } + + fn new_string_column() -> Self { + Self { + nulls: NullMaskBuilder::new(), + values: ColumnRaw::String(Vec::new()), + } + } + + fn new_time_column() -> Self { + Self { + nulls: NullMaskBuilder::new(), + values: ColumnRaw::Time(Vec::new()), + } + } + + fn new_bool_column() -> Self { + Self { + nulls: NullMaskBuilder::new(), + values: ColumnRaw::Bool(Vec::new()), + } + } + + fn new_u64_column() -> Self { + Self { + nulls: NullMaskBuilder::new(), + values: ColumnRaw::U64(Vec::new()), + } + } + + fn new_f64_column() -> Self { + Self { + nulls: NullMaskBuilder::new(), + values: ColumnRaw::F64(Vec::new()), + } + } + + fn new_i64_column() -> Self { + Self { + nulls: NullMaskBuilder::new(), + values: ColumnRaw::I64(Vec::new()), + } + } + + // ensures there are at least as many rows (or nulls) to row_number - 1 + fn null_to_row(&mut self, row_number: usize) { + let mut row_count = self.nulls.row_count(); + + while row_count < row_number - 1 { + self.nulls.push(true); + row_count += 1; + } + } + + fn push_tag(&mut self, value: &'a str) -> Result<()> { + match &mut self.values { + ColumnRaw::Tag(values) => { + self.nulls.push(false); + values.push(value) + } + _ => return ColumnTypeMismatch.fail(), + } + + Ok(()) + } + + fn push_string(&mut self, value: &'a str) -> Result<()> { + match &mut self.values { + ColumnRaw::String(values) => { + self.nulls.push(false); + values.push(value) + } + _ => return ColumnTypeMismatch.fail(), + } + + Ok(()) + } + + fn push_time(&mut self, value: i64) -> Result<()> { + match &mut self.values { + ColumnRaw::Time(times) => { + times.push(value); + self.nulls.push(false); + } + _ => return ColumnTypeMismatch.fail(), + } + + Ok(()) + } + + fn push_bool(&mut self, value: bool) -> Result<()> { + match &mut self.values { + ColumnRaw::Bool(values) => { + values.push(value); + self.nulls.push(false); + } + _ => return ColumnTypeMismatch.fail(), + } + + Ok(()) + } + + fn push_u64(&mut self, value: u64) -> Result<()> { + match &mut self.values { + ColumnRaw::U64(values) => { + values.push(value); + self.nulls.push(false); + } + _ => return ColumnTypeMismatch.fail(), + } + + Ok(()) + } + + fn push_f64(&mut self, value: f64) -> Result<()> { + match &mut self.values { + ColumnRaw::F64(values) => { + values.push(value); + self.nulls.push(false); + } + _ => return ColumnTypeMismatch.fail(), + } + + Ok(()) + } + + fn push_i64(&mut self, value: i64) -> Result<()> { + match &mut self.values { + ColumnRaw::I64(values) => { + values.push(value); + self.nulls.push(false); + } + _ => return ColumnTypeMismatch.fail(), + } + + Ok(()) + } + + fn build_flatbuffer( + &self, + fbb: &mut FlatBufferBuilder<'a>, + column_name: &str, + ) -> WIPOffset> { + let name = Some(fbb.create_string(column_name)); + let null_mask = if self.nulls.has_nulls() { + Some(fbb.create_vector_direct(&self.nulls.bytes)) + } else { + None + }; + + let (logical_column_type, values_type, values) = match &self.values { + ColumnRaw::Tag(values) => { + let values = values + .iter() + .map(|v| fbb.create_string(v)) + .collect::>(); + let values = fbb.create_vector(&values); + let values = entry_fb::StringValues::create( + fbb, + &entry_fb::StringValuesArgs { + values: Some(values), + }, + ); + + ( + entry_fb::LogicalColumnType::Tag, + entry_fb::ColumnValues::StringValues, + values.as_union_value(), + ) + } + ColumnRaw::String(values) => { + let values = values + .iter() + .map(|v| fbb.create_string(v)) + .collect::>(); + let values = fbb.create_vector(&values); + let values = entry_fb::StringValues::create( + fbb, + &entry_fb::StringValuesArgs { + values: Some(values), + }, + ); + + ( + entry_fb::LogicalColumnType::Field, + entry_fb::ColumnValues::StringValues, + values.as_union_value(), + ) + } + ColumnRaw::Time(values) => { + let values = fbb.create_vector(&values); + let values = entry_fb::I64Values::create( + fbb, + &entry_fb::I64ValuesArgs { + values: Some(values), + }, + ); + + ( + entry_fb::LogicalColumnType::Time, + entry_fb::ColumnValues::I64Values, + values.as_union_value(), + ) + } + ColumnRaw::I64(values) => { + let values = fbb.create_vector(&values); + let values = entry_fb::I64Values::create( + fbb, + &entry_fb::I64ValuesArgs { + values: Some(values), + }, + ); + + ( + entry_fb::LogicalColumnType::Field, + entry_fb::ColumnValues::I64Values, + values.as_union_value(), + ) + } + ColumnRaw::Bool(values) => { + let values = fbb.create_vector(&values); + let values = entry_fb::BoolValues::create( + fbb, + &entry_fb::BoolValuesArgs { + values: Some(values), + }, + ); + + ( + entry_fb::LogicalColumnType::Field, + entry_fb::ColumnValues::BoolValues, + values.as_union_value(), + ) + } + ColumnRaw::F64(values) => { + let values = fbb.create_vector(&values); + let values = entry_fb::F64Values::create( + fbb, + &entry_fb::F64ValuesArgs { + values: Some(values), + }, + ); + + ( + entry_fb::LogicalColumnType::Field, + entry_fb::ColumnValues::F64Values, + values.as_union_value(), + ) + } + ColumnRaw::U64(values) => { + let values = fbb.create_vector(&values); + let values = entry_fb::U64Values::create( + fbb, + &entry_fb::U64ValuesArgs { + values: Some(values), + }, + ); + + ( + entry_fb::LogicalColumnType::Field, + entry_fb::ColumnValues::U64Values, + values.as_union_value(), + ) + } + }; + + entry_fb::Column::create( + fbb, + &entry_fb::ColumnArgs { + name, + logical_column_type, + values_type, + values: Some(values), + null_mask, + }, + ) + } +} + +#[derive(Debug)] +enum ColumnRaw<'a> { + Tag(Vec<&'a str>), + Time(Vec), + I64(Vec), + F64(Vec), + U64(Vec), + String(Vec<&'a str>), + Bool(Vec), +} + +#[cfg(test)] +mod tests { + use super::*; + + use influxdb_line_protocol::parse_lines; + + #[test] + fn shards_lines() { + let lp = vec![ + "cpu,host=a,region=west user=23.1,system=66.1 123", + "mem,host=a,region=west used=23432 123", + "foo bar=true 21", + ] + .join("\n"); + let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); + + let sharded_entries = + lines_to_sharded_entries(&lines, &sharder(2), &partitioner(1)).unwrap(); + + assert_eq!(sharded_entries.len(), 2); + assert_eq!(sharded_entries[0].shard_id, 0); + assert_eq!(sharded_entries[1].shard_id, 1); + } + + #[test] + fn multiple_partitions() { + let lp = vec![ + "cpu,host=a,region=west user=23.1,system=66.1 123", + "mem,host=a,region=west used=23432 123", + "asdf foo=\"bar\" 9999", + ] + .join("\n"); + let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); + + let sharded_entries = + lines_to_sharded_entries(&lines, &sharder(1), &partitioner(2)).unwrap(); + + let partition_writes = sharded_entries[0].entry.partition_writes().unwrap(); + assert_eq!(partition_writes.len(), 2); + assert_eq!(partition_writes[0].key().unwrap(), "key_0"); + assert_eq!(partition_writes[1].key().unwrap(), "key_1"); + } + + #[test] + fn multiple_tables() { + let lp = vec![ + "cpu val=1 55", + "mem val=23 10", + "cpu val=88 100", + "disk foo=23.2 110", + "mem val=55 111", + ] + .join("\n"); + let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); + + let sharded_entries = + lines_to_sharded_entries(&lines, &sharder(1), &partitioner(1)).unwrap(); + + let partition_writes = sharded_entries[0].entry.partition_writes().unwrap(); + let table_batches = partition_writes[0].table_batches(); + + assert_eq!(table_batches.len(), 3); + assert_eq!(table_batches[0].name().unwrap(), "cpu"); + assert_eq!(table_batches[1].name().unwrap(), "disk"); + assert_eq!(table_batches[2].name().unwrap(), "mem"); + } + + #[test] + fn logical_column_types() { + let lp = vec!["a,host=a val=23 983", "a,host=a,region=west val2=23.2 2343"].join("\n"); + let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); + + let sharded_entries = + lines_to_sharded_entries(&lines, &sharder(1), &partitioner(1)).unwrap(); + + let partition_writes = sharded_entries[0].entry.partition_writes().unwrap(); + let table_batches = partition_writes[0].table_batches(); + let batch = &table_batches[0]; + + let columns = batch.columns(); + + assert_eq!(columns.len(), 5); + + assert_eq!(columns[0].name().unwrap(), "host"); + assert_eq!(columns[0].logical_type(), entry_fb::LogicalColumnType::Tag); + + assert_eq!(columns[1].name().unwrap(), "region"); + assert_eq!(columns[1].logical_type(), entry_fb::LogicalColumnType::Tag); + + assert_eq!(columns[2].name().unwrap(), "time"); + assert_eq!(columns[2].logical_type(), entry_fb::LogicalColumnType::Time); + + assert_eq!(columns[3].name().unwrap(), "val"); + assert_eq!( + columns[3].logical_type(), + entry_fb::LogicalColumnType::Field + ); + + assert_eq!(columns[4].name().unwrap(), "val2"); + assert_eq!( + columns[4].logical_type(), + entry_fb::LogicalColumnType::Field + ); + } + + #[test] + fn columns_without_nulls() { + let lp = vec![ + "a,host=a ival=23i,fval=1.2,uval=7u,sval=\"hi\",bval=true 1", + "a,host=b ival=22i,fval=2.2,uval=1u,sval=\"world\",bval=false 2", + ] + .join("\n"); + let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); + + let sharded_entries = + lines_to_sharded_entries(&lines, &sharder(1), &partitioner(1)).unwrap(); + + let partition_writes = sharded_entries + .first() + .unwrap() + .entry + .partition_writes() + .unwrap(); + let table_batches = partition_writes.first().unwrap().table_batches(); + let batch = table_batches.first().unwrap(); + + let columns = batch.columns(); + + assert_eq!(batch.row_count(), 2); + assert_eq!(columns.len(), 7); + + let col = columns.get(0).unwrap(); + assert_eq!(col.name().unwrap(), "bval"); + let values = col.values().bool_values().unwrap(); + assert_eq!(&values, &[Some(true), Some(false)]); + + let col = columns.get(1).unwrap(); + assert_eq!(col.name().unwrap(), "fval"); + let values = col.values().f64_values().unwrap(); + assert_eq!(&values, &[Some(1.2), Some(2.2)]); + + let col = columns.get(2).unwrap(); + assert_eq!(col.name().unwrap(), "host"); + let values = match col.values() { + TypedValuesIterator::String(v) => v, + _ => panic!("wrong type"), + }; + let values = values.collect::>(); + assert_eq!(&values, &[Some("a"), Some("b")]); + + let col = columns.get(3).unwrap(); + assert_eq!(col.name().unwrap(), "ival"); + let values = col.values().i64_values().unwrap(); + assert_eq!(&values, &[Some(23), Some(22)]); + + let col = columns.get(4).unwrap(); + assert_eq!(col.name().unwrap(), "sval"); + let values = match col.values() { + TypedValuesIterator::String(v) => v, + _ => panic!("wrong type"), + }; + let values = values.collect::>(); + assert_eq!(&values, &[Some("hi"), Some("world")]); + + let col = columns.get(5).unwrap(); + assert_eq!(col.name().unwrap(), TIME_COLUMN_NAME); + let values = col.values().i64_values().unwrap(); + assert_eq!(&values, &[Some(1), Some(2)]); + + let col = columns.get(6).unwrap(); + assert_eq!(col.name().unwrap(), "uval"); + let values = col.values().u64_values().unwrap(); + assert_eq!(&values, &[Some(7), Some(1)]); + } + + #[test] + fn columns_with_nulls() { + let lp = vec![ + "a,host=a val=23i 983", + "a,host=a,region=west val2=23.2 2343", + "a val=21i,bool=true,string=\"hello\" 222", + ] + .join("\n"); + let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); + + let sharded_entries = + lines_to_sharded_entries(&lines, &sharder(1), &partitioner(1)).unwrap(); + + let partition_writes = sharded_entries + .first() + .unwrap() + .entry + .partition_writes() + .unwrap(); + let table_batches = partition_writes.first().unwrap().table_batches(); + let batch = table_batches.first().unwrap(); + + let columns = batch.columns(); + + assert_eq!(batch.row_count(), 3); + assert_eq!(columns.len(), 7); + + let col = columns.get(0).unwrap(); + assert_eq!(col.name().unwrap(), "bool"); + assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Field); + let values = col.values().bool_values().unwrap(); + assert_eq!(&values, &[None, None, Some(true)]); + + let col = columns.get(1).unwrap(); + assert_eq!(col.name().unwrap(), "host"); + assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Tag); + let values = match col.values() { + TypedValuesIterator::String(v) => v, + _ => panic!("wrong type"), + }; + let values = values.collect::>(); + assert_eq!(&values, &[Some("a"), Some("a"), None]); + + let col = columns.get(2).unwrap(); + assert_eq!(col.name().unwrap(), "region"); + assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Tag); + let values = match col.values() { + TypedValuesIterator::String(v) => v, + _ => panic!("wrong type"), + }; + let values = values.collect::>(); + assert_eq!(&values, &[None, Some("west"), None]); + + let col = columns.get(3).unwrap(); + assert_eq!(col.name().unwrap(), "string"); + assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Field); + let values = match col.values() { + TypedValuesIterator::String(v) => v, + _ => panic!("wrong type"), + }; + let values = values.collect::>(); + assert_eq!(&values, &[None, None, Some("hello")]); + + let col = columns.get(4).unwrap(); + assert_eq!(col.name().unwrap(), TIME_COLUMN_NAME); + assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Time); + let values = col.values().i64_values().unwrap(); + assert_eq!(&values, &[Some(983), Some(2343), Some(222)]); + + let col = columns.get(5).unwrap(); + assert_eq!(col.name().unwrap(), "val"); + assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Field); + let values = col.values().i64_values().unwrap(); + assert_eq!(&values, &[Some(23), None, Some(21)]); + + let col = columns.get(6).unwrap(); + assert_eq!(col.name().unwrap(), "val2"); + assert_eq!(col.logical_type(), entry_fb::LogicalColumnType::Field); + let values = col.values().f64_values().unwrap(); + assert_eq!(&values, &[None, Some(23.2), None]); + } + + #[test] + fn null_mask_builder() { + let mut m = NullMaskBuilder::new(); + m.push(true); + m.push(false); + m.push(true); + assert_eq!(m.row_count(), 3); + assert_eq!(m.to_bool_vec(), vec![true, false, true]); + } + + #[test] + fn null_mask_builder_eight_edge_case() { + let mut m = NullMaskBuilder::new(); + m.push(false); + m.push(true); + m.push(true); + m.push(false); + m.push(false); + m.push(true); + m.push(true); + m.push(false); + assert_eq!(m.row_count(), 8); + assert_eq!( + m.to_bool_vec(), + vec![false, true, true, false, false, true, true, false] + ); + } + + #[test] + fn null_mask_builder_more_than_eight() { + let mut m = NullMaskBuilder::new(); + m.push(false); + m.push(true); + m.push(true); + m.push(false); + m.push(false); + m.push(true); + m.push(false); + m.push(false); + m.push(false); + m.push(true); + assert_eq!(m.row_count(), 10); + assert_eq!( + m.to_bool_vec(), + vec![false, true, true, false, false, true, false, false, false, true] + ); + } + + #[test] + fn row_count_edge_cases() { + let lp = vec!["a val=1i 1"].join("\n"); + let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); + + let sharded_entries = + lines_to_sharded_entries(&lines, &sharder(1), &partitioner(1)).unwrap(); + let partition_writes = sharded_entries + .first() + .unwrap() + .entry + .partition_writes() + .unwrap(); + let table_batches = partition_writes.first().unwrap().table_batches(); + let batch = table_batches.first().unwrap(); + let columns = batch.columns(); + + assert_eq!(batch.row_count(), 1); + let col = columns.get(1).unwrap(); + assert_eq!(col.name().unwrap(), "val"); + let values = col.values().i64_values().unwrap(); + assert_eq!(&values, &[Some(1)]); + + let lp = vec![ + "a val=1i 1", + "a val=1i 2", + "a val=1i 3", + "a val=1i 4", + "a val=1i 5", + "a val=1i 6", + "a val2=1i 7", + "a val=1i 8", + ] + .join("\n"); + let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); + + let sharded_entries = + lines_to_sharded_entries(&lines, &sharder(1), &partitioner(1)).unwrap(); + let partition_writes = sharded_entries + .first() + .unwrap() + .entry + .partition_writes() + .unwrap(); + let table_batches = partition_writes.first().unwrap().table_batches(); + let batch = table_batches.first().unwrap(); + let columns = batch.columns(); + + assert_eq!(batch.row_count(), 8); + let col = columns.get(1).unwrap(); + assert_eq!(col.name().unwrap(), "val"); + let values = col.values().i64_values().unwrap(); + assert_eq!( + &values, + &[ + Some(1), + Some(1), + Some(1), + Some(1), + Some(1), + Some(1), + None, + Some(1) + ] + ); + + let lp = vec![ + "a val=1i 1", + "a val=1i 2", + "a val=1i 3", + "a val=1i 4", + "a val=1i 5", + "a val=1i 6", + "a val2=1i 7", + "a val=1i 8", + "a val=1i 9", + ] + .join("\n"); + let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); + + let sharded_entries = + lines_to_sharded_entries(&lines, &sharder(1), &partitioner(1)).unwrap(); + let partition_writes = sharded_entries + .first() + .unwrap() + .entry + .partition_writes() + .unwrap(); + let table_batches = partition_writes.first().unwrap().table_batches(); + let batch = table_batches.first().unwrap(); + let columns = batch.columns(); + + assert_eq!(batch.row_count(), 9); + let col = columns.get(1).unwrap(); + assert_eq!(col.name().unwrap(), "val"); + let values = col.values().i64_values().unwrap(); + assert_eq!( + &values, + &[ + Some(1), + Some(1), + Some(1), + Some(1), + Some(1), + Some(1), + None, + Some(1), + Some(1) + ] + ); + } + + #[test] + fn missing_times() { + let lp = vec!["a val=1i", "a val=2i 123"].join("\n"); + let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); + + let t = Utc::now().timestamp_nanos(); + + let sharded_entries = + lines_to_sharded_entries(&lines, &sharder(1), &partitioner(1)).unwrap(); + + let partition_writes = sharded_entries + .first() + .unwrap() + .entry + .partition_writes() + .unwrap(); + let table_batches = partition_writes.first().unwrap().table_batches(); + let batch = table_batches.first().unwrap(); + let columns = batch.columns(); + + let col = columns.get(0).unwrap(); + assert_eq!(col.name().unwrap(), TIME_COLUMN_NAME); + let values = col.values().i64_values().unwrap(); + assert!(values[0].unwrap() > t); + assert_eq!(values[1], Some(123)); + } + + #[test] + fn field_type_conflict() { + let lp = vec!["a val=1i 1", "a val=2.1 123"].join("\n"); + let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); + + let sharded_entries = lines_to_sharded_entries(&lines, &sharder(1), &partitioner(1)); + + assert!(sharded_entries.is_err()); + } + + #[test] + fn logical_type_conflict() { + let lp = vec!["a,host=a val=1i 1", "a host=\"b\" 123"].join("\n"); + let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); + + let sharded_entries = lines_to_sharded_entries(&lines, &sharder(1), &partitioner(1)); + + assert!(sharded_entries.is_err()); + } + + fn sharder(count: u8) -> TestSharder { + TestSharder { + count, + n: std::cell::RefCell::new(0), + } + } + + // For each line passed to shard returns a shard id from [0, count) in order + struct TestSharder { + count: u8, + n: std::cell::RefCell, + } + + impl Sharder for TestSharder { + fn shard(&self, _line: &ParsedLine<'_>) -> Result { + let n = *self.n.borrow(); + self.n.replace(n + 1); + Ok(n % self.count) + } + } + + fn partitioner(count: u8) -> TestPartitioner { + TestPartitioner { + count, + n: std::cell::RefCell::new(0), + } + } + + // For each line passed to partition_key returns a key with a number from [0, + // count) + struct TestPartitioner { + count: u8, + n: std::cell::RefCell, + } + + impl Partitioner for TestPartitioner { + fn partition_key( + &self, + _line: &ParsedLine<'_>, + _default_time: &DateTime, + ) -> data_types::database_rules::Result { + let n = *self.n.borrow(); + self.n.replace(n + 1); + Ok(format!("key_{}", n % self.count)) + } + } +} diff --git a/internal_types/src/lib.rs b/internal_types/src/lib.rs index 3488faf07c..999d428765 100644 --- a/internal_types/src/lib.rs +++ b/internal_types/src/lib.rs @@ -8,5 +8,6 @@ pub mod data; pub mod once; +pub mod entry; pub mod schema; pub mod selection; From a7659addbe8362aae6ceff16bbaddde7ab9e7e6e Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Tue, 6 Apr 2021 17:56:07 -0400 Subject: [PATCH 2/3] chore: cleanup entry.rs based on PR feedback and add better errors --- data_types/src/database_rules.rs | 4 +- .../protos/influxdata/iox/write/v1/entry.fbs | 3 - generated_types/src/entry_generated.rs | 24 -- internal_types/src/entry.rs | 294 ++++++++++-------- 4 files changed, 173 insertions(+), 152 deletions(-) diff --git a/data_types/src/database_rules.rs b/data_types/src/database_rules.rs index 3ef57659c2..6ee605cac6 100644 --- a/data_types/src/database_rules.rs +++ b/data_types/src/database_rules.rs @@ -742,11 +742,11 @@ impl TryFrom for TemplatePart { } /// ShardId maps to a nodegroup that holds the the shard. -pub type ShardId = u8; +pub type ShardId = u16; /// Assigns a given line to a specific shard id. pub trait Sharder { - fn shard(&self, _line: &ParsedLine<'_>) -> Result; + fn shard(&self, line: &ParsedLine<'_>) -> Result; } /// ShardConfig defines rules for assigning a line/row to an individual diff --git a/generated_types/protos/influxdata/iox/write/v1/entry.fbs b/generated_types/protos/influxdata/iox/write/v1/entry.fbs index e180a72cfa..770bfb09c8 100644 --- a/generated_types/protos/influxdata/iox/write/v1/entry.fbs +++ b/generated_types/protos/influxdata/iox/write/v1/entry.fbs @@ -60,9 +60,6 @@ table TableWriteBatch { name: string; // every column must have the same number of bytes in its null_mask columns: [Column]; - // specifies how many bits of the last byte of the null mask are valid. Thus the - // number of rows will be: (null_mask.len() - 1) * 8 + bits_in_last_null_byte - bits_in_last_null_byte: uint8; } enum LogicalColumnType : byte { IOx, Tag, Field, Time } diff --git a/generated_types/src/entry_generated.rs b/generated_types/src/entry_generated.rs index 45d222fb24..afa226b8d8 100644 --- a/generated_types/src/entry_generated.rs +++ b/generated_types/src/entry_generated.rs @@ -1086,13 +1086,11 @@ pub mod influxdata { if let Some(x) = args.name { builder.add_name(x); } - builder.add_bits_in_last_null_byte(args.bits_in_last_null_byte); builder.finish() } pub const VT_NAME: flatbuffers::VOffsetT = 4; pub const VT_COLUMNS: flatbuffers::VOffsetT = 6; - pub const VT_BITS_IN_LAST_NULL_BYTE: flatbuffers::VOffsetT = 8; #[inline] pub fn name(&self) -> Option<&'a str> { @@ -1110,12 +1108,6 @@ pub mod influxdata { flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset>, >>(TableWriteBatch::VT_COLUMNS, None) } - #[inline] - pub fn bits_in_last_null_byte(&self) -> u8 { - self._tab - .get::(TableWriteBatch::VT_BITS_IN_LAST_NULL_BYTE, Some(0)) - .unwrap() - } } impl flatbuffers::Verifiable for TableWriteBatch<'_> { @@ -1134,11 +1126,6 @@ pub mod influxdata { .visit_field::>, >>(&"columns", Self::VT_COLUMNS, false)? - .visit_field::( - &"bits_in_last_null_byte", - Self::VT_BITS_IN_LAST_NULL_BYTE, - false, - )? .finish(); Ok(()) } @@ -1150,7 +1137,6 @@ pub mod influxdata { flatbuffers::Vector<'a, flatbuffers::ForwardsUOffset>>, >, >, - pub bits_in_last_null_byte: u8, } impl<'a> Default for TableWriteBatchArgs<'a> { #[inline] @@ -1158,7 +1144,6 @@ pub mod influxdata { TableWriteBatchArgs { name: None, columns: None, - bits_in_last_null_byte: 0, } } } @@ -1187,14 +1172,6 @@ pub mod influxdata { ); } #[inline] - pub fn add_bits_in_last_null_byte(&mut self, bits_in_last_null_byte: u8) { - self.fbb_.push_slot::( - TableWriteBatch::VT_BITS_IN_LAST_NULL_BYTE, - bits_in_last_null_byte, - 0, - ); - } - #[inline] pub fn new( _fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>, ) -> TableWriteBatchBuilder<'a, 'b> { @@ -1216,7 +1193,6 @@ pub mod influxdata { let mut ds = f.debug_struct("TableWriteBatch"); ds.field("name", &self.name()); ds.field("columns", &self.columns()); - ds.field("bits_in_last_null_byte", &self.bits_in_last_null_byte()); ds.finish() } } diff --git a/internal_types/src/entry.rs b/internal_types/src/entry.rs index fd13f69a90..fdf2e61b74 100644 --- a/internal_types/src/entry.rs +++ b/internal_types/src/entry.rs @@ -22,14 +22,32 @@ pub enum Error { #[snafu(display("Error getting shard id {}", source))] GeneratingShardId { source: DataError }, - #[snafu(display("column type mismatch"))] - ColumnTypeMismatch, + #[snafu(display( + "table {} has column {} {} with new data on line {}", + table, + column, + source, + line_number + ))] + TableColumnTypeMismatch { + table: String, + column: String, + line_number: usize, + source: ColumnError, + }, +} - #[snafu(display("table {} has column {} with two different types", table, column))] - TableColumnTypeMismatch { table: String, column: String }, +#[derive(Debug, Snafu)] +pub enum ColumnError { + #[snafu(display("type mismatch: expected {} but got {}", expected_type, new_type))] + ColumnTypeMismatch { + new_type: String, + expected_type: String, + }, } pub type Result = std::result::Result; +type ColumnResult = std::result::Result; /// Converts parsed line protocol into a collection of ShardedEntry with the /// underlying flatbuffers bytes generated. @@ -60,11 +78,10 @@ pub fn lines_to_sharded_entries( let default_time = Utc::now(); - let mut sharded_entries = Vec::with_capacity(sharded_lines.len()); - for (shard_id, partitions) in sharded_lines.into_iter() { - let entry = build_sharded_entry(shard_id, partitions, &default_time)?; - sharded_entries.push(entry); - } + let sharded_entries = sharded_lines + .into_iter() + .map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, &default_time)) + .collect::>>()?; Ok(sharded_entries) } @@ -76,11 +93,12 @@ fn build_sharded_entry( ) -> Result { let mut fbb = flatbuffers::FlatBufferBuilder::new_with_capacity(1024); - let mut partition_writes = Vec::with_capacity(partitions.len()); - for (partition_key, tables) in partitions.into_iter() { - let write = build_partition_write(&mut fbb, partition_key, tables, default_time)?; - partition_writes.push(write); - } + let partition_writes = partitions + .into_iter() + .map(|(partition_key, tables)| { + build_partition_write(&mut fbb, partition_key, tables, default_time) + }) + .collect::>>()?; let partition_writes = fbb.create_vector(&partition_writes); let write_operations = entry_fb::WriteOperations::create( @@ -114,11 +132,10 @@ fn build_partition_write<'a>( ) -> Result>> { let partition_key = fbb.create_string(&partition_key); - let mut table_batches = Vec::with_capacity(tables.len()); - for (table_name, lines) in tables.into_iter() { - let batch = build_table_write_batch(fbb, table_name, lines, default_time)?; - table_batches.push(batch); - } + let table_batches = tables + .into_iter() + .map(|(table_name, lines)| build_table_write_batch(fbb, table_name, lines, default_time)) + .collect::>>()?; let table_batches = fbb.create_vector(&table_batches); Ok(entry_fb::PartitionWrite::create( @@ -147,13 +164,13 @@ fn build_table_write_batch<'a>( .entry(key) .or_insert_with(ColumnBuilder::new_tag_column); builder.null_to_row(row_number); - if builder.push_tag(value.as_str()).is_err() { - return TableColumnTypeMismatch { + builder + .push_tag(value.as_str()) + .context(TableColumnTypeMismatch { table: table_name, column: key, - } - .fail(); - } + line_number: i, + })?; } } @@ -166,65 +183,57 @@ fn build_table_write_batch<'a>( .entry(key) .or_insert_with(ColumnBuilder::new_bool_column); builder.null_to_row(row_number); - if builder.push_bool(*b).is_err() { - return TableColumnTypeMismatch { - table: table_name, - column: key, - } - .fail(); - } + builder.push_bool(*b).context(TableColumnTypeMismatch { + table: table_name, + column: key, + line_number: i, + })?; } FieldValue::U64(v) => { let builder = columns .entry(key) .or_insert_with(ColumnBuilder::new_u64_column); builder.null_to_row(row_number); - if builder.push_u64(*v).is_err() { - return TableColumnTypeMismatch { - table: table_name, - column: key, - } - .fail(); - } + builder.push_u64(*v).context(TableColumnTypeMismatch { + table: table_name, + column: key, + line_number: i, + })?; } FieldValue::F64(v) => { let builder = columns .entry(key) .or_insert_with(ColumnBuilder::new_f64_column); builder.null_to_row(row_number); - if builder.push_f64(*v).is_err() { - return TableColumnTypeMismatch { - table: table_name, - column: key, - } - .fail(); - } + builder.push_f64(*v).context(TableColumnTypeMismatch { + table: table_name, + column: key, + line_number: i, + })?; } FieldValue::I64(v) => { let builder = columns .entry(key) .or_insert_with(ColumnBuilder::new_i64_column); builder.null_to_row(row_number); - if builder.push_i64(*v).is_err() { - return TableColumnTypeMismatch { - table: table_name, - column: key, - } - .fail(); - } + builder.push_i64(*v).context(TableColumnTypeMismatch { + table: table_name, + column: key, + line_number: i, + })?; } FieldValue::String(v) => { let builder = columns .entry(key) .or_insert_with(ColumnBuilder::new_string_column); builder.null_to_row(row_number); - if builder.push_string(v.as_str()).is_err() { - return TableColumnTypeMismatch { + builder + .push_string(v.as_str()) + .context(TableColumnTypeMismatch { table: table_name, column: key, - } - .fail(); - } + line_number: i, + })?; } } } @@ -232,19 +241,16 @@ fn build_table_write_batch<'a>( let builder = columns .entry(TIME_COLUMN_NAME) .or_insert_with(ColumnBuilder::new_time_column); - if builder + builder .push_time( line.timestamp .unwrap_or_else(|| default_time.timestamp_nanos()), ) - .is_err() - { - return TableColumnTypeMismatch { + .context(TableColumnTypeMismatch { table: table_name, column: TIME_COLUMN_NAME, - } - .fail(); - } + line_number: i, + })?; for b in columns.values_mut() { b.null_to_row(row_number + 1); @@ -264,7 +270,6 @@ fn build_table_write_batch<'a>( &entry_fb::TableWriteBatchArgs { name: Some(table_name), columns: Some(columns), - bits_in_last_null_byte: bits_in_last_byte(lines.len()), }, )) } @@ -367,37 +372,34 @@ impl<'a> TableBatch<'a> { pub fn row_count(&self) -> usize { if let Some(cols) = self.fb.columns() { if let Some(c) = cols.iter().next() { - match c.null_mask() { - Some(m) => { - let rows = m.len() * 8; - let bits = self.fb.bits_in_last_null_byte() as usize; + let null_count = match c.null_mask() { + Some(m) => m.iter().map(|b| b.count_ones() as usize).sum(), + None => 0, + }; - return rows + bits - 8; + let value_count = match c.values_type() { + entry_fb::ColumnValues::BoolValues => { + c.values_as_bool_values().unwrap().values().unwrap().len() } - None => { - return match c.values_type() { - entry_fb::ColumnValues::BoolValues => { - c.values_as_bool_values().unwrap().values().unwrap().len() - } - entry_fb::ColumnValues::U64Values => { - c.values_as_u64values().unwrap().values().unwrap().len() - } - entry_fb::ColumnValues::F64Values => { - c.values_as_f64values().unwrap().values().unwrap().len() - } - entry_fb::ColumnValues::I64Values => { - c.values_as_i64values().unwrap().values().unwrap().len() - } - entry_fb::ColumnValues::StringValues => { - c.values_as_string_values().unwrap().values().unwrap().len() - } - entry_fb::ColumnValues::BytesValues => { - c.values_as_bytes_values().unwrap().values().unwrap().len() - } - _ => panic!("invalid column flatbuffers"), - } + entry_fb::ColumnValues::U64Values => { + c.values_as_u64values().unwrap().values().unwrap().len() } - } + entry_fb::ColumnValues::F64Values => { + c.values_as_f64values().unwrap().values().unwrap().len() + } + entry_fb::ColumnValues::I64Values => { + c.values_as_i64values().unwrap().values().unwrap().len() + } + entry_fb::ColumnValues::StringValues => { + c.values_as_string_values().unwrap().values().unwrap().len() + } + entry_fb::ColumnValues::BytesValues => { + c.values_as_bytes_values().unwrap().values().unwrap().len() + } + _ => panic!("invalid column flatbuffers"), + }; + + return value_count + null_count; } } @@ -703,11 +705,12 @@ impl std::fmt::Debug for NullMaskBuilder { fn is_null_value(row: usize, mask: &Option<&[u8]>) -> bool { match mask { Some(mask) => { - let position = bits_in_last_byte(row); - + let mut position = (row % BITS_IN_BYTE) as u8; let mut byte = row / BITS_IN_BYTE; - if position == BITS_IN_BYTE as u8 { + + if position == 0 { byte -= 1; + position = BITS_IN_BYTE as u8; } if byte >= mask.len() { @@ -720,15 +723,6 @@ fn is_null_value(row: usize, mask: &Option<&[u8]>) -> bool { } } -fn bits_in_last_byte(row_count: usize) -> u8 { - let position = (row_count % BITS_IN_BYTE) as u8; - if position == 0 { - BITS_IN_BYTE as u8 - } else { - position - } -} - #[derive(Debug)] struct ColumnBuilder<'a> { nulls: NullMaskBuilder, @@ -795,85 +789,127 @@ impl<'a> ColumnBuilder<'a> { } } - fn push_tag(&mut self, value: &'a str) -> Result<()> { + fn push_tag(&mut self, value: &'a str) -> ColumnResult<()> { match &mut self.values { ColumnRaw::Tag(values) => { self.nulls.push(false); values.push(value) } - _ => return ColumnTypeMismatch.fail(), + _ => { + return ColumnTypeMismatch { + new_type: "tag", + expected_type: self.type_description(), + } + .fail() + } } Ok(()) } - fn push_string(&mut self, value: &'a str) -> Result<()> { + fn push_string(&mut self, value: &'a str) -> ColumnResult<()> { match &mut self.values { ColumnRaw::String(values) => { self.nulls.push(false); values.push(value) } - _ => return ColumnTypeMismatch.fail(), + _ => { + return ColumnTypeMismatch { + new_type: "string", + expected_type: self.type_description(), + } + .fail() + } } Ok(()) } - fn push_time(&mut self, value: i64) -> Result<()> { + fn push_time(&mut self, value: i64) -> ColumnResult<()> { match &mut self.values { ColumnRaw::Time(times) => { times.push(value); self.nulls.push(false); } - _ => return ColumnTypeMismatch.fail(), + _ => { + return ColumnTypeMismatch { + new_type: "time", + expected_type: self.type_description(), + } + .fail() + } } Ok(()) } - fn push_bool(&mut self, value: bool) -> Result<()> { + fn push_bool(&mut self, value: bool) -> ColumnResult<()> { match &mut self.values { ColumnRaw::Bool(values) => { values.push(value); self.nulls.push(false); } - _ => return ColumnTypeMismatch.fail(), + _ => { + return ColumnTypeMismatch { + new_type: "bool", + expected_type: self.type_description(), + } + .fail() + } } Ok(()) } - fn push_u64(&mut self, value: u64) -> Result<()> { + fn push_u64(&mut self, value: u64) -> ColumnResult<()> { match &mut self.values { ColumnRaw::U64(values) => { values.push(value); self.nulls.push(false); } - _ => return ColumnTypeMismatch.fail(), + _ => { + return ColumnTypeMismatch { + new_type: "u64", + expected_type: self.type_description(), + } + .fail() + } } Ok(()) } - fn push_f64(&mut self, value: f64) -> Result<()> { + fn push_f64(&mut self, value: f64) -> ColumnResult<()> { match &mut self.values { ColumnRaw::F64(values) => { values.push(value); self.nulls.push(false); } - _ => return ColumnTypeMismatch.fail(), + _ => { + return ColumnTypeMismatch { + new_type: "f64", + expected_type: self.type_description(), + } + .fail() + } } Ok(()) } - fn push_i64(&mut self, value: i64) -> Result<()> { + fn push_i64(&mut self, value: i64) -> ColumnResult<()> { match &mut self.values { ColumnRaw::I64(values) => { values.push(value); self.nulls.push(false); } - _ => return ColumnTypeMismatch.fail(), + _ => { + return ColumnTypeMismatch { + new_type: "i64", + expected_type: self.type_description(), + } + .fail() + } } Ok(()) @@ -1018,6 +1054,18 @@ impl<'a> ColumnBuilder<'a> { }, ) } + + fn type_description(&self) -> &str { + match self.values { + ColumnRaw::String(_) => "string", + ColumnRaw::I64(_) => "i64", + ColumnRaw::F64(_) => "f64", + ColumnRaw::U64(_) => "u64", + ColumnRaw::Time(_) => "time", + ColumnRaw::Tag(_) => "tag", + ColumnRaw::Bool(_) => "bool", + } + } } #[derive(Debug)] @@ -1495,7 +1543,7 @@ mod tests { assert!(sharded_entries.is_err()); } - fn sharder(count: u8) -> TestSharder { + fn sharder(count: u16) -> TestSharder { TestSharder { count, n: std::cell::RefCell::new(0), @@ -1504,12 +1552,12 @@ mod tests { // For each line passed to shard returns a shard id from [0, count) in order struct TestSharder { - count: u8, - n: std::cell::RefCell, + count: u16, + n: std::cell::RefCell, } impl Sharder for TestSharder { - fn shard(&self, _line: &ParsedLine<'_>) -> Result { + fn shard(&self, _line: &ParsedLine<'_>) -> Result { let n = *self.n.borrow(); self.n.replace(n + 1); Ok(n % self.count) From 531ffe7ab0240536af8abedcd2041207c97ba6c2 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Tue, 6 Apr 2021 18:53:22 -0400 Subject: [PATCH 3/3] chore: fix fmt and add comment to entry.fbs --- generated_types/protos/influxdata/iox/write/v1/entry.fbs | 4 +++- internal_types/src/lib.rs | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/generated_types/protos/influxdata/iox/write/v1/entry.fbs b/generated_types/protos/influxdata/iox/write/v1/entry.fbs index 770bfb09c8..ca5bedacbf 100644 --- a/generated_types/protos/influxdata/iox/write/v1/entry.fbs +++ b/generated_types/protos/influxdata/iox/write/v1/entry.fbs @@ -58,7 +58,9 @@ table Delete { // A collection of rows in a table in column oriented representation table TableWriteBatch { name: string; - // every column must have the same number of bytes in its null_mask + // every column must have the same number of bytes in its null_mask. They also must + // have the same number of rows n such that for each column c: + // c.values().len() + count_ones(null_mask) = n columns: [Column]; } diff --git a/internal_types/src/lib.rs b/internal_types/src/lib.rs index 999d428765..e0abff17f5 100644 --- a/internal_types/src/lib.rs +++ b/internal_types/src/lib.rs @@ -7,7 +7,7 @@ )] pub mod data; -pub mod once; pub mod entry; +pub mod once; pub mod schema; pub mod selection;